Skip to main content
NikoFischer.com

Main navigation

  • Home
  • About
    • My Reading List
    • Recommended Youtube Channels
    • Life Rules
    • Podcast
  • 50-Day Challenge
  • Impressum
Sprachumschalter
  • English

Breadcrumb

  1. Home

Risk Management and Logging Systems: Production-Ready Crypto Trading Bots

🎸
πŸš€ Beta Running

PYNGUP: Rebellion against toxic productivity

Beta limited to 100 spots. Tasks become social commitments instead of lonely to-dos.

πŸš€ Join Beta πŸ“– Read Story "€487 wasted"

Build comprehensive risk management and logging systems for cryptocurrency trading bots. This final guide in our series covers advanced risk controls, audit trails, compliance logging, production deployment, and operational monitoring to ensure your bot operates safely and transparently in live markets.

Why Risk Management is Critical

Risk management is the difference between sustainable profits and catastrophic losses. Even the most profitable strategy can destroy an account without proper risk controls.

The Cost of Poor Risk Management

Risk Scenario Without Controls With Risk Management Outcome
Flash Crash -85% portfolio loss -12% max drawdown stop 73% capital preserved
API Failure Stuck positions, -45% Emergency close all -3% controlled exit
Strategy Malfunction Runaway trading, -100% Daily loss limit hit -5% daily limit
Market Manipulation Exploited for -60% Position size limits -8% limited exposure
🚨 Real Case Study: In May 2022, a grid trading bot without proper risk management lost 90% of its capital during the Terra Luna collapse in just 48 hours. A similar bot with 15% max drawdown limits automatically stopped trading and preserved 85% of capital, resuming profitable operations once markets stabilized.

Comprehensive Risk Management Framework

Multi-Layer Risk Architecture

Create src/risk_management/risk_manager.py:

import time
import logging
import threading
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
import pandas as pd

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class RiskAction(Enum):
    CONTINUE = "continue"
    REDUCE_POSITION = "reduce_position"
    STOP_TRADING = "stop_trading"
    EMERGENCY_EXIT = "emergency_exit"

@dataclass
class RiskLimit:
    """Risk limit configuration"""
    name: str
    limit_type: str  # 'percentage', 'absolute', 'count'
    threshold: float
    action: RiskAction
    enabled: bool = True
    description: str = ""

@dataclass
class RiskViolation:
    """Risk violation record"""
    timestamp: datetime
    limit_name: str
    current_value: float
    threshold: float
    severity: RiskLevel
    action_taken: RiskAction
    description: str

