# ai_heuristic_core.py import requests from typing import List, Dict class AIHeuristicCore: """V3 Auditor: Specialized in Regulatory Compliance and Code Citations.""" def __init__(self, model="qwen2:7b"): self.model = model self.api_url = "http://localhost:11434/api/generate" def generate_compliance_prompt(self, violations: List[str], max_dcr: float, codes: List[str]) -> str: """Synthesizes DCR violations into a formal regulatory audit.""" status = "NON-COMPLIANT" if violations else "COMPLIANT" prompt = ( f"Role: Senior Regulatory Engineer. System Status: {status}.\n" f"Regulatory Codes Active: {', '.join(set(codes))}\n" f"Peak Demand-Capacity Ratio (DCR): {max_dcr:.2f}\n" f"Specific Violations Found:\n" + "\n".join(violations[:5]) + "\n\n" "Task: Provide a high-density compliance directive. Cite the safety factors " "required by the standards. If DCR > 1.0, specify that the design is " "unbuildable under current law. Use professional nomenclature like " "'Limit State Design' and 'Load Path Redundancy'." ) return prompt def perform_audit(self, prompt: str) -> str: payload = { "model": self.model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1, "num_predict": 150} # Ultra-precise } try: response = requests.post(self.api_url, json=payload, timeout=30) if response.status_code == 200: return response.json().get("response", "Audit trace lost.").strip() return f"Compliance Link Offline: Error {response.status_code}" except Exception as e: return f"Regulatory Heuristic Failure: {str(e)}" class AuditManager: """Orchestrates the data flow from Compliance to AI.""" def __init__(self): self.core = AIHeuristicCore() def request_compliance_critique(self, report_data: List[Dict]): if not report_data: return "No structural data available for audit." violations = [] dcrs = [] codes = [] for entry in report_data: violations.extend(entry['reasons']) dcrs.append(max(entry['dcr_stress'], entry['dcr_deflection'])) codes.append(entry['code']) prompt = self.core.generate_compliance_prompt(violations, max(dcrs), codes) return self.core.perform_audit(prompt)