h1', setup.get('rsi', 0)), setup.get('rsi_h2', setup.get('rsi', 0)), setup.get('rsi_h4', setup.get('rsi', 0)), setup.get('bb_squeeze_pct', 0), atr, entry_price, 0, # exit placeholder setup.get('stop_loss', 0), setup.get('take_profit1', 0), signal_price, round(slippage, 2), hour, day, vol_regime, h4_trend, 0, 0, # MFE/MAE placeholder "", # exit reason placeholder 0, # PnL placeholder "OPEN", setup.get('rr_ratio', 0), paper_mode ] self._write_csv_row(self.trades_file, row) return trade_id def update_price_extremes(self, trade_id, current_price): """Track MFE/MAE while trade is open.""" if trade_id not in self.active_trades: return trade = self.active_trades[trade_id] direction = trade['direction'] entry = trade['entry_price'] if direction == 'LONG': trade['highest_price'] = max(trade['highest_price'], current_price) trade['lowest_price'] = min(trade['lowest_price'], current_price) else: trade['highest_price'] = max(trade['highest_price'], current_price) trade['lowest_price'] = min(trade['lowest_price'], current_price) def log_trade_exit(self, trade_id, exit_price, exit_reason, pnl_pct): """Log trade exit and calculate final metrics.""" timestamp = datetime.now(timezone.utc).isoformat() print(f"[TRADE LOGGER] log_trade_exit called: trade_id={trade_id}, exit_price={exit_price:.2f}, pnl_pct={pnl_pct:.2f}%") if trade_id not in self.active_trades: print(f"[TRADE LOGGER] WARNING: trade_id {trade_id} not in active_trades!") print(f"[TRADE LOGGER] Active trades: {list(self.active_trades.keys())}") return trade = self.active_trades[trade_id] entry_time = datetime.fromisoformat(trade['entry_time']) exit_time = datetime.now(timezone.utc) hold_time_min = (exit_time - entry_time).total_seconds() / 60 entry = trade['entry_price'] direction = trade['direction'] # Calculate MFE/MAE if direction == 'LONG': mfe = (trade['highest_price'] - entry) / entry * 100 mae = (entry - trade['lowest_price']) / entry * 100 else: mfe = (entry - trade['lowest_price']) / entry * 100 mae = (trade['highest_price'] - entry) / entry * 100 result = "WIN" if pnl_pct > 0 else "LOSS" print(f"[TRADE LOGGER] Updating trade row: {trade_id}, result={result}, pnl_pct={pnl_pct:.2f}%") # Re-read and update the CSV row (inefficient but simple) update_result = self._update_trade_row(trade_id, timestamp, hold_time_min, exit_price, exit_reason, pnl_pct, result, mfe, mae) if update_result: print(f"[TRADE LOGGER] Trade {trade_id} updated successfully") else: print(f"[TRADE LOGGER] ERROR: Failed to update trade {trade_id}") del self.active_trades[trade_id] def _update_trade_row(self, trade_id, exit_time, hold_time, exit_price, exit_reason, pnl_pct, result, mfe, mae): """Update existing trade row with exit data.""" import csv import tempfile import os print(f"[TRADE LOGGER] _update_trade_row: Looking for trade_id={trade_id} in {self.trades_file}") if not os.path.exists(self.trades_file): print(f"[TRADE LOGGER] ERROR: File does not exist: {self.trades_file}") return False rows = [] updated = False found = False try: with open(self.trades_file, 'r', newline='') as f: reader = csv.reader(f) headers = next(reader) rows.append(headers) for row in reader: if len(row) > 0 and row[0] == trade_id: found = True print(f"[TRADE LOGGER] Found trade row to update: {trade_id}") # Update exit fields row[2] = exit_time row[3] = round(hold_time, 1) row[20] = exit_price row[27] = round(mfe, 2) row[28] = round(mae, 2) row[29] = exit_reason row[30] = round(pnl_pct, 3) row[31] = result updated = True rows.append(row) if not found: print(f"[TRADE LOGGER] WARNING: Trade {trade_id} not found in file!") return False if not updated: print(f"[TRADE LOGGER] WARNING: Trade {trade_id} found but not updated (already closed?)") return False # Write back with tempfile.NamedTemporaryFile(mode='w', delete=False, newline='') as tmp: writer = csv.writer(tmp) writer.writerows(rows) tmp_path = tmp.name # Use shutil.move for cross-device compatibility import shutil shutil.move(tmp_path, self.trades_file) print(f"[TRADE LOGGER] Trade {trade_id} updated successfully") return True except Exception as e: print(f"[TRADE LOGGER] ERROR updating trade: {e}") import traceback traceback.print_exc() return False def get_stats_summary(self): """Quick stats for UI display.""" import csv try: with open(self.trades_file, 'r', newline='') as f: reader = csv.DictReader(f) closed = [r for r in reader if r['result'] != 'OPEN'] if not closed: return {"total": 0, "wins": 0, "losses": 0, "win_rate": 0} wins = sum(1 for r in closed if r['result'] == 'WIN') total = len(closed) # Win rate by setup type by_setup = {} for r in closed: st = r['setup_type'] if st not in by_setup: by_setup[st] = {"total": 0, "wins": 0} by_setup[st]["total"] += 1 if r['result'] == 'WIN': by_setup[st]["wins"] += 1 for st in by_setup: by_setup[st]["win_rate"] = round( by_setup[st]["wins"] / by_setup[st]["total"] * 100, 1 ) return { "total": total, "wins": wins, "losses": total - wins, "win_rate": round(wins / total * 100, 1), "by_setup": by_setup } except Exception as e: return {"error": str(e)} class SetupAnalyticsLogger: """ Advanced setup and trade analytics for continuous algorithm improvement. Tracks ALL setups (taken and missed) and analyzes outcomes for optimization. """ def __init__(self, data_dir=DATA_DIR): self.data_dir = data_dir self.analytics_dir = os.path.join(data_dir, "analytics") os.makedirs(self.analytics_dir, exist_ok=True) # File paths self.all_setups_file = os.path.join(self.analytics_dir, "all_setups.csv") self.traded_setups_file = os.path.join(self.analytics_dir, "traded_setups.csv") self.outcomes_file = os.path.join(self.analytics_dir, "trade_outcomes.csv") self.performance_summary_file = os.path.join(self.analytics_dir, "performance_summary.json") self.improvement_suggestions_file = os.path.join(self.analytics_dir, "improvement_suggestions.json") self._ensure_files_exist() self.pending_setups = {} # Track setups waiting for outcome def _ensure_files_exist(self): """Create analytics files with proper headers.""" import csv # All setups detected (taken or not) if not os.path.exists(self.all_setups_file): headers = [ "timestamp", "pair", "direction", "grade", "score", "confluence_score", "m3_dir", "m5_dir", "m15_dir", "h1_dir", "h2_dir", "h4_dir", "rsi", "rsi_m3", "rsi_m5", "rsi_m15", "rsi_h1", "rsi_h2", "rsi_h4", "bb_squeeze", "bb_percent_b", "supertrend", "supertrend_signal", "obv_trend", "obv_divergence", "vp_poc", "vp_value_area_low", "vp_value_area_high", "fib_nearest", "pivot_nearest", "entry", "stop_loss", "take_profit1", "take_profit2", "rr_ratio", "taken", "skip_reason", "hour", "session", "volatility_regime", "funding_rate", "bid_ask_ratio", "liquidity_score", "exhaustion_score", "indicator_score", "weighted_score", "setup_id" ] self._write_csv_row(self.all_setups_file, headers) # Setups that were actually traded if not os.path.exists(self.traded_setups_file): headers = [ "timestamp", "pair", "direction", "grade", "score", "entry", "stop_loss", "take_profit1", "take_profit2", "position_size", "leverage", "rr_ratio", "setup_id", "trade_id" ] self._write_csv_row(self.traded_setups_file, headers) # Trade outcomes (updated when position closes) if not os.path.exists(self.outcomes_file): headers = [ "setup_id", "trade_id", "pair", "direction", "entry_price", "exit_price", "stop_loss", "take_profit1", "take_profit2", "pnl_pct", "pnl_usd", "exit_reason", "time_in_trade_min", "max_favorable_excursion", "max_adverse_excursion", "tp1_hit", "tp2_hit", "sl_hit", "breakeven_hit", "trailing_stop_hit", "predicted_rr", "actual_rr", "grade", "score_at_entry", "outcome_vs_prediction", "improvement_notes" ] self._write_csv_row(self.outcomes_file, headers) def _write_csv_row(self, filepath, row): """Append row to CSV file.""" import csv try: with open(filepath, 'a', newline='') as f: writer = csv.writer(f) writer.writerow(row) except Exception as e: print(f"[ANALYTICS] Error writing to {filepath}: {e}") def log_setup_detected(self, setup, confluence_data, indicators, taken=False, skip_reason=None): """Log EVERY setup detected by the bot.""" import uuid from datetime import datetime, timezone timestamp = datetime.now(timezone.utc).isoformat() hour = datetime.now(timezone.utc).hour session = "ASIA" if 0 <= hour < 8 else ("EUROPE" if 8 <= hour < 16 else "US") setup_id = str(uuid.uuid4())[:8] self.pending_setups[setup_id] = { 'timestamp': timestamp, 'pair': setup.get('pair', ''), 'direction': setup.get('direction', ''), 'entry': setup.get('entry', 0), 'grade': setup.get('grade', 'F'), 'score': setup.get('score', 0) } row = [ timestamp, setup.get('pair', ''), setup.get('direction', ''), setup.get('grade', ''), setup.get('score', 0), confluence_data.get('confluence_score', 0), confluence_data.get('m3_dir', ''), confluence_data.get('m5_dir', ''), confluence_data.get('m15_dir', ''), confluence_data.get('h1_dir', ''), confluence_data.get('h2_dir', ''), confluence_data.get('h4_dir', ''), indicators.get('rsi', 0), indicators.get('rsi_m3', 0), indicators.get('rsi_m5', 0), indicators.get('rsi_m15', 0), indicators.get('rsi_h1', 0), indicators.get('rsi_h2', 0), indicators.get('rsi_h4', 0), indicators.get('bb_squeeze', False), indicators.get('bb_percent_b', 50), indicators.get('supertrend', 'NEUTRAL'), indicators.get('supertrend_signal', False), indicators.get('obv_trend', 'NEUTRAL'), indicators.get('obv_divergence', None), indicators.get('vp_poc', 0), indicators.get('vp_value_area_low', 0), indicators.get('vp_value_area_high', 0), indicators.get('fib_nearest', ''), indicators.get('pivot_nearest', ''), setup.get('entry', 0), setup.get('stop_loss', 0), setup.get('take_profit1', 0), setup.get('take_profit2', 0), setup.get('rr_ratio', 0), taken, skip_reason or '', hour, session, indicators.get('volatility_regime', 'NORMAL'), indicators.get('funding_rate', 0), indicators.get('bid_ask_ratio', 1.0), indicators.get('liquidity_score', 50), setup.get('exhaustion_score', 50), setup.get('indicator_score', 0), confluence_data.get('weighted_score', 0), setup_id ] self._write_csv_row(self.all_setups_file, row) if taken: trade_row = [ timestamp, setup.get('pair', ''), setup.get('direction', ''), setup.get('grade', ''), setup.get('score', 0), setup.get('entry', 0), setup.get('stop_loss', 0), setup.get('take_profit1', 0), setup.get('take_profit2', 0), setup.get('position_size', 0), setup.get('leverage', 1), setup.get('rr_ratio', 0), setup_id, setup.get('trade_id', '') ] self._write_csv_row(self.traded_setups_file, trade_row) return setup_id class ImprovementLogger: """ Tracks user-reported improvements and issues with setups. Used for continuous algorithm refinement based on user feedback. """ IMPROVEMENT_OPTIONS = { 'tp1_wide': 'TP1 too wide', 'tp1_narrow': 'TP1 too narrow', 'tp2_wide': 'TP2 too wide', 'tp2_narrow': 'TP2 too narrow', 'sl_wide': 'SL too wide', 'sl_narrow': 'SL too narrow', 'entry_early': 'Entry too soon', 'entry_late': 'Entry too late', 'dir_long': 'Correct to LONG direction', 'dir_short': 'Correct to SHORT direction', 'monitor_loss': 'Better monitoring to minimize loss', 'monitor_gain': 'Better monitoring to secure gains', 'monitor_general': 'Better monitoring in general' } def __init__(self, data_dir=DATA_DIR): self.data_dir = data_dir # Create snitch folder inside data self.snitch_dir = os.path.join(data_dir, "snitch") os.makedirs(self.snitch_dir, exist_ok=True) # Text log instead of CSV self.improvements_file = os.path.join(self.snitch_dir, "snitch_log.txt") self.disqualified_file = os.path.join(self.snitch_dir, "blocked_pairs.json") self._ensure_files_exist() self.disqualified_pairs = self._load_disqualified() def _ensure_files_exist(self): """Create improvement log file with header if needed.""" if not os.path.exists(self.improvements_file): header = """╔══════════════════════════════════════════════════════════════════════════════╗ ║ ROB-BOT SNITCH LOG ║ ╚══════════════════════════════════════════════════════════════════════════════╝ Format: [TIMESTAMP] PAIR DIRECTION | Grade: X | Score: Y Type: ISSUE_TYPE Notes: User notes here Entry: $X | SL: $Y | TP1: $Z | TP2: $W ──────────────────────────────────────────────────────────────────────────────── """ try: with open(self.improvements_file, 'w') as f: f.write(header) except Exception as e: print(f"[SNITCH LOGGER] Error creating log file: {e}") def _write_log_entry(self, entry_text): """Append entry to text log file.""" try: with open(self.improvements_file, 'a') as f: f.write(entry_text + "\n") except Exception as e: print(f"[SNITCH LOGGER] Error writing: {e}") def _load_disqualified(self): """Load blocked pairs from JSON.""" try: if os.path.exists(self.disqualified_file): with open(self.disqualified_file, 'r') as f: return json.load(f) except Exception as e: print(f"[SNITCH LOGGER] Error loading blocked pairs: {e}") return {} def _save_disqualified(self): """Save blocked pairs to JSON.""" try: with open(self.disqualified_file, 'w') as f: json.dump(self.disqualified_pairs, f, indent=2) except Exception as e: print(f"[SNITCH LOGGER] Error saving blocked pairs: {e}") def log_improvement(self, setup_data, improvement_type, notes=""): """Log a user-reported improvement to text file.""" from datetime import datetime, timezone timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') pair = setup_data.get('pair', 'Unknown') direction = setup_data.get('direction', 'LONG') grade = setup_data.get('grade', 'F') score = setup_data.get('score', 0) entry = setup_data.get('entry', 0) sl = setup_data.get('stop_loss', 0) tp1 = setup_data.get('take_profit1', 0) tp2 = setup_data.get('take_profit2', 0) label = self.IMPROVEMENT_OPTIONS.get(improvement_type, improvement_type) # Build formatted text entry entry_text = f"""[{timestamp}] {pair} {direction} | Grade: {grade} | Score: {score} Type: {improvement_type} ({label}) Notes: {notes if notes else 'None'} Entry: ${entry} | SL: ${sl} | TP1: ${tp1} | TP2: ${tp2} ──────────────────────────────────────────────────────────────────────────────── """ self._write_log_entry(entry_text) print(f"[SNITCH LOGGED] {pair}: {label}") return True def disqualify_pair(self, setup_data, block_trades=True, improvement_types=None, notes=""): """ Disqualify a pair/setup from trading and optionally block new trades. Args: setup_data: The setup dict block_trades: If True, prevent new trades on this pair improvement_types: List of improvement codes notes: Additional notes """ from datetime import datetime, timezone pair = setup_data.get('pair', '') direction = setup_data.get('direction', 'LONG') key = f"{pair}_{direction}" # Log improvements first if improvement_types: for imp_type in improvement_types: self.log_improvement(setup_data, imp_type, notes) # Add to disqualified list self.disqualified_pairs[key] = { 'pair': pair, 'direction': direction, 'timestamp': datetime.now(timezone.utc).isoformat(), 'grade': setup_data.get('grade', 'F'), 'score': setup_data.get('score', 0), 'entry': setup_data.get('entry', 0), 'block_trades': block_trades, 'improvements': improvement_types or [], 'notes': notes, 'status': 'BLOCKED' if block_trades else 'ANALYZE_ONLY' } self._save_disqualified() print(f"[BLOCKED] {pair} {direction} - Status: {self.disqualified_pairs[key]['status']}") return True def release_pair(self, pair, direction=None): """ Release a disqualified pair (allow trading again). Args: pair: The pair to release direction: Specific direction or None for both """ released = [] keys_to_remove = [] for key, data in self.disqualified_pairs.items(): if data.get('pair') == pair: if direction is None or data.get('direction') == direction: keys_to_remove.append(key) released.append(f"{data.get('pair')} {data.get('direction')}") for key in keys_to_remove: del self.disqualified_pairs[key] if released: self._save_disqualified() print(f"[RELEASED] {', '.join(released)}") return released def is_blocked(self, pair, direction): """Check if a pair/direction is blocked from trading.""" key = f"{pair}_{direction}" if key in self.disqualified_pairs: return self.disqualified_pairs[key].get('block_trades', False) return False def get_blocked_pairs(self): """Get list of currently blocked pairs.""" blocked = [] for key, data in self.disqualified_pairs.items(): if data.get('block_trades', False): blocked.append(data) return blocked def get_all_disqualified(self): """Get all disqualified entries (blocked or just flagged).""" return list(self.disqualified_pairs.values()) def generate_improvement_report(self, days=7): """Generate a report of common improvement patterns from text log.""" try: from datetime import datetime, timezone, timedelta import re if not os.path.exists(self.improvements_file): return {"error": "No snitch data available"} # Parse text log cutoff = datetime.now(timezone.utc) - timedelta(days=days) entries = [] with open(self.improvements_file, 'r') as f: content = f.read() # Parse entries using regex # Format: [2024-01-15 10:30:00 UTC] PAIR DIRECTION | Grade: X | Score: Y pattern = r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} UTC)\] (\w+) (\w+) \| Grade: (\w+) \| Score: (\d+)\n\s+Type: (\w+) \(([^)]+)\)' matches = re.findall(pattern, content) for match in matches: timestamp_str, pair, direction, grade, score, imp_type, label = match try: entry_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S UTC') entry_time = entry_time.replace(tzinfo=timezone.utc) if entry_time >= cutoff: entries.append({ 'timestamp': timestamp_str, 'pair': pair, 'direction': direction, 'grade': grade, 'score': int(score), 'improvement_type': imp_type, 'label': label }) except: continue if len(entries) == 0: return {"error": f"No snitches in last {days} days"} # Count by improvement type from collections import Counter type_counts = Counter(e['improvement_type'] for e in entries) pair_counts = Counter(e['pair'] for e in entries) report = { "period_days": days, "total_snitches": len(entries), "by_type": dict(type_counts.most_common()), "by_pair": dict(pair_counts.most_common()), "top_issues": [t[0] for t in type_counts.most_common(5)], "blocked_pairs": len(self.get_blocked_pairs()), "recommendations": [] } # Generate recommendations if type_counts.get('tp1_wide', 0) > 3: report['recommendations'].append("Consider tightening TP1 targets - multiple reports of TP1 being too wide") if type_counts.get('sl_wide', 0) > 3: report['recommendations'].append("Review SL calculations - frequent reports of SL being too wide") if type_counts.get('entry_early', 0) > 3: report['recommendations'].append("Entry timing may be too aggressive - consider waiting for better confirmation") return report except Exception as e: return {"error": f"Report generation failed: {e}"} class BinanceClient: """Reusable Binance API client with shared thread pool for performance.""" # Class-level shared thread pool to avoid creating new ones per request _shared_thread_pool = None def __init__(self, api_key=None, api_secret=None, paper=True): self.api_key = api_key self.api_secret = api_secret self.paper = paper self.base_url = BINANCE_FAPI self._last_call_time = 0 self._min_interval = 0.05 self._call_count = 0 self._count_reset = time.time() self._max_calls_per_min = 1200 # Performance tracking for TEST tab self._last_fetch_time_ms = 0 self._calls_per_minute = 0 self._last_timing_update = time.time() # Initialize shared thread pool if not exists if BinanceClient._shared_thread_pool is None: BinanceClient._shared_thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="binance_worker") print("[BinanceClient] Shared thread pool initialized") def _rate_limit(self): now = time.time() if now - self._count_reset > 60: self._call_count = 0 self._count_reset = now if self._call_count >= self._max_calls_per_min: sleep_time = 60 - (now - self._count_reset) if sleep_time > 0: print(f"[RATE LIMIT] Sleeping {sleep_time:.1f}s to avoid API ban") time.sleep(sleep_time) self._call_count = 0 self._count_reset = time.time() elapsed = now - self._last_call_time if elapsed < self._min_interval: time.sleep(self._min_interval - elapsed) self._last_call_time = time.time() self._call_count += 1 def _get_signature(self, query_string): return hmac.new(self.api_secret.encode(), query_string.encode(), hashlib.sha256).hexdigest() def get_klines(self, symbol, interval='1h', limit=100): import time start_time = time.time() self._rate_limit() if not REQUESTS_AVAILABLE: return self._mock_klines(symbol, limit) try: url = f"{self.base_url}/fapi/v1/klines" params = {"symbol": symbol, "interval": interval, "limit": limit} response = requests.get(url, params=params, timeout=5) # Track fetch time elapsed_ms = (time.time() - start_time) * 1000 self._last_fetch_time_ms = elapsed_ms # Update calls per minute now = time.time() if now - self._last_timing_update > 60: self._calls_per_minute = self._call_count self._last_timing_update = now return response.json() except Exception as e: return self._mock_klines(symbol, limit) except Exception as e: return self._mock_klines(symbol, limit) def get_depth(self, symbol, limit=100): """Fetch order book depth for a symbol.""" self._rate_limit() if not REQUESTS_AVAILABLE: return {'bids': [], 'asks': []} try: url = f"{self.base_url}/fapi/v1/depth" params = {"symbol": symbol, "limit": limit} response = requests.get(url, params=params, timeout=5) data = response.json() # Convert string prices/quantities to floats if 'bids' in data: data['bids'] = [[float(p), float(q)] for p, q in data['bids']] if 'asks' in data: data['asks'] = [[float(p), float(q)] for p, q in data['asks']] return data except Exception as e: print(f"[BinanceClient] get_depth error for {symbol}: {e}") return {'bids': [], 'asks': []} def get_orderbook(self, symbol, limit=100): """Alias for get_depth - fetch order book for a symbol.""" return self.get_depth(symbol, limit) def get_funding_rate(self, symbol): """Fetch current funding rate for a perpetual symbol.""" self._rate_limit() if not REQUESTS_AVAILABLE: return None try: url = f"{self.base_url}/fapi/v1/fundingRate" params = {"symbol": symbol, "limit": 1} response = requests.get(url, params=params, timeout=5) data = response.json() if isinstance(data, list) and len(data) > 0: return { 'fundingRate': float(data[0].get('fundingRate', 0)), 'fundingTime': data[0].get('fundingTime', 0), 'symbol': data[0].get('symbol', symbol) } return None except Exception as e: print(f"[BinanceClient] get_funding_rate error for {symbol}: {e}") return None def get_open_interest(self, symbol): """Fetch current open interest for a symbol.""" self._rate_limit() if not REQUESTS_AVAILABLE: return None try: url = f"{self.base_url}/fapi/v1/openInterest" params = {"symbol": symbol} response = requests.get(url, params=params, timeout=5) data = response.json() return { 'openInterest': float(data.get('openInterest', 0)), 'symbol': data.get('symbol', symbol), 'time': data.get('time', 0) } except Exception as e: print(f"[BinanceClient] get_open_interest error for {symbol}: {e}") return None def get_onchain_metrics(self, symbol): """ Fetch on-chain metrics from alternative sources. Note: Full on-chain requires dedicated APIs (Glassnode, CryptoQuant). This provides basic indicators available from public APIs. """ metrics = { 'exchange_inflow': None, 'exchange_outflow': None, 'whale_alert': None, 'network_activity': None, 'oi_change_24h': None } try: # Get open interest change as proxy for futures activity oi_data = self.get_open_interest(symbol) if oi_data: current_oi = oi_data.get('openInterest', 0) # We would need historical OI for 24h change # For now, store current for comparison metrics['oi_change_24h'] = {'current': current_oi, 'change_pct': 0} # Try to get funding rate as sentiment indicator funding = self.get_funding_rate(symbol) if funding: metrics['funding_sentiment'] = 'bullish' if funding['fundingRate'] > 0 else 'bearish' metrics['funding_rate'] = funding['fundingRate'] # Note: True on-chain metrics (whale wallets, exchange flows) # require paid APIs (Glassnode, CryptoQuant, Santiment) # This is a placeholder for integration except Exception as e: print(f"[OnChain] Error fetching metrics for {symbol}: {e}") return metrics def analyze_orderbook(self, symbol, depth_limit=100, current_price=None): """ Analyze order book for liquidity, walls, and bid/ask imbalance. Returns dict with analysis results. """ orderbook = self.get_orderbook(symbol, depth_limit) if not orderbook or not orderbook.get('bids') or not orderbook.get('asks'): return None bids = orderbook['bids'] # [[price, qty], ...] asks = orderbook['asks'] # [[price, qty], ...] if not current_price and bids and asks: current_price = (bids[0][0] + asks[0][0]) / 2 if not current_price: return None # Calculate total volume in 1% range around price price_range = current_price * 0.01 # 1% range bid_volume = sum(qty for price, qty in bids if price >= current_price - price_range) ask_volume = sum(qty for price, qty in asks if price <= current_price + price_range) # Calculate bid/ask ratio (1.0 = balanced, >1 = more bids, <1 = more asks) total_volume = bid_volume + ask_volume bid_ask_ratio = bid_volume / ask_volume if ask_volume > 0 else 999 # Find walls (large orders > 2x average size in 1% range) avg_bid_size = bid_volume / len([b for b in bids if b[0] >= current_price - price_range]) if bids else 0 avg_ask_size = ask_volume / len([a for a in asks if a[0] <= current_price + price_range]) if asks else 0 bid_walls = [(price, qty) for price, qty in bids if qty > avg_bid_size * 2 and price >= current_price - price_range][:3] ask_walls = [(price, qty) for price, qty in asks if qty > avg_ask_size * 2 and price <= current_price + price_range][:3] # Calculate slippage for $1000 order usd_amount = 1000 bid_slippage = self._calculate_slippage(bids, usd_amount, current_price, 'sell') ask_slippage = self._calculate_slippage(asks, usd_amount, current_price, 'buy') return { 'bid_ask_ratio': bid_ask_ratio, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_volume_1pct': total_volume, 'bid_walls': bid_walls, 'ask_walls': ask_walls, 'bid_slippage_pct': bid_slippage, 'ask_slippage_pct': ask_slippage, 'liquidity_score': self._score_liquidity(total_volume, bid_ask_ratio, bid_slippage, ask_slippage), 'spread_pct': (asks[0][0] - bids[0][0]) / current_price * 100 if bids and asks else 0, 'current_price': current_price } def _calculate_slippage(self, orders, usd_amount, current_price, side): """Calculate slippage % for a given USD amount.""" remaining = usd_amount total_qty = 0 weighted_price = 0 for price, qty in orders: order_value = qty * price if order_value >= remaining: fill_qty = remaining / price total_qty += fill_qty weighted_price += price * fill_qty remaining = 0 break else: total_qty += qty weighted_price += price * qty remaining -= order_value if total_qty == 0: return 999 # Can't fill avg_price = weighted_price / total_qty slippage = abs(avg_price - current_price) / current_price * 100 return slippage def _score_liquidity(self, total_volume, bid_ask_ratio, bid_slippage, ask_slippage): """Score liquidity 0-100 based on multiple factors.""" score = 50 # Base score # Volume score (0-30) if total_volume > 1000: # High volume score += 30 elif total_volume > 500: score += 20 elif total_volume > 100: score += 10 # Bid/Ask balance (0-20) if 0.8 <= bid_ask_ratio <= 1.2: score += 20 # Balanced elif 0.5 <= bid_ask_ratio <= 2.0: score += 10 # Slight imbalance # Slippage score (0-30) avg_slippage = (bid_slippage + ask_slippage) / 2 if avg_slippage < 0.05: score += 30 elif avg_slippage < 0.1: score += 20 elif avg_slippage < 0.2: score += 10 else: score -= 20 # Penalty for high slippage return min(100, max(0, score)) def fetch_multi_timeframe_data(self, symbol): """Fetch klines data for multiple timeframes (3m, 5m, 15m, 1h, 2h, 4h) - PARALLEL VERSION.""" timeframes = { '3m': 30, '5m': 30, '15m': 50, '1h': 50, '2h': 30, '4h': 30 } def fetch_one(interval_limit): """Fetch single timeframe - helper for parallel execution.""" interval, limit = interval_limit try: klines = self.get_klines(symbol, interval, limit) if klines and isinstance(klines, list) and len(klines) > 0: prices = [float(k[4]) for k in klines if isinstance(k, (list, tuple)) and len(k) > 4] volumes = [float(k[5]) for k in klines if isinstance(k, (list, tuple)) and len(k) > 5] return interval, { 'prices': prices, 'volumes': volumes, 'klines': klines } return interval, None except Exception as e: print(f"[Multi-Timeframe] Error fetching {symbol} {interval}: {e}") return interval, None # Parallel fetch all timeframes using shared thread pool data = {} try: results = BinanceClient._shared_thread_pool.map(fetch_one, timeframes.items()) for interval, result in results: data[interval] = result except Exception as e: print(f"[Multi-Timeframe] Thread pool error: {e}") # Fallback to sequential on error for interval_limit in timeframes.items(): interval, result = fetch_one(interval_limit) data[interval] = result return data def get_ticker_price(self, symbol): self._rate_limit() if not REQUESTS_AVAILABLE: return self._mock_price(symbol) try: url = f"{self.base_url}/fapi/v1/ticker/price" params = {"symbol": symbol} response = requests.get(url, params=params, timeout=3) data = response.json() return float(data.get('price', 0)) except Exception as e: return self._mock_price(symbol) def get_account_balance(self): if not REQUESTS_AVAILABLE or not self.api_key or not self.api_secret: return 10000.0 if self.paper else 0.0 try: timestamp = int(time.time() * 1000) query = f"timestamp={timestamp}" signature = self._get_signature(query) url = f"{self.base_url}/fapi/v2/balance?{query}&signature={signature}" headers = {"X-MBX-APIKEY": self.api_key} response = requests.get(url, headers=headers, timeout=5) data = response.json() for asset in data: if asset['asset'] == 'USDC': return float(asset.get('balance', 0)) return 0.0 except Exception as e: print(f"Balance fetch error: {e}") return 10000.0 if self.paper else 0.0 def place_order(self, symbol, side, quantity, order_type='MARKET', price=None, stop_price=None, reduce_only=False): if self.paper: return {'orderId': str(uuid.uuid4())[:8], 'status': 'FILLED'} if not REQUESTS_AVAILABLE or not self.api_key or not self.api_secret: return None try: timestamp = int(time.time() * 1000) params = { "symbol": symbol, "side": side, "type": order_type, "quantity": quantity, "timestamp": timestamp } if price and order_type in ['LIMIT', 'STOP_LOSS_LIMIT', 'TAKE_PROFIT_LIMIT']: params["price"] = price params["timeInForce"] = "GTC" if stop_price and order_type in ['STOP_MARKET', 'STOP_LOSS', 'TAKE_PROFIT', 'STOP_LOSS_LIMIT', 'TAKE_PROFIT_LIMIT']: params["stopPrice"] = stop_price if reduce_only: params["reduceOnly"] = "true" query = '&'.join([f"{k}={v}" for k, v in params.items()]) signature = self._get_signature(query) url = f"{self.base_url}/fapi/v1/order?{query}&signature={signature}" headers = {"X-MBX-APIKEY": self.api_key} response = requests.post(url, headers=headers, timeout=5) return response.json() except Exception as e: print(f"Order error: {e}") return None def close_position(self, symbol, side, quantity): return self.place_order(symbol, side, quantity, reduce_only=True) def cancel_order(self, symbol, order_id): if self.paper: return {'status': 'CANCELLED'} if not REQUESTS_AVAILABLE or not self.api_key or not self.api_secret: return None try: timestamp = int(time.time() * 1000) query = f"symbol={symbol}&orderId={order_id}×tamp={timestamp}" signature = self._get_signature(query) url = f"{self.base_url}/fapi/v1/order?{query}&signature={signature}" headers = {"X-MBX-APIKEY": self.api_key} response = requests.delete(url, headers=headers, timeout=5) return response.json() except Exception as e: print(f"Cancel error: {e}") return None def _mock_klines(self, symbol, limit): base = 50000 if 'BTC' in symbol else (2500 if 'ETH' in symbol else 100) klines = [] price = base for i in range(limit): change = (math.sin(i / 10) * 0.002) open_p = price close_p = price * (1 + change) high_p = max(open_p, close_p) * 1.002 low_p = min(open_p, close_p) * 0.998 klines.append([ int(time.time() * 1000) - (limit - i) * 3600000, str(open_p), str(high_p), str(low_p), str(close_p), "1000000", int(time.time() * 1000), "1000000", 100, "600000", "600000", "0" ]) price = close_p return klines def _mock_price(self, symbol): base = 50000 if 'BTC' in symbol else (2500 if 'ETH' in symbol else 100) return base * (1 + (hash(symbol + str(int(time.time()))) % 100 - 50) / 10000) def format_price(price, pair=None): if price is None or price == 0: return "$0.00" if pair: pair_upper = pair.upper() # Very low priced coins - need 5-6 decimals to see meaningful changes if any(x in pair_upper for x in ['SHIB']): return f"${price:,.8f}" # Low priced coins - 5-6 decimals for coins under $1 if any(x in pair_upper for x in ['ADA', 'DOGE', 'XLM', 'VET', 'FIL', 'TRX', 'HBAR']): return f"${price:,.5f}" elif any(x in pair_upper for x in ['ARB', 'OP', 'APT', 'NEAR', 'DOT', 'MATIC', 'LINK']): return f"${price:,.4f}" elif any(x in pair_upper for x in ['SOL', 'BNB', 'ETH', 'LTC', 'AVAX']): return f"${price:,.2f}" elif 'BTC' in pair_upper: return f"${price:,.0f}" # Dynamic formatting based on price level if price >= 1000: return f"${price:,.0f}" elif price >= 100: return f"${price:,.2f}" elif price >= 10: return f"${price:,.3f}" elif price >= 1: return f"${price:,.4f}" elif price >= 0.1: return f"${price:,.5f}" else: return f"${price:,.6f}" class SoundManager: EVENTS = [ 'trade_entry', 'trade_exit', 'error', 'setup_found', ] VOLUME_LEVELS = { 'trade_entry': 100, 'trade_exit': 100, 'setup_found': 80, 'error': 60, } def __init__(self, data_dir=DATA_DIR): self.enabled = False # DEFAULT: Sounds OFF self.vibration_enabled = False # DEFAULT: Vibration OFF self.current_theme = 'pro_trader' self.volume_override = 100 self.sound_dir = os.path.join(data_dir, '..', 'ALERTSOUND') if not os.path.exists(self.sound_dir): ext_path = '/storage/emulated/0/ALERTSOUND' if os.path.exists(ext_path): self.sound_dir = ext_path self.sound_cache = {} self.audio_available = False self._init_audio() def _init_audio(self): try: from kivy.core.audio import SoundLoader self.SoundLoader = SoundLoader self.audio_available = True print("[SoundManager] Using Kivy audio") except Exception as e: print(f"[SoundManager] Kivy audio not available: {e}") try: from jnius import autoclass self.android_media = autoclass('android.media.MediaPlayer') self.audio_available = True print("[SoundManager] Using Android MediaPlayer") except: self.android_media = None def get_sound_path(self, event): if not self.current_theme: return None theme_dir = os.path.join(self.sound_dir, self.current_theme) for ext in ['.wav', '.mp3', '.ogg', '.m4a']: path = os.path.join(theme_dir, f"{event}{ext}") if os.path.exists(path): return path return None def set_theme(self, theme_name): """Set the current sound theme""" self.current_theme = theme_name print(f"[SoundManager] Theme set to: {theme_name}") return None def play(self, event, force=False): if not self.enabled and not force: return False if event not in self.EVENTS: return False sound_path = self.get_sound_path(event) if not sound_path: return False if self.vibration_enabled: self._vibrate(event) try: if hasattr(self, 'SoundLoader'): sound = self.sound_cache.get(event) if sound is None: sound = self.SoundLoader.load(sound_path) if sound: self.sound_cache[event] = sound if sound: volume = (self.VOLUME_LEVELS.get(event, 50) / 100.0) * (self.volume_override / 100.0) sound.volume = volume sound.play() return True elif self.android_media: player = self.android_media() player.setDataSource(sound_path) player.prepare() player.start() return True except Exception as e: pass return False def _vibrate(self, event): try: from jnius import autoclass PythonActivity = autoclass('org.kivy.android.PythonActivity') Context = autoclass('android.content.Context') Vibrator = autoclass('android.os.Vibrator') activity = PythonActivity.mActivity vibrator = activity.getSystemService(Context.VIBRATOR_SERVICE) if vibrator and vibrator.hasVibrator(): patterns = { 'trade_entry': [0, 300, 100, 300], 'trade_exit': [0, 500], 'error': [0, 100, 50, 100, 50, 100], 'setup_found': [0, 150, 50, 150], } pattern = patterns.get(event, [0, 100]) vibrator.vibrate(pattern, -1) except Exception as e: pass def test_sound(self, event): return self.play(event, force=True) def toggle_enabled(self): self.enabled = not self.enabled return self.enabled def toggle_vibration(self): self.vibration_enabled = not self.vibration_enabled return self.vibration_enabled def calculate_rsi(prices, period=14): if len(prices) < period + 1: return 50 gains, losses = [], [] for i in range(1, len(prices)): change = prices[i] - prices[i-1] if change > 0: gains.append(change) losses.append(0) else: gains.append(0) losses.append(abs(change)) if len(gains) < period: return 50 avg_gain = sum(gains[-period:]) / period avg_loss = sum(losses[-period:]) / period if avg_loss == 0: return 100 if avg_gain > 0 else 50 rs = avg_gain / avg_loss return max(0, min(100, 100 - (100 / (1 + rs)))) def calculate_ema(prices, period): if len(prices) < period: return sum(prices) / len(prices) if prices else 0 multiplier = 2 / (period + 1) ema = sum(prices[:period]) / period for price in prices[period:]: ema = (price - ema) * multiplier + ema return ema def calculate_bollinger_bands(prices, period=20): if len(prices) < period: return None, None, None recent = prices[-period:] sma = sum(recent) / len(recent) variance = sum((x - sma) ** 2 for x in recent) / len(recent) std = math.sqrt(variance) return sma + (2 * std), sma, sma - (2 * std) def score_to_grade(score): if score >= 97: return "A+" elif score >= 93: return "A" elif score >= 90: return "A-" elif score >= 87: return "B+" elif score >= 83: return "B" elif score >= 80: return "B-" elif score >= 77: return "C+" elif score >= 73: return "C" elif score >= 70: return "C-" elif score >= 67: return "D+" elif score >= 63: return "D" elif score >= 60: return "D-" else: return "F" def grade_to_score(grade): """Convert grade letter to numeric score. Returns 0 (allow all) if grade is None.""" if grade is None: return 0 grade_map = { "A+": 97, "A": 93, "A-": 90, "B+": 87, "B": 83, "B-": 80, "C+": 77, "C": 73, "C-": 70, "D+": 67, "D": 63, "D-": 60, "F": 0 } return grade_map.get(grade, 0) GRADE_OPTIONS = ["A+", "A", "A-", "B+", "B", "B-", "C+", "C", "C-", "D+", "D", "D-", "F"] def reduce_grade(grade, levels=1): """Reduce grade by specified levels (e.g., B → B- → C+).""" if grade not in GRADE_OPTIONS: return 'F' idx = GRADE_OPTIONS.index(grade) new_idx = min(len(GRADE_OPTIONS) - 1, idx + levels) return GRADE_OPTIONS[new_idx] def calculate_momentum_exhaustion(prices, volumes): if len(prices) < 50: return {"exhausted": False, "score": 0, "signals": [], "change_12h": 0, "momentum_direction": "neutral"} current = prices[-1] change_1h = (current - prices[-2]) / prices[-2] * 100 if len(prices) > 2 else 0 change_4h = (current - prices[-12]) / prices[-12] * 100 if len(prices) > 12 else 0 change_12h = (current - prices[-36]) / prices[-36] * 100 if len(prices) > 36 else 0 signals = [] exhaustion_score = 0 if abs(change_12h) > 15: signals.append(f"Extended 12h move: {change_12h:+.1f}%") exhaustion_score += 20 elif abs(change_12h) > 10: signals.append(f"Strong 12h move: {change_12h:+.1f}%") exhaustion_score += 10 elif abs(change_12h) > 5: signals.append(f"Moderate 12h move: {change_12h:+.1f}%") exhaustion_score += 5 if volumes and len(volumes) >= 30: recent_vol = sum(volumes[-6:]) avg_vol = sum(volumes[-30:-6]) / 24 if len(volumes) >= 36 else sum(volumes[:-6]) / max(1, len(volumes)-6) if avg_vol > 0: vol_ratio = recent_vol / avg_vol if vol_ratio > 3: signals.append("Volume climax - distribution/accumulation") exhaustion_score += 15 elif vol_ratio > 2: signals.append("High volume") exhaustion_score += 8 rsi_now = calculate_rsi(prices[-15:]) if len(prices) >= 51: rsi_12h_ago = calculate_rsi(prices[-51:-36]) price_change = (current - prices[-36]) / prices[-36] * 100 rsi_change = rsi_now - rsi_12h_ago if price_change > 5 and rsi_change < -5: signals.append("Bearish RSI divergence") exhaustion_score += 20 elif price_change < -5 and rsi_change > 5: signals.append("Bullish RSI divergence") exhaustion_score += 20 return { "exhausted": exhaustion_score >= 25, "score": min(100, exhaustion_score), "signals": signals, "change_12h": change_12h, "momentum_direction": "up" if change_12h > 0 else "down" if change_12h < 0 else "neutral" } def detect_volume_pattern(volumes, prices): if not volumes or len(volumes) < 20: return {"pattern": "unknown", "suspicious": False, "quality": 1.0} recent_vols = volumes[-10:] avg_vol = sum(volumes) / len(volumes) variance = sum((v - avg_vol)**2 for v in recent_vols) / len(recent_vols) cv = (variance ** 0.5) / avg_vol if avg_vol > 0 else 0 if cv < 0.3 and avg_vol > 1000: return {"pattern": "suspicious_uniformity", "suspicious": True, "quality": 0.5} elif cv > 1.5: return {"pattern": "natural_volatile", "suspicious": False, "quality": 1.0} else: return {"pattern": "normal", "suspicious": False, "quality": 0.9} def calculate_atr(prices, period=14): if len(prices) < period + 1: return prices[-1] * 0.02 if prices else 100 tr_list = [abs(prices[i] - prices[i-1]) for i in range(1, len(prices))] return sum(tr_list[-period:]) / period def calculate_stoch_rsi(prices, rsi_period=14, stoch_period=14, smooth_k=3, smooth_d=3): """Calculate Stochastic RSI indicator. Returns dict with %K and %D values (0-100 range)""" if len(prices) < rsi_period + stoch_period + 5: return {'k': 50, 'd': 50, 'valid': False} # Calculate RSI values rsi_values = [] for i in range(rsi_period, len(prices)): window = prices[i-rsi_period:i] gains, losses = [], [] for j in range(1, len(window)): change = window[j] - window[j-1] if change > 0: gains.append(change) losses.append(0) else: gains.append(0) losses.append(abs(change)) avg_gain = sum(gains) / len(gains) if gains else 0 avg_loss = sum(losses) / len(losses) if losses else 0 if avg_loss == 0: rsi = 100 else: rsi = 100 - (100 / (1 + avg_gain / avg_loss)) rsi_values.append(rsi) if len(rsi_values) < stoch_period: return {'k': 50, 'd': 50, 'valid': False} # Calculate Stochastic of RSI k_values = [] for i in range(stoch_period, len(rsi_values)): rsi_window = rsi_values[i-stoch_period:i] rsi_min, rsi_max = min(rsi_window), max(rsi_window) if rsi_max - rsi_min == 0: k = 50 else: k = (rsi_values[i-1] - rsi_min) / (rsi_max - rsi_min) * 100 k_values.append(k) if len(k_values) < smooth_k + smooth_d: return {'k': 50, 'd': 50, 'valid': False} # Smooth K values smoothed_k = [sum(k_values[i-smooth_k:i]) / smooth_k for i in range(smooth_k, len(k_values))] d_values = [sum(smoothed_k[i-smooth_d:i]) / smooth_d for i in range(smooth_d, len(smoothed_k))] return {'k': smoothed_k[-1] if smoothed_k else 50, 'd': d_values[-1] if d_values else 50, 'valid': True} def calculate_macd(prices, fast=12, slow=26, signal=9): """Calculate MACD with histogram flip detection.""" if len(prices) < slow + signal: return {'macd': 0, 'signal': 0, 'histogram': 0, 'valid': False} ema_fast = calculate_ema(prices, fast) ema_slow = calculate_ema(prices, slow) macd_line = ema_fast - ema_slow # Calculate MACD history macd_values = [] for i in range(slow, len(prices)): ema_f = calculate_ema(prices[:i+1], fast) ema_s = calculate_ema(prices[:i+1], slow) macd_values.append(ema_f - ema_s) signal_line = calculate_ema(macd_values, signal) if len(macd_values) >= signal else macd_line histogram = macd_line - signal_line # Detect flip prev_histogram = 0 if len(macd_values) >= signal + 2: prev_macd_vals = macd_values[:-1] prev_signal = calculate_ema(prev_macd_vals, signal) prev_histogram = prev_macd_vals[-1] - prev_signal return { 'macd': macd_line, 'signal': signal_line, 'histogram': histogram, 'valid': True, 'flip_positive': histogram > 0 and prev_histogram <= 0, 'flip_negative': histogram < 0 and prev_histogram >= 0 } def calculate_vwap(prices, volumes): """Calculate VWAP with false breakdown detection.""" if len(prices) != len(volumes) or len(prices) < 2: return {'vwap': prices[-1] if prices else 0, 'valid': False} typical_prices = [(prices[i] + prices[i-1]) / 2 for i in range(1, len(prices))] vols = volumes[1:] cumulative_pv = sum(tp * v for tp, v in zip(typical_prices, vols)) cumulative_vol = sum(vols) vwap = cumulative_pv / cumulative_vol if cumulative_vol > 0 else prices[-1] # Check for false breakdown false_breakdown = len(prices) >= 5 and prices[-3] < vwap and prices[-2] < vwap and prices[-1] > vwap return {'vwap': vwap, 'valid': True, 'false_breakdown': false_breakdown} def detect_candle_pattern(prices, volumes=None): """Detect candlestick patterns for scalp entries.""" if len(prices) < 5: return {'pattern': 'NONE', 'strength': 0, 'patterns': []} last_5 = prices[-5:] body = abs(last_5[-1] - last_5[-2]) range_5 = max(last_5) - min(last_5) patterns = [] # Bullish engulfing if last_5[-1] > last_5[-2] and last_5[-2] < last_5[-3] and last_5[-1] > last_5[-3]: patterns.append('BULLISH_ENGULFING') # Bearish engulfing if last_5[-1] < last_5[-2] and last_5[-2] > last_5[-3] and last_5[-1] < last_5[-3]: patterns.append('BEARISH_ENGULFING') # Morning star if len(last_5) >= 3 and last_5[-3] > last_5[-2] and last_5[-1] > last_5[-2] * 1.01: if last_5[-2] < min(last_5[-3], last_5[-1]): patterns.append('MORNING_STAR') # Shooting star if len(last_5) >= 3 and last_5[-3] < last_5[-2] and last_5[-1] < last_5[-2] * 0.99: if last_5[-2] > max(last_5[-3], last_5[-1]): patterns.append('SHOOTING_STAR') # Hammer if body < range_5 * 0.3 and last_5[-1] > last_5[-2] * 1.005: patterns.append('HAMMER') return {'pattern': patterns[0] if patterns else 'NONE', 'patterns': patterns, 'strength': len(patterns)} def calculate_bollinger_bands(prices, period=20, std_dev=2): """ Calculate Bollinger Bands. Returns dict with upper, middle (SMA), lower bands, and squeeze detection. """ if len(prices) < period: return {'upper': prices[-1] if prices else 0, 'middle': prices[-1] if prices else 0, 'lower': prices[-1] if prices else 0, 'bandwidth': 0, 'squeeze': False, 'valid': False} # Calculate SMA (middle band) sma = sum(prices[-period:]) / period # Calculate standard deviation variance = sum((p - sma) ** 2 for p in prices[-period:]) / period std = variance ** 0.5 # Calculate bands upper = sma + (std * std_dev) lower = sma - (std * std_dev) # Calculate bandwidth (% of price) bandwidth = ((upper - lower) / sma) * 100 if sma > 0 else 0 # Detect squeeze (narrowing bands - volatility contraction) # Compare current bandwidth to recent average if len(prices) >= period * 2: recent_bandwidths = [] for i in range(period, len(prices) - period + 1): chunk = prices[i:i+period] chunk_sma = sum(chunk) / period chunk_var = sum((p - chunk_sma) ** 2 for p in chunk) / period chunk_std = chunk_var ** 0.5 chunk_bandwidth = ((chunk_sma + chunk_std * std_dev) - (chunk_sma - chunk_std * std_dev)) / chunk_sma * 100 recent_bandwidths.append(chunk_bandwidth) avg_bandwidth = sum(recent_bandwidths) / len(recent_bandwidths) if recent_bandwidths else bandwidth squeeze = bandwidth < avg_bandwidth * 0.6 # Current bandwidth < 60% of average = squeeze else: squeeze = bandwidth < 5 # Fallback: bandwidth < 5% is squeeze # Price position within bands (0 = at lower, 100 = at upper) current_price = prices[-1] percent_b = ((current_price - lower) / (upper - lower)) * 100 if (upper - lower) > 0 else 50 return { 'upper': upper, 'middle': sma, 'lower': lower, 'bandwidth': bandwidth, 'squeeze': squeeze, 'percent_b': percent_b, 'valid': True } def calculate_supertrend(prices, period=10, multiplier=3): """ Calculate Supertrend indicator. Returns dict with trend direction, supertrend line, and signals. """ if len(prices) < period + 1: return {'trend': 'NEUTRAL', 'supertrend': prices[-1] if prices else 0, 'buy_signal': False, 'sell_signal': False, 'valid': False} # Calculate ATR atr = calculate_atr(prices, period) # Calculate basic upper and lower bands upper_band = [] lower_band = [] for i in range(period, len(prices)): high = prices[i] low = prices[i-1] basic_upper = (high + low) / 2 + (multiplier * atr) basic_lower = (high + low) / 2 - (multiplier * atr) upper_band.append(basic_upper) lower_band.append(basic_lower) # Calculate final Supertrend supertrend = [] trend = [] for i in range(len(upper_band)): if i == 0: # Initialize supertrend.append(upper_band[i] if prices[period + i] <= upper_band[i] else lower_band[i]) trend.append('DOWN' if prices[period + i] <= upper_band[i] else 'UP') else: prev_st = supertrend[-1] if trend[-1] == 'UP': # In uptrend, use lower band new_st = max(lower_band[i], prev_st) supertrend.append(new_st) if prices[period + i] < new_st: trend.append('DOWN') else: trend.append('UP') else: # In downtrend, use upper band new_st = min(upper_band[i], prev_st) supertrend.append(new_st) if prices[period + i] > new_st: trend.append('UP') else: trend.append('DOWN') # Detect signals buy_signal = len(trend) >= 2 and trend[-2] == 'DOWN' and trend[-1] == 'UP' sell_signal = len(trend) >= 2 and trend[-2] == 'UP' and trend[-1] == 'DOWN' return { 'trend': trend[-1], 'supertrend': supertrend[-1], 'buy_signal': buy_signal, 'sell_signal': sell_signal, 'valid': True } def calculate_obv(prices, volumes): """ Calculate On-Balance Volume (OBV). Returns dict with OBV value, trend, and divergence detection. """ if len(prices) != len(volumes) or len(prices) < 2: return {'obv': 0, 'trend': 'NEUTRAL', 'divergence': None, 'valid': False} obv_values = [volumes[0]] # Start with first volume for i in range(1, len(prices)): if prices[i] > prices[i-1]: obv_values.append(obv_values[-1] + volumes[i]) elif prices[i] < prices[i-1]: obv_values.append(obv_values[-1] - volumes[i]) else: obv_values.append(obv_values[-1]) current_obv = obv_values[-1] # Determine trend (last 10 periods) obv_trend = 'NEUTRAL' if len(obv_values) >= 10: if obv_values[-1] > obv_values[-10] * 1.05: obv_trend = 'UP' elif obv_values[-1] < obv_values[-10] * 0.95: obv_trend = 'DOWN' # Detect divergence (price vs OBV) divergence = None if len(prices) >= 10 and len(obv_values) >= 10: price_change = (prices[-1] - prices[-10]) / prices[-10] * 100 obv_change = (obv_values[-1] - obv_values[-10]) / obv_values[-10] * 100 if obv_values[-10] != 0 else 0 if price_change > 2 and obv_change < -2: divergence = 'BEARISH' # Price up, OBV down = bearish divergence elif price_change < -2 and obv_change > 2: divergence = 'BULLISH' # Price down, OBV up = bullish divergence return { 'obv': current_obv, 'trend': obv_trend, 'divergence': divergence, 'valid': True } def calculate_volume_profile(prices, volumes, num_bins=12): """ Calculate Volume Profile (price levels with highest volume). Returns dict with POC (Point of Control), Value Area, and support/resistance levels. """ if len(prices) != len(volumes) or len(prices) < 20: return {'poc': prices[-1] if prices else 0, 'value_area_high': 0, 'value_area_low': 0, 'valid': False} # Create price bins min_price = min(prices) max_price = max(prices) price_range = max_price - min_price if price_range == 0: return {'poc': prices[-1], 'value_area_high': max_price, 'value_area_low': min_price, 'valid': True} bin_size = price_range / num_bins bins = [min_price + i * bin_size for i in range(num_bins + 1)] # Calculate volume for each bin volume_by_bin = [0] * num_bins for price, vol in zip(prices, volumes): bin_idx = min(int((price - min_price) / bin_size), num_bins - 1) volume_by_bin[bin_idx] += vol # Find POC (bin with highest volume) max_vol_idx = volume_by_bin.index(max(volume_by_bin)) poc = (bins[max_vol_idx] + bins[max_vol_idx + 1]) / 2 # Calculate Value Area (70% of volume) total_volume = sum(volume_by_bin) target_volume = total_volume * 0.70 # Start from POC and expand value_area_indices = [max_vol_idx] current_volume = volume_by_bin[max_vol_idx] left = max_vol_idx - 1 right = max_vol_idx + 1 while current_volume < target_volume and (left >= 0 or right < num_bins): left_vol = volume_by_bin[left] if left >= 0 else 0 right_vol = volume_by_bin[right] if right < num_bins else 0 if left_vol >= right_vol and left >= 0: value_area_indices.append(left) current_volume += left_vol left -= 1 elif right < num_bins: value_area_indices.append(right) current_volume += right_vol right += 1 else: break value_area_low = bins[min(value_area_indices)] value_area_high = bins[max(value_area_indices) + 1] # Find high volume nodes (support/resistance) avg_volume = total_volume / num_bins high_volume_nodes = [] for i, vol in enumerate(volume_by_bin): if vol > avg_volume * 1.5: # 50% above average node_price = (bins[i] + bins[i + 1]) / 2 high_volume_nodes.append(node_price) return { 'poc': poc, 'value_area_high': value_area_high, 'value_area_low': value_area_low, 'value_area_volume_pct': (current_volume / total_volume) * 100, 'high_volume_nodes': sorted(high_volume_nodes), 'valid': True } def calculate_fibonacci_retracement(high, low, current_price=None): """ Calculate Fibonacci retracement levels. Returns dict with retracement levels (0.236, 0.382, 0.5, 0.618, 0.786). """ if high <= low: return {'levels': {}, 'valid': False} diff = high - low levels = { '0.0': high, '0.236': high - (diff * 0.236), '0.382': high - (diff * 0.382), '0.5': high - (diff * 0.5), '0.618': high - (diff * 0.618), '0.786': high - (diff * 0.786), '1.0': low } # Find nearest level to current price nearest_level = None if current_price: min_dist = float('inf') for level_name, level_price in levels.items(): dist = abs(current_price - level_price) if dist < min_dist: min_dist = dist nearest_level = level_name return { 'levels': levels, 'nearest_level': nearest_level, 'valid': True } def calculate_pivot_points(high, low, close): """ Calculate daily pivot points and support/resistance levels. Returns dict with pivot point, supports, and resistances. """ if high <= 0 or low <= 0 or close <= 0: return {'pivot': 0, 'valid': False} # Classic Pivot Point pivot = (high + low + close) / 3 # Support levels s1 = (2 * pivot) - high s2 = pivot - (high - low) s3 = low - 2 * (high - pivot) # Resistance levels r1 = (2 * pivot) - low r2 = pivot + (high - low) r3 = high + 2 * (pivot - low) return { 'pivot': pivot, 's1': s1, 's2': s2, 's3': s3, 'r1': r1, 'r2': r2, 'r3': r3, 'valid': True } def calculate_adx(prices, period=14): """ Calculate ADX (Average Directional Index) for trend strength. Returns dict with ADX, +DI, -DI values. ADX > 25: Trending, ADX > 40: Strong trend, ADX < 20: Ranging """ if len(prices) < period * 2: return {'adx': 25, 'plus_di': 50, 'minus_di': 50, 'trending': False} # Calculate True Range tr_list = [] for i in range(1, len(prices)): high = prices[i] low = prices[i-1] prev_close = prices[i-1] tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) tr_list.append(tr) # Calculate +DM and -DM plus_dm = [] minus_dm = [] for i in range(1, len(prices)): up_move = prices[i] - prices[i-1] down_move = prices[i-1] - prices[i] if up_move > down_move and up_move > 0: plus_dm.append(up_move) minus_dm.append(0) elif down_move > up_move and down_move > 0: plus_dm.append(0) minus_dm.append(down_move) else: plus_dm.append(0) minus_dm.append(0) # Smooth with Wilder's method atr = sum(tr_list[-period:]) / period plus_di = 100 * sum(plus_dm[-period:]) / (period * atr) if atr > 0 else 0 minus_di = 100 * sum(minus_dm[-period:]) / (period * atr) if atr > 0 else 0 # Calculate DX and ADX dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di) if (plus_di + minus_di) > 0 else 0 # Smooth DX to get ADX adx_values = [dx] for i in range(period, len(prices) - period): if i < len(adx_values): adx_values.append((adx_values[-1] * (period - 1) + dx) / period) adx = adx_values[-1] if adx_values else dx return { 'adx': min(100, max(0, adx)), 'plus_di': plus_di, 'minus_di': minus_di, 'trending': adx > 25, 'strong_trend': adx > 40, 'ranging': adx < 20, 'bullish': plus_di > minus_di, 'di_difference': abs(plus_di - minus_di) } def calculate_adx_responsive(prices, volumes=None, fast_period=7, slow_period=14): """ Calculate RESPONSIVE ADX with multiple periods and trend momentum detection. Uses faster period (7) for quick trend changes + slower period (14) for confirmation. Adds ADX momentum (rate of change) and trend strength classification. Returns enhanced dict with: - adx_fast: Fast ADX (7-period) for quick response - adx_slow: Slow ADX (14-period) for confirmation - adx_composite: Weighted composite of fast/slow - adx_momentum: Rate of change (rising/falling trend strength) - adx_trend: 'RISING', 'FALLING', 'STABLE' - trend_quality: Score 0-100 based on consistency across lookback """ if len(prices) < slow_period * 2: return { 'adx': 25, 'plus_di': 50, 'minus_di': 50, 'trending': False, 'adx_fast': 25, 'adx_slow': 25, 'adx_composite': 25, 'adx_momentum': 0, 'adx_trend': 'STABLE', 'trend_quality': 50 } def calc_adx_for_period(prices, period): """Helper to calculate ADX for a specific period.""" tr_list = [] plus_dm = [] minus_dm = [] for i in range(1, len(prices)): high = prices[i] low = prices[i-1] prev_close = prices[i-1] tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) tr_list.append(tr) up_move = prices[i] - prices[i-1] down_move = prices[i-1] - prices[i] if up_move > down_move and up_move > 0: plus_dm.append(up_move) minus_dm.append(0) elif down_move > up_move and down_move > 0: plus_dm.append(0) minus_dm.append(down_move) else: plus_dm.append(0) minus_dm.append(0) if len(tr_list) < period: return {'adx': 25, 'plus_di': 50, 'minus_di': 50} atr = sum(tr_list[-period:]) / period plus_di = 100 * sum(plus_dm[-period:]) / (period * atr) if atr > 0 else 0 minus_di = 100 * sum(minus_dm[-period:]) / (period * atr) if atr > 0 else 0 dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di) if (plus_di + minus_di) > 0 else 0 # Wilder smoothing adx_smooth = dx for i in range(period, min(len(prices) - period, period * 2)): adx_smooth = (adx_smooth * (period - 1) + dx) / period return { 'adx': min(100, max(0, adx_smooth)), 'plus_di': plus_di, 'minus_di': minus_di } # Calculate both fast and slow ADX fast_data = calc_adx_for_period(prices, fast_period) slow_data = calc_adx_for_period(prices, slow_period) adx_fast = fast_data['adx'] adx_slow = slow_data['adx'] # Composite ADX: weight fast more for responsiveness (60/40 split) adx_composite = (adx_fast * 0.6) + (adx_slow * 0.4) # Calculate ADX momentum (rate of change over last 5 periods) if len(prices) >= slow_period + 5: # Calculate ADX 5 periods ago for momentum old_prices = prices[:-5] old_data = calc_adx_for_period(old_prices, slow_period) adx_momentum = adx_slow - old_data['adx'] else: adx_momentum = 0 # Classify ADX trend if adx_momentum > 2: adx_trend = 'RISING' elif adx_momentum < -2: adx_trend = 'FALLING' else: adx_trend = 'STABLE' # Calculate trend quality (consistency of DI difference over lookback) di_diff = abs(fast_data['plus_di'] - fast_data['minus_di']) trend_quality = min(100, di_diff) # 0-100 scale # Volume confirmation if available volume_confirmed = False if volumes and len(volumes) >= 10: recent_vol = sum(volumes[-3:]) / 3 avg_vol = sum(volumes[-10:]) / 10 volume_confirmed = recent_vol > avg_vol * 1.2 # 20% above average # Dynamic thresholds based on market conditions base_threshold = 25 if adx_trend == 'RISING': # Lower threshold for emerging trends trending_threshold = base_threshold - 3 strong_threshold = 35 elif adx_trend == 'FALLING': # Higher threshold for fading trends trending_threshold = base_threshold + 3 strong_threshold = 42 else: trending_threshold = base_threshold strong_threshold = 40 return { 'adx': adx_composite, # Main ADX value (composite) 'plus_di': fast_data['plus_di'], 'minus_di': fast_data['minus_di'], 'bullish': fast_data['plus_di'] > fast_data['minus_di'], 'di_difference': di_diff, # New responsive metrics 'adx_fast': adx_fast, 'adx_slow': adx_slow, 'adx_composite': adx_composite, 'adx_momentum': adx_momentum, 'adx_trend': adx_trend, 'trend_quality': trend_quality, 'volume_confirmed': volume_confirmed, # Dynamic state 'trending': adx_composite > trending_threshold, 'strong_trend': adx_composite > strong_threshold, 'ranging': adx_composite < 20, 'emerging_trend': adx_trend == 'RISING' and adx_composite > 20 and adx_composite < 35, 'fading_trend': adx_trend == 'FALLING' and adx_composite > 30 } def calculate_multi_timeframe_adx(tf_data): """ Calculate ADX across multiple timeframes for comprehensive trend analysis. Args: tf_data: Dict with timeframe keys ('3m', '5m', '15m', '1h', '2h', '4h') containing price/volume data Returns: Dict with: - timeframe_adx: Individual ADX for each timeframe - weighted_adx: Timeframe-weighted composite ADX - trend_consensus: Agreement across timeframes (%) - dominant_trend: 'STRONG_UP', 'WEAK_UP', 'NEUTRAL', 'WEAK_DOWN', 'STRONG_DOWN' - trend_shift_detected: True if lower TF diverging from higher TF - recommendation: Trading recommendation based on ADX alignment """ from collections import defaultdict timeframe_adx = {} timeframe_directions = {} # Weights: Higher timeframes more important for overall trend tf_weights = {'3m': 5, '5m': 10, '15m': 15, '1h': 25, '2h': 30, '4h': 40} for tf, data in tf_data.items(): if data and data.get('prices') and len(data['prices']) >= 30: volumes = data.get('volumes') adx_result = calculate_adx_responsive(data['prices'], volumes) timeframe_adx[tf] = adx_result # Determine directional strength if adx_result['trending']: if adx_result['bullish']: timeframe_directions[tf] = 'LONG' if adx_result['strong_trend'] else 'WEAK_LONG' else: timeframe_directions[tf] = 'SHORT' if adx_result['strong_trend'] else 'WEAK_SHORT' else: timeframe_directions[tf] = 'NEUTRAL' else: timeframe_adx[tf] = None timeframe_directions[tf] = 'NEUTRAL' # Calculate weighted composite ADX weighted_adx = 0 total_weight = 0 for tf, adx_result in timeframe_adx.items(): if adx_result: weight = tf_weights.get(tf, 10) weighted_adx += adx_result['adx'] * weight total_weight += weight weighted_adx = weighted_adx / total_weight if total_weight > 0 else 25 # Count directional consensus long_count = sum(1 for d in timeframe_directions.values() if d in ['LONG', 'WEAK_LONG']) short_count = sum(1 for d in timeframe_directions.values() if d in ['SHORT', 'WEAK_SHORT']) neutral_count = sum(1 for d in timeframe_directions.values() if d == 'NEUTRAL') total = len([d for d in timeframe_directions.values()]) # Determine dominant trend if long_count >= 4 and short_count <= 1: dominant_trend = 'STRONG_UP' elif long_count >= 3: dominant_trend = 'WEAK_UP' elif short_count >= 4 and long_count <= 1: dominant_trend = 'STRONG_DOWN' elif short_count >= 3: dominant_trend = 'WEAK_DOWN' else: dominant_trend = 'NEUTRAL' # Calculate consensus percentage max_agreement = max(long_count, short_count) trend_consensus = (max_agreement / total * 100) if total > 0 else 0 # Detect trend shifts (lower TF diverging from higher) trend_shift_detected = False higher_tf_dirs = [timeframe_directions.get(tf) for tf in ['1h', '2h', '4h'] if timeframe_directions.get(tf)] lower_tf_dirs = [timeframe_directions.get(tf) for tf in ['3m', '5m', '15m'] if timeframe_directions.get(tf)] higher_bias = 'LONG' if sum(1 for d in higher_tf_dirs if 'LONG' in d) > len(higher_tf_dirs)/2 else 'SHORT' if sum(1 for d in higher_tf_dirs if 'SHORT' in d) > len(higher_tf_dirs)/2 else 'NEUTRAL' lower_bias = 'LONG' if sum(1 for d in lower_tf_dirs if 'LONG' in d) > len(lower_tf_dirs)/2 else 'SHORT' if sum(1 for d in lower_tf_dirs if 'SHORT' in d) > len(lower_tf_dirs)/2 else 'NEUTRAL' if higher_bias != 'NEUTRAL' and lower_bias != 'NEUTRAL' and higher_bias != lower_bias: trend_shift_detected = True # Generate recommendation if dominant_trend == 'STRONG_UP' and not trend_shift_detected: recommendation = 'TREND_FOLLOW_LONG' elif dominant_trend == 'STRONG_DOWN' and not trend_shift_detected: recommendation = 'TREND_FOLLOW_SHORT' elif trend_shift_detected and lower_bias == 'LONG': recommendation = 'COUNTER_TREND_LONG' elif trend_shift_detected and lower_bias == 'SHORT': recommendation = 'COUNTER_TREND_SHORT' elif dominant_trend in ['WEAK_UP', 'WEAK_DOWN']: recommendation = 'WAIT_FOR_CONFIRMATION' else: recommendation = 'NO_TRADE_RANGING' return { 'timeframe_adx': timeframe_adx, 'weighted_adx': weighted_adx, 'trend_consensus': trend_consensus, 'dominant_trend': dominant_trend, 'trend_shift_detected': trend_shift_detected, 'higher_tf_bias': higher_bias, 'lower_tf_bias': lower_bias, 'recommendation': recommendation, 'timeframe_directions': timeframe_directions, 'long_count': long_count, 'short_count': short_count, 'neutral_count': neutral_count, 'trending': weighted_adx > 25, 'strong_trend': weighted_adx > 40 } def find_swing_points(prices, lookback=20, tolerance=0.01): """ Find swing highs and lows in price data. Returns dict with swing highs, lows, and market structure. """ if len(prices) < lookback * 2: return {'swing_highs': [], 'swing_lows': [], 'structure': 'UNKNOWN'} swing_highs = [] swing_lows = [] for i in range(lookback, len(prices) - lookback): window_before = prices[i-lookback:i] window_after = prices[i:i+lookback] # Swing high: higher than lookback periods before and after if prices[i] > max(window_before) and prices[i] > max(window_after): swing_highs.append({'index': i, 'price': prices[i]}) # Swing low: lower than lookback periods before and after if prices[i] < min(window_before) and prices[i] < min(window_after): swing_lows.append({'index': i, 'price': prices[i]}) # Analyze structure structure = 'UNKNOWN' if len(swing_highs) >= 2 and len(swing_lows) >= 2: recent_highs = [h['price'] for h in swing_highs[-3:]] recent_lows = [l['price'] for l in swing_lows[-3:]] # Higher highs and higher lows = bullish if all(recent_highs[i] > recent_highs[i-1] for i in range(1, len(recent_highs))) and \ all(recent_lows[i] > recent_lows[i-1] for i in range(1, len(recent_lows))): structure = 'BULLISH' # Lower highs and lower lows = bearish elif all(recent_highs[i] < recent_highs[i-1] for i in range(1, len(recent_highs))) and \ all(recent_lows[i] < recent_lows[i-1] for i in range(1, len(recent_lows))): structure = 'BEARISH' else: structure = 'RANGING' return { 'swing_highs': swing_highs, 'swing_lows': swing_lows, 'structure': structure, 'recent_high': swing_highs[-1] if swing_highs else None, 'recent_low': swing_lows[-1] if swing_lows else None, 'prev_high': swing_highs[-2] if len(swing_highs) > 1 else None, 'prev_low': swing_lows[-2] if len(swing_lows) > 1 else None } def detect_liquidity_sweep(prices, volumes=None, lookback=50, wick_threshold=0.003): """ Detect liquidity sweeps (stop hunts) above/below recent highs/lows. Returns dict with sweep detection and direction. """ if len(prices) < lookback + 10: return {'sweep_detected': False, 'direction': None, 'confidence': 0} # Find recent high/low recent_range = prices[-lookback-5:-5] recent_high = max(recent_range) recent_low = min(recent_range) current = prices[-1] prev = prices[-2] # Check for sweep above highs then reversal long_sweep = False if prev > recent_high * 1.001: # Price broke above recent high if current < recent_high: # But closed back below long_sweep = True # Check for sweep below lows then reversal short_sweep = False if prev < recent_low * 0.999: # Price broke below recent low if current > recent_low: # But closed back above short_sweep = True # Calculate confidence based on wick size confidence = 0 direction = None if long_sweep: wick_size = (prev - min(current, prices[-3])) / prev confidence = min(100, wick_size * 10000) # Scale up direction = 'SHORT' elif short_sweep: wick_size = (max(current, prices[-3]) - prev) / prev confidence = min(100, wick_size * 10000) direction = 'LONG' return { 'sweep_detected': long_sweep or short_sweep, 'direction': direction, 'confidence': confidence, 'recent_high': recent_high, 'recent_low': recent_low, 'breakout_price': prev, 'close_price': current } def calculate_fibonacci_levels(swing_high, swing_low): """ Calculate Fibonacci retracement levels. Returns dict with key levels. """ if swing_high <= swing_low: return None diff = swing_high - swing_low return { '0.0': swing_high, '0.236': swing_high - diff * 0.236, '0.382': swing_high - diff * 0.382, '0.5': swing_high - diff * 0.5, '0.618': swing_high - diff * 0.618, '0.786': swing_high - diff * 0.786, '1.0': swing_low, '1.618': swing_high - diff * 1.618, # Extension '2.618': swing_high - diff * 2.618 # Extension } def calculate_relative_strength(pair_prices, btc_prices, period=20): """ Calculate relative strength of pair vs BTC. Returns RS ratio and trend direction. """ if len(pair_prices) < period or len(btc_prices) < period: return {'rs_ratio': 1.0, 'leading': False, 'lagging': False} pair_return = (pair_prices[-1] - pair_prices[-period]) / pair_prices[-period] btc_return = (btc_prices[-1] - btc_prices[-period]) / btc_prices[-period] rs_ratio = pair_return / btc_return if btc_return != 0 else 1.0 return { 'rs_ratio': rs_ratio, 'pair_return': pair_return * 100, 'btc_return': btc_return * 100, 'leading': rs_ratio > 1.1, 'lagging': rs_ratio < 0.9, 'neutral': 0.9 <= rs_ratio <= 1.1 } def get_session_characteristics(utc_hour=None): """ Get trading session characteristics for time-of-day adjustments. Returns session info and recommended strategies. """ if utc_hour is None: import time utc_hour = time.gmtime().tm_hour sessions = { 'ASIA': {'start': 0, 'end': 8, 'volume': 'low', 'volatility': 'low'}, 'LONDON': {'start': 8, 'end': 16, 'volume': 'medium', 'volatility': 'medium'}, 'NY': {'start': 13, 'end': 21, 'volume': 'high', 'volatility': 'high'}, 'CROSSOVER': {'start': 13, 'end': 15, 'volume': 'highest', 'volatility': 'highest'} } current_session = 'UNKNOWN' session_data = {'volume': 'medium', 'volatility': 'medium'} for session, data in sessions.items(): if data['start'] <= utc_hour < data['end']: current_session = session session_data = data break # Special case: London-NY crossover (highest activity) if 13 <= utc_hour < 15: current_session = 'CROSSOVER' session_data = sessions['CROSSOVER'] return { 'session': current_session, 'hour': utc_hour, 'volume_profile': session_data['volume'], 'volatility_profile': session_data['volatility'], 'recommendations': { 'ASIA': 'Mean reversion, smaller size', 'LONDON': 'Trend establishment', 'NY': 'Trend continuation, full size', 'CROSSOVER': 'Breakouts, momentum' }.get(current_session, 'Standard') } def detect_scalp_setup(tf_data, pair="UNKNOWN", dxy_change=0, fear_greed_index=50): """Detect scalp trading opportunities based on strict criteria.""" detailed_log = [f"[SCALP_CHECK] === Analyzing {pair} ==="] m2_data = tf_data.get('2m') or tf_data.get('3m') m15_data = tf_data.get('15m') h1_data = tf_data.get('1h') if not m2_data or not m15_data or not h1_data: detailed_log.append("[SCALP_CHECK] MISSING DATA") return {'is_scalp': False, 'score': 0, 'direction': None, 'detailed_log': detailed_log} m2_prices = m2_data.get('prices', []) m15_prices = m15_data.get('prices', []) h1_prices = h1_data.get('prices', []) if len(m2_prices) < 50 or len(m15_prices) < 30 or len(h1_prices) < 30: detailed_log.append("[SCALP_CHECK] INSUFFICIENT DATA") return {'is_scalp': False, 'score': 0, 'direction': None, 'detailed_log': detailed_log} checks_passed = 0 direction = None must_haves = {'stochrsi_m2': False, 'stochrsi_m15': False, 'volume': False} confirmations = {'macd': False, 'ema': False, 'vwap': False, 'candle': False} # StochRSI 2M stoch_m2 = calculate_stoch_rsi(m2_prices) detailed_log.append(f"[SCALP_CHECK] StochRSI 2M: K={stoch_m2['k']:.1f}") if stoch_m2['k'] < 20: must_haves['stochrsi_m2'] = True direction = 'LONG' detailed_log.append("[SCALP_CHECK] [OK] StochRSI 2M < 20 (LONG)") elif stoch_m2['k'] > 80: must_haves['stochrsi_m2'] = True direction = 'SHORT' detailed_log.append("[SCALP_CHECK] [OK] StochRSI 2M > 80 (SHORT)") else: detailed_log.append("[SCALP_CHECK] [X] StochRSI 2M not in trigger zone") return {'is_scalp': False, 'score': 0, 'direction': None, 'detailed_log': detailed_log} # StochRSI 15M stoch_m15 = calculate_stoch_rsi(m15_prices) detailed_log.append(f"[SCALP_CHECK] StochRSI 15M: K={stoch_m15['k']:.1f}") if direction == 'LONG': if stoch_m15['k'] < 35: must_haves['stochrsi_m15'] = True checks_passed += 1 detailed_log.append("[SCALP_CHECK] [OK] StochRSI 15M < 35") elif stoch_m15['k'] > 50: detailed_log.append("[SCALP_CHECK] [X][X] INVALIDATION: 15M > 50") return {'is_scalp': False, 'score': 0, 'direction': None, 'invalidated': True, 'detailed_log': detailed_log} else: if stoch_m15['k'] > 65: must_haves['stochrsi_m15'] = True checks_passed += 1 detailed_log.append("[SCALP_CHECK] [OK] StochRSI 15M > 65") elif stoch_m15['k'] < 50: detailed_log.append("[SCALP_CHECK] [X][X] INVALIDATION: 15M < 50") return {'is_scalp': False, 'score': 0, 'direction': None, 'invalidated': True, 'detailed_log': detailed_log} if not must_haves['stochrsi_m15']: return {'is_scalp': False, 'score': 0, 'direction': None, 'detailed_log': detailed_log} # Volume check m2_volumes = m2_data.get('volumes', []) if len(m2_volumes) >= 20: recent_vol = sum(m2_volumes[-3:]) / 3 avg_vol = sum(m2_volumes[-20:]) / 20 vol_ratio = recent_vol / avg_vol if avg_vol > 0 else 0 detailed_log.append(f"[SCALP_CHECK] Volume: {vol_ratio:.2f}x") if vol_ratio > 1.8: must_haves['volume'] = True checks_passed += 1 detailed_log.append("[SCALP_CHECK] [OK] Volume > 1.8x") elif vol_ratio < 1.2: detailed_log.append("[SCALP_CHECK] [X][X] INVALIDATION: Volume < 1.2x") return {'is_scalp': False, 'score': 0, 'direction': None, 'invalidated': True, 'detailed_log': detailed_log} # MACD confirmation macd_m2 = calculate_macd(m2_prices) if macd_m2['valid']: if direction == 'LONG' and macd_m2.get('flip_positive'): confirmations['macd'] = True checks_passed += 1 detailed_log.append("[SCALP_CHECK] [OK] MACD flip positive") e