class RiskManager:
    """
    Comprehensive risk management system for trading bots
    
    Features:
    - Multi-layer risk limits
    - Real-time position monitoring
    - Dynamic risk adjustment
    - Emergency controls
    - Risk violation tracking
    """
    
    def __init__(self, trading_engine, initial_capital: float):
        self.trading_engine = trading_engine
        self.initial_capital = initial_capital
        self.logger = logging.getLogger(__name__)
        
        # Risk limits configuration
        self.risk_limits: Dict[str, RiskLimit] = {}
        self.risk_violations: List[RiskViolation] = []
        
        # Risk monitoring state
        self.monitoring_active = False
        self.monitor_thread = None
        
        # Performance tracking
        self.daily_start_balance = initial_capital
        self.peak_balance = initial_capital
        self.max_daily_trades = 100
        self.daily_trade_count = 0
        
        # Emergency controls
        self.emergency_stop_triggered = False
        self.trading_suspended = False
        
        # Risk callbacks
        self.risk_callbacks: List[Callable[[RiskViolation], None]] = []
        
        # Initialize default risk limits
        self._setup_default_risk_limits()
    
    def _setup_default_risk_limits(self):
        """Setup conservative default risk limits"""
        
        # Portfolio-level limits
        self.add_risk_limit(
            name="max_drawdown",
            limit_type="percentage",
            threshold=15.0,  # 15% max drawdown
            action=RiskAction.STOP_TRADING,
            description="Maximum portfolio drawdown from peak"
        )
        
        self.add_risk_limit(
            name="daily_loss_limit",
            limit_type="percentage", 
            threshold=5.0,  # 5% daily loss
            action=RiskAction.STOP_TRADING,
            description="Maximum daily loss limit"
        )
        
        self.add_risk_limit(
            name="position_concentration",
            limit_type="percentage",
            threshold=25.0,  # 25% max in single position
            action=RiskAction.REDUCE_POSITION,
            description="Maximum position size relative to portfolio"
        )
        
        # Trading activity limits
        self.add_risk_limit(
            name="max_daily_trades",
            limit_type="count",
            threshold=50,  # Max 50 trades per day
            action=RiskAction.STOP_TRADING,
            description="Maximum number of trades per day"
        )
        
        # Volatility-based limits
        self.add_risk_limit(
            name="volatility_spike",
            limit_type="percentage",
            threshold=50.0,  # 50% price movement
            action=RiskAction.EMERGENCY_EXIT,
            description="Emergency exit on extreme volatility"
        )
    
    def add_risk_limit(self, name: str, limit_type: str, threshold: float,
                      action: RiskAction, description: str = ""):
        """Add or update a risk limit"""
        self.risk_limits[name] = RiskLimit(
            name=name,
            limit_type=limit_type,
            threshold=threshold,
            action=action,
            description=description
        )
        
        self.logger.info(f"πŸ›‘οΈ Risk limit added: {name} = {threshold}")
    
    def start_monitoring(self):
        """Start risk monitoring"""
        if self.monitoring_active:
            return
        
        self.monitoring_active = True
        self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
        self.monitor_thread.start()
        
        self.logger.info("πŸ›‘οΈ Risk monitoring started")
    
    def stop_monitoring(self):
        """Stop risk monitoring"""
        if not self.monitoring_active:
            return
        
        self.monitoring_active = False
        
        if self.monitor_thread and self.monitor_thread.is_alive():
            self.monitor_thread.join(timeout=5)
        
        self.logger.info("πŸ›‘οΈ Risk monitoring stopped")
    
    def _monitoring_loop(self):
        """Main risk monitoring loop"""
        while self.monitoring_active:
            try:
                self._check_all_risk_limits()
                time.sleep(5)  # Check every 5 seconds
                
            except Exception as e:
                self.logger.error(f"Error in risk monitoring: {e}")
                time.sleep(30)
    
    def _check_all_risk_limits(self):
        """Check all risk limits"""
        if self.emergency_stop_triggered:
            return
        
        portfolio = self.trading_engine.get_portfolio_summary()
        current_balance = portfolio['total_value']
        
        # Update peak balance
        if current_balance > self.peak_balance:
            self.peak_balance = current_balance
        
        # Check each risk limit
        for limit_name, risk_limit in self.risk_limits.items():
            if not risk_limit.enabled:
                continue
            
            violation = self._check_risk_limit(risk_limit, portfolio)
            if violation:
                self._handle_risk_violation(violation)
    
    def _check_risk_limit(self, risk_limit: RiskLimit, portfolio: Dict) -> Optional[RiskViolation]:
        """Check individual risk limit"""
        current_balance = portfolio['total_value']
        
        if risk_limit.name == "max_drawdown":
            current_drawdown = ((self.peak_balance - current_balance) / self.peak_balance) * 100
            if current_drawdown > risk_limit.threshold:
                return RiskViolation(
                    timestamp=datetime.now(),
                    limit_name=risk_limit.name,
                    current_value=current_drawdown,
                    threshold=risk_limit.threshold,
                    severity=RiskLevel.CRITICAL,
                    action_taken=risk_limit.action,
                    description=f"Drawdown {current_drawdown:.2f}% exceeds limit {risk_limit.threshold:.2f}%"
                )
        
        elif risk_limit.name == "daily_loss_limit":
            daily_return = ((current_balance - self.daily_start_balance) / self.daily_start_balance) * 100
            if daily_return < -risk_limit.threshold:
                return RiskViolation(
                    timestamp=datetime.now(),
                    limit_name=risk_limit.name,
                    current_value=daily_return,
                    threshold=-risk_limit.threshold,
                    severity=RiskLevel.CRITICAL,
                    action_taken=risk_limit.action,
                    description=f"Daily loss {daily_return:.2f}% exceeds limit {risk_limit.threshold:.2f}%"
                )
        
        elif risk_limit.name == "position_concentration":
            max_position_pct = self._calculate_max_position_percentage(portfolio)
            if max_position_pct > risk_limit.threshold:
                return RiskViolation(
                    timestamp=datetime.now(),
                    limit_name=risk_limit.name,
                    current_value=max_position_pct,
                    threshold=risk_limit.threshold,
                    severity=RiskLevel.HIGH,
                    action_taken=risk_limit.action,
                    description=f"Position concentration {max_position_pct:.2f}% exceeds limit {risk_limit.threshold:.2f}%"
                )
        
        elif risk_limit.name == "max_daily_trades":
            if self.daily_trade_count > risk_limit.threshold:
                return RiskViolation(
                    timestamp=datetime.now(),
                    limit_name=risk_limit.name,
                    current_value=self.daily_trade_count,
                    threshold=risk_limit.threshold,
                    severity=RiskLevel.MEDIUM,
                    action_taken=risk_limit.action,
                    description=f"Daily trades {self.daily_trade_count} exceeds limit {risk_limit.threshold}"
                )
        
        return None
    
    def _calculate_max_position_percentage(self, portfolio: Dict) -> float:
        """Calculate largest position as percentage of portfolio"""
        # This would need to be implemented based on your position tracking
        # For now, return a placeholder
        return 0.0
    
    def _handle_risk_violation(self, violation: RiskViolation):
        """Handle risk limit violation"""
        self.risk_violations.append(violation)
        
        # Log violation
        log_level = logging.ERROR if violation.severity == RiskLevel.CRITICAL else logging.WARNING
        self.logger.log(log_level, f"🚨 RISK VIOLATION: {violation.description}")
        
        # Execute risk action
        if violation.action_taken == RiskAction.STOP_TRADING:
            self._stop_trading(violation)
        elif violation.action_taken == RiskAction.EMERGENCY_EXIT:
            self._emergency_exit(violation)
        elif violation.action_taken == RiskAction.REDUCE_POSITION:
            self._reduce_positions(violation)
        
        # Notify callbacks
        for callback in self.risk_callbacks:
            try:
                callback(violation)
            except Exception as e:
                self.logger.error(f"Risk callback error: {e}")
    
    def _stop_trading(self, violation: RiskViolation):
        """Stop all trading activity"""
        self.trading_suspended = True
        
        # Cancel all pending orders
        try:
            open_orders = self.trading_engine.get_open_orders()
            for order in open_orders:
                self.trading_engine.cancel_order(order.order_id, order.symbol)
            
            self.logger.critical(f"πŸ›‘ TRADING STOPPED due to: {violation.description}")
            
        except Exception as e:
            self.logger.error(f"Error stopping trading: {e}")
    
    def _emergency_exit(self, violation: RiskViolation):
        """Emergency exit all positions"""
        self.emergency_stop_triggered = True
        
        try:
            # Cancel all orders first
            open_orders = self.trading_engine.get_open_orders()
            for order in open_orders:
                self.trading_engine.cancel_order(order.order_id, order.symbol)
            
            # Close all positions at market price
            positions = self.trading_engine.positions
            for symbol, position in positions.items():
                if position.quantity != 0:
                    side = 'sell' if position.quantity > 0 else 'buy'
                    self.trading_engine.place_order(
                        symbol=symbol,
                        side=side,
                        order_type='market',
                        quantity=abs(position.quantity)
                    )
            
            self.logger.critical(f"🚨 EMERGENCY EXIT triggered: {violation.description}")
            
        except Exception as e:
            self.logger.error(f"Error in emergency exit: {e}")
    
    def _reduce_positions(self, violation: RiskViolation):
        """Reduce position sizes"""
        try:
            # Implementation would depend on specific position management
            self.logger.warning(f"⚠️ Position reduction triggered: {violation.description}")
            
        except Exception as e:
            self.logger.error(f"Error reducing positions: {e}")
    
    def add_risk_callback(self, callback: Callable[[RiskViolation], None]):
        """Add callback for risk violations"""
        self.risk_callbacks.append(callback)
    
    def get_risk_status(self) -> Dict:
        """Get current risk status"""
        portfolio = self.trading_engine.get_portfolio_summary()
        current_balance = portfolio['total_value']
        
        current_drawdown = ((self.peak_balance - current_balance) / self.peak_balance) * 100
        daily_return = ((current_balance - self.daily_start_balance) / self.daily_start_balance) * 100
        
        recent_violations = [v for v in self.risk_violations 
                           if (datetime.now() - v.timestamp).total_seconds() < 3600]
        
        return {
            'monitoring_active': self.monitoring_active,
            'trading_suspended': self.trading_suspended,
            'emergency_stop': self.emergency_stop_triggered,
            'current_drawdown_pct': current_drawdown,
            'daily_return_pct': daily_return,
            'daily_trade_count': self.daily_trade_count,
            'peak_balance': self.peak_balance,
            'total_violations': len(self.risk_violations),
            'recent_violations': len(recent_violations),
            'risk_limits_count': len(self.risk_limits)
        }
    
    def reset_daily_counters(self):
        """Reset daily risk counters"""
        portfolio = self.trading_engine.get_portfolio_summary()
        self.daily_start_balance = portfolio['total_value']
        self.daily_trade_count = 0
        
        self.logger.info("πŸŒ… Daily risk counters reset")
    
    def manual_override_emergency_stop(self, reason: str):
        """Manual override of emergency stop (use with extreme caution)"""
        self.emergency_stop_triggered = False
        self.trading_suspended = False
        
        self.logger.warning(f"⚠️ MANUAL OVERRIDE: Emergency stop disabled. Reason: {reason}")
    
    def export_risk_report(self) -> str:
        """Export comprehensive risk report"""
        risk_status = self.get_risk_status()
        
        report = f"""
=== RISK MANAGEMENT REPORT ===
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

CURRENT STATUS
-------------
Monitoring Active: {risk_status['monitoring_active']}
Trading Suspended: {risk_status['trading_suspended']}
Emergency Stop: {risk_status['emergency_stop']}

PERFORMANCE METRICS
------------------
Current Drawdown: {risk_status['current_drawdown_pct']:.2f}%
Daily Return: {risk_status['daily_return_pct']:.2f}%
Peak Balance: ${risk_status['peak_balance']:,.2f}
Daily Trades: {risk_status['daily_trade_count']}

RISK LIMITS STATUS
-----------------
"""
        
        for name, limit in self.risk_limits.items():
            status = "βœ… ENABLED" if limit.enabled else "❌ DISABLED"
            report += f"{name}: {limit.threshold} ({limit.limit_type}) - {status}\n"
        
        report += f"""
VIOLATION HISTORY
----------------
Total Violations: {risk_status['total_violations']}
Recent Violations (1h): {risk_status['recent_violations']}

"""
        
        if self.risk_violations:
            report += "Recent Violations:\n"
            recent_violations = sorted(self.risk_violations, key=lambda x: x.timestamp, reverse=True)[:5]
            for violation in recent_violations:
                report += f"  {violation.timestamp.strftime('%Y-%m-%d %H:%M:%S')}: {violation.description}\n"
        
        return report

Advanced Position Management

Dynamic Position Sizing

Create src/risk_management/position_manager.py:

import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging

