""" try: from rob_bot_domination import integrate_unified_brain DOMINATION_AVAILABLE = True except: DOMINATION_AVAILABLE = False ================================================================================ ROB-BOT TRADING SYSTEM ================================================================================ VERSION: v52.14.0-MAGIC-ENTRY BUILD: 2026-03-31-03-27 07:25:00 Asia/Shanghai (UTC+8) STATUS: RELEASE - FALLING KNIFE AUTO-MODE PREMIERE: 2 DAYS TO LIVE PATCHES APPLIED: - Manual trade crash fix (NoneType error) - Paper mode starts ACTIVE - Time limits default OFF - UI button state sync - All limits indicator (YES/NO colors) - Market Brain Pro integration CHANGELOG: - FK v2.0: Full auto-entry (no manual button) - FK Toggle in Settings tab with confirmation dialog - Auto-entry 15:30-17:00 Norway when 6/6 checklist green - Max 2 trades/day with risk scaling (1.5% / 0.9%) - 45-minute hard stop per trade (time-based) - Visual countdown timer in UI - Crash recovery from fk_mode_state.json - BTC/ETH whitelist strict enforcement FILE: rob_bot_lite_v50_fk_auto.py SIZE: 557695 bytes LINES: 13820 ================================================================================ """ from kivy.app import App from kivy.uix.boxlayout import BoxLayout from kivy.uix.gridlayout import GridLayout from kivy.uix.scrollview import ScrollView from kivy.uix.label import Label from kivy.uix.button import Button from kivy.uix.togglebutton import ToggleButton from kivy.uix.textinput import TextInput from kivy.uix.popup import Popup from kivy.uix.image import Image, AsyncImage from kivy.uix.spinner import Spinner from kivy.graphics import Color, Rectangle, Line, RoundedRectangle from kivy.clock import Clock, mainthread from kivy.metrics import dp, sp from kivy.core.window import Window from datetime import datetime, timezone, timedelta import json import os import math import threading import uuid import time import hmac import hashlib import re from concurrent.futures import ThreadPoolExecutor, as_completed from enum import Enum try: import requests REQUESTS_AVAILABLE = True except: REQUESTS_AVAILABLE = False # Backtest engine import try: from rob_bot_backtest import run_speed_test, optimize_for_profit_factor BACKTEST_AVAILABLE = True except: BACKTEST_AVAILABLE = False print("[WARNING] Backtest module not available") # Order book analyzer for smart trailing stops try: from orderbook_analyzer import SmartTrailingManager SMART_TRAIL_AVAILABLE = True except: SMART_TRAIL_AVAILABLE = False print("[WARNING] OrderBook analyzer not available") try: from self_healing import integrate_self_healing SELF_HEALING_AVAILABLE = True except: SELF_HEALING_AVAILABLE = False print("[WARNING] Self-healing module not available") # Trading modes import try: from modes import get_falling_knife_mode, get_london_close_fade_mode TRADING_MODES_AVAILABLE = True except Exception as e: TRADING_MODES_AVAILABLE = False print(f"[WARNING] Trading modes not available: {e}") # ============================================================================ # COMPREHENSIVE BOT BEHAVIOR LOGGER # ============================================================================ class BotBehaviorLogger: """Comprehensive logger for tracking all bot decisions and actions.""" def __init__(self, app_ref=None): self.app_ref = app_ref self.behavior_log = [] self.metrics = { 'scans_performed': 0, 'setups_found': 0, 'setups_filtered': 0, 'trades_attempted': 0, 'trades_executed': 0, 'trades_rejected': 0, 'positions_opened': 0, 'positions_closed': 0, 'profit_protection_triggered': 0, 'breakeven_triggered': 0, 'time_exits': 0, 'stop_losses_hit': 0, 'take_profits_hit': 0, } self.session_start = datetime.now() def log_behavior(self, category, action, details=None, level="INFO"): """Log a bot behavior event.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] entry = { 'timestamp': timestamp, 'category': category, 'action': action, 'details': details or {}, 'level': level } self.behavior_log.append(entry) # Update metrics if action in self.metrics: self.metrics[action] += 1 # Print to console detail_str = f" | {details}" if details else "" print(f"[BOT BEHAVIOR] [{timestamp}] [{category}] {action}{detail_str}") # Log to app if available if self.app_ref and hasattr(self.app_ref, 'log_activity'): message = f"[{category}] {action}" if details and isinstance(details, dict): key_info = ', '.join([f"{k}={v}" for k, v in list(details.items())[:3]]) message += f" ({key_info})" self.app_ref.log_activity(message, level) def log_scan(self, pairs_count, setups_found, filter_applied, duration_ms): """Log scan behavior.""" self.log_behavior("SCAN", "scan_completed", { 'pairs': pairs_count, 'setups_found': setups_found, 'filter': filter_applied, 'duration_ms': duration_ms }) def log_setup_evaluation(self, pair, grade, score, rr, passed, reason=None): """Log setup evaluation.""" self.log_behavior("SETUP", "setup_evaluated", { 'pair': pair, 'grade': grade, 'score': score, 'rr': rr, 'passed': passed, 'reason': reason }, "SUCCESS" if passed else "INFO") def log_trade_decision(self, pair, decision, reasons): """Log trade decision.""" self.log_behavior("TRADE", "trade_decision", { 'pair': pair, 'decision': decision, 'reasons': reasons }, "TRADE" if decision == "EXECUTE" else "INFO") def log_position_action(self, pair, action, details=None): """Log position action (open, close, modify).""" self.log_behavior("POSITION", f"position_{action}", { 'pair': pair, **(details or {}) }, "TRADE" if action in ['opened', 'closed'] else "INFO") def log_protection_triggered(self, pair, protection_type, details): """Log profit protection trigger.""" self.log_behavior("PROTECTION", f"{protection_type}_triggered", { 'pair': pair, **details }, "PROFIT") def log_risk_check(self, check_name, passed, details=None): """Log risk management check.""" self.log_behavior("RISK", f"risk_check_{check_name}", { 'passed': passed, **(details or {}) }, "SUCCESS" if passed else "WARN") def get_session_summary(self): """Get summary of current session.""" duration = datetime.now() - self.session_start return { 'session_duration_minutes': duration.total_seconds() / 60, 'total_events': len(self.behavior_log), 'metrics': self.metrics.copy() } def export_log(self, filename=None): """Export behavior log to file.""" if not filename: filename = f"bot_behavior_{self.session_start.strftime('%Y%m%d_%H%M%S')}.json" filepath = f"/storage/emulated/0/_Newest Clawbot main script and log reports/Barebone/logs/{filename}" os.makedirs(os.path.dirname(filepath), exist_ok=True) export_data = { 'session_start': self.session_start.isoformat(), 'session_summary': self.get_session_summary(), 'events': self.behavior_log } with open(filepath, 'w') as f: json.dump(export_data, f, indent=2) print(f"[BOT BEHAVIOR] Log exported to {filepath}") return filepath # Global behavior logger instance behavior_logger = None def get_behavior_logger(app_ref=None): """Get or create the global behavior logger.""" global behavior_logger if behavior_logger is None: behavior_logger = BotBehaviorLogger(app_ref) return behavior_logger # ============================================================================ # BOT-FATHER ARCHITECTURE - SMART DATA MANAGEMENT # ============================================================================ class SmartDataCache: """Intelligent caching with time-based TTL""" def __init__(self): self.cache = {} self.default_ttl = 60 # 60 seconds default def get(self, key, default=None): """Get cached value if not expired""" if key not in self.cache: return default entry = self.cache[key] if time.time() > entry['expiry']: del self.cache[key] return default return entry['value'] def set(self, key, value, ttl=None): """Cache value with TTL""" if ttl is None: ttl = self.default_ttl self.cache[key] = { 'value': value, 'expiry': time.time() + ttl } def is_fresh(self, key): """Check if cached data is still fresh""" if key not in self.cache: return False return time.time() <= self.cache[key]['expiry'] def clear_expired(self): """Remove expired entries""" now = time.time() expired = [k for k, v in self.cache.items() if now > v['expiry']] for k in expired: del self.cache[k] class SetupScout: """ TIER 1: Lightweight screening - finds candidates only Time-aware scanning with smart caching """ def __init__(self, binance_client): self.binance = binance_client self.cache = SmartDataCache() self.last_scan_time = 0 def get_scan_interval(self): """ Dynamic interval based on time and market conditions: - 08:00-17:00 weekdays: 3 minutes (high activity) - 17:00-20:00 weekdays + 08:00-17:00 weekends: 5 minutes - Other times: 10 minutes (low activity) """ now = datetime.now() hour = now.hour weekday = now.weekday() < 5 # Mon-Fri # High activity: 08:00-17:00 weekdays if weekday and 8 <= hour < 17: return 180 # 3 minutes # Medium activity if (weekday and 17 <= hour < 20) or (not weekday and 8 <= hour < 17): return 300 # 5 minutes # Low activity: Overnight, early morning return 600 # 10 minutes def should_scan(self): """Check if it's time for a new scan""" interval = self.get_scan_interval() return (time.time() - self.last_scan_time) >= interval def quick_screen(self, pair): """ Fast 3-check screen (no heavy indicators): 1. Price change from cache 2. Volume check 3. Basic trend (lightweight) """ cache_key_price = f"price_{pair}" cache_key_screen = f"screen_{pair}" # 1. Get current price (cached if recent) cached_price = self.cache.get(cache_key_price) current_price = self._get_price_cached(pair) # Check if price changed significantly if cached_price and current_price: price_change = abs(current_price - cached_price) / cached_price if price_change < 0.005: # Less than 0.5% change # Return cached screen if available cached_screen = self.cache.get(cache_key_screen) if cached_screen: return cached_screen # 2. Volume check (lightweight) volume = self._get_volume_cached(pair) if volume < 750_000: return None # Skip low volume pairs # 3. Quick trend check (uses 15m cached data) trend = self._get_quick_trend(pair) # Build screen result screen_result = { 'pair': pair, 'price': current_price, 'volume': volume, 'trend': trend, 'needs_deep_scan': True, 'price_change_24h': self._get_price_change_cached(pair), 'timestamp': time.time() } # Cache the screen result self.cache.set(cache_key_screen, screen_result, ttl=self.get_scan_interval()) return screen_result def _get_price_cached(self, pair): """Get price with caching""" cache_key = f"price_{pair}" if self.cache.is_fresh(cache_key): return self.cache.get(cache_key) # Fetch fresh try: # Use cached klines if available klines = self.binance.get_klines(pair, '1m', 1) if klines and len(klines) > 0: price = float(klines[-1][4]) self.cache.set(cache_key, price, ttl=30) # 30 second cache return price except Exception as e: print(f"[SCOUT] Error fetching price for {pair}: {e}") return None def _get_volume_cached(self, pair): """Get 24h volume with caching""" cache_key = f"volume_{pair}" if self.cache.is_fresh(cache_key): return self.cache.get(cache_key) # Fetch fresh try: ticker = self.binance.get_ticker_24h(pair) if ticker: volume = float(ticker.get('volume', 0)) * float(ticker.get('weightedAvgPrice', 0)) self.cache.set(cache_key, volume, ttl=300) # 5 minute cache return volume except Exception as e: print(f"[SCOUT] Error fetching volume for {pair}: {e}") return 0 def _get_price_change_cached(self, pair): """Get 24h price change with caching""" cache_key = f"change_{pair}" if self.cache.is_fresh(cache_key): return self.cache.get(cache_key) try: ticker = self.binance.get_ticker_24h(pair) if ticker: change = float(ticker.get('priceChangePercent', 0)) self.cache.set(cache_key, change, ttl=300) return change except Exception as e: print(f"[SCOUT] Error fetching change for {pair}: {e}") return 0 def _get_quick_trend(self, pair): """Quick trend using cached 15m data""" cache_key = f"trend_{pair}" if self.cache.is_fresh(cache_key): return self.cache.get(cache_key) try: klines = self.binance.get_klines(pair, '15m', 20) if klines and len(klines) >= 10: prices = [float(k[4]) for k in klines] ema_fast = sum(prices[-5:]) / 5 ema_slow = sum(prices[-10:]) / 10 trend = 'UP' if ema_fast > ema_slow else 'DOWN' self.cache.set(cache_key, trend, ttl=180) # 3 minute cache return trend except Exception as e: print(f"[SCOUT] Error calculating trend for {pair}: {e}") return 'NEUTRAL' class BotFather: """ TIER 5: BOT-FATHER - Intelligent Trade Monitor Monitors active trades and makes dynamic adjustments """ def __init__(self, position_manager, binance_client, app_ref=None): self.position_manager = position_manager self.binance = binance_client self.app_ref = app_ref self.cache = SmartDataCache() self.last_monitor_time = 0 def monitor_all_positions(self): """Main monitoring loop - adaptive frequency""" now = time.time() for position in self.position_manager.positions: if position['status'] != 'OPEN': continue # Determine monitoring intensity intensity = self._calculate_monitor_intensity(position) interval = self._get_interval_for_intensity(intensity) # Check if it's time to monitor this position pos_id = position['id'] last_check = self.cache.get(f"last_check_{pos_id}", 0) if (now - last_check) < interval: continue # Skip this position for now # Update last check time self.cache.set(f"last_check_{pos_id}", now, ttl=3600) # Gather position data position_data = self._gather_position_data(position) if not position_data: continue # Analyze and decide decisions = self._analyze_and_decide(position, position_data) # Execute decisions for decision in decisions: self._execute_decision(position, decision, position_data) def _calculate_monitor_intensity(self, position): """ How closely should we watch this position? Returns: 'calm', 'active', or 'critical' """ current_price = position.get('current_price', 0) entry = position['entry_price'] tp1 = position['take_profit1'] sl = position['stop_loss'] direction = position['direction'] if current_price == 0 or entry == 0: return 'calm' # Calculate distances if direction == 'LONG': distance_to_tp = abs(tp1 - current_price) / current_price if tp1 else 1 distance_to_sl = abs(current_price - sl) / current_price if sl else 1 profit_pct = (current_price - entry) / entry else: # SHORT distance_to_tp = abs(current_price - tp1) / current_price if tp1 else 1 distance_to_sl = abs(sl - current_price) / sl if sl else 1 profit_pct = (entry - current_price) / entry # Critical: Near SL or big profit/loss (>3%) if distance_to_sl < 0.01 or distance_to_tp < 0.005 or abs(profit_pct) > 0.03: return 'critical' # Active: Within 2% of TP or significant profit (>1.5%) if distance_to_tp < 0.02 or profit_pct > 0.015: return 'active' # Calm: Flat or moving slowly return 'calm' def _get_interval_for_intensity(self, intensity): """Get check interval based on intensity""" intervals = { 'calm': 10, # 10 seconds 'active': 3, # 3 seconds 'critical': 1, # 1 second } return intervals.get(intensity, 5) def _gather_position_data(self, position): """Collect all relevant data for decision making""" pair = position['pair'] try: # Get current price klines = self.binance.get_klines(pair, '1m', 5) if not klines: return None current_price = float(klines[-1][4]) # Calculate unrealized PnL entry = position['entry_price'] qty = position['quantity'] direction = position['direction'] if direction == 'LONG': unrealized = (current_price - entry) / entry * (qty * entry) else: unrealized = (entry - current_price) / entry * (qty * entry) # Calculate indicators indicators = self._calculate_live_indicators(pair) return { 'price': current_price, 'unrealized_pnl': unrealized, 'unrealized_pct': (current_price - entry) / entry * 100 if direction == 'LONG' else (entry - current_price) / entry * 100, 'indicators': indicators, 'time_in_trade': time.time() - position.get('entry_time', time.time()), } except Exception as e: print(f"[BOT-FATHER] Error gathering data for {pair}: {e}") return None def _calculate_live_indicators(self, pair): """Calculate live indicators for a pair""" try: klines = self.binance.get_klines(pair, '15m', 20) if not klines or len(klines) < 14: return {} prices = [float(k[4]) for k in klines] highs = [float(k[2]) for k in klines] lows = [float(k[3]) for k in klines] # ATR atr = self._calculate_atr(highs, lows, prices, 14) # RSI rsi = self._calculate_rsi(prices, 14) return { 'atr_14': atr, 'rsi_14': rsi, 'current_price': prices[-1], } except Exception as e: print(f"[BOT-FATHER] Error calculating indicators for {pair}: {e}") return {} def _calculate_atr(self, highs, lows, closes, period=14): """Calculate Average True Range""" if len(highs) < period + 1: return 0 tr_values = [] for i in range(1, len(highs)): tr1 = highs[i] - lows[i] tr2 = abs(highs[i] - closes[i-1]) tr3 = abs(lows[i] - closes[i-1]) tr_values.append(max(tr1, tr2, tr3)) return sum(tr_values[-period:]) / period if len(tr_values) >= period else 0 def _calculate_rsi(self, prices, period=14): """Calculate RSI""" if len(prices) < period + 1: return 50 deltas = [prices[i] - prices[i-1] for i in range(1, len(prices))] gains = [d if d > 0 else 0 for d in deltas[-period:]] losses = [-d if d < 0 else 0 for d in deltas[-period:]] avg_gain = sum(gains) / period if gains else 0 avg_loss = sum(losses) / period if losses else 0 if avg_loss == 0: return 100 rs = avg_gain / avg_loss return 100 - (100 / (1 + rs)) def _analyze_and_decide(self, position, data): """ BOT-FATHER Decision Engine Returns: List of decision dicts """ decisions = [] pair = position['pair'] # 1. BREAKEVEN CHECK if self._should_move_to_breakeven(position, data): new_sl = self._calculate_breakeven_sl(position) decisions.append({ 'action': 'MOVE_SL_BREAKEVEN', 'new_sl': new_sl, 'reason': f"Profit {data['unrealized_pct']:.1f}% reached trigger" }) # 2. PROFIT LOCK CHECK (dollar-based) elif self._should_lock_profit(position, data): lock_price = self._calculate_profit_lock_price(position, data) if lock_price: decisions.append({ 'action': 'LOCK_PROFIT', 'new_sl': lock_price, 'lock_amount': position.get('profit_lock_amount', 3.0), 'reason': f"Profit ${data['unrealized_pnl']:.0f} reached lock trigger" }) # 3. TRAILING STOP ADJUSTMENT (dynamic) elif self._should_adjust_trailing(position, data): new_trail = self._calculate_dynamic_trail(position, data) decisions.append({ 'action': 'ADJUST_TRAIL', 'new_trail_pct': new_trail, 'reason': f"Volatility changed, new trail: {new_trail:.2%}" }) # 4. TIME EXIT CHECK elif self._should_time_exit(position, data): decisions.append({ 'action': 'CLOSE_POSITION', 'exit_type': 'TIME_EXIT', 'reason': f"Time limit reached: {data['time_in_trade']/3600:.1f}h" }) return decisions def _should_move_to_breakeven(self, position, data): """Check if we should move SL to breakeven""" if position.get('sl_at_breakeven', False): return False profit_pct = data['unrealized_pct'] trigger = getattr(self.app_ref, 'breakeven_trigger_pct', 0.8) if self.app_ref else 0.8 return profit_pct >= trigger def _calculate_breakeven_sl(self, position): """Calculate breakeven stop loss with small profit""" entry = position['entry_price'] direction = position['direction'] profit_pct = getattr(self.app_ref, 'breakeven_profit_pct', 0.2) if self.app_ref else 0.2 if direction == 'LONG': return entry * (1 + profit_pct / 100) else: return entry * (1 - profit_pct / 100) def _should_lock_profit(self, position, data): """Check if we should lock in profit""" if position.get('profit_locked', False): return False unrealized = data['unrealized_pnl'] trigger = getattr(self.app_ref, 'profit_lock_trigger', 6.0) if self.app_ref else 6.0 return unrealized >= trigger def _calculate_profit_lock_price(self, position, data): """Calculate the price level for profit lock""" entry = position['entry_price'] direction = position['direction'] lock_amount = getattr(self.app_ref, 'profit_lock_amount', 3.0) if self.app_ref else 3.0 qty = position['quantity'] if qty <= 0: return None if direction == 'LONG': return entry + (lock_amount / qty) else: return entry - (lock_amount / qty) def _should_adjust_trailing(self, position, data): """Check if trailing stop should be adjusted""" if not position.get('tp1_hit', False): return False # Only trail after TP1 indicators = data.get('indicators', {}) current_atr = indicators.get('atr_14', 0) entry_atr = position.get('entry_atr', current_atr) if entry_atr == 0: return False volatility_change = abs(current_atr - entry_atr) / entry_atr return volatility_change > 0.3 # 30% change in volatility def _calculate_dynamic_trail(self, position, data): """Calculate trailing stop based on current volatility""" indicators = data.get('indicators', {}) atr = indicators.get('atr_14', 0) price = data['price'] if price == 0 or atr == 0: return 0.005 # Default 0.5% # ATR-based trail: 2x ATR trail_pct = (atr * 2) / price # Clamp between 0.3% and 2% return max(0.003, min(0.02, trail_pct)) def _should_time_exit(self, position, data): """Check if time-based exit should trigger""" time_limit_hours = getattr(self.app_ref, 'time_exit_hours', 6) if self.app_ref else 6 time_limit = time_limit_hours * 3600 min_profit = getattr(self.app_ref, 'time_exit_min_profit', 0.3) if self.app_ref else 0.3 if data['time_in_trade'] < time_limit: return False # Only exit if we have at least minimum profit return data['unrealized_pct'] >= min_profit def _execute_decision(self, position, decision, data): """Execute the decided actions""" action = decision['action'] pair = position['pair'] if action == 'MOVE_SL_BREAKEVEN': old_sl = position['stop_loss'] new_sl = decision['new_sl'] position['stop_loss'] = new_sl position['sl_at_breakeven'] = True print(f"[BOT-FATHER] {pair}: SL moved to breakeven ${new_sl:.4f} (was ${old_sl:.4f})") if self.app_ref: self.app_ref.log_activity( f"[b]BOT-FATHER:[/b] {pair} SL moved to breakeven ${new_sl:.4f}", "PROFIT" ) elif action == 'LOCK_PROFIT': new_sl = decision['new_sl'] position['stop_loss'] = new_sl position['profit_locked'] = True lock_amount = decision['lock_amount'] print(f"[BOT-FATHER] {pair}: Profit locked ${lock_amount} at SL ${new_sl:.4f}") if self.app_ref: self.app_ref.log_activity( f"[b]BOT-FATHER:[/b] {pair} profit locked ${lock_amount} at SL ${new_sl:.4f}", "PROFIT" ) elif action == 'ADJUST_TRAIL': new_trail = decision['new_trail_pct'] position['trail_pct'] = new_trail print(f"[BOT-FATHER] {pair}: Trailing stop adjusted to {new_trail:.2%}") if self.app_ref: self.app_ref.log_activity( f"[b]BOT-FATHER:[/b] {pair} trail adjusted to {new_trail:.2%}", "INFO" ) elif action == 'CLOSE_POSITION': exit_type = decision['exit_type'] reason = decision['reason'] print(f"[BOT-FATHER] {pair}: Closing position - {exit_type} - {reason}") # Execute close pnl = self.position_manager.close_position( position['id'], data['price'], exit_type ) if self.app_ref: self.app_ref.log_activity( f"[b]BOT-FATHER:[/b] {pair} closed via {exit_type} | PnL: ${pnl:+.2f}", "TRADE" if pnl > 0 else "WARN" ) # ============================================================================ # END BOT-FATHER ARCHITECTURE # ============================================================================ GOLD = (0.96, 0.73, 0.15, 1) GREEN = (0.2, 0.65, 0.2, 1) RED = (0.95, 0.25, 0.25, 1) BLUE = (0.2, 0.6, 0.9, 1) CYAN = (0.0, 0.8, 0.8, 1) AMBER = (1.0, 0.75, 0.0, 1) WHITE = (1, 1, 1, 1) BLACK = (0, 0, 0, 1) GRAY = (0.5, 0.5, 0.5, 1) DARK_GRAY = (0.25, 0.25, 0.25, 1) DARK_BG = (0.06, 0.07, 0.09, 1) CARD_BG = (0.10, 0.11, 0.14, 1) DARK_GREEN = (0.08, 0.5, 0.08, 1) DARK_RED = (0.5, 0.08, 0.08, 1) LIGHT_BG = (0.15, 0.16, 0.20, 1) ORANGE = (1.0, 0.5, 0.0, 1) PURPLE = (0.7, 0.3, 0.9, 1) MAGENTA = (0.9, 0.2, 0.7, 1) # ============================================================================ # EMOJI FONT SUPPORT - Fix tofu boxes on Android # ============================================================================ def setup_emoji_font(): """Download and setup emoji font to prevent tofu boxes""" try: # Check if we're on Android is_android = os.path.exists('/system/build.prop') # Font path in app directory script_dir = os.path.dirname(os.path.abspath(__file__)) font_dir = os.path.join(script_dir, 'fonts') font_path = os.path.join(font_dir, 'NotoColorEmoji.ttf') # Create fonts directory if needed if not os.path.exists(font_dir): os.makedirs(font_dir) # Check if font exists and has reasonable size (>5MB) font_exists = os.path.exists(font_path) font_size = os.path.getsize(font_path) if font_exists else 0 font_valid = font_exists and font_size > 5000000 # >5MB if font_valid: print(f"[FONT] Emoji font found: {font_path} ({font_size/1024/1024:.1f}MB)") return font_path # Try system emoji fonts first (better compatibility) system_fonts = [ "/system/fonts/NotoColorEmoji.ttf", "/system/fonts/SamsungColorEmoji.ttf", "/system/fonts/NotoColorEmoji.flags.ttf", "/system/fonts/NotoColorEmojiLegacy.ttf", ] for sys_font in system_fonts: if os.path.exists(sys_font): print(f"[FONT] Using system emoji font: {sys_font}") return sys_font # Download font if not valid if not font_valid: print(f"[FONT] Downloading emoji font...") try: import urllib.request # Use a reliable mirror urls = [ "https://github.com/googlefonts/noto-emoji/raw/main/fonts/NotoColorEmoji.ttf", "https://raw.githubusercontent.com/googlefonts/noto-emoji/main/fonts/NotoColorEmoji.ttf", ] for url in urls: try: print(f"[FONT] Trying: {url}") urllib.request.urlretrieve(url, font_path) if os.path.exists(font_path) and os.path.getsize(font_path) > 5000000: print(f"[FONT] Emoji font downloaded: {font_path}") return font_path except Exception as e: print(f"[FONT] Download failed from {url}: {e}") continue except Exception as e: print(f"[FONT] Download error: {e}") return None except Exception as e: print(f"[FONT] Setup error: {e}") return None # Setup emoji font on startup EMOJI_FONT = setup_emoji_font() if EMOJI_FONT: print(f"[FONT] Using emoji font: {EMOJI_FONT}") else: print("[FONT] No emoji font available - using text fallbacks") # Always use text fallbacks on Android - more reliable than trying to render emojis USE_EMOJI_FALLBACK = True # Force text mode for reliability # ============================================================================ # EMOJI LABEL HELPER - Prevent tofu boxes # ============================================================================ def EmojiLabel(text="", **kwargs): """Create a Label with text fallbacks instead of emojis - NO emojis, always text""" # Replace ALL emojis with text emoji_map = { '๐Ÿงช': 'TEST:', '๐Ÿง ': 'BRAIN:', '๐Ÿ”': 'SCAN:', 'โšก': 'SPD:', '๐Ÿ“Š': 'CHART:', 'โœ“': 'OK', 'โœ—': 'X', 'โš ': '!', 'โš ๏ธ': '!', '๐Ÿšจ': 'ALERT:', 'โฐ': 'TIME:', '๐Ÿ›‘': 'STOP:', '๐ŸŽฏ': 'TARGET:', '๐Ÿ†': 'WIN:', '๐Ÿ’€': 'FAIL:', '๐Ÿ”ฅ': 'HOT:', '๐Ÿ’ก': 'TIP:', '๐Ÿ’ฐ': '$', '๐Ÿ“ˆ': 'UP:', '๐Ÿ“‰': 'DOWN:', 'โœ…': 'OK', 'โŒ': 'NO', '๐Ÿ”ด': '(R)', '๐ŸŸข': '(G)', '๐ŸŸก': '(Y)', '๐Ÿ”ต': '(B)', 'โฌ†': '^', 'โฌ‡': 'v', '๐Ÿค–': 'BOT:', '๐Ÿ‘': 'EYE:', '๐Ÿ“ก': 'SIG:', '๐Ÿ””': 'BELL:', 'โณ': '...', 'โŒ›': 'WAIT', '๐Ÿ•': 'TM:', 'โš™': 'SET:', '๐Ÿ”ง': 'TOOL:', '๐Ÿ”จ': 'FIX:', '๐Ÿ› ': 'TOOLS:', '๐Ÿš€': 'GO:', '๐Ÿ“Œ': '*', '๐Ÿ“': '*', '๐ŸŽฎ': 'PLY:', '๐ŸŽฒ': 'RND:', '๐ŸŽฐ': 'LUCK:', '๐ŸŽฑ': '[8]', '๐Ÿ': 'END:', '๐Ÿšฉ': 'FLG:', '๐ŸŽŒ': 'FLGS:', '๐Ÿด': 'FLG:', '๐Ÿณ': 'FLG:', '๐ŸŒ': 'WRLD:', '๐ŸŒŽ': 'WRLD:', '๐ŸŒ': 'WRLD:', '๐ŸŒ': 'WEB:', '๐Ÿ“‹': 'LIST:', '๐Ÿ’ผ': 'POS:', '๐Ÿ›ก๏ธ': 'SAFE:', 'โ–ถ๏ธ': '>', 'โฌ†๏ธ': '^', '๐Ÿ—‘๏ธ': 'DEL:', '๐Ÿ”„': 'SYNC:', '๐Ÿ’พ': 'SAVE:', '๐Ÿ“‰': 'DN:', '๐Ÿ“Š': 'BAR:', 'โš™๏ธ': 'SET:', '๐ŸŽฎ': 'GAME:', '๐Ÿ”ฅ': 'HOT:', '๐ŸŽฏ': 'AIM:', '๐Ÿ’€': 'SKULL:', '๐Ÿ’ก': 'IDEA:', '๐Ÿ“ˆ': 'RISE:', '๐Ÿค–': 'AUTO:', '๐Ÿ’ผ': 'JOB:', '๐Ÿ“‹': 'LOG:', '๐Ÿงช': 'LAB:', '๐Ÿง ': 'MIND:', '๐Ÿ”': 'FIND:', 'โšก': 'FAST:', '๐Ÿ—‘๏ธ': 'TRASH:', '๐Ÿ’พ': 'DISK:', '๐Ÿ”„': 'REF:', 'โ–ถ๏ธ': 'PLAY:', '๐Ÿ“Š': 'DATA:', '๐Ÿ’ฐ': 'CASH:', 'โš™๏ธ': 'CFG:', '๐Ÿ”ฅ': 'FIRE:', '๐ŸŽฏ': 'GOAL:', '๐Ÿ›ก๏ธ': 'GUARD:', '๐Ÿ’ผ': 'BAG:', '๐Ÿ“–': 'BOOK:', '๐Ÿ’ต': '$:', 'โœ‰๏ธ': 'MSG:', '๐Ÿ“ง': 'EMAIL:', '๐Ÿ“ฑ': 'MOB:', '๐Ÿ”’': 'LOCK:', '๐Ÿ”“': 'OPEN:', '๐Ÿ”‘': 'KEY:', 'โšก': 'PWR:', '๐ŸŒŸ': 'STAR:', 'โญ': '*', 'โœจ': 'SPARK:', '๐Ÿ’ซ': 'DIZZY:', '๐ŸŽต': 'MUSIC:', '๐ŸŽถ': 'NOTES:', '๐Ÿ”Š': 'LOUD:', '๐Ÿ”‡': 'MUTE:', '๐Ÿ“ฃ': 'MEGA:', '๐Ÿ“ข': 'SPEAK:', '๐Ÿ’ฌ': 'CHAT:', '๐Ÿ—ฏ๏ธ': 'BUBBLE:', '๐Ÿ’ญ': 'THINK:', '๐Ÿ•': 'CLK:', 'โฑ๏ธ': 'TMR:', 'โฒ๏ธ': 'TIMER:', '๐Ÿ•ฐ๏ธ': 'CLOCK:', '๐ŸŒ™': 'MOON:', 'โ˜€๏ธ': 'SUN:', 'โญ': 'STAR:', 'โ˜๏ธ': 'CLOUD:', 'โšก': 'BOLT:', '๐Ÿ”ฅ': 'FLAME:', 'โ„๏ธ': 'ICE:', } for emoji, fallback in emoji_map.items(): text = text.replace(emoji, fallback) # Use ONE font for everything - Roboto kwargs['font_name'] = '/system/fonts/Roboto.ttf' if os.path.exists('/system/fonts/Roboto.ttf') else 'Roboto' return Label(text=text, **kwargs) # ============================================================================ # IMAGE ICON HELPER - Load PNG icons instead of emojis # ============================================================================ # Cache for loaded images _icon_cache = {} _icons_dir = None def get_icons_dir(): """Get the icons directory path""" global _icons_dir if _icons_dir is None: script_dir = os.path.dirname(os.path.abspath(__file__)) _icons_dir = os.path.join(script_dir, 'images') return _icons_dir def IconImage(icon_name, size=dp(20), **kwargs): """Load a PNG icon as an Image widget""" from kivy.uix.image import Image # Build full path icon_path = os.path.join(get_icons_dir(), f"{icon_name}.png") # Check if icon exists if not os.path.exists(icon_path): # Return empty widget if icon not found from kivy.uix.widget import Widget return Widget(size_hint=(None, None), size=(size, size)) # Create image widget img = Image( source=icon_path, size_hint=(None, None), size=(size, size), allow_stretch=True, keep_ratio=True, **kwargs ) return img def IconButton(icon_name, text="", icon_size=dp(18), **kwargs): """Create a button with icon + text""" from kivy.uix.boxlayout import BoxLayout container = BoxLayout(orientation='horizontal', spacing=dp(4)) # Add icon icon = IconImage(icon_name, size=icon_size) container.add_widget(icon) # Add text if provided if text: from kivy.uix.label import Label lbl = Label(text=text, font_size=sp(10), color=WHITE) container.add_widget(lbl) return container # ============================================================================ # MACRO INDICATORS MODULE - FULLY INTEGRATED # ============================================================================ class MacroDataFetcher: """Fetches DXY and Oil data from Yahoo Finance""" def __init__(self): self.cache = {'dxy': None, 'oil': None} self.cache_time = {'dxy': None, 'oil': None} self.cache_duration = 300 def _is_cached(self, key): if self.cache[key] and self.cache_time[key]: return (datetime.now() - self.cache_time[key]).seconds < self.cache_duration return False def get_dxy(self): if self._is_cached('dxy'): return self.cache['dxy'] try: url = "https://query1.finance.yahoo.com/v8/finance/chart/DX-Y.NYB?interval=5m&range=1d" headers = {'User-Agent': 'Mozilla/5.0'} r = requests.get(url, headers=headers, timeout=10) data = r.json() if data.get('chart', {}).get('result'): meta = data['chart']['result'][0]['meta'] curr = meta.get('regularMarketPrice', 0) prev = meta.get('previousClose', 0) if curr and prev: result = { 'price': round(curr, 2), 'change': round(curr - prev, 2), 'change_pct': round(((curr - prev) / prev) * 100, 2), 'timestamp': datetime.now().isoformat() } self.cache['dxy'] = result self.cache_time['dxy'] = datetime.now() return result except Exception as e: print(f"[MACRO] DXY error: {e}") return self.cache['dxy'] or {'price': 103.5, 'change': 0, 'change_pct': 0} def get_crude_oil(self): if self._is_cached('oil'): return self.cache['oil'] try: url = "https://query1.finance.yahoo.com/v8/finance/chart/CL=F?interval=5m&range=1d" headers = {'User-Agent': 'Mozilla/5.0'} r = requests.get(url, headers=headers, timeout=10) data = r.json() if data.get('chart', {}).get('result'): meta = data['chart']['result'][0]['meta'] curr = meta.get('regularMarketPrice', 0) prev = meta.get('previousClose', 0) if curr and prev: result = { 'price': round(curr, 2), 'change': round(curr - prev, 2), 'change_pct': round(((curr - prev) / prev) * 100, 2), 'timestamp': datetime.now().isoformat() } self.cache['oil'] = result self.cache_time['oil'] = datetime.now() return result except Exception as e: print(f"[MACRO] Oil error: {e}") return self.cache['oil'] or {'price': 78.5, 'change': 0, 'change_pct': 0} def get_bias(self): dxy = self.get_dxy() oil = self.get_crude_oil() score = 0 if dxy['change_pct'] > 0.5: score -= 2 elif dxy['change_pct'] < -0.5: score += 2 if oil['change_pct'] > 2: score += 1 elif oil['change_pct'] < -3: score -= 1 if score >= 2: return 'BULLISH', GREEN if score <= -2: return 'BEARISH', RED return 'NEUTRAL', AMBER return 'NEUTRAL', AMBER # ============================================================================ # PRECISION INDICATOR ENGINE - Professional Daytrading Toolkit # ============================================================================ class IndicatorMonitor: """ Central monitoring hub for all indicator performance. Tracks accuracy, latency, and effectiveness of each indicator. """ def __init__(self, app_ref=None): self.app_ref = app_ref self.metrics = { 'divergence': {'calls': 0, 'correct': 0, 'latency_ms': [], 'last_signal': None}, 'adx_filter': {'calls': 0, 'filtered': 0, 'allowed': 0, 'last_check': None}, 'liquidity_sweep': {'calls': 0, 'valid_sweeps': 0, 'false_sweeps': 0, 'last_sweep': None}, 'volume_profile': {'calls': 0, 'hits_poc': 0, 'hits_vah': 0, 'hits_val': 0}, 'cvd': {'calls': 0, 'bullish_signals': 0, 'bearish_signals': 0}, 'pattern_recognition': {'calls': 0, 'patterns_found': 0, 'successful': 0}, } self.start_time = time.time() self.enabled_monitors = set(['divergence', 'adx_filter', 'liquidity_sweep']) def log(self, indicator, event, data=None): """Log an indicator event with optional data""" timestamp = time.time() if indicator not in self.metrics: self.metrics[indicator] = {'calls': 0} self.metrics[indicator]['calls'] += 1 if event == 'signal': self.metrics[indicator]['last_signal'] = {'time': timestamp, 'data': data} elif event == 'correct': self.metrics[indicator]['correct'] = self.metrics[indicator].get('correct', 0) + 1 elif event == 'filtered': self.metrics[indicator]['filtered'] = self.metrics[indicator].get('filtered', 0) + 1 elif event == 'allowed': self.metrics[indicator]['allowed'] = self.metrics[indicator].get('allowed', 0) + 1 elif event == 'latency': self.metrics[indicator].setdefault('latency_ms', []).append(data) # Keep last 100 latency measurements self.metrics[indicator]['latency_ms'] = self.metrics[indicator]['latency_ms'][-100:] # Log to app activity if significant if self.app_ref and event in ['signal', 'filtered', 'valid_sweep']: if indicator == 'divergence' and event == 'signal': div_type = data.get('type', 'unknown') if data else 'unknown' strength = data.get('strength', 'weak') if data else 'weak' pair = data.get('pair', 'unknown') if data else 'unknown' self.app_ref.log_activity( f"[INDICATOR] {pair}: {div_type.upper()} divergence ({strength})", "INFO" ) elif indicator == 'liquidity_sweep' and event == 'valid_sweep': sweep_type = data.get('type', 'unknown') if data else 'unknown' pair = data.get('pair', 'unknown') if data else 'unknown' self.app_ref.log_activity( f"[INDICATOR] {pair}: {sweep_type.upper()} sweep detected", "INFO" ) def get_accuracy(self, indicator): """Get accuracy percentage for an indicator""" if indicator not in self.metrics: return 0 calls = self.metrics[indicator].get('calls', 0) correct = self.metrics[indicator].get('correct', 0) if calls == 0: return 0 return (correct / calls) * 100 def get_avg_latency(self, indicator): """Get average latency in milliseconds""" if indicator not in self_metrics: return 0 latencies = self.metrics[indicator].get('latency_ms', []) if not latencies: return 0 return sum(latencies) / len(latencies) def get_summary(self): """Get summary of all indicator performance""" return { 'uptime_seconds': time.time() - self.start_time, 'enabled': list(self.enabled_monitors), 'accuracy': {k: self.get_accuracy(k) for k in self.metrics.keys()}, 'metrics': self.metrics } def print_summary(self): """Print summary to console""" print("\n" + "="*60) print("INDICATOR MONITOR SUMMARY") print("="*60) for indicator, data in self.metrics.items(): if data.get('calls', 0) > 0: accuracy = self.get_accuracy(indicator) avg_latency = self.get_avg_latency(indicator) print(f"{indicator:20s}: {data['calls']:4d} calls | {accuracy:5.1f}% accuracy | {avg_latency:4.1f}ms avg") print("="*60) class DivergenceScanner: """ RSI Divergence Detection Engine Detects: - Bullish Divergence: Price Lower Low, RSI Higher Low (buy signal) - Bearish Divergence: Price Higher High, RSI Lower High (sell signal) Also detects Hidden Divergence for trend continuation. """ def __init__(self, monitor=None): self.monitor = monitor self.lookback_periods = [10, 20, 30] # Multiple timeframes self.min_pivot_points = 3 def calculate_rsi(self, prices, period=14): """Calculate RSI for price series""" if len(prices) < period + 1: return [] deltas = [prices[i] - prices[i-1] for i in range(1, len(prices))] rsi_values = [] avg_gain = sum(d for d in deltas[:period] if d > 0) / period avg_loss = sum(-d for d in deltas[:period] if d < 0) / period if avg_loss == 0: rsi_values.append(100) else: rs = avg_gain / avg_loss rsi_values.append(100 - (100 / (1 + rs))) for i in range(period, len(deltas)): delta = deltas[i] gain = delta if delta > 0 else 0 loss = -delta if delta < 0 else 0 avg_gain = (avg_gain * (period - 1) + gain) / period avg_loss = (avg_loss * (period - 1) + loss) / period if avg_loss == 0: rsi_values.append(100) else: rs = avg_gain / avg_loss rsi_values.append(100 - (100 / (1 + rs))) return rsi_values def find_pivot_points(self, series, window=5): """Find local maxima and minima in a series""" pivots_high = [] pivots_low = [] for i in range(window, len(series) - window): # Check for pivot high if all(series[i] >= series[i-j] for j in range(1, window+1)) and \ all(series[i] >= series[i+j] for j in range(1, window+1)): pivots_high.append((i, series[i])) # Check for pivot low if all(series[i] <= series[i-j] for j in range(1, window+1)) and \ all(series[i] <= series[i+j] for j in range(1, window+1)): pivots_low.append((i, series[i])) return pivots_high, pivots_low def detect_divergence(self, prices, pair="unknown"): """ Detect RSI divergence patterns. Returns: dict or None: { 'type': 'bullish' or 'bearish', 'strength': 'weak', 'medium', 'strong', 'price_pivots': [(idx, price), ...], 'rsi_pivots': [(idx, rsi), ...], 'description': str } """ start_time = time.time() if len(prices) < 30: return None rsi = self.calculate_rsi(prices) if len(rsi) < 20: return None # Align RSI with prices (RSI starts at index period) rsi_offset = len(prices) - len(rsi) # Find pivot points price_pivots_high, price_pivots_low = self.find_pivot_points(prices) rsi_pivots_high, rsi_pivots_low = self.find_pivot_points(rsi) divergence = None # Check for Regular Bullish Divergence (Price LL, RSI HL) if len(price_pivots_low) >= 2 and len(rsi_pivots_low) >= 2: # Get last two pivot lows price_low1, price_low2 = price_pivots_low[-2], price_pivots_low[-1] # Find corresponding RSI pivots (closest in time) rsi_low1 = min(rsi_pivots_low, key=lambda x: abs(x[0] - price_low1[0])) rsi_low2 = min(rsi_pivots_low, key=lambda x: abs(x[0] - price_low2[0])) # Check divergence: Price makes lower low, RSI makes higher low if price_low2[1] < price_low1[1] and rsi_low2[1] > rsi_low1[1]: strength = self._calculate_strength( price_low1[1], price_low2[1], rsi_low1[1], rsi_low2[1] ) divergence = { 'type': 'bullish', 'strength': strength, 'price_pivots': [price_low1, price_low2], 'rsi_pivots': [rsi_low1, rsi_low2], 'description': f"Bullish divergence: Price LL ({price_low2[1]:.2f} < {price_low1[1]:.2f}), RSI HL ({rsi_low2[1]:.1f} > {rsi_low1[1]:.1f})" } # Check for Regular Bearish Divergence (Price HH, RSI LH) if not divergence and len(price_pivots_high) >= 2 and len(rsi_pivots_high) >= 2: price_high1, price_high2 = price_pivots_high[-2], price_pivots_high[-1] rsi_high1 = min(rsi_pivots_high, key=lambda x: abs(x[0] - price_high1[0])) rsi_high2 = min(rsi_pivots_high, key=lambda x: abs(x[0] - price_high2[0])) if price_high2[1] > price_high1[1] and rsi_high2[1] < rsi_high1[1]: strength = self._calculate_strength( price_high1[1], price_high2[1], rsi_high1[1], rsi_high2[1], bearish=True ) divergence = { 'type': 'bearish', 'strength': strength, 'price_pivots': [price_high1, price_high2], 'rsi_pivots': [rsi_high1, rsi_high2], 'description': f"Bearish divergence: Price HH ({price_high2[1]:.2f} > {price_high1[1]:.2f}), RSI LH ({rsi_high2[1]:.1f} < {rsi_high1[1]:.1f})" } # Log to monitor if self.monitor: latency_ms = (time.time() - start_time) * 1000 self.monitor.log('divergence', 'latency', latency_ms) if divergence: divergence['pair'] = pair self.monitor.log('divergence', 'signal', divergence) return divergence def _calculate_strength(self, price1, price2, rsi1, rsi2, bearish=False): """Calculate divergence strength based on magnitude""" if bearish: price_change = abs((price2 - price1) / price1) rsi_change = abs((rsi1 - rsi2) / rsi1) if rsi1 != 0 else 0 else: price_change = abs((price1 - price2) / price1) rsi_change = abs((rsi2 - rsi1) / rsi1) if rsi1 != 0 else 0 combined = price_change + rsi_change if combined > 0.15: return 'strong' elif combined > 0.08: return 'medium' else: return 'weak' class ADXFilter: """ ADX Trend Strength Filter ADX < 20: Ranging/Choppy market - AVOID TRADING ADX 20-40: Trending market - TRADE WITH TREND ADX > 40: Strong trend - AGGRESSIVE ENTRIES Also provides +DI / -DI for trend direction. """ def __init__(self, monitor=None): self.monitor = monitor self.adx_threshold_weak = 20 self.adx_threshold_strong = 40 self.adx_period = 14 def calculate_adx(self, highs, lows, closes, period=14): """Calculate ADX, +DI, -DI""" if len(highs) < period * 2 or len(lows) < period * 2 or len(closes) < period * 2: return {'adx': 0, 'plus_di': 0, 'minus_di': 0, 'trend': 'unknown'} # Calculate True Range (TR) tr_list = [] for i in range(1, len(highs)): tr1 = highs[i] - lows[i] tr2 = abs(highs[i] - closes[i-1]) tr3 = abs(lows[i] - closes[i-1]) tr_list.append(max(tr1, tr2, tr3)) # Calculate +DM and -DM plus_dm_list = [] minus_dm_list = [] for i in range(1, len(highs)): plus_dm = highs[i] - highs[i-1] minus_dm = lows[i-1] - lows[i] if plus_dm > minus_dm and plus_dm > 0: plus_dm_list.append(plus_dm) minus_dm_list.append(0) elif minus_dm > plus_dm and minus_dm > 0: plus_dm_list.append(0) minus_dm_list.append(minus_dm) else: plus_dm_list.append(0) minus_dm_list.append(0) # Calculate smoothed averages atr = sum(tr_list[-period:]) / period plus_di_sum = sum(plus_dm_list[-period:]) minus_di_sum = sum(minus_dm_list[-period:]) if atr == 0: return {'adx': 0, 'plus_di': 0, 'minus_di': 0, 'trend': 'flat'} plus_di = 100 * plus_di_sum / atr minus_di = 100 * minus_dm_sum / atr # Calculate DX and ADX dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di) if (plus_di + minus_di) > 0 else 0 adx = dx # Simplified - in real ADX this would be smoothed over period # Determine trend direction if plus_di > minus_di: trend = 'bullish' elif minus_di > plus_di: trend = 'bearish' else: trend = 'flat' return { 'adx': adx, 'plus_di': plus_di, 'minus_di': minus_di, 'trend': trend, 'strength': self._classify_strength(adx) } def _classify_strength(self, adx): """Classify trend strength""" if adx < self.adx_threshold_weak: return 'weak' # Ranging, avoid elif adx < self.adx_threshold_strong: return 'moderate' # Trending, trade with trend else: return 'strong' # Strong trend, aggressive entries def should_trade(self, highs, lows, closes, direction='LONG'): """ Determine if we should trade based on ADX. Returns: dict: { 'trade_allowed': bool, 'reason': str, 'confidence': float # 0-1 based on ADX value } """ start_time = time.time() adx_data = self.calculate_adx(highs, lows, closes) adx = adx_data['adx'] trend = adx_data['trend'] strength = adx_data['strength'] result = { 'trade_allowed': False, 'reason': '', 'confidence': adx / 50 if adx < 50 else 1.0, 'adx_data': adx_data } if strength == 'weak': result['reason'] = f"ADX {adx:.1f} < 20: Ranging market, avoiding trades" if self.monitor: self.monitor.log('adx_filter', 'filtered', {'adx': adx, 'reason': 'ranging'}) elif direction == 'LONG' and trend != 'bullish': result['reason'] = f"ADX {adx:.1f}: Trend is {trend}, not bullish" if self.monitor: self.monitor.log('adx_filter', 'filtered', {'adx': adx, 'reason': 'wrong_direction'}) elif direction == 'SHORT' and trend != 'bearish': result['reason'] = f"ADX {adx:.1f}: Trend is {trend}, not bearish" if self.monitor: self.monitor.log('adx_filter', 'filtered', {'adx': adx, 'reason': 'wrong_direction'}) else: result['trade_allowed'] = True result['reason'] = f"ADX {adx:.1f}: {strength} {trend} trend, trading allowed" if self.monitor: self.monitor.log('adx_filter', 'allowed', {'adx': adx, 'trend': trend}) if self.monitor: latency_ms = (time.time() - start_time) * 1000 self.monitor.log('adx_filter', 'latency', latency_ms) return result class LiquiditySweepDetector: """ Liquidity Sweep Detection Engine Detects smart money manipulation patterns: - Bullish Sweep: Wick below support, close back above (stop hunt + reversal) - Bearish Sweep: Wick above resistance, close back below (stop hunt + reversal) Also tracks breaker blocks (previous support becoming resistance and vice versa). """ def __init__(self, monitor=None): self.monitor = monitor self.recent_sweeps = {} # pair -> list of recent sweeps self.max_sweep_age = 300 # 5 minutes self.sweep_confirmation_bars = 2 # Need 2 bars to confirm def find_key_levels(self, highs, lows, closes, lookback=20): """Find recent support and resistance levels""" if len(highs) < lookback or len(lows) < lookback: return {'support': None, 'resistance': None} recent_highs = highs[-lookback:] recent_lows = lows[-lookback:] # Simple approach: use recent swing points resistance = max(recent_highs[:-3]) if len(recent_highs) > 3 else max(recent_highs) support = min(recent_lows[:-3]) if len(recent_lows) > 3 else min(recent_lows) return { 'support': support, 'resistance': resistance, 'recent_high': highs[-1], 'recent_low': lows[-1] } def detect_sweep(self, pair, highs, lows, closes, opens=None): """ Detect liquidity sweep patterns. Args: pair: Trading pair symbol highs: List of high prices lows: List of low prices closes: List of close prices opens: List of open prices (optional) Returns: dict or None: { 'type': 'bullish_sweep' or 'bearish_sweep', 'strength': 'weak', 'medium', 's