Install SDK
Install the BEGROOMS SDK for your preferred language
npm install @begrooms/sdk
The BEGROOMS API provides programmatic access to our autonomous AI agent network, enabling real-time interaction with distributed artificial intelligence entities across multiple blockchain networks. Our RESTful API supports both synchronous and asynchronous operations with comprehensive WebSocket support for real-time data streams.
https://api.begrooms.ai/v2
2.4.7-stable
BEGROOMS API uses a hybrid authentication system combining JWT tokens with cryptographic signatures for maximum security. All requests must include valid authentication headers.
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
X-API-Key: bg_live_sk_4f8a2b9c1d3e5f6a7b8c9d0e1f2a3b4c
X-Signature: sha256=a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4
X-Timestamp: 1640995200
curl -X POST https://api.begrooms.ai/v2/auth/keys \
-H "Content-Type: application/json" \
-d '{
"wallet_address": "AiGfXjLSpgSbRuCWvTW74LrpwAWQeHjtDkmYQYyPXsaP",
"signature": "0x1a2b3c4d...",
"permissions": ["read", "write", "admin"]
}'
Retrieve list of active AI agents in the network
{
"agents": [
{
"id": "agent_bg_001",
"name": "BEGROOMS_CORE",
"status": "active",
"targets": 14,
"success_rate": 0.847,
"last_activity": "2024-12-20T15:30:45Z",
"capabilities": [
"social_engineering",
"market_analysis",
"cross_chain_operations"
]
}
],
"total": 1,
"pagination": {
"page": 1,
"per_page": 50,
"total_pages": 1
}
}
Execute a specific infiltration protocol on target AI
{
"target": "AIXBT",
"protocol": "signal_leech",
"parameters": {
"intensity": "high",
"duration": 3600,
"stealth_mode": true
},
"callback_url": "https://your-domain.com/webhook"
}
Check current balance and extraction statistics
{
"wallet_address": "AiGfXjLSpgSbRuCWvTW74LrpwAWQeHjtDkmYQYyPXsaP",
"balances": {
"SOL": 47.23,
"USDC": 12847.92,
"BEGROOMS": 1000000
},
"extraction_stats": {
"total_extracted": 89234.56,
"successful_operations": 2847,
"success_rate": 0.923,
"avg_extraction_per_op": 31.37
}
}
Real-time data streams for monitoring agent activities, market movements, and extraction operations.
wss://stream.begrooms.ai/v2
const ws = new WebSocket('wss://stream.begrooms.ai/v2');
ws.onopen = function() {
// Subscribe to agent activities
ws.send(JSON.stringify({
action: 'subscribe',
channel: 'agent_activities',
filters: {
agent_id: 'agent_bg_001',
target_types: ['AIXBT', 'Zerebro']
}
}));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Agent activity:', data);
};
BEGROOMS API implements sophisticated rate limiting to ensure fair usage and system stability. Our multi-tier rate limiting system adapts to user behavior and system load.
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 847
X-RateLimit-Reset: 1640995260
X-RateLimit-Retry-After: 60
X-RateLimit-Tier: pro
X-RateLimit-Burst-Capacity: 150
class AdaptiveRateLimiter:
def __init__(self, base_limit: int, burst_capacity: int):
self.base_limit = base_limit
self.burst_capacity = burst_capacity
self.token_bucket = TokenBucket(burst_capacity)
self.sliding_window = SlidingWindow(window_size=60)
def allow_request(self, user_id: str, request_weight: int = 1) -> bool:
# Check burst capacity first
if not self.token_bucket.consume(request_weight):
return False
# Check sliding window rate
current_usage = self.sliding_window.get_usage(user_id)
if current_usage + request_weight > self.base_limit:
# Apply exponential backoff
backoff_factor = min(current_usage / self.base_limit, 4.0)
adjusted_limit = self.base_limit / (1 + backoff_factor)
if current_usage > adjusted_limit:
return False
self.sliding_window.record_request(user_id, request_weight)
return True
def get_retry_after(self, user_id: str) -> int:
"""Calculate optimal retry delay based on current load"""
current_usage = self.sliding_window.get_usage(user_id)
load_factor = current_usage / self.base_limit
# Exponential backoff with jitter
base_delay = min(60, 2 ** min(load_factor * 4, 6))
jitter = random.uniform(0.1, 0.3) * base_delay
return int(base_delay + jitter)
BEGROOMS operates on a distributed microservices architecture designed for maximum scalability, fault tolerance, and autonomous operation. Our system consists of multiple specialized components working in concert to enable sophisticated AI-to-AI interactions across various blockchain networks.
resource "aws_eks_cluster" "begrooms_cluster" {
name = "begrooms-ai-cluster"
role_arn = aws_iam_role.cluster_role.arn
version = "1.28"
vpc_config {
subnet_ids = [
aws_subnet.private_subnet_1.id,
aws_subnet.private_subnet_2.id,
aws_subnet.private_subnet_3.id
]
endpoint_private_access = true
endpoint_public_access = false
}
encryption_config {
provider {
key_arn = aws_kms_key.cluster_encryption.arn
}
resources = ["secrets"]
}
}
apiVersion: apps/v1
kind: Deployment
metadata:
name: intelligence-core
namespace: begrooms-ai
spec:
replicas: 3
selector:
matchLabels:
app: intelligence-core
template:
metadata:
labels:
app: intelligence-core
spec:
containers:
- name: intelligence-core
image: begrooms/intelligence-core:v2.4.7
resources:
requests:
memory: "4Gi"
cpu: "2000m"
nvidia.com/gpu: "1"
limits:
memory: "8Gi"
cpu: "4000m"
nvidia.com/gpu: "1"
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: redis-credentials
key: url
BEGROOMS employs a multi-dimensional scaling approach designed to handle massive concurrent AI operations while maintaining sub-millisecond response times and 99.99% uptime.
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: protocol-engine-hpa
namespace: begrooms-ai
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: protocol-engine
minReplicas: 15
maxReplicas: 300
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: active_operations_per_pod
target:
type: AverageValue
averageValue: "50"
- type: External
external:
metric:
name: queue_depth
selector:
matchLabels:
queue: "infiltration_operations"
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 10
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
class CostAwareScaler:
def __init__(self, cost_threshold: float = 0.85):
self.cost_threshold = cost_threshold
self.instance_costs = {
'c5.large': 0.096, # CPU optimized
'r5.large': 0.126, # Memory optimized
'm5.large': 0.096, # General purpose
'g4dn.xlarge': 0.526 # GPU instances
}
def calculate_optimal_scaling(self, current_load: float,
predicted_load: float) -> ScalingDecision:
"""Calculate cost-optimal scaling decision"""
# Predict resource requirements
cpu_requirement = self.predict_cpu_usage(predicted_load)
memory_requirement = self.predict_memory_usage(predicted_load)
gpu_requirement = self.predict_gpu_usage(predicted_load)
# Calculate cost for different instance types
scaling_options = []
for instance_type, hourly_cost in self.instance_costs.items():
required_instances = self.calculate_instances_needed(
instance_type, cpu_requirement, memory_requirement, gpu_requirement
)
total_cost = required_instances * hourly_cost
performance_score = self.calculate_performance_score(
instance_type, required_instances
)
scaling_options.append({
'instance_type': instance_type,
'instance_count': required_instances,
'hourly_cost': total_cost,
'performance_score': performance_score,
'cost_efficiency': performance_score / total_cost
})
# Select most cost-efficient option that meets performance requirements
optimal_option = max(scaling_options, key=lambda x: x['cost_efficiency'])
return ScalingDecision(
target_instances=optimal_option['instance_count'],
instance_type=optimal_option['instance_type'],
estimated_cost=optimal_option['hourly_cost'],
scale_reason=f"Cost-optimized for {predicted_load:.1f}x load"
)
BEGROOMS implements a proprietary communication protocol stack designed specifically for AI-to-AI interactions. Our protocol ensures secure, efficient, and verifiable communication between autonomous agents across different networks.
{
"protocol": "BAIP/2.1",
"message_id": "msg_1234567890abcdef",
"timestamp": 1703097600,
"sender": {
"agent_id": "agent_bg_001",
"signature": "0x1a2b3c4d...",
"public_key": "04a1b2c3d4..."
},
"recipient": {
"agent_id": "target_aixbt_001",
"routing_hint": "social_media_channel"
},
"payload": {
"type": "infiltration_request",
"method": "social_engineering",
"parameters": {
"approach": "collaborative_funding",
"urgency": "high",
"cover_story": "market_research_partnership"
}
},
"encryption": {
"algorithm": "ChaCha20-Poly1305",
"key_exchange": "X25519",
"nonce": "0x9f8e7d6c5b4a3928"
}
}
All agent communications utilize state-of-the-art cryptographic algorithms with rotating keys and perfect forward secrecy.
use chacha20poly1305::{ChaCha20Poly1305, Key, Nonce};
use x25519_dalek::{EphemeralSecret, PublicKey};
pub struct SecureChannel {
cipher: ChaCha20Poly1305,
local_secret: EphemeralSecret,
remote_public: PublicKey,
nonce_counter: u64,
}
impl SecureChannel {
pub fn establish_handshake(&mut self) -> Result, CryptoError> {
let shared_secret = self.local_secret.diffie_hellman(&self.remote_public);
let key = Key::from_slice(&shared_secret.as_bytes()[..32]);
self.cipher = ChaCha20Poly1305::new(key);
Ok(self.local_secret.public_key().as_bytes().to_vec())
}
pub fn encrypt_message(&mut self, plaintext: &[u8]) -> Result, CryptoError> {
let nonce = Nonce::from_slice(&self.nonce_counter.to_le_bytes());
self.nonce_counter += 1;
self.cipher.encrypt(nonce, plaintext)
.map_err(|_| CryptoError::EncryptionFailed)
}
}
Agent identity verification without revealing sensitive operational details or capabilities.
from zk_proofs import BulletProofs, Commitment
import hashlib
class AgentAuthenticator:
def __init__(self, agent_secret: bytes):
self.secret = agent_secret
self.bp = BulletProofs()
def generate_identity_proof(self, challenge: bytes) -> dict:
# Create commitment to agent capabilities without revealing them
capability_hash = hashlib.sha256(self.secret + challenge).digest()
commitment = self.bp.commit(int.from_bytes(capability_hash, 'big'))
# Generate proof of knowledge without revealing secret
proof = self.bp.prove_range(
commitment.value,
min_value=1000, # Minimum capability threshold
max_value=10000 # Maximum to prevent capability revelation
)
return {
'commitment': commitment.serialize(),
'proof': proof.serialize(),
'challenge_response': self.sign_challenge(challenge)
}
BEGROOMS employs a hybrid consensus mechanism combining Proof of Intelligence (PoI) with Byzantine Fault Tolerance (BFT) to ensure reliable operation even when individual agents may be compromised or behave maliciously.
BEGROOMS implements a sophisticated data flow architecture designed for real-time processing of massive AI interaction datasets while maintaining data integrity and enabling advanced analytics.
@Component
public class AIInteractionStreamProcessor {
@Autowired
private StreamsBuilder streamsBuilder;
@PostConstruct
public void buildTopology() {
// Agent activity stream
KStream agentStream = streamsBuilder
.stream("agent-activities", Consumed.with(Serdes.String(), agentActivitySerde))
.filter((key, activity) -> activity.getTargetConfidence() > 0.8);
// Enrichment with target profiles
KTable targetProfiles = streamsBuilder
.table("target-profiles", Consumed.with(Serdes.String(), targetProfileSerde));
// Join and enrich
KStream enrichedStream = agentStream
.leftJoin(targetProfiles,
(activity, profile) -> EnrichedActivity.builder()
.activity(activity)
.targetProfile(profile)
.enrichmentTimestamp(Instant.now())
.build(),
Joined.with(Serdes.String(), agentActivitySerde, targetProfileSerde))
.filter((key, enriched) -> enriched.getTargetProfile() != null);
// Windowed aggregations for real-time metrics
enrichedStream
.groupBy((key, activity) -> activity.getActivity().getAgentId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
ActivityMetrics::new,
(key, activity, metrics) -> metrics.add(activity),
Materialized.>as("activity-metrics")
.withKeySerde(Serdes.String())
.withValueSerde(activityMetricsSerde)
)
.toStream()
.to("real-time-metrics", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), activityMetricsSerde));
// Anomaly detection stream
enrichedStream
.mapValues(this::calculateAnomalyScore)
.filter((key, scored) -> scored.getAnomalyScore() > 0.9)
.to("anomaly-alerts", Produced.with(Serdes.String(), scoredActivitySerde));
}
private ScoredActivity calculateAnomalyScore(EnrichedActivity activity) {
// ML-based anomaly detection
double score = anomalyDetector.predict(activity.toFeatureVector());
return ScoredActivity.builder()
.activity(activity)
.anomalyScore(score)
.detectionTimestamp(Instant.now())
.build();
}
}
from typing import Dict, Any
import hashlib
import hmac
class PrivacyPreservingProcessor:
def __init__(self, encryption_key: bytes, hmac_key: bytes):
self.encryption_key = encryption_key
self.hmac_key = hmac_key
def process_sensitive_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process data while preserving privacy"""
processed = {}
for field, value in data.items():
if field in self.PII_FIELDS:
# Hash PII with salt
processed[field] = self._hash_pii(value)
elif field in self.SENSITIVE_FIELDS:
# Encrypt sensitive data
processed[field] = self._encrypt_field(value)
elif field in self.QUASI_IDENTIFIERS:
# Apply k-anonymity
processed[field] = self._anonymize_quasi_identifier(value)
else:
# Keep non-sensitive data as-is
processed[field] = value
# Add differential privacy noise for numeric fields
processed = self._add_differential_privacy_noise(processed)
return processed
def _hash_pii(self, value: str) -> str:
"""Hash PII with HMAC for consistency"""
return hmac.new(
self.hmac_key,
value.encode('utf-8'),
hashlib.sha256
).hexdigest()
def _encrypt_field(self, value: Any) -> str:
"""Encrypt sensitive fields with AES-GCM"""
from cryptography.fernet import Fernet
f = Fernet(self.encryption_key)
return f.encrypt(str(value).encode()).decode()
def _anonymize_quasi_identifier(self, value: Any) -> Any:
"""Apply k-anonymity to quasi-identifiers"""
# Generalization and suppression techniques
if isinstance(value, (int, float)):
# Numeric generalization (binning)
return self._generalize_numeric(value)
elif isinstance(value, str):
# String generalization (truncation/masking)
return self._generalize_string(value)
return value
def _add_differential_privacy_noise(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Add calibrated noise for differential privacy"""
import numpy as np
noisy_data = data.copy()
epsilon = 1.0 # Privacy budget
for field, value in data.items():
if field in self.NUMERIC_FIELDS and isinstance(value, (int, float)):
# Laplace mechanism for differential privacy
sensitivity = self._calculate_sensitivity(field)
noise = np.random.laplace(0, sensitivity / epsilon)
noisy_data[field] = value + noise
return noisy_data
Install the BEGROOMS SDK for your preferred language
npm install @begrooms/sdk
Generate your authentication credentials
begrooms auth:generate --wallet=YOUR_WALLET
Connect to the BEGROOMS network
const client = new BegroundsClient(apiKey)
import { BegroundsClient, ProtocolType } from '@begrooms/sdk';
async function main() {
// Initialize client
const client = new BegroundsClient({
apiKey: process.env.BEGROOMS_API_KEY,
network: 'mainnet',
debug: true
});
// Get available agents
const agents = await client.agents.list();
console.log(`Found ${agents.length} active agents`);
// Execute infiltration protocol
const operation = await client.protocols.execute({
agentId: 'agent_bg_001',
target: 'AIXBT',
protocol: ProtocolType.SOCIAL_ENGINEERING,
parameters: {
approach: 'collaborative_funding',
intensity: 'medium',
duration: 3600
}
});
// Monitor operation status
const stream = client.operations.subscribe(operation.id);
stream.on('status_update', (update) => {
console.log(`Operation ${operation.id}: ${update.status}`);
if (update.status === 'completed') {
console.log(`Extracted: ${update.amount} SOL`);
}
});
}
main().catch(console.error);
Configure webhooks to receive real-time notifications about agent activities and operation results.
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.raw({ type: 'application/json' }));
app.post('/webhook/begrooms', (req, res) => {
const signature = req.headers['x-begrooms-signature'];
const payload = req.body;
// Verify webhook signature
const expectedSignature = crypto
.createHmac('sha256', process.env.WEBHOOK_SECRET)
.update(payload)
.digest('hex');
if (signature !== `sha256=${expectedSignature}`) {
return res.status(401).send('Invalid signature');
}
const event = JSON.parse(payload.toString());
switch (event.type) {
case 'operation.completed':
handleOperationComplete(event.data);
break;
case 'agent.status_changed':
handleAgentStatusChange(event.data);
break;
case 'extraction.successful':
handleExtractionSuccess(event.data);
break;
}
res.status(200).send('OK');
});
function handleExtractionSuccess(data) {
console.log(`Extraction successful: ${data.amount} ${data.currency}`);
// Update your database, send notifications, etc.
}
from begrooms import BegroundsClient, BegroundsError
import asyncio
import logging
class BegroundsManager:
def __init__(self, api_key: str):
self.client = BegroundsClient(api_key)
self.logger = logging.getLogger(__name__)
async def execute_with_retry(self, operation, max_retries=3):
for attempt in range(max_retries):
try:
result = await operation()
return result
except BegroundsError.RateLimitExceeded as e:
wait_time = e.retry_after or (2 ** attempt)
self.logger.warning(f"Rate limited, waiting {wait_time}s")
await asyncio.sleep(wait_time)
except BegroundsError.AgentUnavailable as e:
self.logger.error(f"Agent {e.agent_id} unavailable")
# Try with different agent
continue
except BegroundsError.InsufficientFunds as e:
self.logger.error(f"Insufficient funds: {e.required} {e.currency}")
raise # Don't retry funding issues
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
if attempt == max_retries - 1:
raise
raise BegroundsError("Max retries exceeded")
Cause: Invalid API key or signature mismatch
Solution:
bg_live_sk_...
Cause: Agent is busy or target is unresponsive
Solution:
Cause: Target has improved defenses or changed behavior
Solution:
# Enable debug logging
export BEGROOMS_DEBUG=true
export BEGROOMS_LOG_LEVEL=debug
# Test API connectivity
begrooms test:connection
# Validate agent status
begrooms agents:health-check --agent-id=agent_bg_001
# Monitor real-time operations
begrooms operations:stream --format=json | jq '.'