class PositionSizer:
    """
    Advanced position sizing with risk-based allocation
    
    Features:
    - Kelly Criterion optimization
    - Volatility-adjusted sizing
    - Correlation-based limits
    - Heat map monitoring
    """
    
    def __init__(self, max_portfolio_risk: float = 0.02):
        self.max_portfolio_risk = max_portfolio_risk  # 2% max portfolio risk
        self.logger = logging.getLogger(__name__)
        
        # Position tracking
        self.position_history: List[Dict] = []
        self.volatility_cache: Dict[str, float] = {}
        
    def calculate_position_size(self, symbol: str, entry_price: float,
                               stop_loss_price: float, portfolio_value: float,
                               win_rate: float = 0.6, avg_win_loss_ratio: float = 1.5) -> float:
        """
        Calculate optimal position size using multiple methods
        
        Args:
            symbol: Trading pair
            entry_price: Intended entry price
            stop_loss_price: Stop loss price
            portfolio_value: Current portfolio value
            win_rate: Historical win rate (0-1)
            avg_win_loss_ratio: Average win/loss ratio
        
        Returns:
            Recommended position size in base currency
        """
        
        # Method 1: Fixed percentage risk
        risk_per_share = abs(entry_price - stop_loss_price)
        max_risk_amount = portfolio_value * self.max_portfolio_risk
        fixed_risk_size = max_risk_amount / risk_per_share
        
        # Method 2: Kelly Criterion
        kelly_size = self._kelly_criterion_size(
            portfolio_value, entry_price, stop_loss_price,
            win_rate, avg_win_loss_ratio
        )
        
        # Method 3: Volatility-adjusted sizing
        volatility = self._get_volatility(symbol)
        vol_adjusted_size = self._volatility_adjusted_size(
            portfolio_value, entry_price, volatility
        )
        
        # Method 4: Correlation-adjusted sizing
        correlation_limit = self._correlation_adjusted_limit(symbol, portfolio_value)
        
        # Take the minimum of all methods for conservative sizing
        recommended_size = min(fixed_risk_size, kelly_size, vol_adjusted_size, correlation_limit)
        
        self.logger.info(f"πŸ“ Position sizing for {symbol}:")
        self.logger.info(f"   Fixed Risk: {fixed_risk_size:.6f}")
        self.logger.info(f"   Kelly Criterion: {kelly_size:.6f}")
        self.logger.info(f"   Volatility Adjusted: {vol_adjusted_size:.6f}")
        self.logger.info(f"   Correlation Limit: {correlation_limit:.6f}")
        self.logger.info(f"   Recommended: {recommended_size:.6f}")
        
        return max(0, recommended_size)
    
    def _kelly_criterion_size(self, portfolio_value: float, entry_price: float,
                             stop_loss_price: float, win_rate: float,
                             avg_win_loss_ratio: float) -> float:
        """Calculate position size using Kelly Criterion"""
        
        # Kelly formula: f = (bp - q) / b
        # where:
        # f = fraction of capital to wager
        # b = odds received (avg_win_loss_ratio)
        # p = probability of winning (win_rate)
        # q = probability of losing (1 - win_rate)
        
        if win_rate <= 0 or avg_win_loss_ratio <= 0:
            return 0
        
        kelly_fraction = ((avg_win_loss_ratio * win_rate) - (1 - win_rate)) / avg_win_loss_ratio
        
        # Cap Kelly at 25% for safety
        kelly_fraction = min(kelly_fraction, 0.25)
        kelly_fraction = max(kelly_fraction, 0)
        
        # Convert to position size
        risk_per_share = abs(entry_price - stop_loss_price)
        kelly_risk_amount = portfolio_value * kelly_fraction
        
        return kelly_risk_amount / risk_per_share if risk_per_share > 0 else 0
    
    def _volatility_adjusted_size(self, portfolio_value: float, 
                                 entry_price: float, volatility: float) -> float:
        """Adjust position size based on asset volatility"""
        
        if volatility <= 0:
            return portfolio_value * 0.01 / entry_price  # 1% default
        
        # Base allocation inversely proportional to volatility
        base_allocation = 0.05  # 5% base
        volatility_factor = min(0.2 / volatility, 2.0)  # Scale factor
        
        adjusted_allocation = base_allocation * volatility_factor
        adjusted_allocation = min(adjusted_allocation, 0.1)  # Cap at 10%
        
        return (portfolio_value * adjusted_allocation) / entry_price
    
    def _correlation_adjusted_limit(self, symbol: str, portfolio_value: float) -> float:
        """Limit position size based on correlation with existing positions"""
        
        # This is a simplified implementation
        # In practice, you'd calculate actual correlations between assets
        
        # For now, assume 5% max allocation per asset
        max_allocation = 0.05
        
        return portfolio_value * max_allocation / 100  # Placeholder conversion
    
    def _get_volatility(self, symbol: str) -> float:
        """Get or calculate volatility for symbol"""
        
        if symbol in self.volatility_cache:
            return self.volatility_cache[symbol]
        
        # Default volatility estimates for common crypto pairs
        default_volatilities = {
            'BTCUSDT': 0.04,  # 4% daily volatility
            'ETHUSDT': 0.05,  # 5% daily volatility
            'ADAUSDT': 0.08,  # 8% daily volatility
            'DOTUSDT': 0.10,  # 10% daily volatility
        }
        
        volatility = default_volatilities.get(symbol, 0.06)  # 6% default
        self.volatility_cache[symbol] = volatility
        
        return volatility

class PortfolioHeatMap:
    """
    Portfolio heat map for risk visualization and monitoring
    
    Tracks:
    - Position concentrations
    - Sector/correlation exposure
    - Risk distribution
    - Maximum loss scenarios
    """
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
    def generate_risk_heatmap(self, positions: Dict, current_prices: Dict,
                             portfolio_value: float) -> Dict:
        """Generate comprehensive risk heat map"""
        
        risk_map = {
            'position_weights': {},
            'risk_contributions': {},
            'sector_exposure': {},
            'worst_case_scenarios': {},
            'diversification_metrics': {}
        }
        
        # Calculate position weights
        for symbol, position in positions.items():
            if position.quantity == 0:
                continue
                
            current_price = current_prices.get(symbol, position.avg_price)
            position_value = abs(position.quantity) * current_price
            weight = position_value / portfolio_value
            
            risk_map['position_weights'][symbol] = weight
            
            # Estimate risk contribution (simplified)
            volatility = self._estimate_volatility(symbol)
            risk_contribution = weight * volatility
            risk_map['risk_contributions'][symbol] = risk_contribution
        
        # Calculate diversification metrics
        risk_map['diversification_metrics'] = self._calculate_diversification_metrics(
            risk_map['position_weights']
        )
        
        # Worst case scenario analysis
        risk_map['worst_case_scenarios'] = self._worst_case_analysis(
            positions, current_prices, portfolio_value
        )
        
        return risk_map
    
    def _estimate_volatility(self, symbol: str) -> float:
        """Estimate volatility for risk calculations"""
        volatility_estimates = {
            'BTCUSDT': 0.04,
            'ETHUSDT': 0.05,
            'ADAUSDT': 0.08,
            'DOTUSDT': 0.10,
        }
        return volatility_estimates.get(symbol, 0.06)
    
    def _calculate_diversification_metrics(self, weights: Dict) -> Dict:
        """Calculate portfolio diversification metrics"""
        
        if not weights:
            return {'herfindahl_index': 0, 'effective_positions': 0}
        
        # Herfindahl-Hirschman Index (concentration measure)
        hhi = sum(weight ** 2 for weight in weights.values())
        
        # Effective number of positions
        effective_positions = 1 / hhi if hhi > 0 else 0
        
        return {
            'herfindahl_index': hhi,
            'effective_positions': effective_positions,
            'concentration_risk': 'High' if hhi > 0.25 else 'Medium' if hhi > 0.1 else 'Low'
        }
    
    def _worst_case_analysis(self, positions: Dict, current_prices: Dict,
                           portfolio_value: float) -> Dict:
        """Analyze worst-case loss scenarios"""
        
        scenarios = {
            'flash_crash_20pct': 0,
            'sector_crash_50pct': 0,
            'single_asset_90pct': 0
        }
        
        for symbol, position in positions.items():
            if position.quantity == 0:
                continue
            
            current_price = current_prices.get(symbol, position.avg_price)
            position_value = abs(position.quantity) * current_price
            
            # Flash crash: 20% drop across all assets
            scenarios['flash_crash_20pct'] += position_value * 0.20
            
            # Sector crash: 50% drop (simplified - all crypto)
            scenarios['sector_crash_50pct'] += position_value * 0.50
            
            # Single asset crash: 90% drop in largest position
            scenarios['single_asset_90pct'] = max(
                scenarios['single_asset_90pct'],
                position_value * 0.90
            )
        
        # Convert to percentage of portfolio
        for scenario, loss in scenarios.items():
            scenarios[scenario] = (loss / portfolio_value) * 100
        
        return scenarios

Comprehensive Logging System

Multi-Level Logging Architecture

Create src/logging/trading_logger.py:

import os
import json
import logging
import gzip
import shutil
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from dataclasses import dataclass, asdict
import threading
import queue
import time

@dataclass
class TradeLog:
    """Structured trade log entry"""
    timestamp: str
    trade_id: str
    symbol: str
    side: str
    quantity: float
    price: float
    fees: float
    strategy: str
    order_type: str
    execution_time_ms: float
    slippage: float
    portfolio_value_before: float
    portfolio_value_after: float

@dataclass
class PerformanceLog:
    """Performance metrics log entry"""
    timestamp: str
    portfolio_value: float
    total_return_pct: float
    daily_return_pct: float
    max_drawdown_pct: float
    sharpe_ratio: float
    active_positions: int
    cash_balance: float
    unrealized_pnl: float
    realized_pnl: float

