""" Strategy Analyzer Module ======================== Analyzes trading patterns and reverse-engineers strategies from trade history. Identifies entry/exit signals, sizing rules, timing patterns, and risk management. """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional, Callable from datetime import datetime, timedelta from dataclasses import dataclass, field from enum import Enum import logging from scipy import stats from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler logger = logging.getLogger(__name__) class TradeSignal(Enum): """Trade signal types.""" MOMENTUM = "momentum" MEAN_REVERSION = "mean_reversion" BREAKOUT = "breakout" VALUE = "value" NEWS = "news" ARBITRAGE = "arbitrage" UNKNOWN = "unknown" @dataclass class StrategyProfile: """ Profile of a reverse-engineered trading strategy. Contains identified patterns, parameters, and metrics. """ name: str description: str # Entry patterns avg_entry_price: float = 0.0 entry_price_std: float = 0.0 preferred_entry_range: Tuple[float, float] = (0.3, 0.7) entry_signal_type: TradeSignal = TradeSignal.UNKNOWN # Position sizing avg_position_size: float = 0.0 max_position_size: float = 0.0 position_size_scaling: str = "fixed" # fixed, proportional, kelly # Timing avg_holding_period_hours: float = 0.0 preferred_entry_hours: List[int] = field(default_factory=list) preferred_entry_days: List[int] = field(default_factory=list) # Market preferences preferred_markets: List[str] = field(default_factory=list) market_categories: List[str] = field(default_factory=list) # Risk management estimated_win_rate: float = 0.0 avg_win_size: float = 0.0 avg_loss_size: float = 0.0 profit_factor: float = 0.0 sharpe_ratio: float = 0.0 max_drawdown: float = 0.0 # Behavioral patterns is_contrarian: bool = False trades_momentum: bool = False uses_scaling: bool = False quick_exit_pattern: bool = False # Confidence metrics sample_size: int = 0 confidence_score: float = 0.0 def to_dict(self) -> Dict: """Convert to dictionary for serialization.""" return { "name": self.name, "description": self.description, "entry_patterns": { "avg_entry_price": self.avg_entry_price, "entry_price_std": self.entry_price_std, "preferred_entry_range": self.preferred_entry_range, "entry_signal_type": self.entry_signal_type.value }, "position_sizing": { "avg_position_size": self.avg_position_size, "max_position_size": self.max_position_size, "position_size_scaling": self.position_size_scaling }, "timing": { "avg_holding_period_hours": self.avg_holding_period_hours, "preferred_entry_hours": self.preferred_entry_hours, "preferred_entry_days": self.preferred_entry_days }, "risk_management": { "estimated_win_rate": self.estimated_win_rate, "avg_win_size": self.avg_win_size, "avg_loss_size": self.avg_loss_size, "profit_factor": self.profit_factor, "sharpe_ratio": self.sharpe_ratio, "max_drawdown": self.max_drawdown }, "behavioral_patterns": { "is_contrarian": self.is_contrarian, "trades_momentum": self.trades_momentum, "uses_scaling": self.uses_scaling, "quick_exit_pattern": self.quick_exit_pattern }, "confidence": { "sample_size": self.sample_size, "confidence_score": self.confidence_score } } class StrategyAnalyzer: """ Analyzes trade history to reverse-engineer trading strategies. Identifies: - Entry/exit patterns - Position sizing rules - Timing preferences - Risk management approach - Market selection criteria """ def __init__(self, trades_df: pd.DataFrame, price_data: Optional[Dict[str, pd.DataFrame]] = None): """ Initialize analyzer with trade history. Args: trades_df: DataFrame with columns: timestamp, conditionId, side, price, size, outcome price_data: Dict mapping conditionId to price history DataFrame """ self.trades = trades_df.copy() self.price_data = price_data or {} # Ensure proper data types if 'timestamp' in self.trades.columns: self.trades['timestamp'] = pd.to_datetime(self.trades['timestamp']) if 'price' in self.trades.columns: self.trades['price'] = pd.to_numeric(self.trades['price'], errors='coerce') if 'size' in self.trades.columns: self.trades['size'] = pd.to_numeric(self.trades['size'], errors='coerce') # Add derived columns self._add_derived_columns() def _add_derived_columns(self): """Add derived columns for analysis.""" if 'timestamp' not in self.trades.columns: return self.trades['hour'] = self.trades['timestamp'].dt.hour self.trades['dayofweek'] = self.trades['timestamp'].dt.dayofweek self.trades['date'] = self.trades['timestamp'].dt.date # Calculate dollar value if 'size' in self.trades.columns and 'price' in self.trades.columns: self.trades['value'] = self.trades['size'] * self.trades['price'] def analyze(self, strategy_name: str = "Analyzed Strategy") -> StrategyProfile: """ Perform comprehensive strategy analysis. Args: strategy_name: Name for the strategy profile Returns: StrategyProfile with identified patterns """ profile = StrategyProfile( name=strategy_name, description="Reverse-engineered trading strategy", sample_size=len(self.trades) ) if len(self.trades) < 5: logger.warning("Insufficient trades for analysis") return profile # Analyze different aspects self._analyze_entry_patterns(profile) self._analyze_position_sizing(profile) self._analyze_timing(profile) self._analyze_risk_metrics(profile) self._analyze_behavioral_patterns(profile) self._calculate_confidence_score(profile) self._identify_signal_type(profile) return profile def _analyze_entry_patterns(self, profile: StrategyProfile): """Analyze entry price patterns.""" buy_trades = self.trades[self.trades['side'] == 'BUY'] if len(buy_trades) > 0 and 'price' in buy_trades.columns: profile.avg_entry_price = buy_trades['price'].mean() profile.entry_price_std = buy_trades['price'].std() # Calculate preferred entry range (middle 80%) p10 = buy_trades['price'].quantile(0.1) p90 = buy_trades['price'].quantile(0.9) profile.preferred_entry_range = (round(p10, 3), round(p90, 3)) def _analyze_position_sizing(self, profile: StrategyProfile): """Analyze position sizing patterns.""" if 'size' not in self.trades.columns: return sizes = self.trades['size'] profile.avg_position_size = sizes.mean() profile.max_position_size = sizes.max() # Detect scaling pattern size_cv = sizes.std() / sizes.mean() if sizes.mean() > 0 else 0 if size_cv < 0.2: profile.position_size_scaling = "fixed" elif size_cv < 0.5: profile.position_size_scaling = "moderate_scaling" else: profile.position_size_scaling = "aggressive_scaling" # Check for Kelly-like sizing (larger on higher confidence) if 'price' in self.trades.columns: buys = self.trades[self.trades['side'] == 'BUY'] if len(buys) > 10: # Higher prices = lower value = should have smaller size correlation = buys['size'].corr(buys['price']) if correlation < -0.3: profile.position_size_scaling = "kelly_style" def _analyze_timing(self, profile: StrategyProfile): """Analyze timing patterns.""" if 'hour' not in self.trades.columns: return # Hour distribution hour_counts = self.trades['hour'].value_counts() top_hours = hour_counts.nlargest(5).index.tolist() profile.preferred_entry_hours = sorted(top_hours) # Day of week distribution if 'dayofweek' in self.trades.columns: day_counts = self.trades['dayofweek'].value_counts() top_days = day_counts.nlargest(3).index.tolist() profile.preferred_entry_days = sorted(top_days) # Estimate holding period (time between trades in same market) if 'conditionId' in self.trades.columns: holding_periods = [] for market_id, group in self.trades.groupby('conditionId'): if len(group) >= 2: group = group.sort_values('timestamp') # Buy followed by sell for i in range(len(group) - 1): if group.iloc[i]['side'] == 'BUY' and group.iloc[i+1]['side'] == 'SELL': delta = group.iloc[i+1]['timestamp'] - group.iloc[i]['timestamp'] holding_periods.append(delta.total_seconds() / 3600) if holding_periods: profile.avg_holding_period_hours = np.median(holding_periods) def _analyze_risk_metrics(self, profile: StrategyProfile): """Analyze risk and return metrics.""" if 'price' not in self.trades.columns or 'size' not in self.trades.columns: return # Calculate P&L for each market market_pnl = {} for market_id, trades in self.trades.groupby('conditionId'): buys = trades[trades['side'] == 'BUY'] sells = trades[trades['side'] == 'SELL'] buy_cost = (buys['price'] * buys['size']).sum() sell_revenue = (sells['price'] * sells['size']).sum() # For positions still held, estimate based on final price net_position = buys['size'].sum() - sells['size'].sum() # Assume 50% fair value for unrealized estimated_value = net_position * 0.5 pnl = sell_revenue - buy_cost + estimated_value market_pnl[market_id] = pnl pnl_values = list(market_pnl.values()) if pnl_values: wins = [p for p in pnl_values if p > 0] losses = [p for p in pnl_values if p <= 0] profile.estimated_win_rate = len(wins) / len(pnl_values) if pnl_values else 0 profile.avg_win_size = np.mean(wins) if wins else 0 profile.avg_loss_size = abs(np.mean(losses)) if losses else 0 total_wins = sum(wins) total_losses = abs(sum(losses)) profile.profit_factor = total_wins / total_losses if total_losses > 0 else float('inf') # Sharpe-like ratio if np.std(pnl_values) > 0: profile.sharpe_ratio = np.mean(pnl_values) / np.std(pnl_values) * np.sqrt(252) # Max drawdown (simplified) cumulative = np.cumsum(pnl_values) running_max = np.maximum.accumulate(cumulative) drawdowns = running_max - cumulative profile.max_drawdown = max(drawdowns) if len(drawdowns) > 0 else 0 def _analyze_behavioral_patterns(self, profile: StrategyProfile): """Identify behavioral trading patterns.""" if 'price' not in self.trades.columns: return buys = self.trades[self.trades['side'] == 'BUY'] if len(buys) > 0: # Contrarian: buys when prices are low avg_buy_price = buys['price'].mean() profile.is_contrarian = avg_buy_price < 0.4 # Momentum: buys when prices are high and rising # (Would need price history for accurate detection) high_price_buys = buys[buys['price'] > 0.6] profile.trades_momentum = len(high_price_buys) / len(buys) > 0.3 if len(buys) > 0 else False # Check for scaling (multiple trades in same market) if 'conditionId' in self.trades.columns: trades_per_market = self.trades.groupby('conditionId').size() avg_trades_per_market = trades_per_market.mean() profile.uses_scaling = avg_trades_per_market > 2.5 # Quick exit pattern if profile.avg_holding_period_hours > 0 and profile.avg_holding_period_hours < 24: profile.quick_exit_pattern = True def _identify_signal_type(self, profile: StrategyProfile): """Identify the primary signal type used.""" # Based on patterns, determine likely signal type if profile.is_contrarian and profile.avg_entry_price < 0.35: profile.entry_signal_type = TradeSignal.VALUE elif profile.trades_momentum: profile.entry_signal_type = TradeSignal.MOMENTUM elif profile.quick_exit_pattern and profile.uses_scaling: profile.entry_signal_type = TradeSignal.ARBITRAGE elif profile.is_contrarian: profile.entry_signal_type = TradeSignal.MEAN_REVERSION else: profile.entry_signal_type = TradeSignal.UNKNOWN def _calculate_confidence_score(self, profile: StrategyProfile): """Calculate confidence score for the analysis.""" score = 0.0 # Sample size contribution (0-30 points) if profile.sample_size >= 100: score += 30 elif profile.sample_size >= 50: score += 20 elif profile.sample_size >= 20: score += 10 elif profile.sample_size >= 10: score += 5 # Win rate consistency (0-20 points) if profile.estimated_win_rate >= 0.55: score += 20 elif profile.estimated_win_rate >= 0.50: score += 10 # Profit factor (0-20 points) if profile.profit_factor >= 2.0: score += 20 elif profile.profit_factor >= 1.5: score += 15 elif profile.profit_factor >= 1.2: score += 10 elif profile.profit_factor >= 1.0: score += 5 # Pattern clarity (0-30 points) if profile.entry_signal_type != TradeSignal.UNKNOWN: score += 15 if profile.entry_price_std < 0.15: # Consistent entry prices score += 10 if profile.position_size_scaling != "aggressive_scaling": score += 5 profile.confidence_score = score / 100.0 def cluster_trades(self, n_clusters: int = 3) -> Dict[int, pd.DataFrame]: """ Cluster trades by characteristics to identify different strategies. Args: n_clusters: Number of clusters to identify Returns: Dict mapping cluster ID to trades DataFrame """ if len(self.trades) < n_clusters * 5: return {0: self.trades} # Prepare features for clustering features = [] valid_indices = [] for idx, row in self.trades.iterrows(): feature_row = [] # Price if 'price' in row and pd.notna(row['price']): feature_row.append(row['price']) else: continue # Size (normalized) if 'size' in row and pd.notna(row['size']): feature_row.append(row['size']) else: continue # Side (binary) if 'side' in row: feature_row.append(1 if row['side'] == 'BUY' else 0) else: continue # Hour if 'hour' in row and pd.notna(row['hour']): feature_row.append(row['hour'] / 24.0) else: feature_row.append(0.5) features.append(feature_row) valid_indices.append(idx) if len(features) < n_clusters * 3: return {0: self.trades} # Normalize and cluster scaler = StandardScaler() features_scaled = scaler.fit_transform(features) kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) clusters = kmeans.fit_predict(features_scaled) # Group trades by cluster result = {} trades_with_cluster = self.trades.loc[valid_indices].copy() trades_with_cluster['cluster'] = clusters for cluster_id in range(n_clusters): cluster_trades = trades_with_cluster[trades_with_cluster['cluster'] == cluster_id] result[cluster_id] = cluster_trades.drop(columns=['cluster']) return result def compare_to_baseline(self, baseline_profile: StrategyProfile) -> Dict[str, float]: """ Compare current strategy to a baseline profile. Args: baseline_profile: Reference strategy to compare against Returns: Dictionary of similarity scores """ current_profile = self.analyze() similarities = {} # Entry price similarity if baseline_profile.entry_price_std > 0: price_diff = abs(current_profile.avg_entry_price - baseline_profile.avg_entry_price) similarities['entry_price'] = max(0, 1 - price_diff / 0.5) # Position sizing similarity if baseline_profile.avg_position_size > 0: size_ratio = current_profile.avg_position_size / baseline_profile.avg_position_size similarities['position_size'] = 1 / (1 + abs(np.log(size_ratio))) if size_ratio > 0 else 0 # Win rate similarity wr_diff = abs(current_profile.estimated_win_rate - baseline_profile.estimated_win_rate) similarities['win_rate'] = max(0, 1 - wr_diff / 0.3) # Timing similarity hour_overlap = set(current_profile.preferred_entry_hours) & set(baseline_profile.preferred_entry_hours) max_hours = max(len(current_profile.preferred_entry_hours), len(baseline_profile.preferred_entry_hours)) similarities['timing'] = len(hour_overlap) / max_hours if max_hours > 0 else 0 # Signal type match similarities['signal_type'] = 1.0 if current_profile.entry_signal_type == baseline_profile.entry_signal_type else 0.0 # Overall similarity (weighted average) weights = {'entry_price': 0.25, 'position_size': 0.2, 'win_rate': 0.25, 'timing': 0.15, 'signal_type': 0.15} similarities['overall'] = sum(similarities[k] * weights[k] for k in weights) return similarities def generate_strategy_rules(self) -> List[str]: """ Generate human-readable trading rules from analysis. Returns: List of rule descriptions """ profile = self.analyze() rules = [] # Entry rules low, high = profile.preferred_entry_range rules.append(f"ENTRY: Target prices between {low:.0%} and {high:.0%}") if profile.is_contrarian: rules.append("ENTRY: Prefer contrarian positions (buying low probability outcomes)") elif profile.trades_momentum: rules.append("ENTRY: Follow momentum (buying high probability outcomes)") # Sizing rules rules.append(f"SIZE: Average position ${profile.avg_position_size:.2f}, max ${profile.max_position_size:.2f}") rules.append(f"SIZE: Use {profile.position_size_scaling.replace('_', ' ')} position sizing") # Timing rules if profile.preferred_entry_hours: hours_str = ', '.join([f"{h}:00" for h in profile.preferred_entry_hours]) rules.append(f"TIMING: Preferred entry hours: {hours_str}") # Risk rules rules.append(f"RISK: Target win rate {profile.estimated_win_rate:.0%}") if profile.profit_factor < float('inf'): rules.append(f"RISK: Target profit factor {profile.profit_factor:.2f}") # Exit rules if profile.quick_exit_pattern: rules.append(f"EXIT: Quick exits, avg holding period {profile.avg_holding_period_hours:.1f} hours") else: rules.append("EXIT: Hold to resolution or significant price movement") if profile.uses_scaling: rules.append("SCALING: Build positions gradually across multiple entries") return rules # Example usage if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Generate sample trades from data_fetcher import MockDataFetcher mock = MockDataFetcher() trades = mock.generate_trader_trades( num_trades=200, win_rate=0.6 ) # Analyze analyzer = StrategyAnalyzer(trades) profile = analyzer.analyze("Test Strategy") print("\n=== Strategy Profile ===") print(f"Sample size: {profile.sample_size}") print(f"Avg entry price: {profile.avg_entry_price:.2%}") print(f"Entry range: {profile.preferred_entry_range}") print(f"Signal type: {profile.entry_signal_type.value}") print(f"Win rate: {profile.estimated_win_rate:.1%}") print(f"Profit factor: {profile.profit_factor:.2f}") print(f"Confidence: {profile.confidence_score:.0%}") print("\n=== Strategy Rules ===") for rule in analyzer.generate_strategy_rules(): print(f" • {rule}")