1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| from flask import Flask, render_template, request, jsonify import requests import sqlite3 import datetime import os import urllib.parse import re
app = Flask(__name__)
# --- 配置中心 --- DB_PATH = "/app/data/chat_history.db" KNOWLEDGE_DIR = "/app/data/knowledge/" # 这个文档是专门用来放内部文档的。 PRIMARY_MODEL = "llama3:70b" BACKUP_MODEL = "qwen2.5:7b" OLLAMA_API = "http://ollama-svc:11434/api/generate"
def init_db(): with sqlite3.connect(DB_PATH) as conn: conn.execute('''CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY AUTOINCREMENT, prompt TEXT, response TEXT, model TEXT, time TEXT)''') conn.execute("CREATE INDEX IF NOT EXISTS idx_prompt ON history(prompt)")
# RAG 检索逻辑 def get_relevant_context_v2(user_query, top_n=2): """ 全目录动态扫描:支持万级文件,自动计算相关性评分 """ context = "" if not os.path.exists(KNOWLEDGE_DIR): print(f" [DEBUG] 路径不存在: {KNOWLEDGE_DIR}") return context
# 预处理用户提问:转小写并提取关键词 query_words = set(re.findall(r'\w+', user_query.lower())) # 比如我问的是“TNS抽帧预案”,这里会拆成“TNS”、“抽帧”、“预案”这些关键词 if not query_words: return context
scored_docs = [] try: for filename in os.listdir(KNOWLEDGE_DIR): if filename.endswith(".txt") or filename.endswith(".md"): file_path = os.path.join(KNOWLEDGE_DIR, filename) # 遍历目录下所有文件,在这里死磕 with open(file_path, 'r', encoding='utf-8') as f: content = f.read() content_lower = content.lower() file_lower = filename.lower() # 评分逻辑:文件名命中权重为 5,内容命中权重为 1 score = sum(5 for word in query_words if word in file_lower) # 每当题目提到了关键词,就加1分 score += sum(1 for word in query_words if word in content_lower) # 每当文章内容提到了关键词,就加1分,分数越高说明这个文件是需要的东西。 if score > 0: scored_docs.append({ "filename": filename, "content": content[:1500], # 限制长度,防止模型上下文溢出 "score": score }) except Exception as e: print(f" [DEBUG] 检索异常: {e}")
# 按分数降序排列,取最相关的 top_n 个文档 scored_docs.sort(key=lambda x: x['score'], reverse=True) # 然后对分数进行排序。 selected_docs = scored_docs[:top_n]
if selected_docs: context = "\n".join([f"--- 参考文档: {d['filename']} ---\n{d['content']}" for d in selected_docs]) # 因为AI上下文有限,就给他top N的文档然后弄成一个字符串,AI通过这个字符串组织语言回答你。 print(f" [DEBUG] RAG 命中 {len(selected_docs)} 个文档,最高分: {selected_docs[0]['score']}") return context
@app.route('/') def index(): return render_template('index.html')
@app.route('/aigpt_api') def aigpt_api(): raw_prompt = request.args.get('prompt', '') user_prompt = urllib.parse.unquote(raw_prompt) if not user_prompt: return jsonify({"response": "内容不能为空"})
# 调用 V2 自动化检索 knowledge_context = get_relevant_context_v2(user_prompt) # 构造增强 Prompt if knowledge_context: final_prompt = ( f"你是一位资深 SRE 专家。请严格参考以下内部文档回答用户问题。\n\n" f"【内部参考信息】:\n{knowledge_context}\n\n" f"【用户问题】: {user_prompt}\n" f"请用中文详细回答。" ) else: # 如果没搜到,就按通用 AI 回答 final_prompt = f"你是一位资深 SRE 专家,请用中文回答:{user_prompt}"
# 模型请求逻辑(包含降级策略) final_response = "" used_model = "" for model in [PRIMARY_MODEL, BACKUP_MODEL]: try: print(f" [DEBUG] 尝试调用模型: {model}") r = requests.post(OLLAMA_API, json={ "model": model, "prompt": final_prompt, "stream": False, "keep_alive": -1 }, timeout=(5, 300)) if r.status_code == 200: used_model = model final_response = r.json().get('response', '') break except Exception as e: print(f" [DEBUG] 模型 {model} 异常: {e}") continue
if not used_model: return jsonify({"response": "后端推理服务连接失败"}), 500
# 结果持久化 try: with sqlite3.connect(DB_PATH) as conn: conn.execute("INSERT INTO history (prompt, response, model, time) VALUES (?, ?, ?, ?)", (user_prompt, final_response, used_model, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) except: pass
# 加上前端标识 tag = "🚀 [70B+RAG]" if used_model == PRIMARY_MODEL else "💡 [Qwen]" return jsonify({"response": f"{tag}\n\n{final_response}"})
@app.route('/search') def search(): query = request.args.get('q', '') with sqlite3.connect(DB_PATH) as conn: cursor = conn.execute("SELECT prompt, response, time FROM history WHERE prompt LIKE ? OR response LIKE ? ORDER BY id DESC", (f'%{query}%', f'%{query}%')) results = [{"prompt": r[0], "response": r[1], "time": r[2]} for r in cursor.fetchall()] return jsonify(results)
if __name__ == '__main__': init_db() app.run(host='0.0.0.0', port=5000)
|