@dataclass
class RiskLog:
    """Risk management log entry"""
    timestamp: str
    risk_level: str
    event_type: str
    description: str
    current_value: float
    threshold: float
    action_taken: str
    portfolio_impact: str

@dataclass
class SystemLog:
    """System health and errors log"""
    timestamp: str
    level: str
    component: str
    event: str
    details: Dict
    error_stack: Optional[str] = None

class TradingLogger:
    """
    Comprehensive logging system for trading bots
    
    Features:
    - Structured logging (JSON format)
    - Multiple log levels and types
    - Automatic log rotation
    - Compression and archiving
    - Real-time log monitoring
    - Compliance audit trails
    """
    
    def __init__(self, log_directory: str = "data/logs"):
        self.log_directory = log_directory
        self.ensure_log_directory()
        
        # Async logging queue
        self.log_queue = queue.Queue()
        self.logging_thread = None
        self.logging_active = False
        
        # Setup different loggers
        self._setup_loggers()
        
        # Performance tracking
        self.log_stats = {
            'trades_logged': 0,
            'performance_logs': 0,
            'risk_events': 0,
            'system_errors': 0
        }
    
    def ensure_log_directory(self):
        """Ensure log directory structure exists"""
        subdirs = ['trades', 'performance', 'risk', 'system', 'compliance', 'archived']
        
        for subdir in subdirs:
            path = os.path.join(self.log_directory, subdir)
            os.makedirs(path, exist_ok=True)
    
    def _setup_loggers(self):
        """Setup specialized loggers for different data types"""
        
        # Trade logger - JSON format with rotation
        self.trade_logger = self._create_json_logger(
            'trades',
            os.path.join(self.log_directory, 'trades', 'trades.jsonl'),
            max_bytes=50*1024*1024  # 50MB
        )
        
        # Performance logger - Timed rotation
        self.performance_logger = self._create_json_logger(
            'performance',
            os.path.join(self.log_directory, 'performance', 'performance.jsonl'),
            when='midnight',
            interval=1
        )
        
        # Risk logger - Immediate rotation for critical events
        self.risk_logger = self._create_json_logger(
            'risk',
            os.path.join(self.log_directory, 'risk', 'risk.jsonl'),
            max_bytes=10*1024*1024  # 10MB
        )
        
        # System logger - Standard Python logging
        self.system_logger = logging.getLogger('system')
        self.system_logger.setLevel(logging.INFO)
        
        system_handler = RotatingFileHandler(
            os.path.join(self.log_directory, 'system', 'system.log'),
            maxBytes=20*1024*1024,  # 20MB
            backupCount=5
        )
        
        system_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        system_handler.setFormatter(system_formatter)
        self.system_logger.addHandler(system_handler)
        
        # Compliance logger - Long-term retention
        self.compliance_logger = self._create_json_logger(
            'compliance',
            os.path.join(self.log_directory, 'compliance', 'compliance.jsonl'),
            when='W0',  # Weekly rotation
            interval=1,
            backup_count=52  # Keep 1 year
        )
    
    def _create_json_logger(self, name: str, filename: str, max_bytes: int = None,
                           when: str = None, interval: int = 1, backup_count: int = 10):
        """Create a JSON logger with rotation"""
        
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        
        # Clear existing handlers
        logger.handlers.clear()
        
        if max_bytes:
            handler = RotatingFileHandler(filename, maxBytes=max_bytes, backupCount=backup_count)
        else:
            handler = TimedRotatingFileHandler(filename, when=when, interval=interval, backupCount=backup_count)
        
        # JSON formatter
        formatter = JsonFormatter()
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def start_async_logging(self):
        """Start asynchronous logging thread"""
        if self.logging_active:
            return
        
        self.logging_active = True
        self.logging_thread = threading.Thread(target=self._async_logging_worker, daemon=True)
        self.logging_thread.start()
        
        self.system_logger.info("πŸ—‚οΈ Async logging started")
    
    def stop_async_logging(self):
        """Stop asynchronous logging"""
        if not self.logging_active:
            return
        
        self.logging_active = False
        
        # Process remaining queue items
        while not self.log_queue.empty():
            time.sleep(0.1)
        
        if self.logging_thread and self.logging_thread.is_alive():
            self.logging_thread.join(timeout=5)
        
        self.system_logger.info("πŸ—‚οΈ Async logging stopped")
    
    def _async_logging_worker(self):
        """Async logging worker thread"""
        while self.logging_active:
            try:
                log_entry = self.log_queue.get(timeout=1)
                self._process_log_entry(log_entry)
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Logging error: {e}")  # Can't use logger here to avoid recursion
    
    def _process_log_entry(self, log_entry: Dict):
        """Process a log entry"""
        log_type = log_entry.get('log_type')
        data = log_entry.get('data')
        
        if log_type == 'trade':
            self.trade_logger.info(json.dumps(asdict(data)))
            self.log_stats['trades_logged'] += 1
            
        elif log_type == 'performance':
            self.performance_logger.info(json.dumps(asdict(data)))
            self.log_stats['performance_logs'] += 1
            
        elif log_type == 'risk':
            self.risk_logger.info(json.dumps(asdict(data)))
            self.log_stats['risk_events'] += 1
            
        elif log_type == 'system':
            self.system_logger.log(data.get('level', logging.INFO), data.get('message', ''))
            self.log_stats['system_errors'] += 1
    
    def log_trade(self, trade_data: Dict):
        """Log a trade execution"""
        trade_log = TradeLog(
            timestamp=datetime.now().isoformat(),
            trade_id=trade_data.get('trade_id', ''),
            symbol=trade_data.get('symbol', ''),
            side=trade_data.get('side', ''),
            quantity=trade_data.get('quantity', 0.0),
            price=trade_data.get('price', 0.0),
            fees=trade_data.get('fees', 0.0),
            strategy=trade_data.get('strategy', ''),
            order_type=trade_data.get('order_type', ''),
            execution_time_ms=trade_data.get('execution_time_ms', 0.0),
            slippage=trade_data.get('slippage', 0.0),
            portfolio_value_before=trade_data.get('portfolio_value_before', 0.0),
            portfolio_value_after=trade_data.get('portfolio_value_after', 0.0)
        )
        
        self.log_queue.put({
            'log_type': 'trade',
            'data': trade_log
        })
        
        # Also log to compliance for audit trail
        self._log_compliance_event('trade_execution', asdict(trade_log))
    
    def log_performance(self, portfolio_data: Dict):
        """Log performance metrics"""
        perf_log = PerformanceLog(
            timestamp=datetime.now().isoformat(),
            portfolio_value=portfolio_data.get('total_value', 0.0),
            total_return_pct=portfolio_data.get('total_return_pct', 0.0),
            daily_return_pct=portfolio_data.get('daily_return_pct', 0.0),
            max_drawdown_pct=portfolio_data.get('max_drawdown_pct', 0.0),
            sharpe_ratio=portfolio_data.get('sharpe_ratio', 0.0),
            active_positions=portfolio_data.get('num_positions', 0),
            cash_balance=portfolio_data.get('cash_balance', 0.0),
            unrealized_pnl=portfolio_data.get('unrealized_pnl', 0.0),
            realized_pnl=portfolio_data.get('realized_pnl', 0.0)
        )
        
        self.log_queue.put({
            'log_type': 'performance',
            'data': perf_log
        })
    
    def log_risk_event(self, risk_data: Dict):
        """Log risk management events"""
        risk_log = RiskLog(
            timestamp=datetime.now().isoformat(),
            risk_level=risk_data.get('severity', 'medium'),
            event_type=risk_data.get('event_type', ''),
            description=risk_data.get('description', ''),
            current_value=risk_data.get('current_value', 0.0),
            threshold=risk_data.get('threshold', 0.0),
            action_taken=risk_data.get('action_taken', ''),
            portfolio_impact=risk_data.get('portfolio_impact', '')
        )
        
        self.log_queue.put({
            'log_type': 'risk',
            'data': risk_log
        })
        
        # Critical risk events also go to compliance
        if risk_data.get('severity') in ['critical', 'high']:
            self._log_compliance_event('risk_violation', asdict(risk_log))
    
    def log_system_event(self, level: int, component: str, event: str, 
                        details: Dict, error_stack: str = None):
        """Log system events and errors"""
        system_log = SystemLog(
            timestamp=datetime.now().isoformat(),
            level=logging.getLevelName(level),
            component=component,
            event=event,
            details=details,
            error_stack=error_stack
        )
        
        self.log_queue.put({
            'log_type': 'system',
            'data': {
                'level': level,
                'message': f"{component}: {event} - {json.dumps(details)}"
            }
        })
        
        # Critical system errors also go to compliance
        if level >= logging.ERROR:
            self._log_compliance_event('system_error', asdict(system_log))
    
    def _log_compliance_event(self, event_type: str, data: Dict):
        """Log compliance/audit events"""
        compliance_entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type,
            'data': data,
            'hash': self._calculate_log_hash(data)
        }
        
        self.compliance_logger.info(json.dumps(compliance_entry))
    
    def _calculate_log_hash(self, data: Dict) -> str:
        """Calculate hash for log integrity"""
        import hashlib
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.sha256(data_str.encode()).hexdigest()[:16]
    
    def compress_old_logs(self, days_old: int = 30):
        """Compress logs older than specified days"""
        cutoff_date = datetime.now() - timedelta(days=days_old)
        
        for root, dirs, files in os.walk(self.log_directory):
            for file in files:
                if file.endswith(('.log', '.jsonl')):
                    file_path = os.path.join(root, file)
                    file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
                    
                    if file_time < cutoff_date and not file.endswith('.gz'):
                        # Compress the file
                        with open(file_path, 'rb') as f_in:
                            with gzip.open(f"{file_path}.gz", 'wb') as f_out:
                                shutil.copyfileobj(f_in, f_out)
                        
                        # Remove original
                        os.remove(file_path)
                        self.system_logger.info(f"Compressed old log: {file}")
    
    def get_log_statistics(self) -> Dict:
        """Get logging statistics"""
        log_sizes = {}
        
        for root, dirs, files in os.walk(self.log_directory):
            for file in files:
                file_path = os.path.join(root, file)
                size_mb = os.path.getsize(file_path) / (1024 * 1024)
                log_sizes[file] = size_mb
        
        return {
            'log_stats': self.log_stats,
            'total_log_size_mb': sum(log_sizes.values()),
            'log_files': log_sizes,
            'queue_size': self.log_queue.qsize()
        }

