百川2-13B-4bits开源大模型实战对接企业微信机器人实现内部AI问答通知流1. 项目背景与价值想象一下这个场景你的团队正在讨论一个技术方案有人提出了一个关于数据库索引优化的问题。大家七嘴八舌讨论了半天还是没有定论。这时候如果有一个AI助手能立刻给出专业建议并且把答案自动推送到群里是不是能大大提升效率这就是我们今天要做的——把百川2-13B大模型接入企业微信机器人打造一个团队内部的智能问答通知系统。为什么选择百川2-13B-4bits你可能听说过很多大模型但部署起来要么显存要求太高要么速度太慢。百川2-13B的4bits量化版本完美解决了这个问题显存占用低只需要约10GB显存消费级显卡就能跑性能损失小相比原版性能只下降1-2个百分点几乎感觉不到中英双语支持无论是中文技术文档还是英文代码都能很好处理商用友好可以申请商用不用担心版权问题更重要的是我们已经有了现成的WebUI界面现在要做的就是让它从“只能网页访问”变成“随时随地可用”的团队助手。2. 整体架构设计在开始动手之前我们先看看整个系统是怎么工作的┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ │ │ │ │ 企业微信群聊 │────▶│ 企业微信机器人 │────▶│ 百川API服务 │ │ │ │ │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ │ │ │ │ 用户提问消息 │ │ 消息解析转发 │ │ AI生成回答 │ │ │ │ │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ │ │ │ └────────────────────────┼────────────────────────┘ │ ▼ ┌─────────────────┐ │ │ │ 回答推送回群 │ │ │ └─────────────────┘核心流程很简单用户在群里机器人提问机器人收到消息转发给百川API百川模型生成回答机器人把回答推送到群里听起来是不是挺简单的接下来我们一步步实现它。3. 准备工作3.1 检查现有环境首先确保你的百川WebUI服务已经正常运行。打开终端运行检查脚本cd /root/baichuan2-13b-webui/ ./check.sh你应该看到类似这样的输出✅ 所有检查通过 项目运行正常可以正常使用。如果服务没有运行先启动它supervisorctl start baichuan-webui3.2 创建企业微信机器人这一步需要在企业微信里操作打开企业微信进入你要添加机器人的群聊点击右上角群设置→添加机器人新建一个机器人设置好名字和头像复制Webhook地址这个地址长这样https://qyapi.weixin.qq.com/cgi-bin/webhook/send?keyxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx重要这个地址要保存好后面会用到测试机器人是否正常可选 在群里机器人发个“测试”看看机器人能不能回复3.3 安装必要的Python包我们需要安装几个Python库来处理HTTP请求和JSON数据pip install requests flask如果你用的是虚拟环境记得先激活环境# 如果有虚拟环境的话 source /path/to/your/venv/bin/activate pip install requests flask4. 搭建API桥接服务现在我们来创建一个简单的Flask应用作为企业微信机器人和百川模型之间的桥梁。4.1 创建项目目录结构mkdir -p /root/wechat-bot cd /root/wechat-bot创建以下文件wechat-bot/ ├── app.py # 主程序 ├── config.py # 配置文件 ├── baichuan_client.py # 百川客户端 ├── wechat_handler.py # 企业微信处理器 └── requirements.txt # 依赖包4.2 编写配置文件先创建config.py这里存放所有的配置信息# config.py import os class Config: # 企业微信机器人配置 WECHAT_WEBHOOK_URL os.getenv( WECHAT_WEBHOOK_URL, https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key你的机器人key ) # 百川API配置 BAICHUAN_API_URL os.getenv( BAICHUAN_API_URL, http://127.0.0.1:7860/api/chat ) # 服务器配置 HOST os.getenv(HOST, 0.0.0.0) PORT int(os.getenv(PORT, 5000)) # 模型参数配置 MODEL_CONFIG { temperature: 0.7, # 温度参数控制随机性 top_p: 0.9, # 核采样参数 max_tokens: 512, # 最大生成长度 stream: False # 是否流式输出 } # 安全配置 ALLOWED_IPS [127.0.0.1] # 允许访问的IP生产环境需要配置 # 日志配置 LOG_LEVEL INFO LOG_FILE /root/wechat-bot/logs/app.log config Config()重要提醒记得把WECHAT_WEBHOOK_URL替换成你刚才复制的真实地址。4.3 编写百川客户端创建baichuan_client.py负责和百川模型通信# baichuan_client.py import requests import json import logging from typing import Dict, Any, Optional from config import config logger logging.getLogger(__name__) class BaichuanClient: 百川模型客户端 def __init__(self): self.api_url config.BAICHUAN_API_URL self.timeout 30 # 30秒超时 def chat(self, message: str, history: list None) - Dict[str, Any]: 发送消息给百川模型 Args: message: 用户消息 history: 对话历史格式为 [{role: user, content: ...}, ...] Returns: 包含回答的字典 if history is None: history [] # 构建请求数据 payload { message: message, history: history, **config.MODEL_CONFIG } try: logger.info(f发送请求到百川API: {message[:50]}...) response requests.post( self.api_url, jsonpayload, timeoutself.timeout ) response.raise_for_status() # 检查HTTP错误 result response.json() if response in result: logger.info(f收到百川回复: {result[response][:50]}...) return { success: True, response: result[response], history: result.get(history, history [ {role: user, content: message}, {role: assistant, content: result[response]} ]) } else: logger.error(f百川API返回格式错误: {result}) return { success: False, error: API返回格式错误, response: 抱歉AI服务暂时不可用请稍后重试。 } except requests.exceptions.Timeout: logger.error(百川API请求超时) return { success: False, error: 请求超时, response: 思考时间有点长请稍后重试或简化问题。 } except requests.exceptions.RequestException as e: logger.error(f百川API请求失败: {str(e)}) return { success: False, error: str(e), response: AI服务连接失败请检查服务状态。 } except Exception as e: logger.error(f处理百川响应时出错: {str(e)}) return { success: False, error: str(e), response: 处理回答时出现错误请稍后重试。 } def test_connection(self) - bool: 测试与百川服务的连接 try: # 发送一个简单的测试消息 test_result self.chat(你好) return test_result[success] except Exception as e: logger.error(f连接测试失败: {str(e)}) return False # 创建全局客户端实例 baichuan_client BaichuanClient()4.4 编写企业微信处理器创建wechat_handler.py处理企业微信的消息和回复# wechat_handler.py import requests import json import logging from typing import Dict, Any, List from config import config logger logging.getLogger(__name__) class WeChatHandler: 企业微信消息处理器 def __init__(self): self.webhook_url config.WECHAT_WEBHOOK_URL def send_text_message(self, content: str, mentioned_list: List[str] None) - bool: 发送文本消息到企业微信群 Args: content: 消息内容 mentioned_list: 的用户列表None表示不任何人 Returns: 是否发送成功 payload { msgtype: text, text: { content: content } } # 添加功能 if mentioned_list: payload[text][mentioned_list] mentioned_list try: logger.info(f发送消息到企业微信: {content[:50]}...) response requests.post( self.webhook_url, jsonpayload, timeout10 ) response.raise_for_status() result response.json() if result.get(errcode) 0: logger.info(消息发送成功) return True else: logger.error(f企业微信返回错误: {result}) return False except requests.exceptions.RequestException as e: logger.error(f发送消息失败: {str(e)}) return False def send_markdown_message(self, content: str) - bool: 发送Markdown格式消息 Args: content: Markdown格式内容 Returns: 是否发送成功 payload { msgtype: markdown, markdown: { content: content } } try: response requests.post( self.webhook_url, jsonpayload, timeout10 ) response.raise_for_status() return response.json().get(errcode) 0 except requests.exceptions.RequestException as e: logger.error(f发送Markdown消息失败: {str(e)}) return False def parse_user_message(self, data: Dict[str, Any]) - Dict[str, Any]: 解析企业微信推送的消息 Args: data: 企业微信推送的JSON数据 Returns: 解析后的消息信息 try: msg_type data.get(msgtype, ) if msg_type text: return { type: text, content: data.get(text, {}).get(content, ), sender: data.get(sender, ), room_id: data.get(roomid, ), msg_id: data.get(msgid, ) } else: logger.warning(f不支持的消息类型: {msg_type}) return {} except Exception as e: logger.error(f解析消息失败: {str(e)}) return {} def format_ai_response(self, response: str, question: str None) - str: 格式化AI回复使其在企业微信中显示更友好 Args: response: AI原始回复 question: 原始问题可选 Returns: 格式化后的回复 # 如果回复是代码用Markdown代码块包裹 if in response or any(keyword in response.lower() for keyword in [def , class , import , function , const , var ]): formatted f**AI回答**\n\n\n{response}\n else: # 普通文本回复 formatted f**AI回答**\n\n{response} # 如果提供了问题可以一起显示 if question: formatted f**问题** {question}\n\n{formatted} return formatted # 创建全局处理器实例 wechat_handler WeChatHandler()4.5 编写主程序创建app.py这是整个服务的入口# app.py from flask import Flask, request, jsonify import logging from datetime import datetime import hashlib import json import os from config import config from baichuan_client import baichuan_client from wechat_handler import wechat_handler # 配置日志 logging.basicConfig( levelgetattr(logging, config.LOG_LEVEL), format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(config.LOG_FILE), logging.StreamHandler() ] ) logger logging.getLogger(__name__) app Flask(__name__) # 存储对话历史简单内存存储生产环境建议用Redis conversation_history {} def get_conversation_key(room_id: str, user_id: str) - str: 生成对话历史存储的key return f{room_id}_{user_id} app.route(/webhook, methods[POST]) def wechat_webhook(): 企业微信机器人Webhook接口 try: # 获取请求数据 data request.get_json() if not data: logger.warning(收到空请求) return jsonify({errcode: 400, errmsg: 无效的请求}) logger.info(f收到企业微信消息: {json.dumps(data, ensure_asciiFalse)[:200]}...) # 解析消息 message_info wechat_handler.parse_user_message(data) if not message_info: return jsonify({errcode: 0, errmsg: ok}) # 提取信息 content message_info.get(content, ).strip() room_id message_info.get(room_id, ) user_id message_info.get(sender, ) # 检查是否是机器人的消息 # 企业微信机器人消息默认就是机器人的所以不需要特别检查 # 但我们可以检查消息是否包含特定触发词可选 trigger_keywords [AI, ai, AI, 提问, 请问] # 如果消息太短或者是空消息忽略 if len(content) 2: logger.info(消息太短忽略) return jsonify({errcode: 0, errmsg: ok}) # 获取对话历史 conv_key get_conversation_key(room_id, user_id) history conversation_history.get(conv_key, []) # 限制历史记录长度最近10轮对话 if len(history) 20: # 10轮对话每轮2条消息 history history[-20:] logger.info(f用户提问: {content[:50]}...) # 调用百川模型 ai_response baichuan_client.chat(content, history) if ai_response[success]: response_text ai_response[response] # 更新对话历史 conversation_history[conv_key] ai_response.get(history, history [ {role: user, content: content}, {role: assistant, content: response_text} ]) # 格式化回复 formatted_response wechat_handler.format_ai_response(response_text, content) # 发送回复到企业微信 send_success wechat_handler.send_text_message(formatted_response) if send_success: logger.info(f成功回复用户: {user_id}) else: logger.error(f发送回复失败: {user_id}) else: # 如果AI服务出错发送错误提示 error_msg fAI服务暂时不可用: {ai_response.get(error, 未知错误)} wechat_handler.send_text_message(error_msg) logger.error(fAI服务错误: {ai_response.get(error)}) # 企业微信要求返回成功响应 return jsonify({errcode: 0, errmsg: ok}) except Exception as e: logger.error(f处理Webhook请求时出错: {str(e)}, exc_infoTrue) return jsonify({errcode: 500, errmsg: 服务器内部错误}) app.route(/health, methods[GET]) def health_check(): 健康检查接口 # 检查百川服务连接 baichuan_ok baichuan_client.test_connection() # 检查企业微信连接简单测试 wechat_ok wechat_handler.send_text_message(健康检查测试, mentioned_list[]) status { timestamp: datetime.now().isoformat(), baichuan_service: healthy if baichuan_ok else unhealthy, wechat_service: healthy if wechat_ok else unhealthy, conversation_count: len(conversation_history) } return jsonify(status) app.route(/clear_history, methods[POST]) def clear_history(): 清空对话历史管理接口 global conversation_history count len(conversation_history) conversation_history {} logger.info(f已清空对话历史共{count}个对话) return jsonify({errcode: 0, errmsg: f已清空{count}个对话历史}) app.route(/history, methods[GET]) def get_history(): 获取对话历史统计管理接口 stats { total_conversations: len(conversation_history), sample_conversations: list(conversation_history.keys())[:5] if conversation_history else [] } return jsonify(stats) if __name__ __main__: # 创建日志目录 os.makedirs(os.path.dirname(config.LOG_FILE), exist_okTrue) logger.info(f启动企业微信机器人服务监听 {config.HOST}:{config.PORT}) logger.info(f百川API地址: {config.BAICHUAN_API_URL}) # 测试连接 if baichuan_client.test_connection(): logger.info(百川服务连接正常) else: logger.warning(百川服务连接失败请检查服务状态) app.run( hostconfig.HOST, portconfig.PORT, debugFalse # 生产环境设为False )4.6 创建依赖文件创建requirements.txtFlask2.3.3 requests2.31.05. 部署与配置5.1 启动服务现在我们可以启动服务了cd /root/wechat-bot # 安装依赖 pip install -r requirements.txt # 启动服务前台运行测试用 python app.py你应该看到类似这样的输出2024-01-15 10:30:00 - app - INFO - 启动企业微信机器人服务监听 0.0.0.0:5000 2024-01-15 10:30:00 - app - INFO - 百川API地址: http://127.0.0.1:7860/api/chat 2024-01-15 10:30:00 - app - INFO - 百川服务连接正常 * Serving Flask app app * Debug mode: off * Running on all addresses (0.0.0.0) * Running on http://127.0.0.1:50005.2 配置企业微信机器人Webhook现在需要告诉企业微信把消息发送到我们的服务获取服务器公网IP如果你有公网IPcurl ifconfig.me配置企业微信机器人Webhook打开企业微信机器人设置找到接收消息配置设置回调URL为http://你的服务器IP:5000/webhook保存配置注意如果你没有公网IP可以使用内网穿透工具如ngrok、frp等将本地服务暴露到公网。5.3 配置Supervisor生产环境为了让服务在后台稳定运行我们配置Supervisor创建配置文件/etc/supervisor/conf.d/wechat-bot.conf[program:wechat-bot] command/usr/bin/python3 /root/wechat-bot/app.py directory/root/wechat-bot userroot autostarttrue autorestarttrue startsecs10 startretries3 stdout_logfile/root/wechat-bot/logs/supervisor.log stdout_logfile_maxbytes10MB stdout_logfile_backups10 stderr_logfile/root/wechat-bot/logs/supervisor_error.log stderr_logfile_maxbytes10MB stderr_logfile_backups10 environmentPYTHONPATH/root/wechat-bot然后重新加载Supervisor配置supervisorctl reread supervisorctl update supervisorctl start wechat-bot检查服务状态supervisorctl status wechat-bot应该看到wechat-bot RUNNING pid 12345, uptime 0:00:106. 测试与使用6.1 测试服务连通性首先测试健康检查接口curl http://127.0.0.1:5000/health应该返回类似这样的JSON{ timestamp: 2024-01-15T10:35:00.123456, baichuan_service: healthy, wechat_service: healthy, conversation_count: 0 }6.2 在企业微信中测试现在去企业微信群里测试机器人提问AI助手 什么是Python的装饰器等待回复通常3-10秒 机器人应该会回复**AI回答** 装饰器是Python中一个非常重要的特性它允许你在不修改原函数代码的情况下为函数添加新的功能...测试连续对话AI助手 能举个例子吗机器人会记得之前的对话给出相关的例子。6.3 测试不同场景代码生成测试AI助手 帮我写一个Python函数计算斐波那契数列技术问题解答AI助手 解释一下RESTful API的设计原则文档总结AI助手 总结一下微服务架构的优缺点7. 高级功能扩展7.1 添加命令功能我们可以让机器人支持一些特殊命令。修改app.py中的消息处理逻辑# 在 wechat_webhook 函数中添加命令处理 def wechat_webhook(): # ... 之前的代码 ... content message_info.get(content, ).strip() # 检查是否是命令 if content.startswith(/): return handle_command(content, room_id, user_id) # ... 原来的处理逻辑 ... def handle_command(command: str, room_id: str, user_id: str): 处理特殊命令 cmd command.lower().strip() if cmd /help: help_text 可用命令 /help - 显示帮助信息 /clear - 清空当前对话历史 /status - 查看服务状态 /history - 查看对话历史统计 wechat_handler.send_text_message(help_text) elif cmd /clear: conv_key get_conversation_key(room_id, user_id) if conv_key in conversation_history: del conversation_history[conv_key] wechat_handler.send_text_message(已清空你的对话历史) else: wechat_handler.send_text_message(没有找到你的对话历史) elif cmd /status: # 调用健康检查 import requests try: resp requests.get(fhttp://{config.HOST}:{config.PORT}/health, timeout5) status resp.json() status_msg f服务状态\n百川服务{status[baichuan_service]}\n微信服务{status[wechat_service]}\n对话数{status[conversation_count]} wechat_handler.send_text_message(status_msg) except: wechat_handler.send_text_message(无法获取服务状态) else: wechat_handler.send_text_message(f未知命令: {command}\n输入 /help 查看可用命令) return jsonify({errcode: 0, errmsg: ok})7.2 添加限流功能防止被滥用添加简单的限流# 在 app.py 开头添加 from collections import defaultdict from datetime import datetime, timedelta # 限流配置 RATE_LIMIT { max_requests: 10, # 每分钟最大请求数 window_minutes: 1 } # 存储请求记录 request_history defaultdict(list) def check_rate_limit(user_id: str) - bool: 检查用户是否超过频率限制 now datetime.now() window_start now - timedelta(minutesRATE_LIMIT[window_minutes]) # 清理过期记录 valid_requests [ req_time for req_time in request_history[user_id] if req_time window_start ] request_history[user_id] valid_requests # 检查是否超限 if len(valid_requests) RATE_LIMIT[max_requests]: return False # 记录本次请求 request_history[user_id].append(now) return True # 在 wechat_webhook 函数中添加限流检查 def wechat_webhook(): # ... 之前的代码 ... user_id message_info.get(sender, ) # 检查频率限制 if not check_rate_limit(user_id): wechat_handler.send_text_message(请求过于频繁请稍后再试) return jsonify({errcode: 0, errmsg: ok}) # ... 原来的处理逻辑 ...7.3 添加消息队列可选如果并发量较大可以考虑使用消息队列。这里以Redis为例# 安装Redis客户端 # pip install redis import redis import json from threading import Thread # 初始化Redis连接 redis_client redis.Redis(hostlocalhost, port6379, db0) def process_message_async(message_data: dict): 异步处理消息 # 这里放入原来的消息处理逻辑 # 使用Redis队列实现异步 # 将消息放入队列 redis_client.rpush(ai_requests, json.dumps(message_data)) # 启动处理线程 Thread(targetprocess_queue, daemonTrue).start() def process_queue(): 处理消息队列 while True: # 从队列获取消息 message_json redis_client.blpop(ai_requests, timeout30) if message_json: message_data json.loads(message_json[1]) # 处理消息... # 调用百川API发送回复等8. 监控与维护8.1 查看服务日志# 查看应用日志 tail -f /root/wechat-bot/logs/app.log # 查看Supervisor日志 tail -f /root/wechat-bot/logs/supervisor.log # 查看错误日志 tail -f /root/wechat-bot/logs/supervisor_error.log8.2 监控服务状态创建一个监控脚本/root/wechat-bot/monitor.sh#!/bin/bash # 监控脚本 SERVICE_NAMEwechat-bot LOG_FILE/root/wechat-bot/logs/monitor.log HEALTH_URLhttp://127.0.0.1:5000/health log() { echo [$(date %Y-%m-%d %H:%M:%S)] $1 $LOG_FILE } check_service() { # 检查进程 if supervisorctl status $SERVICE_NAME | grep -q RUNNING; then log 服务 $SERVICE_NAME 运行正常 # 检查健康接口 if curl -s --max-time 5 $HEALTH_URL /dev/null; then log 健康检查通过 return 0 else log 健康检查失败 return 1 fi else log 服务 $SERVICE_NAME 未运行 return 2 fi } restart_service() { log 尝试重启服务 $SERVICE_NAME supervisorctl restart $SERVICE_NAME sleep 5 if check_service; then log 服务重启成功 else log 服务重启失败需要人工干预 # 可以在这里添加邮件或微信通知 fi } # 主循环 while true; do if ! check_service; then restart_service fi sleep 60 # 每分钟检查一次 done给脚本执行权限并运行chmod x /root/wechat-bot/monitor.sh nohup /root/wechat-bot/monitor.sh /dev/null 21 8.3 性能优化建议如果你的团队使用频繁可以考虑以下优化启用流式输出 修改百川API调用使用流式输出让用户看到生成过程。添加缓存 对常见问题添加缓存减少模型调用。使用GPU内存优化 如果显存不足可以调整模型参数MODEL_CONFIG { max_tokens: 256, # 减少生成长度 temperature: 0.5, # 降低随机性 }负载均衡 如果用户量很大可以部署多个实例使用Nginx做负载均衡。9. 总结通过这个项目我们成功将百川2-13B大模型接入了企业微信打造了一个团队内部的AI问答助手。让我们回顾一下关键点9.1 项目成果低成本部署使用4bits量化版本只需要10GB显存普通显卡就能跑无缝集成与企业微信深度集成团队成员在群里就能直接使用智能对话支持多轮对话能记住上下文回答专业问题稳定可靠使用Supervisor管理自动重启7x24小时服务易于扩展模块化设计可以轻松添加新功能9.2 实际效果在实际使用中这个系统能够快速回答技术问题数据库优化、代码调试、架构设计等辅助文档编写API文档、技术方案、会议纪要等代码审查助手检查代码问题提出改进建议学习辅导工具解释复杂概念提供学习资源9.3 部署建议对于不同规模的团队我有以下建议小团队10人直接使用本文的方案单台服务器部署定期备份对话历史中型团队10-50人添加用户认证和权限控制实现消息队列提高并发处理能力添加使用统计和监控大型团队50人使用负载均衡部署多个实例添加Redis缓存提高响应速度实现完整的日志分析和审计功能9.4 遇到的坑和解决方案在实施过程中你可能会遇到这些问题企业微信回调超时企业微信要求5秒内响应如果AI生成时间过长可以先返回正在思考然后用异步方式推送结果。显存不足如果同时多人使用导致显存不足可以限制并发数使用更小的模型添加排队机制回答质量不稳定调整模型参数MODEL_CONFIG { temperature: 0.3, # 更稳定的回答 top_p: 0.8, max_tokens: 384, # 更简洁的回答 }敏感内容过滤在企业微信中可能需要添加内容过滤def filter_sensitive_content(text: str) - str: # 添加你的过滤逻辑 sensitive_keywords [...] # 敏感词列表 for keyword in sensitive_keywords: if keyword in text: return 抱歉这个问题我无法回答。 return text9.5 下一步优化方向如果你想让这个系统更强大可以考虑多模型支持除了百川还可以接入其他模型让用户选择文件处理支持上传图片、文档让AI分析文件内容知识库集成连接公司内部文档回答更准确语音交互支持语音输入和输出数据分析统计使用情况了解团队最常问的问题类型这个项目的核心价值在于它把强大的AI能力带到了团队日常沟通的最前线。不再是需要打开网页、登录系统的额外操作而是在工作流中无缝集成。当技术讨论遇到瓶颈时一下AI助手几秒钟就能得到专业建议这种体验的改变是革命性的。最重要的是整个方案都是开源的你可以完全掌控根据团队需求定制。从今天开始让你的团队沟通更智能、更高效吧获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。