### app/modules/likes/plugin.py ### from .routes import router class Plugin: def register(self, app, event_bus): app.include_router(router) ### app/modules/likes/routes.py ### from fastapi import APIRouter from .schemas import LikeRequest, CommentRequest from .service import SocialService from app.shared.utils import success_response, error_response router = APIRouter(prefix="/api/v1/social", tags=["Social"]) service = SocialService() @router.post("/like") async def like_post(req: LikeRequest): try: result = service.process_like(req.target_url, req.account_id) if result.get("success"): return success_response(result, "Like delivered") return error_response(result.get("error", "Like failed"), "LIKE_FAILED") except Exception as e: return error_response(str(e), "LIKE_ERROR") @router.post("/comment") async def comment_post(req: CommentRequest): try: result = service.process_comment(req.target_url, req.comment_texts, req.account_id) if result.get("success"): return success_response(result, "Comment delivered") return error_response(result.get("error", "Comment failed"), "COMMENT_FAILED") except Exception as e: return error_response(str(e), "COMMENT_ERROR") ### app/modules/likes/service.py ### """Likes & Comments Service — Story View se completely isolated""" import json from app.core.logger import get_logger from app.core.database import db_execute, db_fetchrow from app.engine.instagram_actions import InstagramActions logger = get_logger(__name__) class SocialService: """Social actions (likes + comments) using InstagramActions engine""" @staticmethod def _load_account(account_id: int = None) -> dict: """Load account cookies + proxy from DB (sync for engine)""" import psycopg2 from app.core.config import settings conn = psycopg2.connect( host=settings.DB_HOST, database=settings.DB_NAME, user=settings.DB_USER, password=settings.DB_PASSWORD ) cur = conn.cursor() if account_id: cur.execute("SELECT id, username, cookies, proxy FROM accounts WHERE id=%s", (account_id,)) else: cur.execute("SELECT id, username, cookies, proxy FROM accounts WHERE status='active' AND (cookie_valid IS NULL OR cookie_valid = true) ORDER BY id LIMIT 1") row = cur.fetchone() cur.close() conn.close() if not row: return None cookies = row[2] if isinstance(row[2], dict) else json.loads(row[2]) return {"id": row[0], "username": row[1], "cookies": cookies, "proxy": row[3]} def process_like(self, target_url: str, account_id: int = None) -> dict: """Deliver likes to a post/reel URL""" account = self._load_account(account_id) if not account: return {"success": False, "error": "No active account available"} engine = InstagramActions(cookies=account["cookies"], proxy=account["proxy"]) if not engine.fb_dtsg: return {"success": False, "error": "Failed to initialize session tokens"} shortcode = engine.extract_shortcode(target_url) if not shortcode: return {"success": False, "error": "Invalid post URL - could not extract shortcode"} details = engine.get_post_details(shortcode) if not details or not details.get("media_id"): return {"success": False, "error": f"Could not get media_id for shortcode {shortcode}"} result = engine.like_post(details["media_id"]) logger.info(f"Like result: {result}") return result def process_comment(self, target_url: str, comment_texts: list, account_id: int = None) -> dict: """Deliver comment to a post/reel URL""" account = self._load_account(account_id) if not account: return {"success": False, "error": "No active account available"} engine = InstagramActions(cookies=account["cookies"], proxy=account["proxy"]) if not engine.fb_dtsg: return {"success": False, "error": "Failed to initialize session tokens"} shortcode = engine.extract_shortcode(target_url) if not shortcode: return {"success": False, "error": "Invalid post URL - could not extract shortcode"} details = engine.get_post_details(shortcode) if not details or not details.get("media_id"): return {"success": False, "error": f"Could not get media_id for shortcode {shortcode}"} result = engine.comment_by_url(target_url, comment_texts) logger.info(f"Comment result: {result}") return result ### app/modules/likes/tasks.py ### """Social order processing tasks (sync - for Celery)""" from app.workers.celery_app import celery_app from app.modules.likes.service import SocialService @celery_app.task(name="app.modules.likes.tasks.process_social_order") def process_social_order(order_id: int): """Process a like/comment order synchronously and update DB.""" from app.core.logger import get_logger logger = get_logger(__name__) try: # Use sync psycopg2 (Celery-safe) import psycopg2 from app.core.config import settings conn = psycopg2.connect( host=settings.DB_HOST, database=settings.DB_NAME, user=settings.DB_USER, password=settings.DB_PASSWORD ) cur = conn.cursor() # Fetch order cur.execute( "SELECT id, order_id, service_type, target_url, comment_texts, requested_views FROM orders WHERE id = %s", (order_id,) ) row = cur.fetchone() if not row: logger.error(f"Order {order_id} not found") cur.close() conn.close() return {"status": "error", "order_id": order_id, "error": "Order not found"} order_id_db, order_id_pub, service_type, target_url, comment_texts, requested = row requested = requested or 1 # Skip non-social orders if service_type not in ("like", "comment"): cur.close() conn.close() return {"status": "skipped", "order_id": order_id_pub} svc = SocialService() delivered = 0 fail_reason = None for i in range(requested): if service_type == "like": res = svc.process_like(target_url, account_id=None) elif service_type == "comment": texts = comment_texts if isinstance(comment_texts, list) else [comment_texts] if comment_texts else ["Nice!"] res = svc.process_comment(target_url, texts, account_id=None) else: res = {"success": False, "error": "Unknown service type"} if res and res.get("success"): delivered += 1 else: fail_reason = res.get("error", "Unknown error") if res else "No response" # Determine final status if delivered >= requested: status = "completed" elif delivered > 0: status = "partial" else: status = "failed" # Update order in DB cur.execute( "UPDATE orders SET status = %s, delivered_views = %s, fail_reason = %s WHERE id = %s", (status, delivered, fail_reason, order_id_db) ) conn.commit() logger.info(f"Social order {order_id_pub} processed: {delivered}/{requested} {service_type} → {status}") cur.close() conn.close() return {"status": status, "delivered": delivered, "order_id": order_id_pub} except Exception as e: logger.error(f"Social task error for order {order_id}: {e}") # Attempt to mark as failed try: conn.rollback() cur.execute("UPDATE orders SET status = 'failed', fail_reason = %s WHERE id = %s", (str(e), order_id)) conn.commit() except: pass try: cur.close() conn.close() except: pass return {"status": "error", "order_id": order_id, "error": str(e)} ### app/modules/likes/schemas.py ### from pydantic import BaseModel, Field from typing import Optional, Literal class LikeRequest(BaseModel): target_url: str = Field(..., description="Instagram post/reel URL") account_id: Optional[int] = None count: int = Field(default=1, ge=1, le=100) class CommentRequest(BaseModel): target_url: str = Field(..., description="Instagram post/reel URL") comment_texts: list = Field(..., description="List of comment texts") account_id: Optional[int] = None