class JsonFormatter(logging.Formatter):
    """Custom JSON formatter for structured logging"""
    
    def format(self, record):
        """Format log record as JSON"""
        log_entry = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage()
        }
        
        # Add exception info if present
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        return json.dumps(log_entry)

Production Deployment Framework

Infrastructure and Monitoring

Create src/deployment/production_manager.py:

import os
import sys
import signal
import time
import psutil
import threading
import subprocess
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import logging
import json
import requests

class ProductionManager:
    """
    Production deployment and monitoring manager
    
    Features:
    - Health monitoring
    - Automatic restarts
    - Resource monitoring
    - Alert integration
    - Backup management
    - Update deployment
    """
    
    def __init__(self, config_file: str = "config/production.json"):
        self.config = self._load_config(config_file)
        self.logger = logging.getLogger(__name__)
        
        # Monitoring state
        self.monitoring_active = False
        self.monitor_thread = None
        
        # Health metrics
        self.health_metrics = {
            'uptime_seconds': 0,
            'cpu_usage_pct': 0.0,
            'memory_usage_mb': 0.0,
            'disk_usage_pct': 0.0,
            'network_connections': 0,
            'last_trade_time': None,
            'api_response_time_ms': 0.0
        }
        
        # Alerts
        self.alert_thresholds = {
            'cpu_usage_pct': 80.0,
            'memory_usage_mb': 8000.0,
            'disk_usage_pct': 90.0,
            'api_response_time_ms': 5000.0,
            'no_trades_minutes': 30
        }
        
        # Process management
        self.start_time = datetime.now()
        self.restart_count = 0
        self.max_restarts = 5
    
    def _load_config(self, config_file: str) -> Dict:
        """Load production configuration"""
        default_config = {
            'monitoring_interval_seconds': 30,
            'health_check_url': 'http://localhost:8080/health',
            'alert_webhook_url': None,
            'backup_interval_hours': 6,
            'log_retention_days': 90,
            'auto_restart_on_failure': True,
            'resource_limits': {
                'max_cpu_pct': 80,
                'max_memory_mb': 8000,
                'max_disk_pct': 90
            }
        }
        
        if os.path.exists(config_file):
            try:
                with open(config_file, 'r') as f:
                    loaded_config = json.load(f)
                default_config.update(loaded_config)
            except Exception as e:
                print(f"Error loading config: {e}, using defaults")
        
        return default_config
    
    def start_monitoring(self):
        """Start production monitoring"""
        if self.monitoring_active:
            return
        
        self.monitoring_active = True
        self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
        self.monitor_thread.start()
        
        self.logger.info("πŸ”§ Production monitoring started")
    
    def stop_monitoring(self):
        """Stop production monitoring"""
        if not self.monitoring_active:
            return
        
        self.monitoring_active = False
        
        if self.monitor_thread and self.monitor_thread.is_alive():
            self.monitor_thread.join(timeout=5)
        
        self.logger.info("πŸ”§ Production monitoring stopped")
    
    def _monitoring_loop(self):
        """Main monitoring loop"""
        while self.monitoring_active:
            try:
                self._collect_health_metrics()
                self._check_alert_conditions()
                self._perform_health_check()
                
                time.sleep(self.config['monitoring_interval_seconds'])
                
            except Exception as e:
                self.logger.error(f"Error in monitoring loop: {e}")
                time.sleep(60)
    
    def _collect_health_metrics(self):
        """Collect system health metrics"""
        try:
            # System metrics
            self.health_metrics['uptime_seconds'] = (datetime.now() - self.start_time).total_seconds()
            self.health_metrics['cpu_usage_pct'] = psutil.cpu_percent(interval=1)
            
            memory = psutil.virtual_memory()
            self.health_metrics['memory_usage_mb'] = memory.used / (1024 * 1024)
            
            disk = psutil.disk_usage('/')
            self.health_metrics['disk_usage_pct'] = (disk.used / disk.total) * 100
            
            self.health_metrics['network_connections'] = len(psutil.net_connections())
            
            # API response time
            self.health_metrics['api_response_time_ms'] = self._measure_api_response_time()
            
        except Exception as e:
            self.logger.error(f"Error collecting health metrics: {e}")
    
    def _measure_api_response_time(self) -> float:
        """Measure API response time"""
        try:
            start_time = time.time()
            
            # Test Binance API response time
            response = requests.get('https://api.binance.com/api/v3/ping', timeout=10)
            
            if response.status_code == 200:
                return (time.time() - start_time) * 1000  # Convert to ms
            else:
                return 9999.0  # Error indicator
                
        except Exception:
            return 9999.0  # Error indicator
    
    def _check_alert_conditions(self):
        """Check for alert conditions"""
        alerts = []
        
        # Check CPU usage
        if self.health_metrics['cpu_usage_pct'] > self.alert_thresholds['cpu_usage_pct']:
            alerts.append({
                'type': 'high_cpu_usage',
                'message': f"High CPU usage: {self.health_metrics['cpu_usage_pct']:.1f}%",
                'severity': 'warning'
            })
        
        # Check memory usage
        if self.health_metrics['memory_usage_mb'] > self.alert_thresholds['memory_usage_mb']:
            alerts.append({
                'type': 'high_memory_usage',
                'message': f"High memory usage: {self.health_metrics['memory_usage_mb']:.1f} MB",
                'severity': 'warning'
            })
        
        # Check disk usage
        if self.health_metrics['disk_usage_pct'] > self.alert_thresholds['disk_usage_pct']:
            alerts.append({
                'type': 'high_disk_usage',
                'message': f"High disk usage: {self.health_metrics['disk_usage_pct']:.1f}%",
                'severity': 'critical'
            })
        
        # Check API response time
        if self.health_metrics['api_response_time_ms'] > self.alert_thresholds['api_response_time_ms']:
            alerts.append({
                'type': 'slow_api_response',
                'message': f"Slow API response: {self.health_metrics['api_response_time_ms']:.1f} ms",
                'severity': 'warning'
            })
        
        # Send alerts
        for alert in alerts:
            self._send_alert(alert)
    
    def _perform_health_check(self):
        """Perform application health check"""
        try:
            health_url = self.config.get('health_check_url')
            if health_url:
                response = requests.get(health_url, timeout=10)
                
                if response.status_code != 200:
                    self._send_alert({
                        'type': 'health_check_failed',
                        'message': f"Health check failed: HTTP {response.status_code}",
                        'severity': 'critical'
                    })
                    
                    if self.config.get('auto_restart_on_failure', True):
                        self._attempt_restart()
        
        except Exception as e:
            self.logger.error(f"Health check error: {e}")
            
            self._send_alert({
                'type': 'health_check_error',
                'message': f"Health check error: {str(e)}",
                'severity': 'critical'
            })
    
    def _send_alert(self, alert: Dict):
        """Send alert notification"""
        try:
            webhook_url = self.config.get('alert_webhook_url')
            if webhook_url:
                alert_payload = {
                    'timestamp': datetime.now().isoformat(),
                    'hostname': os.uname().nodename,
                    'service': 'crypto_trading_bot',
                    **alert
                }
                
                requests.post(webhook_url, json=alert_payload, timeout=10)
            
            # Also log the alert
            log_level = logging.CRITICAL if alert['severity'] == 'critical' else logging.WARNING
            self.logger.log(log_level, f"🚨 {alert['type']}: {alert['message']}")
            
        except Exception as e:
            self.logger.error(f"Error sending alert: {e}")
    
    def _attempt_restart(self):
        """Attempt to restart the application"""
        if self.restart_count >= self.max_restarts:
            self.logger.critical(f"Max restart attempts ({self.max_restarts}) reached")
            self._send_alert({
                'type': 'max_restarts_reached',
                'message': f"Maximum restart attempts reached: {self.restart_count}",
                'severity': 'critical'
            })
            return
        
        self.restart_count += 1
        self.logger.warning(f"Attempting restart #{self.restart_count}")
        
        try:
            # Send restart signal to main process
            os.kill(os.getpid(), signal.SIGUSR1)
            
        except Exception as e:
            self.logger.error(f"Error during restart: {e}")
    
    def create_backup(self) -> bool:
        """Create backup of important data"""
        try:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            backup_dir = f"backups/backup_{timestamp}"
            
            os.makedirs(backup_dir, exist_ok=True)
            
            # Backup directories
            backup_items = [
                'data/logs',
                'config',
                'data/historical_data.db'
            ]
            
            for item in backup_items:
                if os.path.exists(item):
                    if os.path.isdir(item):
                        subprocess.run(['cp', '-r', item, backup_dir], check=True)
                    else:
                        subprocess.run(['cp', item, backup_dir], check=True)
            
            # Compress backup
            subprocess.run(['tar', '-czf', f"{backup_dir}.tar.gz", backup_dir], check=True)
            subprocess.run(['rm', '-rf', backup_dir], check=True)
            
            self.logger.info(f"πŸ“¦ Backup created: {backup_dir}.tar.gz")
            return True
            
        except Exception as e:
            self.logger.error(f"Backup failed: {e}")
            return False
    
    def cleanup_old_backups(self, keep_days: int = 30):
        """Clean up old backup files"""
        try:
            backup_dir = 'backups'
            if not os.path.exists(backup_dir):
                return
            
            cutoff_time = time.time() - (keep_days * 24 * 3600)
            
            for filename in os.listdir(backup_dir):
                if filename.endswith('.tar.gz'):
                    file_path = os.path.join(backup_dir, filename)
                    if os.path.getmtime(file_path) < cutoff_time:
                        os.remove(file_path)
                        self.logger.info(f"πŸ—‘οΈ Removed old backup: {filename}")
        
        except Exception as e:
            self.logger.error(f"Error cleaning up backups: {e}")
    
    def get_system_status(self) -> Dict:
        """Get comprehensive system status"""
        return {
            'uptime_hours': self.health_metrics['uptime_seconds'] / 3600,
            'health_metrics': self.health_metrics,
            'alert_thresholds': self.alert_thresholds,
            'restart_count': self.restart_count,
            'monitoring_active': self.monitoring_active,
            'config': self.config
        }
    
    def generate_status_report(self) -> str:
        """Generate detailed status report"""
        status = self.get_system_status()
        
        report = f"""
=== PRODUCTION STATUS REPORT ===
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

SYSTEM HEALTH
------------
Uptime: {status['uptime_hours']:.1f} hours
CPU Usage: {status['health_metrics']['cpu_usage_pct']:.1f}%
Memory Usage: {status['health_metrics']['memory_usage_mb']:.1f} MB
Disk Usage: {status['health_metrics']['disk_usage_pct']:.1f}%
Network Connections: {status['health_metrics']['network_connections']}
API Response Time: {status['health_metrics']['api_response_time_ms']:.1f} ms

OPERATIONAL STATUS
-----------------
Monitoring Active: {status['monitoring_active']}
Restart Count: {status['restart_count']}/{self.max_restarts}
Last Trade: {status['health_metrics']['last_trade_time'] or 'N/A'}

ALERT THRESHOLDS
---------------
Max CPU: {status['alert_thresholds']['cpu_usage_pct']:.1f}%
Max Memory: {status['alert_thresholds']['memory_usage_mb']:.1f} MB
Max Disk: {status['alert_thresholds']['disk_usage_pct']:.1f}%
Max API Response: {status['alert_thresholds']['api_response_time_ms']:.1f} ms

CONFIGURATION
------------
Monitoring Interval: {status['config']['monitoring_interval_seconds']} seconds
Auto Restart: {status['config']['auto_restart_on_failure']}
Backup Interval: {status['config']['backup_interval_hours']} hours
        """
        
        return report

