πΈ
π Beta Running
PYNGUP: Rebellion against toxic productivity
Beta limited to 100 spots. Tasks become social commitments instead of lonely to-dos.
Implement a professional paper trading system that validates your cryptocurrency trading strategies with real market data without financial risk. This comprehensive guide covers real-time data integration, order simulation, performance monitoring, and the transition to live trading.
Paper trading, also known as simulated trading, is the critical bridge between backtesting and live trading. It allows you to test your strategy with real market conditions without risking actual capital.
Aspect | Backtesting | Paper Trading | Live Trading |
---|---|---|---|
Data Source | Historical data | Real-time market data | Real-time market data |
Execution | Perfect/simulated | Simulated realistic | Real exchange orders |
Market Impact | None | Simulated | Real impact |
Risk | None | None | Real financial risk |
Speed | Very fast (years in minutes) | Real-time | Real-time |
Real-time data is essential for paper trading. WebSocket connections provide the fastest, most efficient way to receive live market data.
Create src/data/realtime_data.py
:
import asyncio
import websocket
import json
import threading
import time
import logging
from typing import Dict, Callable, Optional, List
from datetime import datetime
from dataclasses import dataclass
from collections import deque
@dataclass
class MarketTick:
"""Real-time market data tick"""
symbol: str
price: float
volume: float
timestamp: datetime
bid: Optional[float] = None
ask: Optional[float] = None
@dataclass
class OrderBookLevel:
"""Order book price level"""
price: float
quantity: float
@dataclass
class OrderBookSnapshot:
"""Order book snapshot"""
symbol: str
bids: List[OrderBookLevel]
asks: List[OrderBookLevel]
timestamp: datetime
class BinanceWebSocketClient:
"""
Real-time Binance WebSocket client for market data
Features:
- Automatic reconnection
- Multiple symbol support
- Order book and ticker streams
- Data buffering and callbacks
"""
def __init__(self, symbols: List[str], max_buffer_size: int = 1000):
self.symbols = [s.lower() for s in symbols]
self.max_buffer_size = max_buffer_size
self.logger = logging.getLogger(__name__)
# WebSocket connection
self.ws = None
self.ws_thread = None
self.running = False
# Data storage
self.tickers: Dict[str, MarketTick] = {}
self.order_books: Dict[str, OrderBookSnapshot] = {}
self.price_history: Dict[str, deque] = {}
# Callbacks
self.tick_callbacks: List[Callable[[MarketTick], None]] = []
self.orderbook_callbacks: List[Callable[[OrderBookSnapshot], None]] = []
# Initialize price history buffers
for symbol in self.symbols:
self.price_history[symbol] = deque(maxlen=max_buffer_size)
def add_tick_callback(self, callback: Callable[[MarketTick], None]):
"""Add callback for ticker updates"""
self.tick_callbacks.append(callback)
def add_orderbook_callback(self, callback: Callable[[OrderBookSnapshot], None]):
"""Add callback for order book updates"""
self.orderbook_callbacks.append(callback)
def start(self):
"""Start WebSocket connection"""
if self.running:
self.logger.warning("WebSocket client already running")
return
self.running = True
self.ws_thread = threading.Thread(target=self._run_websocket, daemon=True)
self.ws_thread.start()
self.logger.info(f"Started WebSocket client for {len(self.symbols)} symbols")
def stop(self):
"""Stop WebSocket connection"""
if not self.running:
return
self.running = False
if self.ws:
self.ws.close()
if self.ws_thread and self.ws_thread.is_alive():
self.ws_thread.join(timeout=5)
self.logger.info("Stopped WebSocket client")
def _run_websocket(self):
"""Main WebSocket connection loop with reconnection"""
while self.running:
try:
self._connect_websocket()
except Exception as e:
self.logger.error(f"WebSocket error: {e}")
if self.running:
self.logger.info("Reconnecting in 5 seconds...")
time.sleep(5)
def _connect_websocket(self):
"""Connect to Binance WebSocket"""
# Create stream URLs
streams = []
# Add ticker streams
for symbol in self.symbols:
streams.append(f"{symbol}@ticker")
streams.append(f"{symbol}@depth10@100ms") # Order book depth
stream_url = f"wss://stream.binance.com:9443/stream?streams={'/'.join(streams)}"
self.logger.info(f"Connecting to WebSocket: {len(streams)} streams")
self.ws = websocket.WebSocketApp(
stream_url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
self.ws.run_forever()
def _on_open(self, ws):
"""WebSocket connection opened"""
self.logger.info("β
WebSocket connection established")
def _on_message(self, ws, message):
"""Process incoming WebSocket message"""
try:
data = json.loads(message)
if 'stream' in data and 'data' in data:
stream = data['stream']
payload = data['data']
if '@ticker' in stream:
self._process_ticker(payload)
elif '@depth' in stream:
self._process_order_book(payload)
except Exception as e:
self.logger.error(f"Error processing message: {e}")
def _on_error(self, ws, error):
"""WebSocket error handler"""
self.logger.error(f"WebSocket error: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""WebSocket connection closed"""
if self.running:
self.logger.warning("WebSocket connection closed, will reconnect")
else:
self.logger.info("WebSocket connection closed")
def _process_ticker(self, data):
"""Process ticker update"""
try:
symbol = data['s'].upper()
tick = MarketTick(
symbol=symbol,
price=float(data['c']), # Close price
volume=float(data['v']), # Volume
timestamp=datetime.fromtimestamp(data['E'] / 1000), # Event time
bid=float(data['b']), # Best bid
ask=float(data['a']) # Best ask
)
# Store ticker
self.tickers[symbol] = tick
# Add to price history
if symbol in self.price_history:
self.price_history[symbol].append((tick.timestamp, tick.price))
# Notify callbacks
for callback in self.tick_callbacks:
try:
callback(tick)
except Exception as e:
self.logger.error(f"Tick callback error: {e}")
except Exception as e:
self.logger.error(f"Error processing ticker: {e}")
def _process_order_book(self, data):
"""Process order book update"""
try:
symbol = data['s'].upper()
# Parse bids and asks
bids = [OrderBookLevel(float(bid[0]), float(bid[1]))
for bid in data['bids']]
asks = [OrderBookLevel(float(ask[0]), float(ask[1]))
for ask in data['asks']]
order_book = OrderBookSnapshot(
symbol=symbol,
bids=bids,
asks=asks,
timestamp=datetime.now()
)
# Store order book
self.order_books[symbol] = order_book
# Notify callbacks
for callback in self.orderbook_callbacks:
try:
callback(order_book)
except Exception as e:
self.logger.error(f"Order book callback error: {e}")
except Exception as e:
self.logger.error(f"Error processing order book: {e}")
def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for symbol"""
ticker = self.tickers.get(symbol.upper())
return ticker.price if ticker else None
def get_current_spread(self, symbol: str) -> Optional[float]:
"""Get current bid-ask spread"""
ticker = self.tickers.get(symbol.upper())
if ticker and ticker.bid and ticker.ask:
return ticker.ask - ticker.bid
return None
def get_price_history(self, symbol: str, count: int = 100) -> List[tuple]:
"""Get recent price history"""
history = self.price_history.get(symbol.upper(), deque())
return list(history)[-count:]
def get_order_book(self, symbol: str) -> Optional[OrderBookSnapshot]:
"""Get current order book snapshot"""
return self.order_books.get(symbol.upper())
Create src/paper_trading/paper_trading_engine.py
:
import time
import logging
import uuid
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import asyncio
import threading
class OrderStatus(Enum):
PENDING = "pending"
FILLED = "filled"
PARTIALLY_FILLED = "partially_filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
class OrderSide(Enum):
BUY = "buy"
SELL = "sell"
class OrderType(Enum):
MARKET = "market"
LIMIT = "limit"
STOP_LOSS = "stop_loss"
TAKE_PROFIT = "take_profit"
@dataclass
class PaperOrder:
"""Paper trading order"""
order_id: str
symbol: str
side: OrderSide
order_type: OrderType
quantity: float
price: Optional[float] = None
stop_price: Optional[float] = None
status: OrderStatus = OrderStatus.PENDING
filled_quantity: float = 0.0
avg_fill_price: float = 0.0
created_at: datetime = field(default_factory=datetime.now)
filled_at: Optional[datetime] = None
fees: float = 0.0
@property
def remaining_quantity(self) -> float:
return self.quantity - self.filled_quantity
@property
def is_filled(self) -> bool:
return self.status == OrderStatus.FILLED
@property
def is_active(self) -> bool:
return self.status in [OrderStatus.PENDING, OrderStatus.PARTIALLY_FILLED]
@dataclass
class PaperPosition:
"""Paper trading position"""
symbol: str
quantity: float = 0.0
avg_price: float = 0.0
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
@property
def market_value(self) -> float:
return abs(self.quantity) * self.avg_price
def update_position(self, trade_quantity: float, trade_price: float, fees: float = 0.0):
"""Update position with new trade"""
if self.quantity == 0:
# Opening new position
self.quantity = trade_quantity
self.avg_price = trade_price
else:
if (self.quantity > 0 and trade_quantity > 0) or (self.quantity < 0 and trade_quantity < 0):
# Adding to existing position
total_cost = (self.quantity * self.avg_price) + (trade_quantity * trade_price)
self.quantity += trade_quantity
self.avg_price = total_cost / self.quantity if self.quantity != 0 else 0
else:
# Reducing or closing position
close_quantity = min(abs(self.quantity), abs(trade_quantity))
# Calculate realized P&L
if self.quantity > 0: # Long position
self.realized_pnl += close_quantity * (trade_price - self.avg_price) - fees
else: # Short position
self.realized_pnl += close_quantity * (self.avg_price - trade_price) - fees
# Update position
if abs(trade_quantity) >= abs(self.quantity):
# Position closed or reversed
remaining_quantity = abs(trade_quantity) - abs(self.quantity)
self.quantity = remaining_quantity * (1 if trade_quantity > 0 else -1)
self.avg_price = trade_price if remaining_quantity > 0 else 0
else:
# Position reduced
self.quantity += trade_quantity
class PaperTradingEngine:
"""
Comprehensive paper trading engine with realistic order simulation
Features:
- Realistic order fills based on market data
- Slippage and fee simulation
- Portfolio tracking
- Performance metrics
- Risk management
"""
def __init__(self, initial_balance: float = 10000,
commission_rate: float = 0.001,
slippage_rate: float = 0.0005):
self.initial_balance = initial_balance
self.commission_rate = commission_rate
self.slippage_rate = slippage_rate
self.logger = logging.getLogger(__name__)
# Portfolio state
self.cash_balance = initial_balance
self.positions: Dict[str, PaperPosition] = {}
self.orders: Dict[str, PaperOrder] = {}
self.trade_history: List[Dict] = []
# Market data
self.current_prices: Dict[str, float] = {}
self.order_books: Dict[str, object] = {}
# Performance tracking
self.equity_history: List[tuple] = []
self.start_time = datetime.now()
# Order callbacks
self.order_callbacks: List[Callable[[PaperOrder], None]] = []
# Execution engine
self.running = False
self.execution_thread = None
def add_order_callback(self, callback: Callable[[PaperOrder], None]):
"""Add callback for order updates"""
self.order_callbacks.append(callback)
def start(self):
"""Start the paper trading engine"""
if self.running:
return
self.running = True
self.execution_thread = threading.Thread(target=self._execution_loop, daemon=True)
self.execution_thread.start()
self.logger.info("π Paper trading engine started")
def stop(self):
"""Stop the paper trading engine"""
if not self.running:
return
self.running = False
if self.execution_thread and self.execution_thread.is_alive():
self.execution_thread.join(timeout=5)
self.logger.info("π Paper trading engine stopped")
def update_market_data(self, symbol: str, price: float, order_book: Optional[object] = None):
"""Update current market data"""
self.current_prices[symbol] = price
if order_book:
self.order_books[symbol] = order_book
# Update unrealized P&L for positions
if symbol in self.positions:
position = self.positions[symbol]
if position.quantity != 0:
if position.quantity > 0: # Long position
position.unrealized_pnl = position.quantity * (price - position.avg_price)
else: # Short position
position.unrealized_pnl = position.quantity * (position.avg_price - price)
def place_order(self, symbol: str, side: str, order_type: str,
quantity: float, price: Optional[float] = None,
stop_price: Optional[float] = None) -> str:
"""
Place a paper trading order
Returns order_id or None if order rejected
"""
try:
# Validate order
if not self._validate_order(symbol, side, order_type, quantity, price):
return None
# Create order
order = PaperOrder(
order_id=str(uuid.uuid4()),
symbol=symbol,
side=OrderSide(side.lower()),
order_type=OrderType(order_type.lower()),
quantity=quantity,
price=price,
stop_price=stop_price
)
# Check for immediate execution (market orders)
if order.order_type == OrderType.MARKET:
self._execute_market_order(order)
else:
# Add to pending orders
self.orders[order.order_id] = order
self.logger.info(f"π Order placed: {order.side.value} {order.quantity} {order.symbol} @ {order.price}")
return order.order_id
except Exception as e:
self.logger.error(f"Error placing order: {e}")
return None
def cancel_order(self, order_id: str) -> bool:
"""Cancel a pending order"""
if order_id not in self.orders:
self.logger.warning(f"Order {order_id} not found")
return False
order = self.orders[order_id]
if not order.is_active:
self.logger.warning(f"Cannot cancel order {order_id} - status: {order.status}")
return False
order.status = OrderStatus.CANCELLED
self._notify_order_update(order)
self.logger.info(f"β Order cancelled: {order_id}")
return True
def get_portfolio_value(self) -> float:
"""Calculate total portfolio value"""
total_value = self.cash_balance
for symbol, position in self.positions.items():
if position.quantity != 0 and symbol in self.current_prices:
current_price = self.current_prices[symbol]
position_value = position.quantity * current_price
total_value += position_value
return total_value
def get_portfolio_summary(self) -> Dict:
"""Get comprehensive portfolio summary"""
total_value = self.get_portfolio_value()
total_return = total_value - self.initial_balance
total_return_pct = (total_return / self.initial_balance) * 100
# Calculate realized and unrealized P&L
total_realized_pnl = sum(pos.realized_pnl for pos in self.positions.values())
total_unrealized_pnl = sum(pos.unrealized_pnl for pos in self.positions.values())
return {
'total_value': total_value,
'cash_balance': self.cash_balance,
'total_return': total_return,
'total_return_pct': total_return_pct,
'realized_pnl': total_realized_pnl,
'unrealized_pnl': total_unrealized_pnl,
'num_positions': len([p for p in self.positions.values() if p.quantity != 0]),
'num_active_orders': len([o for o in self.orders.values() if o.is_active]),
'total_trades': len(self.trade_history)
}
def get_position(self, symbol: str) -> Optional[PaperPosition]:
"""Get position for symbol"""
return self.positions.get(symbol)
def get_open_orders(self, symbol: str = None) -> List[PaperOrder]:
"""Get open orders, optionally filtered by symbol"""
open_orders = [order for order in self.orders.values() if order.is_active]
if symbol:
open_orders = [order for order in open_orders if order.symbol == symbol]
return open_orders
def _execution_loop(self):
"""Main execution loop for processing orders"""
while self.running:
try:
self._process_pending_orders()
self._update_equity_history()
time.sleep(0.1) # 100ms execution cycle
except Exception as e:
self.logger.error(f"Error in execution loop: {e}")
time.sleep(1)
def _process_pending_orders(self):
"""Process all pending orders"""
for order in list(self.orders.values()):
if not order.is_active:
continue
if order.symbol not in self.current_prices:
continue
current_price = self.current_prices[order.symbol]
# Check if order should be filled
if self._should_fill_order(order, current_price):
self._execute_limit_order(order, current_price)
def _should_fill_order(self, order: PaperOrder, current_price: float) -> bool:
"""Determine if order should be filled at current price"""
if order.order_type == OrderType.LIMIT:
if order.side == OrderSide.BUY:
return current_price <= order.price
else: # SELL
return current_price >= order.price
elif order.order_type == OrderType.STOP_LOSS:
if order.side == OrderSide.BUY:
return current_price >= order.stop_price
else: # SELL
return current_price <= order.stop_price
return False
def _execute_market_order(self, order: PaperOrder):
"""Execute market order immediately"""
if order.symbol not in self.current_prices:
order.status = OrderStatus.REJECTED
self._notify_order_update(order)
return
current_price = self.current_prices[order.symbol]
# Apply slippage for market orders
if order.side == OrderSide.BUY:
execution_price = current_price * (1 + self.slippage_rate)
else:
execution_price = current_price * (1 - self.slippage_rate)
self._fill_order(order, execution_price, order.quantity)
def _execute_limit_order(self, order: PaperOrder, current_price: float):
"""Execute limit order at specified price"""
# For limit orders, execute at the limit price (assuming liquidity)
execution_price = order.price
self._fill_order(order, execution_price, order.quantity)
def _fill_order(self, order: PaperOrder, fill_price: float, fill_quantity: float):
"""Fill order with specified price and quantity"""
# Calculate fees
trade_value = fill_quantity * fill_price
fees = trade_value * self.commission_rate
# Check if sufficient funds/position for trade
if not self._check_sufficient_funds(order, fill_price, fill_quantity, fees):
order.status = OrderStatus.REJECTED
self._notify_order_update(order)
return
# Update order
order.filled_quantity += fill_quantity
order.avg_fill_price = ((order.avg_fill_price * (order.filled_quantity - fill_quantity)) +
(fill_price * fill_quantity)) / order.filled_quantity
order.fees += fees
order.filled_at = datetime.now()
if order.filled_quantity >= order.quantity:
order.status = OrderStatus.FILLED
else:
order.status = OrderStatus.PARTIALLY_FILLED
# Update portfolio
self._update_portfolio(order, fill_price, fill_quantity, fees)
# Record trade
self._record_trade(order, fill_price, fill_quantity, fees)
# Notify callbacks
self._notify_order_update(order)
self.logger.info(f"β
Order filled: {order.side.value} {fill_quantity} {order.symbol} @ {fill_price:.2f}")
def _check_sufficient_funds(self, order: PaperOrder, price: float,
quantity: float, fees: float) -> bool:
"""Check if sufficient funds/position for trade"""
if order.side == OrderSide.BUY:
# Check cash balance
required_cash = (price * quantity) + fees
return self.cash_balance >= required_cash
else: # SELL
# Check position
position = self.positions.get(order.symbol, PaperPosition(order.symbol))
return position.quantity >= quantity
def _update_portfolio(self, order: PaperOrder, fill_price: float,
fill_quantity: float, fees: float):
"""Update portfolio with trade"""
# Update position
if order.symbol not in self.positions:
self.positions[order.symbol] = PaperPosition(order.symbol)
position = self.positions[order.symbol]
if order.side == OrderSide.BUY:
# Buying - decrease cash, increase position
self.cash_balance -= (fill_price * fill_quantity) + fees
position.update_position(fill_quantity, fill_price, fees)
else:
# Selling - increase cash, decrease position
self.cash_balance += (fill_price * fill_quantity) - fees
position.update_position(-fill_quantity, fill_price, fees)
def _record_trade(self, order: PaperOrder, fill_price: float,
fill_quantity: float, fees: float):
"""Record trade in history"""
trade = {
'timestamp': datetime.now(),
'order_id': order.order_id,
'symbol': order.symbol,
'side': order.side.value,
'quantity': fill_quantity,
'price': fill_price,
'fees': fees,
'trade_value': fill_quantity * fill_price
}
self.trade_history.append(trade)
def _update_equity_history(self):
"""Update equity curve history"""
current_time = datetime.now()
current_equity = self.get_portfolio_value()
self.equity_history.append((current_time, current_equity))
# Keep only last 10000 points to manage memory
if len(self.equity_history) > 10000:
self.equity_history = self.equity_history[-5000:]
def _validate_order(self, symbol: str, side: str, order_type: str,
quantity: float, price: Optional[float]) -> bool:
"""Validate order parameters"""
if quantity <= 0:
self.logger.error("Order quantity must be positive")
return False
if order_type.lower() in ['limit', 'stop_loss'] and price is None:
self.logger.error("Price required for limit/stop orders")
return False
if order_type.lower() == 'limit' and price <= 0:
self.logger.error("Limit price must be positive")
return False
return True
def _notify_order_update(self, order: PaperOrder):
"""Notify callbacks of order update"""
for callback in self.order_callbacks:
try:
callback(order)
except Exception as e:
self.logger.error(f"Order callback error: {e}")
Create src/paper_trading/paper_grid_strategy.py
:
import time
import logging
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from src.paper_trading.paper_trading_engine import PaperTradingEngine, PaperOrder
from src.data.realtime_data import BinanceWebSocketClient, MarketTick
@dataclass
class GridLevel:
"""Grid trading level for paper trading"""
price: float
quantity: float
side: str # 'buy' or 'sell'
order_id: Optional[str] = None
is_filled: bool = False
class PaperGridStrategy:
"""
Grid trading strategy for paper trading
Features:
- Real-time price monitoring
- Dynamic grid adjustments
- Performance tracking
- Risk management
"""
def __init__(self, trading_engine: PaperTradingEngine,
symbol: str = 'BTCUSDT',
grid_spacing: float = 0.005,
num_grids: int = 10,
base_order_size: float = 0.001):
self.trading_engine = trading_engine
self.symbol = symbol
self.grid_spacing = grid_spacing
self.num_grids = num_grids
self.base_order_size = base_order_size
self.logger = logging.getLogger(__name__)
# Strategy state
self.center_price = 0.0
self.grid_levels: List[GridLevel] = []
self.active_orders: Dict[str, GridLevel] = {}
# Performance tracking
self.total_profit = 0.0
self.completed_cycles = 0
self.start_time = datetime.now()
# Register callbacks
self.trading_engine.add_order_callback(self._on_order_update)
# Strategy status
self.is_running = False
def start_strategy(self, initial_price: float):
"""Start the grid trading strategy"""
if self.is_running:
self.logger.warning("Strategy already running")
return
self.center_price = initial_price
self.is_running = True
# Create initial grid
self._create_initial_grid()
# Place initial orders
self._place_grid_orders()
self.logger.info(f"π― Grid strategy started at ${initial_price:,.2f}")
self.logger.info(f" Grid spacing: {self.grid_spacing:.1%}")
self.logger.info(f" Number of grids: {self.num_grids}")
self.logger.info(f" Base order size: {self.base_order_size}")
def stop_strategy(self):
"""Stop the grid trading strategy"""
if not self.is_running:
return
self.is_running = False
# Cancel all active orders
for order_id in list(self.active_orders.keys()):
self.trading_engine.cancel_order(order_id)
self.logger.info("π Grid strategy stopped")
self._log_performance_summary()
def update_market_price(self, current_price: float):
"""Update strategy with current market price"""
if not self.is_running:
return
# Check if we need to adjust grid (significant price movement)
price_change = abs(current_price - self.center_price) / self.center_price
if price_change > 0.1: # 10% price movement
self.logger.info(f"π Significant price movement detected: {price_change:.1%}")
self._adjust_grid_center(current_price)
def get_strategy_status(self) -> Dict:
"""Get current strategy status"""
portfolio = self.trading_engine.get_portfolio_summary()
return {
'is_running': self.is_running,
'center_price': self.center_price,
'active_grids': len(self.active_orders),
'completed_cycles': self.completed_cycles,
'total_profit': self.total_profit,
'portfolio_value': portfolio['total_value'],
'unrealized_pnl': portfolio['unrealized_pnl'],
'realized_pnl': portfolio['realized_pnl'],
'runtime_minutes': (datetime.now() - self.start_time).total_seconds() / 60
}
def _create_initial_grid(self):
"""Create initial grid levels around center price"""
self.grid_levels.clear()
# Create buy levels (below center price)
for i in range(1, self.num_grids + 1):
price = self.center_price * (1 - self.grid_spacing * i)
grid_level = GridLevel(
price=price,
quantity=self.base_order_size,
side='buy'
)
self.grid_levels.append(grid_level)
# Create sell levels (above center price)
for i in range(1, self.num_grids + 1):
price = self.center_price * (1 + self.grid_spacing * i)
grid_level = GridLevel(
price=price,
quantity=self.base_order_size,
side='sell'
)
self.grid_levels.append(grid_level)
self.logger.info(f"π Created {len(self.grid_levels)} grid levels")
def _place_grid_orders(self):
"""Place all grid orders"""
orders_placed = 0
for grid_level in self.grid_levels:
if grid_level.is_filled:
continue
order_id = self.trading_engine.place_order(
symbol=self.symbol,
side=grid_level.side,
order_type='limit',
quantity=grid_level.quantity,
price=grid_level.price
)
if order_id:
grid_level.order_id = order_id
self.active_orders[order_id] = grid_level
orders_placed += 1
# Rate limiting
time.sleep(0.1)
self.logger.info(f"π Placed {orders_placed} grid orders")
def _on_order_update(self, order: PaperOrder):
"""Handle order updates from trading engine"""
if order.order_id not in self.active_orders:
return
grid_level = self.active_orders[order.order_id]
if order.is_filled:
self.logger.info(f"β
Grid order filled: {order.side.value} {order.quantity} @ ${order.avg_fill_price:.2f}")
# Mark grid level as filled
grid_level.is_filled = True
del self.active_orders[order.order_id]
# Place opposite order
self._place_opposite_order(grid_level, order.avg_fill_price)
# Update profit tracking
if order.side.value == 'sell':
# Completed a buy->sell cycle
profit = (order.avg_fill_price - self._get_corresponding_buy_price(order.avg_fill_price)) * order.quantity
profit -= order.fees
self.total_profit += profit
self.completed_cycles += 1
self.logger.info(f"π° Completed cycle profit: ${profit:.2f} (Total: ${self.total_profit:.2f})")
def _place_opposite_order(self, filled_grid: GridLevel, fill_price: float):
"""Place opposite order after grid level is filled"""
if filled_grid.side == 'buy':
# Buy order filled, place sell order one level up
new_price = fill_price * (1 + self.grid_spacing)
new_side = 'sell'
else:
# Sell order filled, place buy order one level down
new_price = fill_price * (1 - self.grid_spacing)
new_side = 'buy'
order_id = self.trading_engine.place_order(
symbol=self.symbol,
side=new_side,
order_type='limit',
quantity=filled_grid.quantity,
price=new_price
)
if order_id:
# Create new grid level
new_grid_level = GridLevel(
price=new_price,
quantity=filled_grid.quantity,
side=new_side,
order_id=order_id
)
self.active_orders[order_id] = new_grid_level
self.logger.info(f"π Placed opposite order: {new_side} {filled_grid.quantity} @ ${new_price:.2f}")
def _get_corresponding_buy_price(self, sell_price: float) -> float:
"""Get corresponding buy price for profit calculation"""
# Simplified - assumes buy price was one grid level below
return sell_price * (1 - self.grid_spacing)
def _adjust_grid_center(self, new_center_price: float):
"""Adjust grid center due to significant price movement"""
self.logger.info(f"π Adjusting grid center: ${self.center_price:.2f} β ${new_center_price:.2f}")
# Cancel existing orders that are too far from new center
orders_to_cancel = []
for order_id, grid_level in self.active_orders.items():
price_deviation = abs(grid_level.price - new_center_price) / new_center_price
if price_deviation > 0.15: # 15% deviation threshold
orders_to_cancel.append(order_id)
# Cancel far orders
for order_id in orders_to_cancel:
self.trading_engine.cancel_order(order_id)
del self.active_orders[order_id]
# Update center price
self.center_price = new_center_price
# Create new grid levels around new center
self._create_additional_grid_levels()
def _create_additional_grid_levels(self):
"""Create additional grid levels around new center"""
# Find price ranges that need new orders
existing_prices = [gl.price for gl in self.active_orders.values()]
if not existing_prices:
# No active orders, recreate full grid
self._create_initial_grid()
self._place_grid_orders()
return
min_existing = min(existing_prices)
max_existing = max(existing_prices)
# Add buy orders below minimum existing
for i in range(1, self.num_grids + 1):
price = self.center_price * (1 - self.grid_spacing * i)
if price < min_existing:
order_id = self.trading_engine.place_order(
symbol=self.symbol,
side='buy',
order_type='limit',
quantity=self.base_order_size,
price=price
)
if order_id:
grid_level = GridLevel(
price=price,
quantity=self.base_order_size,
side='buy',
order_id=order_id
)
self.active_orders[order_id] = grid_level
# Add sell orders above maximum existing
for i in range(1, self.num_grids + 1):
price = self.center_price * (1 + self.grid_spacing * i)
if price > max_existing:
order_id = self.trading_engine.place_order(
symbol=self.symbol,
side='sell',
order_type='limit',
quantity=self.base_order_size,
price=price
)
if order_id:
grid_level = GridLevel(
price=price,
quantity=self.base_order_size,
side='sell',
order_id=order_id
)
self.active_orders[order_id] = grid_level
def _log_performance_summary(self):
"""Log strategy performance summary"""
runtime = datetime.now() - self.start_time
runtime_hours = runtime.total_seconds() / 3600
portfolio = self.trading_engine.get_portfolio_summary()
self.logger.info("π === GRID STRATEGY PERFORMANCE SUMMARY ===")
self.logger.info(f"Runtime: {runtime_hours:.1f} hours")
self.logger.info(f"Completed cycles: {self.completed_cycles}")
self.logger.info(f"Total profit: ${self.total_profit:.2f}")
self.logger.info(f"Portfolio return: {portfolio['total_return_pct']:.2f}%")
self.logger.info(f"Realized P&L: ${portfolio['realized_pnl']:.2f}")
self.logger.info(f"Unrealized P&L: ${portfolio['unrealized_pnl']:.2f}")
if runtime_hours > 0:
profit_per_hour = self.total_profit / runtime_hours
cycles_per_hour = self.completed_cycles / runtime_hours
self.logger.info(f"Profit per hour: ${profit_per_hour:.2f}")
self.logger.info(f"Cycles per hour: {cycles_per_hour:.1f}")
self.logger.info("=============================================")
Create src/paper_trading/monitoring.py
:
import time
import logging
import smtplib
import json
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import threading
@dataclass
class Alert:
"""Performance alert"""
alert_type: str
message: str
severity: str # 'info', 'warning', 'critical'
timestamp: datetime
value: Optional[float] = None
threshold: Optional[float] = None
class PerformanceMonitor:
"""
Real-time performance monitoring with alerts
Features:
- Drawdown monitoring
- Profit/loss alerts
- Risk limit checking
- Email notifications
- Performance dashboard
"""
def __init__(self, trading_engine, initial_balance: float):
self.trading_engine = trading_engine
self.initial_balance = initial_balance
self.logger = logging.getLogger(__name__)
# Monitoring thresholds
self.max_drawdown_pct = 10.0 # 10% max drawdown
self.daily_loss_limit_pct = 5.0 # 5% daily loss limit
self.profit_target_pct = 2.0 # 2% daily profit target
# Performance tracking
self.daily_start_balance = initial_balance
self.peak_balance = initial_balance
self.alerts: List[Alert] = []
# Monitoring state
self.monitoring = False
self.monitor_thread = None
# Notification settings
self.email_alerts_enabled = False
self.email_config = {}
# Callbacks
self.alert_callbacks: List[Callable[[Alert], None]] = []
def add_alert_callback(self, callback: Callable[[Alert], None]):
"""Add callback for alert notifications"""
self.alert_callbacks.append(callback)
def configure_email_alerts(self, smtp_server: str, smtp_port: int,
email: str, password: str, recipients: List[str]):
"""Configure email alert notifications"""
self.email_config = {
'smtp_server': smtp_server,
'smtp_port': smtp_port,
'email': email,
'password': password,
'recipients': recipients
}
self.email_alerts_enabled = True
self.logger.info("π§ Email alerts configured")
def start_monitoring(self):
"""Start real-time monitoring"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
self.monitor_thread.start()
self.logger.info("ποΈ Performance monitoring started")
def stop_monitoring(self):
"""Stop monitoring"""
if not self.monitoring:
return
self.monitoring = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5)
self.logger.info("ποΈ Performance monitoring stopped")
def _monitoring_loop(self):
"""Main monitoring loop"""
last_check = datetime.now()
while self.monitoring:
try:
current_time = datetime.now()
# Check performance every 30 seconds
if (current_time - last_check).total_seconds() >= 30:
self._check_performance()
last_check = current_time
# Reset daily tracking at midnight
if current_time.hour == 0 and current_time.minute == 0:
self._reset_daily_tracking()
time.sleep(5)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {e}")
time.sleep(30)
def _check_performance(self):
"""Check current performance against thresholds"""
portfolio = self.trading_engine.get_portfolio_summary()
current_balance = portfolio['total_value']
# Update peak balance
if current_balance > self.peak_balance:
self.peak_balance = current_balance
# Check drawdown
self._check_drawdown(current_balance)
# Check daily performance
self._check_daily_performance(current_balance)
# Check profit targets
self._check_profit_targets(current_balance)
# Log periodic status
self._log_periodic_status(portfolio)
def _check_drawdown(self, current_balance: float):
"""Check for excessive drawdown"""
current_drawdown_pct = ((self.peak_balance - current_balance) / self.peak_balance) * 100
if current_drawdown_pct > self.max_drawdown_pct:
alert = Alert(
alert_type='max_drawdown_exceeded',
message=f'Maximum drawdown exceeded: {current_drawdown_pct:.2f}% (limit: {self.max_drawdown_pct:.2f}%)',
severity='critical',
timestamp=datetime.now(),
value=current_drawdown_pct,
threshold=self.max_drawdown_pct
)
self._send_alert(alert)
def _check_daily_performance(self, current_balance: float):
"""Check daily performance limits"""
daily_return = current_balance - self.daily_start_balance
daily_return_pct = (daily_return / self.daily_start_balance) * 100
if daily_return_pct < -self.daily_loss_limit_pct:
alert = Alert(
alert_type='daily_loss_limit',
message=f'Daily loss limit exceeded: {daily_return_pct:.2f}% (limit: -{self.daily_loss_limit_pct:.2f}%)',
severity='critical',
timestamp=datetime.now(),
value=daily_return_pct,
threshold=-self.daily_loss_limit_pct
)
self._send_alert(alert)
def _check_profit_targets(self, current_balance: float):
"""Check if profit targets are met"""
daily_return = current_balance - self.daily_start_balance
daily_return_pct = (daily_return / self.daily_start_balance) * 100
if daily_return_pct >= self.profit_target_pct:
alert = Alert(
alert_type='profit_target_reached',
message=f'Daily profit target reached: {daily_return_pct:.2f}% (target: {self.profit_target_pct:.2f}%)',
severity='info',
timestamp=datetime.now(),
value=daily_return_pct,
threshold=self.profit_target_pct
)
self._send_alert(alert)
def _log_periodic_status(self, portfolio: Dict):
"""Log periodic status update"""
# Log every 5 minutes
if datetime.now().minute % 5 == 0:
self.logger.info(f"π Portfolio Status:")
self.logger.info(f" Total Value: ${portfolio['total_value']:,.2f}")
self.logger.info(f" Total Return: {portfolio['total_return_pct']:.2f}%")
self.logger.info(f" Realized P&L: ${portfolio['realized_pnl']:.2f}")
self.logger.info(f" Unrealized P&L: ${portfolio['unrealized_pnl']:.2f}")
self.logger.info(f" Active Orders: {portfolio['num_active_orders']}")
def _send_alert(self, alert: Alert):
"""Send alert through configured channels"""
# Add to alerts list
self.alerts.append(alert)
# Log alert
log_level = logging.INFO if alert.severity == 'info' else logging.WARNING if alert.severity == 'warning' else logging.ERROR
self.logger.log(log_level, f"π¨ {alert.alert_type.upper()}: {alert.message}")
# Notify callbacks
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
self.logger.error(f"Alert callback error: {e}")
# Send email if configured
if self.email_alerts_enabled and alert.severity in ['warning', 'critical']:
self._send_email_alert(alert)
def _send_email_alert(self, alert: Alert):
"""Send email alert notification"""
try:
msg = MIMEMultipart()
msg['From'] = self.email_config['email']
msg['To'] = ', '.join(self.email_config['recipients'])
msg['Subject'] = f"Trading Bot Alert: {alert.alert_type.replace('_', ' ').title()}"
body = f"""
Trading Bot Performance Alert
Alert Type: {alert.alert_type.replace('_', ' ').title()}
Severity: {alert.severity.upper()}
Time: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
Message: {alert.message}
Portfolio Summary:
{self._get_portfolio_summary_text()}
This is an automated message from your cryptocurrency trading bot.
"""
msg.attach(MIMEText(body, 'plain'))
# Send email
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['email'], self.email_config['password'])
text = msg.as_string()
server.sendmail(self.email_config['email'], self.email_config['recipients'], text)
server.quit()
self.logger.info("π§ Email alert sent successfully")
except Exception as e:
self.logger.error(f"Failed to send email alert: {e}")
def _get_portfolio_summary_text(self) -> str:
"""Get portfolio summary as formatted text"""
portfolio = self.trading_engine.get_portfolio_summary()
return f"""
Total Value: ${portfolio['total_value']:,.2f}
Cash Balance: ${portfolio['cash_balance']:,.2f}
Total Return: {portfolio['total_return_pct']:.2f}%
Realized P&L: ${portfolio['realized_pnl']:.2f}
Unrealized P&L: ${portfolio['unrealized_pnl']:.2f}
Active Positions: {portfolio['num_positions']}
Active Orders: {portfolio['num_active_orders']}
Total Trades: {portfolio['total_trades']}
"""
def _reset_daily_tracking(self):
"""Reset daily performance tracking"""
portfolio = self.trading_engine.get_portfolio_summary()
self.daily_start_balance = portfolio['total_value']
self.logger.info("π
Daily performance tracking reset")
def get_monitoring_summary(self) -> Dict:
"""Get monitoring summary"""
portfolio = self.trading_engine.get_portfolio_summary()
current_balance = portfolio['total_value']
# Calculate metrics
total_return = current_balance - self.initial_balance
total_return_pct = (total_return / self.initial_balance) * 100
current_drawdown_pct = ((self.peak_balance - current_balance) / self.peak_balance) * 100
daily_return = current_balance - self.daily_start_balance
daily_return_pct = (daily_return / self.daily_start_balance) * 100
# Alert summary
recent_alerts = [a for a in self.alerts if (datetime.now() - a.timestamp).total_seconds() < 3600] # Last hour
critical_alerts = [a for a in recent_alerts if a.severity == 'critical']
return {
'monitoring_active': self.monitoring,
'current_balance': current_balance,
'total_return_pct': total_return_pct,
'daily_return_pct': daily_return_pct,
'current_drawdown_pct': current_drawdown_pct,
'peak_balance': self.peak_balance,
'total_alerts': len(self.alerts),
'recent_alerts': len(recent_alerts),
'critical_alerts': len(critical_alerts),
'email_alerts_enabled': self.email_alerts_enabled
}
Create run_paper_trading.py
:
#!/usr/bin/env python3
"""
Complete paper trading application
Integrates real-time data, trading engine, strategy, and monitoring
"""
import sys
import time
import signal
import logging
from datetime import datetime
from src.data.realtime_data import BinanceWebSocketClient
from src.paper_trading.paper_trading_engine import PaperTradingEngine
from src.paper_trading.paper_grid_strategy import PaperGridStrategy
from src.paper_trading.monitoring import PerformanceMonitor
class PaperTradingApplication:
"""Complete paper trading application"""
def __init__(self):
self.setup_logging()
self.logger = logging.getLogger(__name__)
# Configuration
self.symbol = 'BTCUSDT'
self.initial_balance = 10000.0
# Components
self.ws_client = None
self.trading_engine = None
self.strategy = None
self.monitor = None
# Application state
self.running = False
# Setup signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def setup_logging(self):
"""Configure logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data/logs/paper_trading.log'),
logging.StreamHandler(sys.stdout)
]
)
def signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
self.logger.info(f"Received signal {signum}. Shutting down gracefully...")
self.running = False
def initialize_components(self):
"""Initialize all application components"""
try:
self.logger.info("π Initializing Paper Trading Application")
# Initialize trading engine
self.trading_engine = PaperTradingEngine(
initial_balance=self.initial_balance,
commission_rate=0.001, # 0.1% commission
slippage_rate=0.0005 # 0.05% slippage
)
# Initialize WebSocket client
self.ws_client = BinanceWebSocketClient([self.symbol])
# Setup market data callback
self.ws_client.add_tick_callback(self._on_market_tick)
# Initialize strategy
self.strategy = PaperGridStrategy(
trading_engine=self.trading_engine,
symbol=self.symbol,
grid_spacing=0.005, # 0.5%
num_grids=10,
base_order_size=0.001
)
# Initialize performance monitor
self.monitor = PerformanceMonitor(
trading_engine=self.trading_engine,
initial_balance=self.initial_balance
)
# Configure email alerts (optional)
# self.monitor.configure_email_alerts(
# smtp_server='smtp.gmail.com',
# smtp_port=587,
# email='your_email@gmail.com',
# password='your_app_password',
# recipients=['alerts@yourdomain.com']
# )
self.logger.info("β
All components initialized successfully")
return True
except Exception as e:
self.logger.error(f"β Failed to initialize components: {e}")
return False
def start_application(self):
"""Start the paper trading application"""
try:
if not self.initialize_components():
return False
# Start components
self.logger.info("π― Starting paper trading components...")
# Start trading engine
self.trading_engine.start()
# Start WebSocket client
self.ws_client.start()
# Wait for initial market data
self.logger.info("β³ Waiting for initial market data...")
timeout = 30
start_time = time.time()
while time.time() - start_time < timeout:
current_price = self.ws_client.get_current_price(self.symbol)
if current_price:
break
time.sleep(0.5)
if not current_price:
self.logger.error("β Failed to receive market data")
return False
self.logger.info(f"π Initial {self.symbol} price: ${current_price:,.2f}")
# Start strategy
self.strategy.start_strategy(current_price)
# Start monitoring
self.monitor.start_monitoring()
self.running = True
self.logger.info("π Paper trading application started successfully!")
return True
except Exception as e:
self.logger.error(f"β Failed to start application: {e}")
return False
def run_main_loop(self):
"""Main application loop"""
last_status_time = time.time()
status_interval = 300 # 5 minutes
try:
while self.running:
current_time = time.time()
# Periodic status update
if current_time - last_status_time >= status_interval:
self._log_status_update()
last_status_time = current_time
# Check for user input (optional - for interactive mode)
# self._check_user_input()
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Keyboard interrupt received")
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
finally:
self.shutdown_application()
def shutdown_application(self):
"""Gracefully shutdown the application"""
self.logger.info("π Shutting down paper trading application...")
try:
# Stop strategy first
if self.strategy:
self.strategy.stop_strategy()
# Stop monitoring
if self.monitor:
self.monitor.stop_monitoring()
# Stop WebSocket client
if self.ws_client:
self.ws_client.stop()
# Stop trading engine
if self.trading_engine:
self.trading_engine.stop()
# Generate final report
self._generate_final_report()
self.logger.info("β
Application shutdown complete")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
def _on_market_tick(self, tick):
"""Handle real-time market data"""
# Update trading engine with current price
self.trading_engine.update_market_data(tick.symbol, tick.price)
# Update strategy with current price
if self.strategy and self.strategy.is_running:
self.strategy.update_market_price(tick.price)
def _log_status_update(self):
"""Log periodic status update"""
try:
# Get portfolio status
portfolio = self.trading_engine.get_portfolio_summary()
# Get strategy status
strategy_status = self.strategy.get_strategy_status() if self.strategy else {}
# Get monitoring status
monitor_status = self.monitor.get_monitoring_summary() if self.monitor else {}
self.logger.info("π === PAPER TRADING STATUS UPDATE ===")
self.logger.info(f"Portfolio Value: ${portfolio['total_value']:,.2f}")
self.logger.info(f"Total Return: {portfolio['total_return_pct']:.2f}%")
self.logger.info(f"Cash Balance: ${portfolio['cash_balance']:,.2f}")
self.logger.info(f"Realized P&L: ${portfolio['realized_pnl']:.2f}")
self.logger.info(f"Unrealized P&L: ${portfolio['unrealized_pnl']:.2f}")
self.logger.info(f"Active Orders: {portfolio['num_active_orders']}")
self.logger.info(f"Total Trades: {portfolio['total_trades']}")
if strategy_status:
self.logger.info(f"Grid Cycles: {strategy_status['completed_cycles']}")
self.logger.info(f"Strategy Profit: ${strategy_status['total_profit']:.2f}")
self.logger.info(f"Runtime: {strategy_status['runtime_minutes']:.1f} minutes")
if monitor_status:
self.logger.info(f"Current Drawdown: {monitor_status['current_drawdown_pct']:.2f}%")
self.logger.info(f"Daily Return: {monitor_status['daily_return_pct']:.2f}%")
self.logger.info(f"Recent Alerts: {monitor_status['recent_alerts']}")
# Current price
current_price = self.ws_client.get_current_price(self.symbol)
if current_price:
self.logger.info(f"Current {self.symbol}: ${current_price:,.2f}")
self.logger.info("========================================")
except Exception as e:
self.logger.error(f"Error generating status update: {e}")
def _generate_final_report(self):
"""Generate final performance report"""
try:
portfolio = self.trading_engine.get_portfolio_summary()
strategy_status = self.strategy.get_strategy_status() if self.strategy else {}
report = f"""
=== PAPER TRADING FINAL REPORT ===
PORTFOLIO PERFORMANCE
--------------------
Initial Balance: ${self.initial_balance:,.2f}
Final Balance: ${portfolio['total_value']:,.2f}
Total Return: ${portfolio['total_return']:,.2f} ({portfolio['total_return_pct']:.2f}%)
Realized P&L: ${portfolio['realized_pnl']:.2f}
Unrealized P&L: ${portfolio['unrealized_pnl']:.2f}
TRADING STATISTICS
-----------------
Total Trades: {portfolio['total_trades']}
Active Positions: {portfolio['num_positions']}
Final Cash Balance: ${portfolio['cash_balance']:,.2f}
STRATEGY PERFORMANCE
-------------------
Grid Cycles: {strategy_status.get('completed_cycles', 0)}
Strategy Profit: ${strategy_status.get('total_profit', 0):.2f}
Runtime: {strategy_status.get('runtime_minutes', 0):.1f} minutes
EVALUATION
----------
"""
# Strategy evaluation
if portfolio['total_return_pct'] > 5:
report += "β
SUCCESSFUL: Positive returns achieved\n"
elif portfolio['total_return_pct'] > 0:
report += "β
MARGINAL: Small positive returns\n"
else:
report += "β UNSUCCESSFUL: Negative returns\n"
if strategy_status.get('completed_cycles', 0) > 5:
report += "β
ACTIVE: Strategy executed multiple cycles\n"
else:
report += "β οΈ LOW ACTIVITY: Few strategy cycles completed\n"
# Save report
with open('data/paper_trading_report.txt', 'w') as f:
f.write(report)
print(report)
self.logger.info("π Final report saved to: data/paper_trading_report.txt")
except Exception as e:
self.logger.error(f"Error generating final report: {e}")
def main():
"""Main entry point"""
app = PaperTradingApplication()
print("π Starting Cryptocurrency Paper Trading Bot")
print("=" * 50)
if app.start_application():
print("β
Paper trading started successfully!")
print("π‘ Press Ctrl+C to stop gracefully")
print("π Check logs for real-time updates")
app.run_main_loop()
else:
print("β Failed to start paper trading application")
sys.exit(1)
if __name__ == "__main__":
main()
Compare your paper trading results with backtesting to validate strategy effectiveness:
Metric | Backtest Result | Paper Trading | Expected Difference |
---|---|---|---|
Annual Return | 31% | 27-33% | Β±3% due to market conditions |
Max Drawdown | 12% | 10-15% | Similar range expected |
Sharpe Ratio | 1.8 | 1.5-2.0 | Slight decrease due to real volatility |
Win Rate | 65% | 60-70% | Similar performance expected |
Before transitioning from paper trading to live trading:
When transitioning to live trading, implement these additional safeguards:
Paper trading assumes perfect order execution, which may not reflect live trading:
Markets evolve, and strategies effective during paper trading may not work later:
Based on extensive testing, here are realistic expectations for paper trading:
Time Period | Expected Return | Max Drawdown | Trade Frequency |
---|---|---|---|
1 Week | 0.5-2% | 2-5% | 5-15 trades |
1 Month | 2-8% | 5-12% | 20-80 trades |
3 Months | 8-25% | 8-18% | 100-300 trades |
Your paper trading system is now complete and ready for validation! In the final article of this series, we'll cover:
Paper trade for at least 30 days before considering live trading! This validation period is crucial for building confidence and identifying any issues with your strategy in real market conditions.
The final article covers implementing robust risk management and logging systems to ensure your trading bot operates safely and transparently in production environments.
Nikolai Fischer is the founder of Kommune3 (since 2007) and a leading expert in Drupal development and tech entrepreneurship. With 17+ years of experience, he has led hundreds of projects and achieved #1 on Hacker News. As host of the "Kommit mich" podcast and founder of skillution, he combines technical expertise with entrepreneurial thinking. His articles about Supabase, modern web development, and systematic problem-solving have influenced thousands of developers worldwide.
Comments