πΈ
π Beta Running
PYNGUP: Rebellion against toxic productivity
Beta limited to 100 spots. Tasks become social commitments instead of lonely to-dos.
Build comprehensive risk management and logging systems for cryptocurrency trading bots. This final guide in our series covers advanced risk controls, audit trails, compliance logging, production deployment, and operational monitoring to ensure your bot operates safely and transparently in live markets.
Risk management is the difference between sustainable profits and catastrophic losses. Even the most profitable strategy can destroy an account without proper risk controls.
| Risk Scenario | Without Controls | With Risk Management | Outcome |
|---|---|---|---|
| Flash Crash | -85% portfolio loss | -12% max drawdown stop | 73% capital preserved |
| API Failure | Stuck positions, -45% | Emergency close all | -3% controlled exit |
| Strategy Malfunction | Runaway trading, -100% | Daily loss limit hit | -5% daily limit |
| Market Manipulation | Exploited for -60% | Position size limits | -8% limited exposure |
Create src/risk_management/risk_manager.py:
import time
import logging
import threading
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
import pandas as pd
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class RiskAction(Enum):
CONTINUE = "continue"
REDUCE_POSITION = "reduce_position"
STOP_TRADING = "stop_trading"
EMERGENCY_EXIT = "emergency_exit"
@dataclass
class RiskLimit:
"""Risk limit configuration"""
name: str
limit_type: str # 'percentage', 'absolute', 'count'
threshold: float
action: RiskAction
enabled: bool = True
description: str = ""
@dataclass
class RiskViolation:
"""Risk violation record"""
timestamp: datetime
limit_name: str
current_value: float
threshold: float
severity: RiskLevel
action_taken: RiskAction
description: str
class RiskManager:
"""
Comprehensive risk management system for trading bots
Features:
- Multi-layer risk limits
- Real-time position monitoring
- Dynamic risk adjustment
- Emergency controls
- Risk violation tracking
"""
def __init__(self, trading_engine, initial_capital: float):
self.trading_engine = trading_engine
self.initial_capital = initial_capital
self.logger = logging.getLogger(__name__)
# Risk limits configuration
self.risk_limits: Dict[str, RiskLimit] = {}
self.risk_violations: List[RiskViolation] = []
# Risk monitoring state
self.monitoring_active = False
self.monitor_thread = None
# Performance tracking
self.daily_start_balance = initial_capital
self.peak_balance = initial_capital
self.max_daily_trades = 100
self.daily_trade_count = 0
# Emergency controls
self.emergency_stop_triggered = False
self.trading_suspended = False
# Risk callbacks
self.risk_callbacks: List[Callable[[RiskViolation], None]] = []
# Initialize default risk limits
self._setup_default_risk_limits()
def _setup_default_risk_limits(self):
"""Setup conservative default risk limits"""
# Portfolio-level limits
self.add_risk_limit(
name="max_drawdown",
limit_type="percentage",
threshold=15.0, # 15% max drawdown
action=RiskAction.STOP_TRADING,
description="Maximum portfolio drawdown from peak"
)
self.add_risk_limit(
name="daily_loss_limit",
limit_type="percentage",
threshold=5.0, # 5% daily loss
action=RiskAction.STOP_TRADING,
description="Maximum daily loss limit"
)
self.add_risk_limit(
name="position_concentration",
limit_type="percentage",
threshold=25.0, # 25% max in single position
action=RiskAction.REDUCE_POSITION,
description="Maximum position size relative to portfolio"
)
# Trading activity limits
self.add_risk_limit(
name="max_daily_trades",
limit_type="count",
threshold=50, # Max 50 trades per day
action=RiskAction.STOP_TRADING,
description="Maximum number of trades per day"
)
# Volatility-based limits
self.add_risk_limit(
name="volatility_spike",
limit_type="percentage",
threshold=50.0, # 50% price movement
action=RiskAction.EMERGENCY_EXIT,
description="Emergency exit on extreme volatility"
)
def add_risk_limit(self, name: str, limit_type: str, threshold: float,
action: RiskAction, description: str = ""):
"""Add or update a risk limit"""
self.risk_limits[name] = RiskLimit(
name=name,
limit_type=limit_type,
threshold=threshold,
action=action,
description=description
)
self.logger.info(f"π‘οΈ Risk limit added: {name} = {threshold}")
def start_monitoring(self):
"""Start risk monitoring"""
if self.monitoring_active:
return
self.monitoring_active = True
self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
self.monitor_thread.start()
self.logger.info("π‘οΈ Risk monitoring started")
def stop_monitoring(self):
"""Stop risk monitoring"""
if not self.monitoring_active:
return
self.monitoring_active = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5)
self.logger.info("π‘οΈ Risk monitoring stopped")
def _monitoring_loop(self):
"""Main risk monitoring loop"""
while self.monitoring_active:
try:
self._check_all_risk_limits()
time.sleep(5) # Check every 5 seconds
except Exception as e:
self.logger.error(f"Error in risk monitoring: {e}")
time.sleep(30)
def _check_all_risk_limits(self):
"""Check all risk limits"""
if self.emergency_stop_triggered:
return
portfolio = self.trading_engine.get_portfolio_summary()
current_balance = portfolio['total_value']
# Update peak balance
if current_balance > self.peak_balance:
self.peak_balance = current_balance
# Check each risk limit
for limit_name, risk_limit in self.risk_limits.items():
if not risk_limit.enabled:
continue
violation = self._check_risk_limit(risk_limit, portfolio)
if violation:
self._handle_risk_violation(violation)
def _check_risk_limit(self, risk_limit: RiskLimit, portfolio: Dict) -> Optional[RiskViolation]:
"""Check individual risk limit"""
current_balance = portfolio['total_value']
if risk_limit.name == "max_drawdown":
current_drawdown = ((self.peak_balance - current_balance) / self.peak_balance) * 100
if current_drawdown > risk_limit.threshold:
return RiskViolation(
timestamp=datetime.now(),
limit_name=risk_limit.name,
current_value=current_drawdown,
threshold=risk_limit.threshold,
severity=RiskLevel.CRITICAL,
action_taken=risk_limit.action,
description=f"Drawdown {current_drawdown:.2f}% exceeds limit {risk_limit.threshold:.2f}%"
)
elif risk_limit.name == "daily_loss_limit":
daily_return = ((current_balance - self.daily_start_balance) / self.daily_start_balance) * 100
if daily_return < -risk_limit.threshold:
return RiskViolation(
timestamp=datetime.now(),
limit_name=risk_limit.name,
current_value=daily_return,
threshold=-risk_limit.threshold,
severity=RiskLevel.CRITICAL,
action_taken=risk_limit.action,
description=f"Daily loss {daily_return:.2f}% exceeds limit {risk_limit.threshold:.2f}%"
)
elif risk_limit.name == "position_concentration":
max_position_pct = self._calculate_max_position_percentage(portfolio)
if max_position_pct > risk_limit.threshold:
return RiskViolation(
timestamp=datetime.now(),
limit_name=risk_limit.name,
current_value=max_position_pct,
threshold=risk_limit.threshold,
severity=RiskLevel.HIGH,
action_taken=risk_limit.action,
description=f"Position concentration {max_position_pct:.2f}% exceeds limit {risk_limit.threshold:.2f}%"
)
elif risk_limit.name == "max_daily_trades":
if self.daily_trade_count > risk_limit.threshold:
return RiskViolation(
timestamp=datetime.now(),
limit_name=risk_limit.name,
current_value=self.daily_trade_count,
threshold=risk_limit.threshold,
severity=RiskLevel.MEDIUM,
action_taken=risk_limit.action,
description=f"Daily trades {self.daily_trade_count} exceeds limit {risk_limit.threshold}"
)
return None
def _calculate_max_position_percentage(self, portfolio: Dict) -> float:
"""Calculate largest position as percentage of portfolio"""
# This would need to be implemented based on your position tracking
# For now, return a placeholder
return 0.0
def _handle_risk_violation(self, violation: RiskViolation):
"""Handle risk limit violation"""
self.risk_violations.append(violation)
# Log violation
log_level = logging.ERROR if violation.severity == RiskLevel.CRITICAL else logging.WARNING
self.logger.log(log_level, f"π¨ RISK VIOLATION: {violation.description}")
# Execute risk action
if violation.action_taken == RiskAction.STOP_TRADING:
self._stop_trading(violation)
elif violation.action_taken == RiskAction.EMERGENCY_EXIT:
self._emergency_exit(violation)
elif violation.action_taken == RiskAction.REDUCE_POSITION:
self._reduce_positions(violation)
# Notify callbacks
for callback in self.risk_callbacks:
try:
callback(violation)
except Exception as e:
self.logger.error(f"Risk callback error: {e}")
def _stop_trading(self, violation: RiskViolation):
"""Stop all trading activity"""
self.trading_suspended = True
# Cancel all pending orders
try:
open_orders = self.trading_engine.get_open_orders()
for order in open_orders:
self.trading_engine.cancel_order(order.order_id, order.symbol)
self.logger.critical(f"π TRADING STOPPED due to: {violation.description}")
except Exception as e:
self.logger.error(f"Error stopping trading: {e}")
def _emergency_exit(self, violation: RiskViolation):
"""Emergency exit all positions"""
self.emergency_stop_triggered = True
try:
# Cancel all orders first
open_orders = self.trading_engine.get_open_orders()
for order in open_orders:
self.trading_engine.cancel_order(order.order_id, order.symbol)
# Close all positions at market price
positions = self.trading_engine.positions
for symbol, position in positions.items():
if position.quantity != 0:
side = 'sell' if position.quantity > 0 else 'buy'
self.trading_engine.place_order(
symbol=symbol,
side=side,
order_type='market',
quantity=abs(position.quantity)
)
self.logger.critical(f"π¨ EMERGENCY EXIT triggered: {violation.description}")
except Exception as e:
self.logger.error(f"Error in emergency exit: {e}")
def _reduce_positions(self, violation: RiskViolation):
"""Reduce position sizes"""
try:
# Implementation would depend on specific position management
self.logger.warning(f"β οΈ Position reduction triggered: {violation.description}")
except Exception as e:
self.logger.error(f"Error reducing positions: {e}")
def add_risk_callback(self, callback: Callable[[RiskViolation], None]):
"""Add callback for risk violations"""
self.risk_callbacks.append(callback)
def get_risk_status(self) -> Dict:
"""Get current risk status"""
portfolio = self.trading_engine.get_portfolio_summary()
current_balance = portfolio['total_value']
current_drawdown = ((self.peak_balance - current_balance) / self.peak_balance) * 100
daily_return = ((current_balance - self.daily_start_balance) / self.daily_start_balance) * 100
recent_violations = [v for v in self.risk_violations
if (datetime.now() - v.timestamp).total_seconds() < 3600]
return {
'monitoring_active': self.monitoring_active,
'trading_suspended': self.trading_suspended,
'emergency_stop': self.emergency_stop_triggered,
'current_drawdown_pct': current_drawdown,
'daily_return_pct': daily_return,
'daily_trade_count': self.daily_trade_count,
'peak_balance': self.peak_balance,
'total_violations': len(self.risk_violations),
'recent_violations': len(recent_violations),
'risk_limits_count': len(self.risk_limits)
}
def reset_daily_counters(self):
"""Reset daily risk counters"""
portfolio = self.trading_engine.get_portfolio_summary()
self.daily_start_balance = portfolio['total_value']
self.daily_trade_count = 0
self.logger.info("π
Daily risk counters reset")
def manual_override_emergency_stop(self, reason: str):
"""Manual override of emergency stop (use with extreme caution)"""
self.emergency_stop_triggered = False
self.trading_suspended = False
self.logger.warning(f"β οΈ MANUAL OVERRIDE: Emergency stop disabled. Reason: {reason}")
def export_risk_report(self) -> str:
"""Export comprehensive risk report"""
risk_status = self.get_risk_status()
report = f"""
=== RISK MANAGEMENT REPORT ===
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
CURRENT STATUS
-------------
Monitoring Active: {risk_status['monitoring_active']}
Trading Suspended: {risk_status['trading_suspended']}
Emergency Stop: {risk_status['emergency_stop']}
PERFORMANCE METRICS
------------------
Current Drawdown: {risk_status['current_drawdown_pct']:.2f}%
Daily Return: {risk_status['daily_return_pct']:.2f}%
Peak Balance: ${risk_status['peak_balance']:,.2f}
Daily Trades: {risk_status['daily_trade_count']}
RISK LIMITS STATUS
-----------------
"""
for name, limit in self.risk_limits.items():
status = "β
ENABLED" if limit.enabled else "β DISABLED"
report += f"{name}: {limit.threshold} ({limit.limit_type}) - {status}\n"
report += f"""
VIOLATION HISTORY
----------------
Total Violations: {risk_status['total_violations']}
Recent Violations (1h): {risk_status['recent_violations']}
"""
if self.risk_violations:
report += "Recent Violations:\n"
recent_violations = sorted(self.risk_violations, key=lambda x: x.timestamp, reverse=True)[:5]
for violation in recent_violations:
report += f" {violation.timestamp.strftime('%Y-%m-%d %H:%M:%S')}: {violation.description}\n"
return report
Create src/risk_management/position_manager.py:
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
class PositionSizer:
"""
Advanced position sizing with risk-based allocation
Features:
- Kelly Criterion optimization
- Volatility-adjusted sizing
- Correlation-based limits
- Heat map monitoring
"""
def __init__(self, max_portfolio_risk: float = 0.02):
self.max_portfolio_risk = max_portfolio_risk # 2% max portfolio risk
self.logger = logging.getLogger(__name__)
# Position tracking
self.position_history: List[Dict] = []
self.volatility_cache: Dict[str, float] = {}
def calculate_position_size(self, symbol: str, entry_price: float,
stop_loss_price: float, portfolio_value: float,
win_rate: float = 0.6, avg_win_loss_ratio: float = 1.5) -> float:
"""
Calculate optimal position size using multiple methods
Args:
symbol: Trading pair
entry_price: Intended entry price
stop_loss_price: Stop loss price
portfolio_value: Current portfolio value
win_rate: Historical win rate (0-1)
avg_win_loss_ratio: Average win/loss ratio
Returns:
Recommended position size in base currency
"""
# Method 1: Fixed percentage risk
risk_per_share = abs(entry_price - stop_loss_price)
max_risk_amount = portfolio_value * self.max_portfolio_risk
fixed_risk_size = max_risk_amount / risk_per_share
# Method 2: Kelly Criterion
kelly_size = self._kelly_criterion_size(
portfolio_value, entry_price, stop_loss_price,
win_rate, avg_win_loss_ratio
)
# Method 3: Volatility-adjusted sizing
volatility = self._get_volatility(symbol)
vol_adjusted_size = self._volatility_adjusted_size(
portfolio_value, entry_price, volatility
)
# Method 4: Correlation-adjusted sizing
correlation_limit = self._correlation_adjusted_limit(symbol, portfolio_value)
# Take the minimum of all methods for conservative sizing
recommended_size = min(fixed_risk_size, kelly_size, vol_adjusted_size, correlation_limit)
self.logger.info(f"π Position sizing for {symbol}:")
self.logger.info(f" Fixed Risk: {fixed_risk_size:.6f}")
self.logger.info(f" Kelly Criterion: {kelly_size:.6f}")
self.logger.info(f" Volatility Adjusted: {vol_adjusted_size:.6f}")
self.logger.info(f" Correlation Limit: {correlation_limit:.6f}")
self.logger.info(f" Recommended: {recommended_size:.6f}")
return max(0, recommended_size)
def _kelly_criterion_size(self, portfolio_value: float, entry_price: float,
stop_loss_price: float, win_rate: float,
avg_win_loss_ratio: float) -> float:
"""Calculate position size using Kelly Criterion"""
# Kelly formula: f = (bp - q) / b
# where:
# f = fraction of capital to wager
# b = odds received (avg_win_loss_ratio)
# p = probability of winning (win_rate)
# q = probability of losing (1 - win_rate)
if win_rate <= 0 or avg_win_loss_ratio <= 0:
return 0
kelly_fraction = ((avg_win_loss_ratio * win_rate) - (1 - win_rate)) / avg_win_loss_ratio
# Cap Kelly at 25% for safety
kelly_fraction = min(kelly_fraction, 0.25)
kelly_fraction = max(kelly_fraction, 0)
# Convert to position size
risk_per_share = abs(entry_price - stop_loss_price)
kelly_risk_amount = portfolio_value * kelly_fraction
return kelly_risk_amount / risk_per_share if risk_per_share > 0 else 0
def _volatility_adjusted_size(self, portfolio_value: float,
entry_price: float, volatility: float) -> float:
"""Adjust position size based on asset volatility"""
if volatility <= 0:
return portfolio_value * 0.01 / entry_price # 1% default
# Base allocation inversely proportional to volatility
base_allocation = 0.05 # 5% base
volatility_factor = min(0.2 / volatility, 2.0) # Scale factor
adjusted_allocation = base_allocation * volatility_factor
adjusted_allocation = min(adjusted_allocation, 0.1) # Cap at 10%
return (portfolio_value * adjusted_allocation) / entry_price
def _correlation_adjusted_limit(self, symbol: str, portfolio_value: float) -> float:
"""Limit position size based on correlation with existing positions"""
# This is a simplified implementation
# In practice, you'd calculate actual correlations between assets
# For now, assume 5% max allocation per asset
max_allocation = 0.05
return portfolio_value * max_allocation / 100 # Placeholder conversion
def _get_volatility(self, symbol: str) -> float:
"""Get or calculate volatility for symbol"""
if symbol in self.volatility_cache:
return self.volatility_cache[symbol]
# Default volatility estimates for common crypto pairs
default_volatilities = {
'BTCUSDT': 0.04, # 4% daily volatility
'ETHUSDT': 0.05, # 5% daily volatility
'ADAUSDT': 0.08, # 8% daily volatility
'DOTUSDT': 0.10, # 10% daily volatility
}
volatility = default_volatilities.get(symbol, 0.06) # 6% default
self.volatility_cache[symbol] = volatility
return volatility
class PortfolioHeatMap:
"""
Portfolio heat map for risk visualization and monitoring
Tracks:
- Position concentrations
- Sector/correlation exposure
- Risk distribution
- Maximum loss scenarios
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def generate_risk_heatmap(self, positions: Dict, current_prices: Dict,
portfolio_value: float) -> Dict:
"""Generate comprehensive risk heat map"""
risk_map = {
'position_weights': {},
'risk_contributions': {},
'sector_exposure': {},
'worst_case_scenarios': {},
'diversification_metrics': {}
}
# Calculate position weights
for symbol, position in positions.items():
if position.quantity == 0:
continue
current_price = current_prices.get(symbol, position.avg_price)
position_value = abs(position.quantity) * current_price
weight = position_value / portfolio_value
risk_map['position_weights'][symbol] = weight
# Estimate risk contribution (simplified)
volatility = self._estimate_volatility(symbol)
risk_contribution = weight * volatility
risk_map['risk_contributions'][symbol] = risk_contribution
# Calculate diversification metrics
risk_map['diversification_metrics'] = self._calculate_diversification_metrics(
risk_map['position_weights']
)
# Worst case scenario analysis
risk_map['worst_case_scenarios'] = self._worst_case_analysis(
positions, current_prices, portfolio_value
)
return risk_map
def _estimate_volatility(self, symbol: str) -> float:
"""Estimate volatility for risk calculations"""
volatility_estimates = {
'BTCUSDT': 0.04,
'ETHUSDT': 0.05,
'ADAUSDT': 0.08,
'DOTUSDT': 0.10,
}
return volatility_estimates.get(symbol, 0.06)
def _calculate_diversification_metrics(self, weights: Dict) -> Dict:
"""Calculate portfolio diversification metrics"""
if not weights:
return {'herfindahl_index': 0, 'effective_positions': 0}
# Herfindahl-Hirschman Index (concentration measure)
hhi = sum(weight ** 2 for weight in weights.values())
# Effective number of positions
effective_positions = 1 / hhi if hhi > 0 else 0
return {
'herfindahl_index': hhi,
'effective_positions': effective_positions,
'concentration_risk': 'High' if hhi > 0.25 else 'Medium' if hhi > 0.1 else 'Low'
}
def _worst_case_analysis(self, positions: Dict, current_prices: Dict,
portfolio_value: float) -> Dict:
"""Analyze worst-case loss scenarios"""
scenarios = {
'flash_crash_20pct': 0,
'sector_crash_50pct': 0,
'single_asset_90pct': 0
}
for symbol, position in positions.items():
if position.quantity == 0:
continue
current_price = current_prices.get(symbol, position.avg_price)
position_value = abs(position.quantity) * current_price
# Flash crash: 20% drop across all assets
scenarios['flash_crash_20pct'] += position_value * 0.20
# Sector crash: 50% drop (simplified - all crypto)
scenarios['sector_crash_50pct'] += position_value * 0.50
# Single asset crash: 90% drop in largest position
scenarios['single_asset_90pct'] = max(
scenarios['single_asset_90pct'],
position_value * 0.90
)
# Convert to percentage of portfolio
for scenario, loss in scenarios.items():
scenarios[scenario] = (loss / portfolio_value) * 100
return scenarios
Create src/logging/trading_logger.py:
import os
import json
import logging
import gzip
import shutil
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from dataclasses import dataclass, asdict
import threading
import queue
import time
@dataclass
class TradeLog:
"""Structured trade log entry"""
timestamp: str
trade_id: str
symbol: str
side: str
quantity: float
price: float
fees: float
strategy: str
order_type: str
execution_time_ms: float
slippage: float
portfolio_value_before: float
portfolio_value_after: float
@dataclass
class PerformanceLog:
"""Performance metrics log entry"""
timestamp: str
portfolio_value: float
total_return_pct: float
daily_return_pct: float
max_drawdown_pct: float
sharpe_ratio: float
active_positions: int
cash_balance: float
unrealized_pnl: float
realized_pnl: float
@dataclass
class RiskLog:
"""Risk management log entry"""
timestamp: str
risk_level: str
event_type: str
description: str
current_value: float
threshold: float
action_taken: str
portfolio_impact: str
@dataclass
class SystemLog:
"""System health and errors log"""
timestamp: str
level: str
component: str
event: str
details: Dict
error_stack: Optional[str] = None
class TradingLogger:
"""
Comprehensive logging system for trading bots
Features:
- Structured logging (JSON format)
- Multiple log levels and types
- Automatic log rotation
- Compression and archiving
- Real-time log monitoring
- Compliance audit trails
"""
def __init__(self, log_directory: str = "data/logs"):
self.log_directory = log_directory
self.ensure_log_directory()
# Async logging queue
self.log_queue = queue.Queue()
self.logging_thread = None
self.logging_active = False
# Setup different loggers
self._setup_loggers()
# Performance tracking
self.log_stats = {
'trades_logged': 0,
'performance_logs': 0,
'risk_events': 0,
'system_errors': 0
}
def ensure_log_directory(self):
"""Ensure log directory structure exists"""
subdirs = ['trades', 'performance', 'risk', 'system', 'compliance', 'archived']
for subdir in subdirs:
path = os.path.join(self.log_directory, subdir)
os.makedirs(path, exist_ok=True)
def _setup_loggers(self):
"""Setup specialized loggers for different data types"""
# Trade logger - JSON format with rotation
self.trade_logger = self._create_json_logger(
'trades',
os.path.join(self.log_directory, 'trades', 'trades.jsonl'),
max_bytes=50*1024*1024 # 50MB
)
# Performance logger - Timed rotation
self.performance_logger = self._create_json_logger(
'performance',
os.path.join(self.log_directory, 'performance', 'performance.jsonl'),
when='midnight',
interval=1
)
# Risk logger - Immediate rotation for critical events
self.risk_logger = self._create_json_logger(
'risk',
os.path.join(self.log_directory, 'risk', 'risk.jsonl'),
max_bytes=10*1024*1024 # 10MB
)
# System logger - Standard Python logging
self.system_logger = logging.getLogger('system')
self.system_logger.setLevel(logging.INFO)
system_handler = RotatingFileHandler(
os.path.join(self.log_directory, 'system', 'system.log'),
maxBytes=20*1024*1024, # 20MB
backupCount=5
)
system_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
system_handler.setFormatter(system_formatter)
self.system_logger.addHandler(system_handler)
# Compliance logger - Long-term retention
self.compliance_logger = self._create_json_logger(
'compliance',
os.path.join(self.log_directory, 'compliance', 'compliance.jsonl'),
when='W0', # Weekly rotation
interval=1,
backup_count=52 # Keep 1 year
)
def _create_json_logger(self, name: str, filename: str, max_bytes: int = None,
when: str = None, interval: int = 1, backup_count: int = 10):
"""Create a JSON logger with rotation"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# Clear existing handlers
logger.handlers.clear()
if max_bytes:
handler = RotatingFileHandler(filename, maxBytes=max_bytes, backupCount=backup_count)
else:
handler = TimedRotatingFileHandler(filename, when=when, interval=interval, backupCount=backup_count)
# JSON formatter
formatter = JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def start_async_logging(self):
"""Start asynchronous logging thread"""
if self.logging_active:
return
self.logging_active = True
self.logging_thread = threading.Thread(target=self._async_logging_worker, daemon=True)
self.logging_thread.start()
self.system_logger.info("ποΈ Async logging started")
def stop_async_logging(self):
"""Stop asynchronous logging"""
if not self.logging_active:
return
self.logging_active = False
# Process remaining queue items
while not self.log_queue.empty():
time.sleep(0.1)
if self.logging_thread and self.logging_thread.is_alive():
self.logging_thread.join(timeout=5)
self.system_logger.info("ποΈ Async logging stopped")
def _async_logging_worker(self):
"""Async logging worker thread"""
while self.logging_active:
try:
log_entry = self.log_queue.get(timeout=1)
self._process_log_entry(log_entry)
except queue.Empty:
continue
except Exception as e:
print(f"Logging error: {e}") # Can't use logger here to avoid recursion
def _process_log_entry(self, log_entry: Dict):
"""Process a log entry"""
log_type = log_entry.get('log_type')
data = log_entry.get('data')
if log_type == 'trade':
self.trade_logger.info(json.dumps(asdict(data)))
self.log_stats['trades_logged'] += 1
elif log_type == 'performance':
self.performance_logger.info(json.dumps(asdict(data)))
self.log_stats['performance_logs'] += 1
elif log_type == 'risk':
self.risk_logger.info(json.dumps(asdict(data)))
self.log_stats['risk_events'] += 1
elif log_type == 'system':
self.system_logger.log(data.get('level', logging.INFO), data.get('message', ''))
self.log_stats['system_errors'] += 1
def log_trade(self, trade_data: Dict):
"""Log a trade execution"""
trade_log = TradeLog(
timestamp=datetime.now().isoformat(),
trade_id=trade_data.get('trade_id', ''),
symbol=trade_data.get('symbol', ''),
side=trade_data.get('side', ''),
quantity=trade_data.get('quantity', 0.0),
price=trade_data.get('price', 0.0),
fees=trade_data.get('fees', 0.0),
strategy=trade_data.get('strategy', ''),
order_type=trade_data.get('order_type', ''),
execution_time_ms=trade_data.get('execution_time_ms', 0.0),
slippage=trade_data.get('slippage', 0.0),
portfolio_value_before=trade_data.get('portfolio_value_before', 0.0),
portfolio_value_after=trade_data.get('portfolio_value_after', 0.0)
)
self.log_queue.put({
'log_type': 'trade',
'data': trade_log
})
# Also log to compliance for audit trail
self._log_compliance_event('trade_execution', asdict(trade_log))
def log_performance(self, portfolio_data: Dict):
"""Log performance metrics"""
perf_log = PerformanceLog(
timestamp=datetime.now().isoformat(),
portfolio_value=portfolio_data.get('total_value', 0.0),
total_return_pct=portfolio_data.get('total_return_pct', 0.0),
daily_return_pct=portfolio_data.get('daily_return_pct', 0.0),
max_drawdown_pct=portfolio_data.get('max_drawdown_pct', 0.0),
sharpe_ratio=portfolio_data.get('sharpe_ratio', 0.0),
active_positions=portfolio_data.get('num_positions', 0),
cash_balance=portfolio_data.get('cash_balance', 0.0),
unrealized_pnl=portfolio_data.get('unrealized_pnl', 0.0),
realized_pnl=portfolio_data.get('realized_pnl', 0.0)
)
self.log_queue.put({
'log_type': 'performance',
'data': perf_log
})
def log_risk_event(self, risk_data: Dict):
"""Log risk management events"""
risk_log = RiskLog(
timestamp=datetime.now().isoformat(),
risk_level=risk_data.get('severity', 'medium'),
event_type=risk_data.get('event_type', ''),
description=risk_data.get('description', ''),
current_value=risk_data.get('current_value', 0.0),
threshold=risk_data.get('threshold', 0.0),
action_taken=risk_data.get('action_taken', ''),
portfolio_impact=risk_data.get('portfolio_impact', '')
)
self.log_queue.put({
'log_type': 'risk',
'data': risk_log
})
# Critical risk events also go to compliance
if risk_data.get('severity') in ['critical', 'high']:
self._log_compliance_event('risk_violation', asdict(risk_log))
def log_system_event(self, level: int, component: str, event: str,
details: Dict, error_stack: str = None):
"""Log system events and errors"""
system_log = SystemLog(
timestamp=datetime.now().isoformat(),
level=logging.getLevelName(level),
component=component,
event=event,
details=details,
error_stack=error_stack
)
self.log_queue.put({
'log_type': 'system',
'data': {
'level': level,
'message': f"{component}: {event} - {json.dumps(details)}"
}
})
# Critical system errors also go to compliance
if level >= logging.ERROR:
self._log_compliance_event('system_error', asdict(system_log))
def _log_compliance_event(self, event_type: str, data: Dict):
"""Log compliance/audit events"""
compliance_entry = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'data': data,
'hash': self._calculate_log_hash(data)
}
self.compliance_logger.info(json.dumps(compliance_entry))
def _calculate_log_hash(self, data: Dict) -> str:
"""Calculate hash for log integrity"""
import hashlib
data_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(data_str.encode()).hexdigest()[:16]
def compress_old_logs(self, days_old: int = 30):
"""Compress logs older than specified days"""
cutoff_date = datetime.now() - timedelta(days=days_old)
for root, dirs, files in os.walk(self.log_directory):
for file in files:
if file.endswith(('.log', '.jsonl')):
file_path = os.path.join(root, file)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_date and not file.endswith('.gz'):
# Compress the file
with open(file_path, 'rb') as f_in:
with gzip.open(f"{file_path}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove original
os.remove(file_path)
self.system_logger.info(f"Compressed old log: {file}")
def get_log_statistics(self) -> Dict:
"""Get logging statistics"""
log_sizes = {}
for root, dirs, files in os.walk(self.log_directory):
for file in files:
file_path = os.path.join(root, file)
size_mb = os.path.getsize(file_path) / (1024 * 1024)
log_sizes[file] = size_mb
return {
'log_stats': self.log_stats,
'total_log_size_mb': sum(log_sizes.values()),
'log_files': log_sizes,
'queue_size': self.log_queue.qsize()
}
class JsonFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging"""
def format(self, record):
"""Format log record as JSON"""
log_entry = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage()
}
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry)
Create src/deployment/production_manager.py:
import os
import sys
import signal
import time
import psutil
import threading
import subprocess
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import logging
import json
import requests
class ProductionManager:
"""
Production deployment and monitoring manager
Features:
- Health monitoring
- Automatic restarts
- Resource monitoring
- Alert integration
- Backup management
- Update deployment
"""
def __init__(self, config_file: str = "config/production.json"):
self.config = self._load_config(config_file)
self.logger = logging.getLogger(__name__)
# Monitoring state
self.monitoring_active = False
self.monitor_thread = None
# Health metrics
self.health_metrics = {
'uptime_seconds': 0,
'cpu_usage_pct': 0.0,
'memory_usage_mb': 0.0,
'disk_usage_pct': 0.0,
'network_connections': 0,
'last_trade_time': None,
'api_response_time_ms': 0.0
}
# Alerts
self.alert_thresholds = {
'cpu_usage_pct': 80.0,
'memory_usage_mb': 8000.0,
'disk_usage_pct': 90.0,
'api_response_time_ms': 5000.0,
'no_trades_minutes': 30
}
# Process management
self.start_time = datetime.now()
self.restart_count = 0
self.max_restarts = 5
def _load_config(self, config_file: str) -> Dict:
"""Load production configuration"""
default_config = {
'monitoring_interval_seconds': 30,
'health_check_url': 'http://localhost:8080/health',
'alert_webhook_url': None,
'backup_interval_hours': 6,
'log_retention_days': 90,
'auto_restart_on_failure': True,
'resource_limits': {
'max_cpu_pct': 80,
'max_memory_mb': 8000,
'max_disk_pct': 90
}
}
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
loaded_config = json.load(f)
default_config.update(loaded_config)
except Exception as e:
print(f"Error loading config: {e}, using defaults")
return default_config
def start_monitoring(self):
"""Start production monitoring"""
if self.monitoring_active:
return
self.monitoring_active = True
self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
self.monitor_thread.start()
self.logger.info("π§ Production monitoring started")
def stop_monitoring(self):
"""Stop production monitoring"""
if not self.monitoring_active:
return
self.monitoring_active = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5)
self.logger.info("π§ Production monitoring stopped")
def _monitoring_loop(self):
"""Main monitoring loop"""
while self.monitoring_active:
try:
self._collect_health_metrics()
self._check_alert_conditions()
self._perform_health_check()
time.sleep(self.config['monitoring_interval_seconds'])
except Exception as e:
self.logger.error(f"Error in monitoring loop: {e}")
time.sleep(60)
def _collect_health_metrics(self):
"""Collect system health metrics"""
try:
# System metrics
self.health_metrics['uptime_seconds'] = (datetime.now() - self.start_time).total_seconds()
self.health_metrics['cpu_usage_pct'] = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
self.health_metrics['memory_usage_mb'] = memory.used / (1024 * 1024)
disk = psutil.disk_usage('/')
self.health_metrics['disk_usage_pct'] = (disk.used / disk.total) * 100
self.health_metrics['network_connections'] = len(psutil.net_connections())
# API response time
self.health_metrics['api_response_time_ms'] = self._measure_api_response_time()
except Exception as e:
self.logger.error(f"Error collecting health metrics: {e}")
def _measure_api_response_time(self) -> float:
"""Measure API response time"""
try:
start_time = time.time()
# Test Binance API response time
response = requests.get('https://api.binance.com/api/v3/ping', timeout=10)
if response.status_code == 200:
return (time.time() - start_time) * 1000 # Convert to ms
else:
return 9999.0 # Error indicator
except Exception:
return 9999.0 # Error indicator
def _check_alert_conditions(self):
"""Check for alert conditions"""
alerts = []
# Check CPU usage
if self.health_metrics['cpu_usage_pct'] > self.alert_thresholds['cpu_usage_pct']:
alerts.append({
'type': 'high_cpu_usage',
'message': f"High CPU usage: {self.health_metrics['cpu_usage_pct']:.1f}%",
'severity': 'warning'
})
# Check memory usage
if self.health_metrics['memory_usage_mb'] > self.alert_thresholds['memory_usage_mb']:
alerts.append({
'type': 'high_memory_usage',
'message': f"High memory usage: {self.health_metrics['memory_usage_mb']:.1f} MB",
'severity': 'warning'
})
# Check disk usage
if self.health_metrics['disk_usage_pct'] > self.alert_thresholds['disk_usage_pct']:
alerts.append({
'type': 'high_disk_usage',
'message': f"High disk usage: {self.health_metrics['disk_usage_pct']:.1f}%",
'severity': 'critical'
})
# Check API response time
if self.health_metrics['api_response_time_ms'] > self.alert_thresholds['api_response_time_ms']:
alerts.append({
'type': 'slow_api_response',
'message': f"Slow API response: {self.health_metrics['api_response_time_ms']:.1f} ms",
'severity': 'warning'
})
# Send alerts
for alert in alerts:
self._send_alert(alert)
def _perform_health_check(self):
"""Perform application health check"""
try:
health_url = self.config.get('health_check_url')
if health_url:
response = requests.get(health_url, timeout=10)
if response.status_code != 200:
self._send_alert({
'type': 'health_check_failed',
'message': f"Health check failed: HTTP {response.status_code}",
'severity': 'critical'
})
if self.config.get('auto_restart_on_failure', True):
self._attempt_restart()
except Exception as e:
self.logger.error(f"Health check error: {e}")
self._send_alert({
'type': 'health_check_error',
'message': f"Health check error: {str(e)}",
'severity': 'critical'
})
def _send_alert(self, alert: Dict):
"""Send alert notification"""
try:
webhook_url = self.config.get('alert_webhook_url')
if webhook_url:
alert_payload = {
'timestamp': datetime.now().isoformat(),
'hostname': os.uname().nodename,
'service': 'crypto_trading_bot',
**alert
}
requests.post(webhook_url, json=alert_payload, timeout=10)
# Also log the alert
log_level = logging.CRITICAL if alert['severity'] == 'critical' else logging.WARNING
self.logger.log(log_level, f"π¨ {alert['type']}: {alert['message']}")
except Exception as e:
self.logger.error(f"Error sending alert: {e}")
def _attempt_restart(self):
"""Attempt to restart the application"""
if self.restart_count >= self.max_restarts:
self.logger.critical(f"Max restart attempts ({self.max_restarts}) reached")
self._send_alert({
'type': 'max_restarts_reached',
'message': f"Maximum restart attempts reached: {self.restart_count}",
'severity': 'critical'
})
return
self.restart_count += 1
self.logger.warning(f"Attempting restart #{self.restart_count}")
try:
# Send restart signal to main process
os.kill(os.getpid(), signal.SIGUSR1)
except Exception as e:
self.logger.error(f"Error during restart: {e}")
def create_backup(self) -> bool:
"""Create backup of important data"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_dir = f"backups/backup_{timestamp}"
os.makedirs(backup_dir, exist_ok=True)
# Backup directories
backup_items = [
'data/logs',
'config',
'data/historical_data.db'
]
for item in backup_items:
if os.path.exists(item):
if os.path.isdir(item):
subprocess.run(['cp', '-r', item, backup_dir], check=True)
else:
subprocess.run(['cp', item, backup_dir], check=True)
# Compress backup
subprocess.run(['tar', '-czf', f"{backup_dir}.tar.gz", backup_dir], check=True)
subprocess.run(['rm', '-rf', backup_dir], check=True)
self.logger.info(f"π¦ Backup created: {backup_dir}.tar.gz")
return True
except Exception as e:
self.logger.error(f"Backup failed: {e}")
return False
def cleanup_old_backups(self, keep_days: int = 30):
"""Clean up old backup files"""
try:
backup_dir = 'backups'
if not os.path.exists(backup_dir):
return
cutoff_time = time.time() - (keep_days * 24 * 3600)
for filename in os.listdir(backup_dir):
if filename.endswith('.tar.gz'):
file_path = os.path.join(backup_dir, filename)
if os.path.getmtime(file_path) < cutoff_time:
os.remove(file_path)
self.logger.info(f"ποΈ Removed old backup: {filename}")
except Exception as e:
self.logger.error(f"Error cleaning up backups: {e}")
def get_system_status(self) -> Dict:
"""Get comprehensive system status"""
return {
'uptime_hours': self.health_metrics['uptime_seconds'] / 3600,
'health_metrics': self.health_metrics,
'alert_thresholds': self.alert_thresholds,
'restart_count': self.restart_count,
'monitoring_active': self.monitoring_active,
'config': self.config
}
def generate_status_report(self) -> str:
"""Generate detailed status report"""
status = self.get_system_status()
report = f"""
=== PRODUCTION STATUS REPORT ===
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
SYSTEM HEALTH
------------
Uptime: {status['uptime_hours']:.1f} hours
CPU Usage: {status['health_metrics']['cpu_usage_pct']:.1f}%
Memory Usage: {status['health_metrics']['memory_usage_mb']:.1f} MB
Disk Usage: {status['health_metrics']['disk_usage_pct']:.1f}%
Network Connections: {status['health_metrics']['network_connections']}
API Response Time: {status['health_metrics']['api_response_time_ms']:.1f} ms
OPERATIONAL STATUS
-----------------
Monitoring Active: {status['monitoring_active']}
Restart Count: {status['restart_count']}/{self.max_restarts}
Last Trade: {status['health_metrics']['last_trade_time'] or 'N/A'}
ALERT THRESHOLDS
---------------
Max CPU: {status['alert_thresholds']['cpu_usage_pct']:.1f}%
Max Memory: {status['alert_thresholds']['memory_usage_mb']:.1f} MB
Max Disk: {status['alert_thresholds']['disk_usage_pct']:.1f}%
Max API Response: {status['alert_thresholds']['api_response_time_ms']:.1f} ms
CONFIGURATION
------------
Monitoring Interval: {status['config']['monitoring_interval_seconds']} seconds
Auto Restart: {status['config']['auto_restart_on_failure']}
Backup Interval: {status['config']['backup_interval_hours']} hours
"""
return report
Create run_production_bot.py:
#!/usr/bin/env python3
"""
Production cryptocurrency trading bot
Integrates all components: trading, risk management, logging, monitoring
"""
import sys
import os
import signal
import time
import argparse
import logging
from datetime import datetime
from typing import Optional
# Add src to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
from data.realtime_data import BinanceWebSocketClient
from exchanges.binance_client import BinanceClient
from config.binance_config import BinanceConfig
from strategies.grid_trading import GridTradingStrategy, GridConfig
from risk_management.risk_manager import RiskManager
from logging.trading_logger import TradingLogger
from deployment.production_manager import ProductionManager
class ProductionTradingBot:
"""
Production-ready cryptocurrency trading bot
Features:
- Multi-strategy support
- Comprehensive risk management
- Professional logging
- Health monitoring
- Automatic recovery
"""
def __init__(self, config_file: str = None):
self.setup_logging()
self.logger = logging.getLogger(__name__)
# Configuration
self.config = BinanceConfig()
# Core components
self.client: Optional[BinanceClient] = None
self.ws_client: Optional[BinanceWebSocketClient] = None
self.strategy: Optional[GridTradingStrategy] = None
self.risk_manager: Optional[RiskManager] = None
self.trading_logger: Optional[TradingLogger] = None
self.production_manager: Optional[ProductionManager] = None
# Application state
self.running = False
self.shutdown_requested = False
# Performance tracking
self.start_time = datetime.now()
self.trade_count = 0
self.last_health_check = datetime.now()
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGUSR1, self._restart_signal_handler)
def setup_logging(self):
"""Setup basic logging before components initialize"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self.logger.info(f"Received signal {signum}. Initiating graceful shutdown...")
self.shutdown_requested = True
self.running = False
def _restart_signal_handler(self, signum, frame):
"""Handle restart signal from production manager"""
self.logger.info("Received restart signal from production manager")
self.shutdown_requested = True
self.running = False
def initialize_components(self) -> bool:
"""Initialize all bot components"""
try:
self.logger.info("π Initializing Production Trading Bot")
# Initialize trading logger first
self.trading_logger = TradingLogger()
self.trading_logger.start_async_logging()
# Initialize Binance client
self.client = BinanceClient(self.config)
# Test API connection
if not self.client.test_connection():
self.logger.error("β Failed to connect to Binance API")
return False
# Initialize WebSocket client
symbols = ['BTCUSDT'] # Add more symbols as needed
self.ws_client = BinanceWebSocketClient(symbols)
self.ws_client.add_tick_callback(self._on_market_tick)
# Initialize risk manager
initial_capital = self._get_initial_capital()
self.risk_manager = RiskManager(self.client, initial_capital)
self.risk_manager.add_risk_callback(self._on_risk_violation)
# Initialize trading strategy
grid_config = GridConfig(
symbol='BTCUSDT',
grid_spacing=0.005, # 0.5%
num_grids_up=10,
num_grids_down=10,
base_order_size=0.001,
max_position_size=0.1,
stop_loss_percentage=0.05
)
self.strategy = GridTradingStrategy(self.client, grid_config)
# Initialize production manager
self.production_manager = ProductionManager()
self.logger.info("β
All components initialized successfully")
return True
except Exception as e:
self.logger.error(f"β Component initialization failed: {e}")
return False
def _get_initial_capital(self) -> float:
"""Get initial capital from account balance"""
try:
account = self.client.get_account_info()
if account['success']:
usdt_balance = account['balances'].get('USDT', {}).get('free', 0)
return float(usdt_balance)
else:
self.logger.warning("Could not get account balance, using default capital")
return 10000.0 # Default
except Exception as e:
self.logger.error(f"Error getting initial capital: {e}")
return 10000.0
def start_bot(self) -> bool:
"""Start the trading bot"""
try:
self.logger.info("π― Starting Production Trading Bot")
# Start all components
self.ws_client.start()
# Wait for initial market data
self._wait_for_market_data()
# Start risk monitoring
self.risk_manager.start_monitoring()
# Start production monitoring
self.production_manager.start_monitoring()
# Initialize strategy with current price
current_price = self.ws_client.get_current_price('BTCUSDT')
if not self.strategy.initialize_strategy():
self.logger.error("β Failed to initialize trading strategy")
return False
# Log startup
self._log_startup_event()
self.running = True
self.logger.info("π Production trading bot started successfully!")
return True
except Exception as e:
self.logger.error(f"β Failed to start bot: {e}")
return False
def _wait_for_market_data(self, timeout: int = 30):
"""Wait for initial market data"""
self.logger.info("β³ Waiting for market data...")
start_time = time.time()
while time.time() - start_time < timeout:
current_price = self.ws_client.get_current_price('BTCUSDT')
if current_price:
self.logger.info(f"π Current BTC/USDT: ${current_price:,.2f}")
return
time.sleep(0.5)
raise Exception("Failed to receive market data within timeout")
def run_main_loop(self):
"""Main bot execution loop"""
last_status_update = time.time()
last_performance_log = time.time()
last_backup = time.time()
status_interval = 300 # 5 minutes
performance_interval = 60 # 1 minute
backup_interval = 6 * 3600 # 6 hours
try:
while self.running and not self.shutdown_requested:
current_time = time.time()
# Update strategy
try:
update_result = self.strategy.update_strategy()
if update_result['success'] and update_result['filled_orders'] > 0:
self.trade_count += update_result['filled_orders']
self._log_trade_activity(update_result)
except Exception as e:
self.logger.error(f"Strategy update error: {e}")
self.trading_logger.log_system_event(
logging.ERROR, 'strategy', 'update_failed', {'error': str(e)}
)
# Periodic status update
if current_time - last_status_update >= status_interval:
self._log_status_update()
last_status_update = current_time
# Performance logging
if current_time - last_performance_log >= performance_interval:
self._log_performance_metrics()
last_performance_log = current_time
# Backup creation
if current_time - last_backup >= backup_interval:
self.production_manager.create_backup()
last_backup = current_time
# Health check
self.last_health_check = datetime.now()
time.sleep(5) # Main loop interval
except KeyboardInterrupt:
self.logger.info("Keyboard interrupt received")
except Exception as e:
self.logger.error(f"Critical error in main loop: {e}")
self.trading_logger.log_system_event(
logging.CRITICAL, 'main_loop', 'critical_error',
{'error': str(e)}, str(e)
)
finally:
self.shutdown_bot()
def shutdown_bot(self):
"""Gracefully shutdown the bot"""
self.logger.info("π Shutting down production trading bot...")
try:
# Stop strategy first
if self.strategy:
self.strategy.stop_strategy()
# Stop monitoring
if self.risk_manager:
self.risk_manager.stop_monitoring()
if self.production_manager:
self.production_manager.stop_monitoring()
# Stop data feeds
if self.ws_client:
self.ws_client.stop()
# Stop logging
if self.trading_logger:
self.trading_logger.stop_async_logging()
# Generate final reports
self._generate_final_reports()
self.logger.info("β
Bot shutdown complete")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
def _on_market_tick(self, tick):
"""Handle market data updates"""
# Update risk manager with current prices
if self.risk_manager:
# This would need to be implemented based on your risk manager's interface
pass
def _on_risk_violation(self, violation):
"""Handle risk violations"""
self.trading_logger.log_risk_event({
'event_type': violation.limit_name,
'severity': violation.severity.value,
'description': violation.description,
'current_value': violation.current_value,
'threshold': violation.threshold,
'action_taken': violation.action_taken.value,
'portfolio_impact': 'high' if violation.severity.value == 'critical' else 'medium'
})
# Take action based on violation severity
if violation.action_taken.value == 'stop_trading':
self.logger.critical("π¨ Trading stopped due to risk violation")
self.strategy.stop_strategy()
elif violation.action_taken.value == 'emergency_exit':
self.logger.critical("π¨ Emergency exit triggered")
self.shutdown_requested = True
def _log_startup_event(self):
"""Log bot startup"""
startup_data = {
'timestamp': datetime.now().isoformat(),
'version': '1.0.0',
'config': {
'testnet': self.config.TESTNET,
'symbol': 'BTCUSDT',
'initial_capital': self._get_initial_capital()
}
}
self.trading_logger.log_system_event(
logging.INFO, 'startup', 'bot_started', startup_data
)
def _log_trade_activity(self, update_result):
"""Log trade activity"""
# This would need to be implemented based on your strategy's trade format
pass
def _log_status_update(self):
"""Log periodic status update"""
try:
# Get portfolio status
account = self.client.get_account_info()
strategy_status = self.strategy.get_status()
risk_status = self.risk_manager.get_risk_status()
system_status = self.production_manager.get_system_status()
self.logger.info("π === PRODUCTION STATUS UPDATE ===")
if account['success']:
usdt_balance = account['balances'].get('USDT', {}).get('free', 0)
self.logger.info(f"Portfolio Value: ${usdt_balance:,.2f}")
self.logger.info(f"Strategy Profit: ${strategy_status.get('total_profit', 0):.2f}")
self.logger.info(f"Active Orders: {strategy_status.get('active_orders', 0)}")
self.logger.info(f"Total Trades: {self.trade_count}")
self.logger.info(f"Risk Status: {risk_status.get('current_drawdown_pct', 0):.2f}% drawdown")
self.logger.info(f"System Health: CPU {system_status['health_metrics']['cpu_usage_pct']:.1f}%, Mem {system_status['health_metrics']['memory_usage_mb']:.1f}MB")
self.logger.info("=====================================")
except Exception as e:
self.logger.error(f"Error in status update: {e}")
def _log_performance_metrics(self):
"""Log performance metrics"""
try:
account = self.client.get_account_info()
if account['success']:
portfolio_data = {
'total_value': account['balances'].get('USDT', {}).get('free', 0),
'total_return_pct': 0, # Would need to calculate this
'daily_return_pct': 0, # Would need to calculate this
'max_drawdown_pct': 0, # Would need to track this
'sharpe_ratio': 0, # Would need to calculate this
'num_positions': len([b for b in account['balances'].values() if b.get('free', 0) > 0]),
'cash_balance': account['balances'].get('USDT', {}).get('free', 0),
'unrealized_pnl': 0, # Would need to calculate this
'realized_pnl': 0 # Would need to track this
}
self.trading_logger.log_performance(portfolio_data)
except Exception as e:
self.logger.error(f"Error logging performance: {e}")
def _generate_final_reports(self):
"""Generate final reports"""
try:
# Runtime summary
runtime = datetime.now() - self.start_time
final_report = f"""
=== FINAL TRADING SESSION REPORT ===
SESSION SUMMARY
--------------
Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}
End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Total Runtime: {runtime}
Total Trades: {self.trade_count}
PERFORMANCE SUMMARY
------------------
{self.strategy.get_status() if self.strategy else 'Strategy not available'}
RISK SUMMARY
-----------
{self.risk_manager.export_risk_report() if self.risk_manager else 'Risk manager not available'}
SYSTEM SUMMARY
-------------
{self.production_manager.generate_status_report() if self.production_manager else 'Production manager not available'}
"""
# Save final report
with open(f"data/final_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", 'w') as f:
f.write(final_report)
print(final_report)
except Exception as e:
self.logger.error(f"Error generating final reports: {e}")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='Production Cryptocurrency Trading Bot')
parser.add_argument('--config', help='Configuration file path')
parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode')
args = parser.parse_args()
print("π Starting Production Cryptocurrency Trading Bot")
print("=" * 60)
# Create bot instance
bot = ProductionTradingBot(args.config)
# Initialize components
if not bot.initialize_components():
print("β Failed to initialize bot components")
sys.exit(1)
# Start bot
if not bot.start_bot():
print("β Failed to start trading bot")
sys.exit(1)
print("β
Production bot started successfully!")
print("π‘ Press Ctrl+C to stop gracefully")
print("π Monitor logs for real-time updates")
# Run main loop
bot.run_main_loop()
print("β
Trading bot session completed")
if __name__ == "__main__":
main()
| Alert Type | Trigger Condition | Response Time | Action Required |
|---|---|---|---|
| Critical Risk | Drawdown >15% or emergency stop | Immediate | Manual intervention required |
| System Health | CPU >80%, Memory >8GB | 5 minutes | Check resource usage |
| API Issues | Response time >5s or failures | 2 minutes | Verify connectivity |
| Trading Halt | No trades for 30+ minutes | 30 minutes | Check strategy status |
Daily:
Weekly:
Monthly:
Ensure your trading bot complies with relevant regulations:
Maintain comprehensive audit trails:
This comprehensive series has taken you from basic concepts to a production-ready cryptocurrency trading bot. The key components of a professional system include:
With proper implementation, you can expect:
Remember: Trading bots are tools that amplify both good and bad decisions. The quality of your strategy, risk management, and operational discipline determine long-term success. Stay disciplined, keep learning, and always prioritize capital preservation over profit maximization.
This concludes our comprehensive guide to building professional cryptocurrency trading bots. You now have all the components necessary for safe, profitable, and scalable automated trading systems.
Nikolai Fischer is the founder of Kommune3 (since 2007) and a leading expert in Drupal development and tech entrepreneurship. With 17+ years of experience, he has led hundreds of projects and achieved #1 on Hacker News. As host of the "Kommit mich" podcast and founder of skillution, he combines technical expertise with entrepreneurial thinking. His articles about Supabase, modern web development, and systematic problem-solving have influenced thousands of developers worldwide.
Comments