🎸
🚀 Beta Running
PYNGUP: Rebellion against toxic productivity
Beta limited to 100 spots. Tasks become social commitments instead of lonely to-dos.
Build a professional backtesting framework to validate your cryptocurrency trading strategies with historical data. This comprehensive guide covers data acquisition, performance metrics, risk analysis, and advanced validation techniques to ensure your bot is profitable before live trading.
Backtesting is the process of testing a trading strategy against historical market data to evaluate its performance. It's the difference between gambling and systematic trading.
Consider these scenarios:
Strategy | Live Performance | Backtest Result | Outcome |
---|---|---|---|
Grid 0.1% spacing | -15% (fee heavy) | -12% predicted | ✅ Avoided loss |
Grid 0.5% spacing | +28% return | +31% predicted | ✅ Successful deployment |
Momentum strategy | -8% (trend reversal) | +45% (overfitted) | ❌ Flawed backtest |
Quality data is essential for reliable backtesting results:
Data Source | Quality | Cost | Best For |
---|---|---|---|
Binance API | Excellent | Free | Binance-specific strategies |
CoinGecko | Good | Free/Paid | Multi-exchange comparison |
CryptoCompare | Very Good | Paid | Professional backtesting |
Alpha Vantage | Good | Free/Paid | Traditional + crypto data |
Create src/data/historical_data.py
:
import pandas as pd
import numpy as np
import time
import sqlite3
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from src.exchanges.binance_client import BinanceClient
import logging
class HistoricalDataManager:
"""Manage historical cryptocurrency data for backtesting"""
def __init__(self, client: BinanceClient, db_path: str = "data/historical_data.db"):
self.client = client
self.db_path = db_path
self.logger = logging.getLogger(__name__)
self._init_database()
def _init_database(self):
"""Initialize SQLite database for historical data storage"""
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create OHLCV table
cursor.execute('''
CREATE TABLE IF NOT EXISTS ohlcv_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
timestamp INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
trades INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, timeframe, timestamp)
)
''')
# Create indexes for faster queries
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_symbol_timeframe_timestamp
ON ohlcv_data(symbol, timeframe, timestamp)
''')
conn.commit()
conn.close()
self.logger.info(f"Historical data database initialized: {self.db_path}")
def download_historical_data(self, symbol: str, timeframe: str,
start_date: str, end_date: str = None,
force_update: bool = False) -> bool:
"""
Download historical OHLCV data from Binance
Args:
symbol: Trading pair (e.g., 'BTCUSDT')
timeframe: Kline interval ('1m', '5m', '1h', '1d', etc.)
start_date: Start date in 'YYYY-MM-DD' format
end_date: End date in 'YYYY-MM-DD' format (default: today)
force_update: Re-download existing data
"""
try:
# Parse dates
start_datetime = datetime.strptime(start_date, '%Y-%m-%d')
end_datetime = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now()
self.logger.info(f"Downloading {symbol} {timeframe} data from {start_date} to {end_datetime.strftime('%Y-%m-%d')}")
# Check what data already exists
if not force_update:
existing_range = self._get_existing_data_range(symbol, timeframe)
if existing_range:
self.logger.info(f"Existing data: {existing_range}")
# Download data in chunks (Binance limit: 1000 klines per request)
current_date = start_datetime
total_records = 0
while current_date < end_datetime:
# Calculate end time for this chunk
if timeframe == '1m':
chunk_end = current_date + timedelta(hours=16) # ~1000 minutes
elif timeframe == '5m':
chunk_end = current_date + timedelta(days=3) # ~864 5-minute intervals
elif timeframe == '1h':
chunk_end = current_date + timedelta(days=41) # ~1000 hours
elif timeframe == '1d':
chunk_end = current_date + timedelta(days=1000) # 1000 days
else:
chunk_end = current_date + timedelta(days=1)
chunk_end = min(chunk_end, end_datetime)
# Download chunk
klines = self._download_klines_chunk(symbol, timeframe, current_date, chunk_end)
if klines:
# Save to database
records_saved = self._save_klines_to_db(symbol, timeframe, klines)
total_records += records_saved
self.logger.info(f"Downloaded {len(klines)} klines, saved {records_saved} new records")
current_date = chunk_end
time.sleep(0.1) # Rate limiting
self.logger.info(f"Download complete. Total new records: {total_records}")
return True
except Exception as e:
self.logger.error(f"Error downloading historical data: {e}")
return False
def _download_klines_chunk(self, symbol: str, timeframe: str,
start_time: datetime, end_time: datetime) -> List:
"""Download a chunk of klines data"""
try:
# Convert to milliseconds
start_ms = int(start_time.timestamp() * 1000)
end_ms = int(end_time.timestamp() * 1000)
# Use CCXT to fetch OHLCV data
klines = self.client.client.fetch_ohlcv(
symbol=symbol,
timeframe=timeframe,
since=start_ms,
limit=1000
)
# Filter by end time
filtered_klines = [k for k in klines if k[0] <= end_ms]
return filtered_klines
except Exception as e:
self.logger.error(f"Error downloading klines chunk: {e}")
return []
def _save_klines_to_db(self, symbol: str, timeframe: str, klines: List) -> int:
"""Save klines to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
saved_count = 0
for kline in klines:
try:
cursor.execute('''
INSERT OR IGNORE INTO ohlcv_data
(symbol, timeframe, timestamp, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
symbol, timeframe, kline[0], # timestamp
kline[1], kline[2], kline[3], kline[4], # OHLC
kline[5] # volume
))
if cursor.rowcount > 0:
saved_count += 1
except sqlite3.Error as e:
self.logger.error(f"Database error: {e}")
conn.commit()
conn.close()
return saved_count
def _get_existing_data_range(self, symbol: str, timeframe: str) -> Optional[Tuple[str, str]]:
"""Get the date range of existing data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT MIN(timestamp), MAX(timestamp)
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ?
''', (symbol, timeframe))
result = cursor.fetchone()
conn.close()
if result and result[0] and result[1]:
start_date = datetime.fromtimestamp(result[0] / 1000).strftime('%Y-%m-%d')
end_date = datetime.fromtimestamp(result[1] / 1000).strftime('%Y-%m-%d')
return start_date, end_date
return None
def get_historical_data(self, symbol: str, timeframe: str,
start_date: str, end_date: str = None) -> pd.DataFrame:
"""
Retrieve historical data as pandas DataFrame
Returns:
DataFrame with columns: timestamp, open, high, low, close, volume
"""
try:
# Parse dates
start_datetime = datetime.strptime(start_date, '%Y-%m-%d')
end_datetime = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now()
start_ms = int(start_datetime.timestamp() * 1000)
end_ms = int(end_datetime.timestamp() * 1000)
conn = sqlite3.connect(self.db_path)
query = '''
SELECT timestamp, open, high, low, close, volume
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ?
AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC
'''
df = pd.read_sql_query(query, conn, params=(symbol, timeframe, start_ms, end_ms))
conn.close()
if df.empty:
self.logger.warning(f"No data found for {symbol} {timeframe} between {start_date} and {end_datetime.strftime('%Y-%m-%d')}")
return pd.DataFrame()
# Convert timestamp to datetime
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('datetime', inplace=True)
self.logger.info(f"Retrieved {len(df)} records for {symbol} {timeframe}")
return df
except Exception as e:
self.logger.error(f"Error retrieving historical data: {e}")
return pd.DataFrame()
def get_data_summary(self) -> Dict:
"""Get summary of available historical data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT symbol, timeframe,
COUNT(*) as record_count,
MIN(timestamp) as start_timestamp,
MAX(timestamp) as end_timestamp
FROM ohlcv_data
GROUP BY symbol, timeframe
ORDER BY symbol, timeframe
''')
results = cursor.fetchall()
conn.close()
summary = []
for row in results:
start_date = datetime.fromtimestamp(row[3] / 1000).strftime('%Y-%m-%d %H:%M')
end_date = datetime.fromtimestamp(row[4] / 1000).strftime('%Y-%m-%d %H:%M')
summary.append({
'symbol': row[0],
'timeframe': row[1],
'records': row[2],
'start_date': start_date,
'end_date': end_date
})
return summary
Create src/backtesting/backtest_engine.py
:
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import logging
from abc import ABC, abstractmethod
@dataclass
class Trade:
"""Represents a single trade"""
timestamp: datetime
symbol: str
side: str # 'buy' or 'sell'
quantity: float
price: float
fee: float
trade_id: str = ""
@property
def value(self) -> float:
"""Trade value in quote currency"""
return self.quantity * self.price
@property
def net_value(self) -> float:
"""Trade value after fees"""
return self.value - self.fee
@dataclass
class Position:
"""Represents current position in an asset"""
symbol: str
quantity: float = 0.0
avg_price: float = 0.0
unrealized_pnl: float = 0.0
def update_position(self, trade: Trade):
"""Update position with new trade"""
if trade.side == 'buy':
# Adding to position
total_cost = (self.quantity * self.avg_price) + trade.net_value
self.quantity += trade.quantity
self.avg_price = total_cost / self.quantity if self.quantity > 0 else 0
elif trade.side == 'sell':
# Reducing position
self.quantity -= trade.quantity
if self.quantity <= 0:
self.quantity = 0
self.avg_price = 0
@dataclass
class BacktestResults:
"""Complete backtesting results"""
# Basic metrics
start_date: datetime
end_date: datetime
initial_capital: float
final_capital: float
total_return: float
total_return_pct: float
# Trade statistics
total_trades: int
winning_trades: int
losing_trades: int
win_rate: float
avg_win: float
avg_loss: float
profit_factor: float
# Risk metrics
max_drawdown: float
max_drawdown_pct: float
sharpe_ratio: float
sortino_ratio: float
calmar_ratio: float
# Time-based metrics
trades_per_day: float
avg_trade_duration: timedelta
# Detailed data
trades: List[Trade] = field(default_factory=list)
equity_curve: pd.Series = field(default_factory=pd.Series)
daily_returns: pd.Series = field(default_factory=pd.Series)
def to_dict(self) -> Dict:
"""Convert results to dictionary"""
return {
'start_date': self.start_date.isoformat(),
'end_date': self.end_date.isoformat(),
'initial_capital': self.initial_capital,
'final_capital': self.final_capital,
'total_return': self.total_return,
'total_return_pct': self.total_return_pct,
'total_trades': self.total_trades,
'winning_trades': self.winning_trades,
'losing_trades': self.losing_trades,
'win_rate': self.win_rate,
'avg_win': self.avg_win,
'avg_loss': self.avg_loss,
'profit_factor': self.profit_factor,
'max_drawdown': self.max_drawdown,
'max_drawdown_pct': self.max_drawdown_pct,
'sharpe_ratio': self.sharpe_ratio,
'sortino_ratio': self.sortino_ratio,
'calmar_ratio': self.calmar_ratio,
'trades_per_day': self.trades_per_day
}
class BacktestStrategy(ABC):
"""Abstract base class for backtesting strategies"""
@abstractmethod
def initialize(self, data: pd.DataFrame, initial_capital: float):
"""Initialize strategy with historical data"""
pass
@abstractmethod
def next(self, current_bar: pd.Series) -> List[Trade]:
"""Process next bar and return any trades"""
pass
@abstractmethod
def finalize(self) -> Dict:
"""Clean up and return any final metrics"""
pass
class BacktestEngine:
"""
Professional backtesting engine for cryptocurrency trading strategies
Features:
- Realistic trade execution simulation
- Comprehensive performance metrics
- Risk analysis and drawdown calculation
- Slippage and fee modeling
- Position tracking
"""
def __init__(self, initial_capital: float = 10000,
commission: float = 0.001, slippage: float = 0.0005):
self.initial_capital = initial_capital
self.commission = commission # 0.1% default
self.slippage = slippage # 0.05% default slippage
self.logger = logging.getLogger(__name__)
# Backtest state
self.current_capital = initial_capital
self.positions: Dict[str, Position] = {}
self.trades: List[Trade] = []
self.equity_curve: List[Tuple[datetime, float]] = []
# Performance tracking
self.peak_capital = initial_capital
self.max_drawdown = 0.0
def run_backtest(self, strategy: BacktestStrategy,
data: pd.DataFrame,
start_date: str = None,
end_date: str = None) -> BacktestResults:
"""
Run complete backtest
Args:
strategy: Strategy instance implementing BacktestStrategy
data: Historical OHLCV data
start_date: Start date for backtesting (YYYY-MM-DD)
end_date: End date for backtesting (YYYY-MM-DD)
"""
try:
self.logger.info("Starting backtesting...")
# Filter data by date range
if start_date:
data = data[data.index >= start_date]
if end_date:
data = data[data.index <= end_date]
if data.empty:
raise ValueError("No data available for specified date range")
# Reset state
self._reset_state()
# Initialize strategy
strategy.initialize(data, self.initial_capital)
# Main backtesting loop
for timestamp, bar in data.iterrows():
# Generate trades from strategy
new_trades = strategy.next(bar)
# Execute trades
for trade in new_trades:
self._execute_trade(trade, bar)
# Update equity curve
current_equity = self._calculate_total_equity(bar)
self.equity_curve.append((timestamp, current_equity))
# Update drawdown
self._update_drawdown(current_equity)
# Finalize strategy
strategy_metrics = strategy.finalize()
# Calculate final results
results = self._calculate_results(data.index[0], data.index[-1])
self.logger.info(f"Backtesting complete. Final return: {results.total_return_pct:.2f}%")
return results
except Exception as e:
self.logger.error(f"Backtesting failed: {e}")
raise
def _reset_state(self):
"""Reset backtesting state"""
self.current_capital = self.initial_capital
self.positions.clear()
self.trades.clear()
self.equity_curve.clear()
self.peak_capital = self.initial_capital
self.max_drawdown = 0.0
def _execute_trade(self, trade: Trade, current_bar: pd.Series):
"""Execute a trade with realistic simulation"""
try:
# Apply slippage
if trade.side == 'buy':
# For buy orders, we pay slightly more
execution_price = trade.price * (1 + self.slippage)
else:
# For sell orders, we receive slightly less
execution_price = trade.price * (1 - self.slippage)
# Calculate trade value and fees
trade_value = trade.quantity * execution_price
fee = trade_value * self.commission
# Check if we have sufficient capital/position
if trade.side == 'buy':
total_cost = trade_value + fee
if total_cost > self.current_capital:
self.logger.warning(f"Insufficient capital for trade: {total_cost:.2f} > {self.current_capital:.2f}")
return False
self.current_capital -= total_cost
elif trade.side == 'sell':
position = self.positions.get(trade.symbol, Position(trade.symbol))
if trade.quantity > position.quantity:
self.logger.warning(f"Insufficient position for sell: {trade.quantity} > {position.quantity}")
return False
self.current_capital += (trade_value - fee)
# Update the trade with actual execution details
trade.price = execution_price
trade.fee = fee
# Update position
if trade.symbol not in self.positions:
self.positions[trade.symbol] = Position(trade.symbol)
self.positions[trade.symbol].update_position(trade)
# Record trade
self.trades.append(trade)
self.logger.debug(f"Executed trade: {trade.side} {trade.quantity} {trade.symbol} @ {execution_price:.2f}")
return True
except Exception as e:
self.logger.error(f"Error executing trade: {e}")
return False
def _calculate_total_equity(self, current_bar: pd.Series) -> float:
"""Calculate total portfolio equity at current prices"""
total_equity = self.current_capital
# Add value of all positions at current prices
for symbol, position in self.positions.items():
if position.quantity > 0:
# Use current close price to value position
current_price = current_bar['close']
position_value = position.quantity * current_price
total_equity += position_value
return total_equity
def _update_drawdown(self, current_equity: float):
"""Update maximum drawdown tracking"""
if current_equity > self.peak_capital:
self.peak_capital = current_equity
current_drawdown = (self.peak_capital - current_equity) / self.peak_capital
self.max_drawdown = max(self.max_drawdown, current_drawdown)
def _calculate_results(self, start_date: datetime, end_date: datetime) -> BacktestResults:
"""Calculate comprehensive backtest results"""
# Basic returns
final_capital = self.equity_curve[-1][1] if self.equity_curve else self.initial_capital
total_return = final_capital - self.initial_capital
total_return_pct = (total_return / self.initial_capital) * 100
# Trade statistics
winning_trades = [t for t in self.trades if self._is_winning_trade(t)]
losing_trades = [t for t in self.trades if not self._is_winning_trade(t)]
win_rate = len(winning_trades) / len(self.trades) if self.trades else 0
avg_win = np.mean([self._trade_pnl(t) for t in winning_trades]) if winning_trades else 0
avg_loss = np.mean([self._trade_pnl(t) for t in losing_trades]) if losing_trades else 0
profit_factor = abs(avg_win * len(winning_trades) / (avg_loss * len(losing_trades))) if losing_trades else float('inf')
# Risk metrics
equity_series = pd.Series([eq[1] for eq in self.equity_curve],
index=[eq[0] for eq in self.equity_curve])
daily_returns = equity_series.pct_change().dropna()
# Sharpe ratio (assuming 0% risk-free rate)
sharpe_ratio = daily_returns.mean() / daily_returns.std() * np.sqrt(365) if daily_returns.std() > 0 else 0
# Sortino ratio (downside deviation)
downside_returns = daily_returns[daily_returns < 0]
sortino_ratio = daily_returns.mean() / downside_returns.std() * np.sqrt(365) if len(downside_returns) > 0 and downside_returns.std() > 0 else 0
# Calmar ratio
annualized_return = (final_capital / self.initial_capital) ** (365 / (end_date - start_date).days) - 1
calmar_ratio = annualized_return / self.max_drawdown if self.max_drawdown > 0 else float('inf')
# Time-based metrics
backtest_days = (end_date - start_date).days
trades_per_day = len(self.trades) / backtest_days if backtest_days > 0 else 0
return BacktestResults(
start_date=start_date,
end_date=end_date,
initial_capital=self.initial_capital,
final_capital=final_capital,
total_return=total_return,
total_return_pct=total_return_pct,
total_trades=len(self.trades),
winning_trades=len(winning_trades),
losing_trades=len(losing_trades),
win_rate=win_rate,
avg_win=avg_win,
avg_loss=avg_loss,
profit_factor=profit_factor,
max_drawdown=self.max_drawdown * final_capital,
max_drawdown_pct=self.max_drawdown * 100,
sharpe_ratio=sharpe_ratio,
sortino_ratio=sortino_ratio,
calmar_ratio=calmar_ratio,
trades_per_day=trades_per_day,
avg_trade_duration=timedelta(hours=0), # TODO: Calculate this
trades=self.trades,
equity_curve=equity_series,
daily_returns=daily_returns
)
def _is_winning_trade(self, trade: Trade) -> bool:
"""Determine if a trade is winning (simplified)"""
# This is a simplified version - in reality, you'd need to track
# complete buy/sell pairs to determine profit/loss
return True # Placeholder
def _trade_pnl(self, trade: Trade) -> float:
"""Calculate P&L for a trade (simplified)"""
# Simplified - would need proper position tracking
return 0 # Placeholder
Create src/backtesting/grid_strategy_bt.py
:
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime
from src.backtesting.backtest_engine import BacktestStrategy, Trade
class GridTradingBacktest(BacktestStrategy):
"""Grid trading strategy implementation for backtesting"""
def __init__(self, grid_spacing: float = 0.005,
num_grids: int = 10,
base_order_size: float = 0.001):
self.grid_spacing = grid_spacing
self.num_grids = num_grids
self.base_order_size = base_order_size
# Strategy state
self.center_price = 0.0
self.grid_levels: List[Dict] = []
self.active_orders: List[Dict] = []
self.total_profit = 0.0
self.trades_executed = 0
def initialize(self, data: pd.DataFrame, initial_capital: float):
"""Initialize grid trading strategy"""
# Set initial center price from first data point
self.center_price = data.iloc[0]['close']
# Create initial grid levels
self._create_grid_levels()
print(f"Grid strategy initialized:")
print(f" Center price: ${self.center_price:,.2f}")
print(f" Grid spacing: {self.grid_spacing:.1%}")
print(f" Number of grids: {self.num_grids}")
print(f" Base order size: {self.base_order_size}")
def next(self, current_bar: pd.Series) -> List[Trade]:
"""Process next price bar and generate trades"""
current_price = current_bar['close']
current_time = current_bar.name
trades = []
# Check for filled orders
filled_orders = []
for order in self.active_orders:
if order['side'] == 'buy' and current_price <= order['price']:
# Buy order filled
trade = Trade(
timestamp=current_time,
symbol='BTCUSDT',
side='buy',
quantity=order['quantity'],
price=order['price'],
fee=0.0, # Will be calculated in engine
trade_id=f"grid_buy_{self.trades_executed}"
)
trades.append(trade)
filled_orders.append(order)
# Create corresponding sell order
sell_price = order['price'] * (1 + self.grid_spacing)
sell_order = {
'side': 'sell',
'price': sell_price,
'quantity': order['quantity'],
'type': 'grid'
}
self.active_orders.append(sell_order)
elif order['side'] == 'sell' and current_price >= order['price']:
# Sell order filled
trade = Trade(
timestamp=current_time,
symbol='BTCUSDT',
side='sell',
quantity=order['quantity'],
price=order['price'],
fee=0.0, # Will be calculated in engine
trade_id=f"grid_sell_{self.trades_executed}"
)
trades.append(trade)
filled_orders.append(order)
# Create corresponding buy order
buy_price = order['price'] * (1 - self.grid_spacing)
buy_order = {
'side': 'buy',
'price': buy_price,
'quantity': order['quantity'],
'type': 'grid'
}
self.active_orders.append(buy_order)
# Remove filled orders
for order in filled_orders:
self.active_orders.remove(order)
self.trades_executed += len(trades)
return trades
def finalize(self) -> Dict:
"""Finalize strategy and return metrics"""
return {
'total_trades': self.trades_executed,
'grid_levels': len(self.grid_levels),
'final_active_orders': len(self.active_orders)
}
def _create_grid_levels(self):
"""Create initial grid of buy and sell orders"""
self.grid_levels.clear()
self.active_orders.clear()
# Create buy orders below center price
for i in range(1, self.num_grids + 1):
price = self.center_price * (1 - self.grid_spacing * i)
order = {
'side': 'buy',
'price': price,
'quantity': self.base_order_size,
'type': 'grid'
}
self.active_orders.append(order)
self.grid_levels.append(order)
# Create sell orders above center price
for i in range(1, self.num_grids + 1):
price = self.center_price * (1 + self.grid_spacing * i)
order = {
'side': 'sell',
'price': price,
'quantity': self.base_order_size,
'type': 'grid'
}
self.active_orders.append(order)
self.grid_levels.append(order)
Create src/backtesting/performance_metrics.py
:
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
class PerformanceAnalyzer:
"""Comprehensive performance analysis for trading strategies"""
def __init__(self, backtest_results):
self.results = backtest_results
self.equity_curve = backtest_results.equity_curve
self.trades = backtest_results.trades
self.daily_returns = backtest_results.daily_returns
def calculate_advanced_metrics(self) -> Dict:
"""Calculate advanced performance metrics"""
# Basic metrics
total_days = (self.results.end_date - self.results.start_date).days
annualized_return = (self.results.final_capital / self.results.initial_capital) ** (365 / total_days) - 1
# Volatility metrics
daily_vol = self.daily_returns.std()
annualized_vol = daily_vol * np.sqrt(365)
# Risk-adjusted returns
sharpe_ratio = annualized_return / annualized_vol if annualized_vol > 0 else 0
# Downside metrics
downside_returns = self.daily_returns[self.daily_returns < 0]
downside_deviation = downside_returns.std() * np.sqrt(365)
sortino_ratio = annualized_return / downside_deviation if downside_deviation > 0 else 0
# Drawdown analysis
drawdown_analysis = self._analyze_drawdowns()
# Win/Loss streaks
streak_analysis = self._analyze_streaks()
# Monthly returns
monthly_returns = self._calculate_monthly_returns()
return {
'annualized_return': annualized_return,
'annualized_volatility': annualized_vol,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'max_drawdown_duration': drawdown_analysis['max_duration'],
'avg_drawdown_duration': drawdown_analysis['avg_duration'],
'recovery_factor': annualized_return / self.results.max_drawdown_pct if self.results.max_drawdown_pct > 0 else float('inf'),
'profit_to_max_drawdown': self.results.total_return / self.results.max_drawdown if self.results.max_drawdown > 0 else float('inf'),
'longest_win_streak': streak_analysis['longest_win_streak'],
'longest_loss_streak': streak_analysis['longest_loss_streak'],
'monthly_win_rate': monthly_returns['win_rate'],
'best_month': monthly_returns['best_month'],
'worst_month': monthly_returns['worst_month']
}
def _analyze_drawdowns(self) -> Dict:
"""Analyze drawdown periods"""
equity_values = self.equity_curve.values
cumulative_max = np.maximum.accumulate(equity_values)
drawdowns = (cumulative_max - equity_values) / cumulative_max
# Find drawdown periods
in_drawdown = drawdowns > 0.01 # Consider 1% as minimum drawdown
drawdown_periods = []
start_idx = None
for i, in_dd in enumerate(in_drawdown):
if in_dd and start_idx is None:
start_idx = i
elif not in_dd and start_idx is not None:
drawdown_periods.append((start_idx, i))
start_idx = None
# Calculate durations
durations = [(end - start) for start, end in drawdown_periods]
return {
'max_duration': max(durations) if durations else 0,
'avg_duration': np.mean(durations) if durations else 0,
'num_drawdown_periods': len(drawdown_periods)
}
def _analyze_streaks(self) -> Dict:
"""Analyze winning and losing streaks"""
if not self.trades:
return {'longest_win_streak': 0, 'longest_loss_streak': 0}
# Simplified streak analysis
# In practice, you'd need to properly calculate trade P&L
returns = self.daily_returns.dropna()
win_streak = 0
loss_streak = 0
max_win_streak = 0
max_loss_streak = 0
for ret in returns:
if ret > 0:
win_streak += 1
loss_streak = 0
max_win_streak = max(max_win_streak, win_streak)
elif ret < 0:
loss_streak += 1
win_streak = 0
max_loss_streak = max(max_loss_streak, loss_streak)
return {
'longest_win_streak': max_win_streak,
'longest_loss_streak': max_loss_streak
}
def _calculate_monthly_returns(self) -> Dict:
"""Calculate monthly return statistics"""
monthly_returns = self.equity_curve.resample('M').last().pct_change().dropna()
if monthly_returns.empty:
return {'win_rate': 0, 'best_month': 0, 'worst_month': 0}
win_rate = (monthly_returns > 0).mean()
best_month = monthly_returns.max()
worst_month = monthly_returns.min()
return {
'win_rate': win_rate,
'best_month': best_month,
'worst_month': worst_month
}
def generate_performance_report(self) -> str:
"""Generate comprehensive performance report"""
metrics = self.calculate_advanced_metrics()
report = f"""
=== CRYPTOCURRENCY TRADING BOT PERFORMANCE REPORT ===
BASIC PERFORMANCE
-----------------
Initial Capital: ${self.results.initial_capital:,.2f}
Final Capital: ${self.results.final_capital:,.2f}
Total Return: ${self.results.total_return:,.2f} ({self.results.total_return_pct:.2f}%)
Annualized Return: {metrics['annualized_return']:.2%}
RISK METRICS
------------
Maximum Drawdown: {self.results.max_drawdown_pct:.2f}%
Annualized Volatility: {metrics['annualized_volatility']:.2%}
Sharpe Ratio: {metrics['sharpe_ratio']:.3f}
Sortino Ratio: {metrics['sortino_ratio']:.3f}
Recovery Factor: {metrics['recovery_factor']:.2f}
TRADING STATISTICS
------------------
Total Trades: {self.results.total_trades}
Winning Trades: {self.results.winning_trades} ({self.results.win_rate:.1%})
Losing Trades: {self.results.losing_trades}
Profit Factor: {self.results.profit_factor:.2f}
Trades Per Day: {self.results.trades_per_day:.1f}
STREAK ANALYSIS
---------------
Longest Win Streak: {metrics['longest_win_streak']} periods
Longest Loss Streak: {metrics['longest_loss_streak']} periods
Monthly Win Rate: {metrics['monthly_win_rate']:.1%}
BEST/WORST PERIODS
------------------
Best Month: {metrics['best_month']:.2%}
Worst Month: {metrics['worst_month']:.2%}
Max Drawdown Duration: {metrics['max_drawdown_duration']} periods
EVALUATION
----------
"""
# Strategy evaluation
if self.results.total_return_pct > 15 and self.results.max_drawdown_pct < 20:
report += "✅ EXCELLENT: High returns with controlled risk\n"
elif self.results.total_return_pct > 10 and self.results.max_drawdown_pct < 30:
report += "✅ GOOD: Positive returns with acceptable risk\n"
elif self.results.total_return_pct > 0:
report += "⚠️ MARGINAL: Positive but may not justify risk\n"
else:
report += "❌ POOR: Negative returns - strategy needs improvement\n"
if metrics['sharpe_ratio'] > 1.0:
report += "✅ EXCELLENT risk-adjusted returns (Sharpe > 1.0)\n"
elif metrics['sharpe_ratio'] > 0.5:
report += "✅ GOOD risk-adjusted returns (Sharpe > 0.5)\n"
else:
report += "⚠️ POOR risk-adjusted returns (Sharpe < 0.5)\n"
return report
Bias Type | Description | Prevention |
---|---|---|
Look-ahead Bias | Using future data in current decisions | Strict chronological processing |
Survivorship Bias | Only testing active cryptocurrencies | Include delisted coins in historical tests |
Overfitting | Optimizing parameters to specific data | Out-of-sample testing, walk-forward |
Selection Bias | Cherry-picking favorable time periods | Test across multiple market cycles |
Create src/backtesting/walk_forward.py
:
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
from datetime import datetime, timedelta
from src.backtesting.backtest_engine import BacktestEngine
from src.backtesting.grid_strategy_bt import GridTradingBacktest
class WalkForwardAnalysis:
"""
Walk-forward analysis to prevent overfitting
Splits data into training/testing periods and optimizes
parameters on training data, then tests on out-of-sample data
"""
def __init__(self, train_period_months: int = 6,
test_period_months: int = 3,
step_months: int = 1):
self.train_period_months = train_period_months
self.test_period_months = test_period_months
self.step_months = step_months
def run_walk_forward(self, data: pd.DataFrame,
parameter_ranges: Dict) -> Dict:
"""
Run walk-forward analysis
Args:
data: Historical OHLCV data
parameter_ranges: Dict of parameter names and ranges to test
"""
# Generate date ranges
date_ranges = self._generate_date_ranges(data.index[0], data.index[-1])
results = []
for i, (train_start, train_end, test_start, test_end) in enumerate(date_ranges):
print(f"Walk-forward step {i+1}/{len(date_ranges)}")
print(f"Train: {train_start.date()} to {train_end.date()}")
print(f"Test: {test_start.date()} to {test_end.date()}")
# Split data
train_data = data[(data.index >= train_start) & (data.index <= train_end)]
test_data = data[(data.index >= test_start) & (data.index <= test_end)]
if train_data.empty or test_data.empty:
continue
# Optimize parameters on training data
best_params = self._optimize_parameters(train_data, parameter_ranges)
# Test on out-of-sample data
test_result = self._test_parameters(test_data, best_params)
results.append({
'step': i + 1,
'train_start': train_start,
'train_end': train_end,
'test_start': test_start,
'test_end': test_end,
'best_params': best_params,
'test_result': test_result
})
# Aggregate results
return self._aggregate_walk_forward_results(results)
def _generate_date_ranges(self, start_date: datetime,
end_date: datetime) -> List[Tuple[datetime, datetime, datetime, datetime]]:
"""Generate overlapping train/test date ranges"""
ranges = []
current_date = start_date
while current_date < end_date:
train_start = current_date
train_end = current_date + timedelta(days=30 * self.train_period_months)
test_start = train_end + timedelta(days=1)
test_end = test_start + timedelta(days=30 * self.test_period_months)
if test_end > end_date:
break
ranges.append((train_start, train_end, test_start, test_end))
current_date += timedelta(days=30 * self.step_months)
return ranges
def _optimize_parameters(self, train_data: pd.DataFrame,
parameter_ranges: Dict) -> Dict:
"""Optimize strategy parameters on training data"""
best_params = {}
best_score = -float('inf')
# Generate parameter combinations
param_combinations = self._generate_param_combinations(parameter_ranges)
for params in param_combinations:
try:
# Create strategy with current parameters
strategy = GridTradingBacktest(
grid_spacing=params.get('grid_spacing', 0.005),
num_grids=params.get('num_grids', 10),
base_order_size=params.get('base_order_size', 0.001)
)
# Run backtest
engine = BacktestEngine(initial_capital=10000)
result = engine.run_backtest(strategy, train_data)
# Score based on risk-adjusted return
score = result.total_return_pct / (result.max_drawdown_pct + 0.1)
if score > best_score:
best_score = score
best_params = params.copy()
except Exception as e:
print(f"Error testing parameters {params}: {e}")
continue
return best_params
def _test_parameters(self, test_data: pd.DataFrame, params: Dict) -> Dict:
"""Test optimized parameters on out-of-sample data"""
try:
strategy = GridTradingBacktest(
grid_spacing=params.get('grid_spacing', 0.005),
num_grids=params.get('num_grids', 10),
base_order_size=params.get('base_order_size', 0.001)
)
engine = BacktestEngine(initial_capital=10000)
result = engine.run_backtest(strategy, test_data)
return result.to_dict()
except Exception as e:
print(f"Error in out-of-sample test: {e}")
return {}
def _generate_param_combinations(self, parameter_ranges: Dict) -> List[Dict]:
"""Generate all parameter combinations"""
import itertools
keys = list(parameter_ranges.keys())
values = list(parameter_ranges.values())
combinations = []
for combo in itertools.product(*values):
param_dict = dict(zip(keys, combo))
combinations.append(param_dict)
return combinations
def _aggregate_walk_forward_results(self, results: List[Dict]) -> Dict:
"""Aggregate walk-forward analysis results"""
if not results:
return {}
# Extract test results
test_returns = [r['test_result'].get('total_return_pct', 0) for r in results if r['test_result']]
test_drawdowns = [r['test_result'].get('max_drawdown_pct', 0) for r in results if r['test_result']]
# Calculate aggregate metrics
avg_return = np.mean(test_returns) if test_returns else 0
std_return = np.std(test_returns) if test_returns else 0
max_drawdown = max(test_drawdowns) if test_drawdowns else 0
# Stability metrics
positive_periods = sum(1 for r in test_returns if r > 0)
consistency = positive_periods / len(test_returns) if test_returns else 0
return {
'total_periods': len(results),
'avg_return': avg_return,
'return_volatility': std_return,
'max_drawdown': max_drawdown,
'consistency': consistency,
'positive_periods': positive_periods,
'all_results': results
}
Create src/backtesting/visualizations.py
:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from typing import Dict, List
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class BacktestVisualizer:
"""Generate comprehensive backtest visualizations"""
def __init__(self, results):
self.results = results
plt.style.use('seaborn-v0_8')
def create_performance_dashboard(self, save_path: str = None):
"""Create comprehensive performance dashboard"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Cryptocurrency Trading Bot Performance Dashboard', fontsize=16, fontweight='bold')
# 1. Equity Curve
self._plot_equity_curve(axes[0, 0])
# 2. Drawdown Chart
self._plot_drawdown(axes[0, 1])
# 3. Monthly Returns Heatmap
self._plot_monthly_returns(axes[0, 2])
# 4. Return Distribution
self._plot_return_distribution(axes[1, 0])
# 5. Rolling Sharpe Ratio
self._plot_rolling_sharpe(axes[1, 1])
# 6. Trade Analysis
self._plot_trade_analysis(axes[1, 2])
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Performance dashboard saved to {save_path}")
plt.show()
def _plot_equity_curve(self, ax):
"""Plot equity curve"""
equity_curve = self.results.equity_curve
ax.plot(equity_curve.index, equity_curve.values, linewidth=2, color='#2E8B57')
ax.fill_between(equity_curve.index, equity_curve.values, alpha=0.3, color='#2E8B57')
# Add benchmark line (initial capital)
ax.axhline(y=self.results.initial_capital, color='red', linestyle='--', alpha=0.7, label='Initial Capital')
ax.set_title('Portfolio Equity Curve')
ax.set_ylabel('Portfolio Value ($)')
ax.grid(True, alpha=0.3)
ax.legend()
# Format y-axis
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
def _plot_drawdown(self, ax):
"""Plot drawdown chart"""
equity_values = self.results.equity_curve.values
cumulative_max = np.maximum.accumulate(equity_values)
drawdown = (cumulative_max - equity_values) / cumulative_max * 100
ax.fill_between(self.results.equity_curve.index, drawdown, alpha=0.7, color='red')
ax.plot(self.results.equity_curve.index, drawdown, color='darkred', linewidth=1)
ax.set_title('Drawdown Analysis')
ax.set_ylabel('Drawdown (%)')
ax.grid(True, alpha=0.3)
# Highlight maximum drawdown
max_dd_idx = np.argmax(drawdown)
max_dd_date = self.results.equity_curve.index[max_dd_idx]
ax.annotate(f'Max DD: {drawdown[max_dd_idx]:.1f}%',
xy=(max_dd_date, drawdown[max_dd_idx]),
xytext=(10, 10), textcoords='offset points',
bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7),
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0'))
def _plot_monthly_returns(self, ax):
"""Plot monthly returns heatmap"""
monthly_returns = self.results.equity_curve.resample('M').last().pct_change().dropna() * 100
if len(monthly_returns) > 0:
# Create monthly returns matrix
monthly_data = []
for date, ret in monthly_returns.items():
monthly_data.append([date.year, date.month, ret])
df = pd.DataFrame(monthly_data, columns=['Year', 'Month', 'Return'])
pivot_table = df.pivot(index='Year', columns='Month', values='Return')
# Plot heatmap
sns.heatmap(pivot_table, annot=True, fmt='.1f', cmap='RdYlGn', center=0,
ax=ax, cbar_kws={'label': 'Return (%)'})
ax.set_title('Monthly Returns Heatmap')
else:
ax.text(0.5, 0.5, 'Insufficient data for monthly analysis',
ha='center', va='center', transform=ax.transAxes)
ax.set_title('Monthly Returns (Insufficient Data)')
def _plot_return_distribution(self, ax):
"""Plot daily return distribution"""
daily_returns = self.results.daily_returns.dropna() * 100
if len(daily_returns) > 0:
ax.hist(daily_returns, bins=50, alpha=0.7, color='skyblue', edgecolor='black')
ax.axvline(daily_returns.mean(), color='red', linestyle='--',
label=f'Mean: {daily_returns.mean():.2f}%')
ax.set_title('Daily Returns Distribution')
ax.set_xlabel('Daily Return (%)')
ax.set_ylabel('Frequency')
ax.legend()
ax.grid(True, alpha=0.3)
else:
ax.text(0.5, 0.5, 'No return data available',
ha='center', va='center', transform=ax.transAxes)
def _plot_rolling_sharpe(self, ax):
"""Plot rolling Sharpe ratio"""
daily_returns = self.results.daily_returns.dropna()
if len(daily_returns) > 30:
rolling_sharpe = daily_returns.rolling(window=30).mean() / daily_returns.rolling(window=30).std() * np.sqrt(365)
ax.plot(rolling_sharpe.index, rolling_sharpe.values, linewidth=2, color='purple')
ax.axhline(y=1.0, color='green', linestyle='--', alpha=0.7, label='Sharpe = 1.0')
ax.axhline(y=0.5, color='orange', linestyle='--', alpha=0.7, label='Sharpe = 0.5')
ax.set_title('30-Day Rolling Sharpe Ratio')
ax.set_ylabel('Sharpe Ratio')
ax.legend()
ax.grid(True, alpha=0.3)
else:
ax.text(0.5, 0.5, 'Insufficient data for rolling Sharpe',
ha='center', va='center', transform=ax.transAxes)
def _plot_trade_analysis(self, ax):
"""Plot trade analysis"""
if self.results.total_trades > 0:
# Create trade statistics
labels = ['Winning Trades', 'Losing Trades']
sizes = [self.results.winning_trades, self.results.losing_trades]
colors = ['green', 'red']
wedges, texts, autotexts = ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%',
startangle=90)
ax.set_title(f'Trade Analysis\n(Total: {self.results.total_trades} trades)')
# Add win rate text
ax.text(0.5, -1.3, f'Win Rate: {self.results.win_rate:.1%}',
ha='center', va='center', transform=ax.transAxes, fontweight='bold')
else:
ax.text(0.5, 0.5, 'No trades executed',
ha='center', va='center', transform=ax.transAxes)
def create_interactive_dashboard(self) -> go.Figure:
"""Create interactive Plotly dashboard"""
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Equity Curve', 'Drawdown', 'Daily Returns', 'Trade Statistics'),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"type": "pie"}]]
)
# Equity curve
fig.add_trace(
go.Scatter(
x=self.results.equity_curve.index,
y=self.results.equity_curve.values,
mode='lines',
name='Portfolio Value',
line=dict(color='green', width=2)
),
row=1, col=1
)
# Drawdown
equity_values = self.results.equity_curve.values
cumulative_max = np.maximum.accumulate(equity_values)
drawdown = (cumulative_max - equity_values) / cumulative_max * 100
fig.add_trace(
go.Scatter(
x=self.results.equity_curve.index,
y=-drawdown,
mode='lines',
fill='tonexty',
name='Drawdown',
line=dict(color='red')
),
row=1, col=2
)
# Daily returns histogram
daily_returns = self.results.daily_returns.dropna() * 100
if len(daily_returns) > 0:
fig.add_trace(
go.Histogram(
x=daily_returns,
nbinsx=30,
name='Daily Returns',
marker_color='skyblue'
),
row=2, col=1
)
# Trade statistics pie chart
if self.results.total_trades > 0:
fig.add_trace(
go.Pie(
labels=['Winning Trades', 'Losing Trades'],
values=[self.results.winning_trades, self.results.losing_trades],
name="Trade Stats"
),
row=2, col=2
)
# Update layout
fig.update_layout(
title_text="Cryptocurrency Trading Bot Performance Dashboard",
showlegend=True,
height=800
)
return fig
Create run_backtest.py
:
#!/usr/bin/env python3
"""
Complete backtesting script for cryptocurrency trading strategies
"""
import sys
import logging
from datetime import datetime, timedelta
from src.data.historical_data import HistoricalDataManager
from src.exchanges.binance_client import BinanceClient
from src.backtesting.backtest_engine import BacktestEngine
from src.backtesting.grid_strategy_bt import GridTradingBacktest
from src.backtesting.performance_metrics import PerformanceAnalyzer
from src.backtesting.visualizations import BacktestVisualizer
from src.backtesting.walk_forward import WalkForwardAnalysis
def setup_logging():
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data/logs/backtest.log'),
logging.StreamHandler(sys.stdout)
]
)
def run_complete_backtest():
"""Run complete backtesting analysis"""
print("🚀 Starting Comprehensive Cryptocurrency Trading Bot Backtest")
print("=" * 70)
# Initialize components
client = BinanceClient()
data_manager = HistoricalDataManager(client)
# Download/load historical data
symbol = 'BTCUSDT'
timeframe = '1h'
start_date = '2023-01-01'
end_date = '2024-01-01'
print(f"📊 Loading historical data: {symbol} {timeframe}")
print(f" Period: {start_date} to {end_date}")
# Check if data exists, download if necessary
data_summary = data_manager.get_data_summary()
needs_download = True
for item in data_summary:
if (item['symbol'] == symbol and
item['timeframe'] == timeframe and
start_date in item['start_date']):
needs_download = False
break
if needs_download:
print(" Downloading historical data...")
success = data_manager.download_historical_data(
symbol=symbol,
timeframe=timeframe,
start_date=start_date,
end_date=end_date
)
if not success:
print("❌ Failed to download historical data")
return False
# Load data for backtesting
data = data_manager.get_historical_data(symbol, timeframe, start_date, end_date)
if data.empty:
print("❌ No historical data available")
return False
print(f"✅ Loaded {len(data)} data points")
print(f" Date range: {data.index[0].date()} to {data.index[-1].date()}")
# Configure backtesting parameters
test_configs = [
{'grid_spacing': 0.003, 'num_grids': 15, 'name': 'Tight Grid (0.3%)'},
{'grid_spacing': 0.005, 'num_grids': 10, 'name': 'Standard Grid (0.5%)'},
{'grid_spacing': 0.007, 'num_grids': 8, 'name': 'Wide Grid (0.7%)'},
]
backtest_results = []
print("\n🔬 Running Strategy Backtests")
print("-" * 40)
for i, config in enumerate(test_configs, 1):
print(f"\n📈 Test {i}/3: {config['name']}")
print(f" Grid spacing: {config['grid_spacing']:.1%}")
print(f" Number of grids: {config['num_grids']}")
try:
# Create strategy
strategy = GridTradingBacktest(
grid_spacing=config['grid_spacing'],
num_grids=config['num_grids'],
base_order_size=0.001
)
# Run backtest
engine = BacktestEngine(initial_capital=10000, commission=0.001)
result = engine.run_backtest(strategy, data)
# Store results
result.strategy_name = config['name']
result.strategy_params = config
backtest_results.append(result)
# Quick results summary
print(f" ✅ Total Return: {result.total_return_pct:.2f}%")
print(f" 📊 Max Drawdown: {result.max_drawdown_pct:.2f}%")
print(f" 🎯 Sharpe Ratio: {result.sharpe_ratio:.3f}")
print(f" 📈 Total Trades: {result.total_trades}")
except Exception as e:
print(f" ❌ Backtest failed: {e}")
continue
if not backtest_results:
print("❌ No successful backtests completed")
return False
# Find best strategy
best_strategy = max(backtest_results,
key=lambda x: x.total_return_pct / (x.max_drawdown_pct + 0.1))
print(f"\n🏆 BEST STRATEGY: {best_strategy.strategy_name}")
print(f" Return: {best_strategy.total_return_pct:.2f}%")
print(f" Max Drawdown: {best_strategy.max_drawdown_pct:.2f}%")
print(f" Risk-Adjusted Score: {best_strategy.total_return_pct / (best_strategy.max_drawdown_pct + 0.1):.2f}")
# Generate comprehensive analysis
print(f"\n📊 Generating Performance Analysis for Best Strategy...")
analyzer = PerformanceAnalyzer(best_strategy)
performance_report = analyzer.generate_performance_report()
print(performance_report)
# Save detailed report
with open('data/backtest_report.txt', 'w') as f:
f.write(performance_report)
print("📄 Detailed report saved to: data/backtest_report.txt")
# Generate visualizations
print("📈 Generating Performance Visualizations...")
visualizer = BacktestVisualizer(best_strategy)
visualizer.create_performance_dashboard('data/performance_dashboard.png')
# Run walk-forward analysis on best strategy
print("\n🔄 Running Walk-Forward Analysis...")
parameter_ranges = {
'grid_spacing': [0.003, 0.005, 0.007],
'num_grids': [8, 10, 12],
'base_order_size': [0.001]
}
walk_forward = WalkForwardAnalysis(
train_period_months=3,
test_period_months=1,
step_months=1
)
try:
wf_results = walk_forward.run_walk_forward(data, parameter_ranges)
print(f"🎯 Walk-Forward Results:")
print(f" Average Return: {wf_results['avg_return']:.2f}%")
print(f" Return Volatility: {wf_results['return_volatility']:.2f}%")
print(f" Consistency: {wf_results['consistency']:.1%}")
print(f" Positive Periods: {wf_results['positive_periods']}/{wf_results['total_periods']}")
except Exception as e:
print(f"⚠️ Walk-forward analysis failed: {e}")
# Final recommendations
print("\n💡 RECOMMENDATIONS")
print("=" * 50)
if best_strategy.total_return_pct > 15 and best_strategy.max_drawdown_pct < 20:
print("✅ DEPLOY: Strategy shows excellent risk-adjusted returns")
print(" • Start with small position size")
print(" • Monitor performance closely")
print(" • Consider live testing on testnet first")
elif best_strategy.total_return_pct > 0 and best_strategy.max_drawdown_pct < 30:
print("⚠️ CAUTION: Strategy shows positive returns but needs optimization")
print(" • Consider parameter tuning")
print(" • Test on different market conditions")
print(" • Implement strict risk management")
else:
print("❌ DO NOT DEPLOY: Strategy shows poor performance")
print(" • Revisit strategy logic")
print(" • Test different market conditions")
print(" • Consider alternative approaches")
print(f"\n🎉 Backtesting Analysis Complete!")
print(f"📁 Results saved to: data/")
return True
if __name__ == "__main__":
setup_logging()
success = run_complete_backtest()
if success:
print("✅ Backtesting completed successfully")
sys.exit(0)
else:
print("❌ Backtesting failed")
sys.exit(1)
Metric | Excellent | Good | Poor |
---|---|---|---|
Annual Return | >30% | 15-30% | <10% |
Max Drawdown | <15% | 15-25% | >25% |
Sharpe Ratio | >1.5 | 0.8-1.5 | <0.8 |
Win Rate | >60% | 45-60% | <45% |
Profit Factor | >2.0 | 1.3-2.0 | <1.3 |
Your backtesting framework is now complete and ready to validate trading strategies! In the next article, we'll cover:
Always backtest thoroughly before risking real capital! A robust backtesting process is your first line of defense against unprofitable strategies and helps ensure your bot performs as expected in live markets.
The next article covers implementing paper trading to bridge the gap between backtesting and live trading, allowing you to validate your strategy with real market conditions without financial risk.
Nikolai Fischer is the founder of Kommune3 (since 2007) and a leading expert in Drupal development and tech entrepreneurship. With 17+ years of experience, he has led hundreds of projects and achieved #1 on Hacker News. As host of the "Kommit mich" podcast and founder of skillution, he combines technical expertise with entrepreneurial thinking. His articles about Supabase, modern web development, and systematic problem-solving have influenced thousands of developers worldwide.
Comments