Complete Production Bot

Main Production Application

Create run_production_bot.py:

#!/usr/bin/env python3
"""
Production cryptocurrency trading bot
Integrates all components: trading, risk management, logging, monitoring
"""

import sys
import os
import signal
import time
import argparse
import logging
from datetime import datetime
from typing import Optional

# Add src to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))

from data.realtime_data import BinanceWebSocketClient
from exchanges.binance_client import BinanceClient
from config.binance_config import BinanceConfig
from strategies.grid_trading import GridTradingStrategy, GridConfig
from risk_management.risk_manager import RiskManager
from logging.trading_logger import TradingLogger
from deployment.production_manager import ProductionManager

class ProductionTradingBot:
    """
    Production-ready cryptocurrency trading bot
    
    Features:
    - Multi-strategy support
    - Comprehensive risk management
    - Professional logging
    - Health monitoring
    - Automatic recovery
    """
    
    def __init__(self, config_file: str = None):
        self.setup_logging()
        self.logger = logging.getLogger(__name__)
        
        # Configuration
        self.config = BinanceConfig()
        
        # Core components
        self.client: Optional[BinanceClient] = None
        self.ws_client: Optional[BinanceWebSocketClient] = None
        self.strategy: Optional[GridTradingStrategy] = None
        self.risk_manager: Optional[RiskManager] = None
        self.trading_logger: Optional[TradingLogger] = None
        self.production_manager: Optional[ProductionManager] = None
        
        # Application state
        self.running = False
        self.shutdown_requested = False
        
        # Performance tracking
        self.start_time = datetime.now()
        self.trade_count = 0
        self.last_health_check = datetime.now()
        
        # Setup signal handlers
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGUSR1, self._restart_signal_handler)
    
    def setup_logging(self):
        """Setup basic logging before components initialize"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler(sys.stdout)
            ]
        )
    
    def _signal_handler(self, signum, frame):
        """Handle shutdown signals"""
        self.logger.info(f"Received signal {signum}. Initiating graceful shutdown...")
        self.shutdown_requested = True
        self.running = False
    
    def _restart_signal_handler(self, signum, frame):
        """Handle restart signal from production manager"""
        self.logger.info("Received restart signal from production manager")
        self.shutdown_requested = True
        self.running = False
    
    def initialize_components(self) -> bool:
        """Initialize all bot components"""
        try:
            self.logger.info("πŸš€ Initializing Production Trading Bot")
            
            # Initialize trading logger first
            self.trading_logger = TradingLogger()
            self.trading_logger.start_async_logging()
            
            # Initialize Binance client
            self.client = BinanceClient(self.config)
            
            # Test API connection
            if not self.client.test_connection():
                self.logger.error("❌ Failed to connect to Binance API")
                return False
            
            # Initialize WebSocket client
            symbols = ['BTCUSDT']  # Add more symbols as needed
            self.ws_client = BinanceWebSocketClient(symbols)
            self.ws_client.add_tick_callback(self._on_market_tick)
            
            # Initialize risk manager
            initial_capital = self._get_initial_capital()
            self.risk_manager = RiskManager(self.client, initial_capital)
            self.risk_manager.add_risk_callback(self._on_risk_violation)
            
            # Initialize trading strategy
            grid_config = GridConfig(
                symbol='BTCUSDT',
                grid_spacing=0.005,  # 0.5%
                num_grids_up=10,
                num_grids_down=10,
                base_order_size=0.001,
                max_position_size=0.1,
                stop_loss_percentage=0.05
            )
            
            self.strategy = GridTradingStrategy(self.client, grid_config)
            
            # Initialize production manager
            self.production_manager = ProductionManager()
            
            self.logger.info("βœ… All components initialized successfully")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ Component initialization failed: {e}")
            return False
    
    def _get_initial_capital(self) -> float:
        """Get initial capital from account balance"""
        try:
            account = self.client.get_account_info()
            if account['success']:
                usdt_balance = account['balances'].get('USDT', {}).get('free', 0)
                return float(usdt_balance)
            else:
                self.logger.warning("Could not get account balance, using default capital")
                return 10000.0  # Default
                
        except Exception as e:
            self.logger.error(f"Error getting initial capital: {e}")
            return 10000.0
    
    def start_bot(self) -> bool:
        """Start the trading bot"""
        try:
            self.logger.info("🎯 Starting Production Trading Bot")
            
            # Start all components
            self.ws_client.start()
            
            # Wait for initial market data
            self._wait_for_market_data()
            
            # Start risk monitoring
            self.risk_manager.start_monitoring()
            
            # Start production monitoring
            self.production_manager.start_monitoring()
            
            # Initialize strategy with current price
            current_price = self.ws_client.get_current_price('BTCUSDT')
            if not self.strategy.initialize_strategy():
                self.logger.error("❌ Failed to initialize trading strategy")
                return False
            
            # Log startup
            self._log_startup_event()
            
            self.running = True
            self.logger.info("πŸŽ‰ Production trading bot started successfully!")
            
            return True
            
        except Exception as e:
            self.logger.error(f"❌ Failed to start bot: {e}")
            return False
    
    def _wait_for_market_data(self, timeout: int = 30):
        """Wait for initial market data"""
        self.logger.info("⏳ Waiting for market data...")
        
        start_time = time.time()
        while time.time() - start_time < timeout:
            current_price = self.ws_client.get_current_price('BTCUSDT')
            if current_price:
                self.logger.info(f"πŸ“Š Current BTC/USDT: ${current_price:,.2f}")
                return
            time.sleep(0.5)
        
        raise Exception("Failed to receive market data within timeout")
    
    def run_main_loop(self):
        """Main bot execution loop"""
        last_status_update = time.time()
        last_performance_log = time.time()
        last_backup = time.time()
        
        status_interval = 300  # 5 minutes
        performance_interval = 60  # 1 minute
        backup_interval = 6 * 3600  # 6 hours
        
        try:
            while self.running and not self.shutdown_requested:
                current_time = time.time()
                
                # Update strategy
                try:
                    update_result = self.strategy.update_strategy()
                    if update_result['success'] and update_result['filled_orders'] > 0:
                        self.trade_count += update_result['filled_orders']
                        self._log_trade_activity(update_result)
                
                except Exception as e:
                    self.logger.error(f"Strategy update error: {e}")
                    self.trading_logger.log_system_event(
                        logging.ERROR, 'strategy', 'update_failed', {'error': str(e)}
                    )
                
                # Periodic status update
                if current_time - last_status_update >= status_interval:
                    self._log_status_update()
                    last_status_update = current_time
                
                # Performance logging
                if current_time - last_performance_log >= performance_interval:
                    self._log_performance_metrics()
                    last_performance_log = current_time
                
                # Backup creation
                if current_time - last_backup >= backup_interval:
                    self.production_manager.create_backup()
                    last_backup = current_time
                
                # Health check
                self.last_health_check = datetime.now()
                
                time.sleep(5)  # Main loop interval
                
        except KeyboardInterrupt:
            self.logger.info("Keyboard interrupt received")
        except Exception as e:
            self.logger.error(f"Critical error in main loop: {e}")
            self.trading_logger.log_system_event(
                logging.CRITICAL, 'main_loop', 'critical_error', 
                {'error': str(e)}, str(e)
            )
        finally:
            self.shutdown_bot()
    
    def shutdown_bot(self):
        """Gracefully shutdown the bot"""
        self.logger.info("πŸ›‘ Shutting down production trading bot...")
        
        try:
            # Stop strategy first
            if self.strategy:
                self.strategy.stop_strategy()
            
            # Stop monitoring
            if self.risk_manager:
                self.risk_manager.stop_monitoring()
            
            if self.production_manager:
                self.production_manager.stop_monitoring()
            
            # Stop data feeds
            if self.ws_client:
                self.ws_client.stop()
            
            # Stop logging
            if self.trading_logger:
                self.trading_logger.stop_async_logging()
            
            # Generate final reports
            self._generate_final_reports()
            
            self.logger.info("βœ… Bot shutdown complete")
            
        except Exception as e:
            self.logger.error(f"Error during shutdown: {e}")
    
    def _on_market_tick(self, tick):
        """Handle market data updates"""
        # Update risk manager with current prices
        if self.risk_manager:
            # This would need to be implemented based on your risk manager's interface
            pass
    
    def _on_risk_violation(self, violation):
        """Handle risk violations"""
        self.trading_logger.log_risk_event({
            'event_type': violation.limit_name,
            'severity': violation.severity.value,
            'description': violation.description,
            'current_value': violation.current_value,
            'threshold': violation.threshold,
            'action_taken': violation.action_taken.value,
            'portfolio_impact': 'high' if violation.severity.value == 'critical' else 'medium'
        })
        
        # Take action based on violation severity
        if violation.action_taken.value == 'stop_trading':
            self.logger.critical("🚨 Trading stopped due to risk violation")
            self.strategy.stop_strategy()
        elif violation.action_taken.value == 'emergency_exit':
            self.logger.critical("🚨 Emergency exit triggered")
            self.shutdown_requested = True
    
    def _log_startup_event(self):
        """Log bot startup"""
        startup_data = {
            'timestamp': datetime.now().isoformat(),
            'version': '1.0.0',
            'config': {
                'testnet': self.config.TESTNET,
                'symbol': 'BTCUSDT',
                'initial_capital': self._get_initial_capital()
            }
        }
        
        self.trading_logger.log_system_event(
            logging.INFO, 'startup', 'bot_started', startup_data
        )
    
    def _log_trade_activity(self, update_result):
        """Log trade activity"""
        # This would need to be implemented based on your strategy's trade format
        pass
    
    def _log_status_update(self):
        """Log periodic status update"""
        try:
            # Get portfolio status
            account = self.client.get_account_info()
            strategy_status = self.strategy.get_status()
            risk_status = self.risk_manager.get_risk_status()
            system_status = self.production_manager.get_system_status()
            
            self.logger.info("πŸ“Š === PRODUCTION STATUS UPDATE ===")
            
            if account['success']:
                usdt_balance = account['balances'].get('USDT', {}).get('free', 0)
                self.logger.info(f"Portfolio Value: ${usdt_balance:,.2f}")
            
            self.logger.info(f"Strategy Profit: ${strategy_status.get('total_profit', 0):.2f}")
            self.logger.info(f"Active Orders: {strategy_status.get('active_orders', 0)}")
            self.logger.info(f"Total Trades: {self.trade_count}")
            self.logger.info(f"Risk Status: {risk_status.get('current_drawdown_pct', 0):.2f}% drawdown")
            self.logger.info(f"System Health: CPU {system_status['health_metrics']['cpu_usage_pct']:.1f}%, Mem {system_status['health_metrics']['memory_usage_mb']:.1f}MB")
            self.logger.info("=====================================")
            
        except Exception as e:
            self.logger.error(f"Error in status update: {e}")
    
    def _log_performance_metrics(self):
        """Log performance metrics"""
        try:
            account = self.client.get_account_info()
            if account['success']:
                portfolio_data = {
                    'total_value': account['balances'].get('USDT', {}).get('free', 0),
                    'total_return_pct': 0,  # Would need to calculate this
                    'daily_return_pct': 0,  # Would need to calculate this
                    'max_drawdown_pct': 0,  # Would need to track this
                    'sharpe_ratio': 0,  # Would need to calculate this
                    'num_positions': len([b for b in account['balances'].values() if b.get('free', 0) > 0]),
                    'cash_balance': account['balances'].get('USDT', {}).get('free', 0),
                    'unrealized_pnl': 0,  # Would need to calculate this
                    'realized_pnl': 0   # Would need to track this
                }
                
                self.trading_logger.log_performance(portfolio_data)
        
        except Exception as e:
            self.logger.error(f"Error logging performance: {e}")
    
    def _generate_final_reports(self):
        """Generate final reports"""
        try:
            # Runtime summary
            runtime = datetime.now() - self.start_time
            
            final_report = f"""
=== FINAL TRADING SESSION REPORT ===

SESSION SUMMARY
--------------
Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}
End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Total Runtime: {runtime}
Total Trades: {self.trade_count}

PERFORMANCE SUMMARY
------------------
{self.strategy.get_status() if self.strategy else 'Strategy not available'}

RISK SUMMARY
-----------
{self.risk_manager.export_risk_report() if self.risk_manager else 'Risk manager not available'}

SYSTEM SUMMARY
-------------
{self.production_manager.generate_status_report() if self.production_manager else 'Production manager not available'}
            """
            
            # Save final report
            with open(f"data/final_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", 'w') as f:
                f.write(final_report)
            
            print(final_report)
            
        except Exception as e:
            self.logger.error(f"Error generating final reports: {e}")

def main():
    """Main entry point"""
    parser = argparse.ArgumentParser(description='Production Cryptocurrency Trading Bot')
    parser.add_argument('--config', help='Configuration file path')
    parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode')
    args = parser.parse_args()
    
    print("πŸš€ Starting Production Cryptocurrency Trading Bot")
    print("=" * 60)
    
    # Create bot instance
    bot = ProductionTradingBot(args.config)
    
    # Initialize components
    if not bot.initialize_components():
        print("❌ Failed to initialize bot components")
        sys.exit(1)
    
    # Start bot
    if not bot.start_bot():
        print("❌ Failed to start trading bot")
        sys.exit(1)
    
    print("βœ… Production bot started successfully!")
    print("πŸ’‘ Press Ctrl+C to stop gracefully")
    print("πŸ“Š Monitor logs for real-time updates")
    
    # Run main loop
    bot.run_main_loop()
    
    print("βœ… Trading bot session completed")

if __name__ == "__main__":
    main()

Operational Best Practices

Security Checklist

  • πŸ”’ API Key Security: Store in environment variables, never in code
  • πŸ”’ IP Restrictions: Whitelist server IPs on exchange
  • πŸ”’ Permission Limits: Disable withdrawal permissions
  • πŸ”’ 2FA Required: Enable on all exchange accounts
  • πŸ”’ Regular Rotation: Rotate API keys every 90 days
  • πŸ”’ Monitoring: Alert on unusual API activity
  • πŸ”’ Backup Encryption: Encrypt all backup files
  • πŸ”’ Access Logs: Monitor all system access

Monitoring and Alerting

Alert Type Trigger Condition Response Time Action Required
Critical Risk Drawdown >15% or emergency stop Immediate Manual intervention required
System Health CPU >80%, Memory >8GB 5 minutes Check resource usage
API Issues Response time >5s or failures 2 minutes Verify connectivity
Trading Halt No trades for 30+ minutes 30 minutes Check strategy status

Maintenance Schedule

Daily:

  • βœ… Review performance metrics
  • βœ… Check risk violations
  • βœ… Verify system health
  • βœ… Monitor log alerts

Weekly:

  • βœ… Analyze strategy performance
  • βœ… Review risk parameters
  • βœ… Check backup integrity
  • βœ… Update market data

Monthly:

  • βœ… Performance review and optimization
  • βœ… Security audit
  • βœ… System updates
  • βœ… Disaster recovery test

Compliance and Audit

Regulatory Considerations

Ensure your trading bot complies with relevant regulations:

  • πŸ“‹ KYC/AML Compliance: Verify all exchange accounts
  • πŸ“‹ Tax Reporting: Maintain detailed trade records
  • πŸ“‹ Jurisdiction Rules: Understand local trading regulations
  • πŸ“‹ Market Manipulation: Avoid practices that could be deemed manipulative
  • πŸ“‹ Data Privacy: Protect user data according to local laws
  • πŸ“‹ Financial Reporting: Maintain accurate financial records

Audit Trail Requirements

Maintain comprehensive audit trails:

  • πŸ” Trade Records: Every order, fill, and cancellation
  • πŸ” Decision Logic: Why each trade was made
  • πŸ” Risk Events: All risk violations and responses
  • πŸ” System Changes: Configuration and code changes
  • πŸ” Performance Records: Daily P&L and metrics
  • πŸ” Error Logs: All system errors and resolutions

Conclusion: Building Professional Trading Systems

This comprehensive series has taken you from basic concepts to a production-ready cryptocurrency trading bot. The key components of a professional system include:

Critical Success Factors

  1. 🎯 Strategy Validation: Thorough backtesting and paper trading
  2. πŸ›‘οΈ Risk Management: Multi-layer protection and monitoring
  3. πŸ“Š Professional Logging: Comprehensive audit trails
  4. πŸ”§ Production Monitoring: Health checks and alerting
  5. πŸ”’ Security First: API security and access controls
  6. πŸ“ˆ Continuous Improvement: Regular review and optimization

Performance Expectations

With proper implementation, you can expect:

  • πŸ“ˆ Consistent Returns: 15-35% annual returns in suitable market conditions
  • πŸ“‰ Controlled Risk: Maximum drawdowns under 15-20%
  • ⚑ High Reliability: 99%+ uptime with proper monitoring
  • πŸ”„ Adaptive Performance: Ability to adjust to changing market conditions
πŸŽ‰ Congratulations! You now have the knowledge and tools to build, deploy, and maintain professional cryptocurrency trading bots. Remember: start small, test thoroughly, and never risk more than you can afford to lose. The combination of systematic approach, proper risk management, and continuous monitoring will give you the best chance of long-term success.

Final Recommendations

Before Going Live

  • βœ… Complete at least 30 days of paper trading
  • βœ… Verify all risk management systems
  • βœ… Test emergency procedures
  • βœ… Set up comprehensive monitoring
  • βœ… Start with small capital allocation
  • βœ… Have manual override procedures ready

Remember: Trading bots are tools that amplify both good and bad decisions. The quality of your strategy, risk management, and operational discipline determine long-term success. Stay disciplined, keep learning, and always prioritize capital preservation over profit maximization.


This concludes our comprehensive guide to building professional cryptocurrency trading bots. You now have all the components necessary for safe, profitable, and scalable automated trading systems.

Tags

  • Crypto
  • Trading
  • Bitcoin
  • Python

Comments

About text formats

Restricted HTML

  • Allowed HTML tags: <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • Lines and paragraphs break automatically.
  • Web page addresses and email addresses turn into links automatically.

Related articles

How to Build a Cryptocurrency Trading Bot: Complete Guide
Setting Up Python Development Environment for Crypto Trading Bots
Binance API Configuration and Authentication: Complete Setup Guide
Grid Trading Strategy Implementation: Build Your First Profitable Crypto Bot
Crypto Trading Bot Backtesting Framework: Validate Strategies Before Risking Real Money

About the author

Nikolai Fischer is the founder of Kommune3 (since 2007) and a leading expert in Drupal development and tech entrepreneurship. With 17+ years of experience, he has led hundreds of projects and achieved #1 on Hacker News. As host of the "Kommit mich" podcast and founder of skillution, he combines technical expertise with entrepreneurial thinking. His articles about Supabase, modern web development, and systematic problem-solving have influenced thousands of developers worldwide.

Ihre Anmeldung konnte nicht gespeichert werden. Bitte versuchen Sie es erneut.
Ihre Anmeldung war erfolgreich.

Newsletter

Join a growing community of friendly readers. From time to time I share my thoughts about rational thinking, productivity and life.

Nikolai Fischer

✌ Hi, I'm Niko
Entrepreneur, developer & podcaster

Contact me:

  • E-Mail
  • Phone
  • LinkedIn

My Reading List

  • $100M Leads: How to Get Strangers To Want To Buy Your Stuff - Alex Hormozi
  • Quantitative Trading: How to Build Your Own Algorithmic Trading Business (Wiley Trading) - Ernest P. Chan
  • Hands-On Machine Learning for Algorithmic Trading: Design and implement investment strategies based on smart algorithms that learn from data using Python - Stefan Jansen
  • Algorithmic Trading - Ernie Chan
  • Let Me Tell You a Story: Tales Along the Road to Happiness - Jorge Bucay
more
RSS feed