千问3.5-2B实战教程:用Flask封装为微服务,支持Webhook回调与异步任务队列

张开发
2026/4/16 22:52:16 15 分钟阅读

分享文章

千问3.5-2B实战教程:用Flask封装为微服务,支持Webhook回调与异步任务队列
千问3.5-2B实战教程用Flask封装为微服务支持Webhook回调与异步任务队列1. 项目背景与目标千问3.5-2B是Qwen系列的小型视觉语言模型能够理解图片内容并生成文本响应。虽然官方提供了网页交互界面但在实际业务场景中我们通常需要将其集成到现有系统中。本教程将教你如何用Flask将模型封装为RESTful API微服务实现Webhook回调机制处理异步任务使用Redis构建任务队列提高并发能力添加基础认证和请求限流等生产级功能完成本教程后你将获得一个可投入生产的视觉理解微服务能够处理高并发请求并支持异步回调通知。2. 环境准备与快速部署2.1 基础环境要求Python 3.8Redis 5.0CUDA 11.7 (如需GPU加速)至少8GB空闲内存2.2 安装依赖# 创建虚拟环境 python -m venv venv source venv/bin/activate # 安装核心依赖 pip install flask redis celery qwen-vl # 可选安装GPU加速组件 pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu1172.3 快速启动服务# 启动Redis服务 redis-server --daemonize yes # 启动Celery worker celery -A app.celery worker --loglevelinfo # 启动Flask应用 python app.py3. 核心代码实现3.1 Flask应用骨架创建app.py文件from flask import Flask, request, jsonify from werkzeug.security import generate_password_hash, check_password_hash from functools import wraps import redis from celery import Celery app Flask(__name__) app.config[SECRET_KEY] your-secret-key-here # Redis配置 redis_client redis.StrictRedis(hostlocalhost, port6379, db0) # Celery配置 app.config[CELERY_BROKER_URL] redis://localhost:6379/0 app.config[CELERY_RESULT_BACKEND] redis://localhost:6379/0 celery Celery(app.name, brokerapp.config[CELERY_BROKER_URL]) celery.conf.update(app.config) # 加载千问3.5-2B模型 from qwen_vl import QWenVL model QWenVL(devicecuda if torch.cuda.is_available() else cpu)3.2 实现基础认证# 模拟用户数据库 users { admin: generate_password_hash(strongpassword) } def token_required(f): wraps(f) def decorated(*args, **kwargs): auth request.headers.get(Authorization) if not auth or not auth.startswith(Bearer ): return jsonify({error: Missing or invalid token}), 401 token auth.split( )[1] if not check_password_hash(users[admin], token): return jsonify({error: Invalid token}), 401 return f(*args, **kwargs) return decorated3.3 同步API端点app.route(/api/v1/sync_predict, methods[POST]) token_required def sync_predict(): # 限流检查 client_ip request.remote_addr if redis_client.get(frate_limit:{client_ip}): return jsonify({error: Too many requests}), 429 redis_client.setex(frate_limit:{client_ip}, 60, 1) # 获取请求数据 image_file request.files.get(image) prompt request.form.get(prompt, 请描述这张图片) # 调用模型 try: response model.generate(image_file, prompt) return jsonify({ status: success, result: response }) except Exception as e: return jsonify({error: str(e)}), 5003.4 异步任务处理创建tasks.py文件from app import celery, model import requests celery.task(bindTrue) def async_predict_task(self, image_url, prompt, callback_url): try: # 下载图片 response requests.get(image_url) image_data response.content # 调用模型 result model.generate(image_data, prompt) # 回调通知 if callback_url: requests.post(callback_url, json{ task_id: self.request.id, status: completed, result: result }) return result except Exception as e: if callback_url: requests.post(callback_url, json{ task_id: self.request.id, status: failed, error: str(e) }) raise3.5 异步API端点app.route(/api/v1/async_predict, methods[POST]) token_required def async_predict(): data request.json image_url data.get(image_url) prompt data.get(prompt, 请描述这张图片) callback_url data.get(callback_url) if not image_url: return jsonify({error: image_url is required}), 400 # 提交异步任务 task async_predict_task.apply_async( args[image_url, prompt, callback_url] ) return jsonify({ status: queued, task_id: task.id, check_status_url: f/api/v1/task_status/{task.id} }), 2023.6 任务状态查询app.route(/api/v1/task_status/task_id, methods[GET]) token_required def task_status(task_id): task async_predict_task.AsyncResult(task_id) if task.state PENDING: response { state: task.state, status: Pending... } elif task.state ! FAILURE: response { state: task.state, result: task.result } else: response { state: task.state, error: str(task.info) } return jsonify(response)4. 生产环境部署建议4.1 使用Gunicorn运行Flask应用pip install gunicorn gunicorn -w 4 -b :5000 app:app4.2 配置Supervisor管理进程创建/etc/supervisor/conf.d/qwen_vl.conf:[program:qwen_vl_api] command/path/to/venv/bin/gunicorn -w 4 -b :5000 app:app directory/path/to/your/project userwww-data autostarttrue autorestarttrue stderr_logfile/var/log/qwen_vl/api.err.log stdout_logfile/var/log/qwen_vl/api.out.log [program:qwen_vl_worker] command/path/to/venv/bin/celery -A app.celery worker --loglevelinfo directory/path/to/your/project userwww-data autostarttrue autorestarttrue stderr_logfile/var/log/qwen_vl/worker.err.log stdout_logfile/var/log/qwen_vl/worker.out.log4.3 Nginx反向代理配置server { listen 80; server_name yourdomain.com; location / { proxy_pass http://localhost:5000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } location /static/ { alias /path/to/your/project/static/; } }5. 性能优化技巧5.1 模型预热在服务启动时预先加载模型# 在app.py中添加 app.before_first_request def warm_up_model(): # 使用小图片进行预热 dummy_image Image.new(RGB, (100, 100), colorwhite) model.generate(dummy_image, 这是一张测试图片)5.2 请求批处理对于高并发场景可以实现批处理APIapp.route(/api/v1/batch_predict, methods[POST]) token_required def batch_predict(): tasks request.json.get(tasks, []) if not tasks or len(tasks) 10: # 限制最大批处理数量 return jsonify({error: Invalid batch size}), 400 results [] for task in tasks: image_url task.get(image_url) prompt task.get(prompt, 请描述这张图片) try: response requests.get(image_url) result model.generate(response.content, prompt) results.append({ status: success, result: result }) except Exception as e: results.append({ status: failed, error: str(e) }) return jsonify({results: results})5.3 结果缓存使用Redis缓存常见请求的结果from hashlib import md5 def get_cache_key(image_url, prompt): key f{image_url}:{prompt} return md5(key.encode()).hexdigest() app.route(/api/v1/cached_predict, methods[POST]) token_required def cached_predict(): data request.json image_url data.get(image_url) prompt data.get(prompt, 请描述这张图片) cache_key get_cache_key(image_url, prompt) cached_result redis_client.get(cache_key) if cached_result: return jsonify({ status: success, result: cached_result.decode(), cached: True }) # 无缓存则处理请求 response requests.get(image_url) result model.generate(response.content, prompt) # 缓存结果1小时过期 redis_client.setex(cache_key, 3600, result) return jsonify({ status: success, result: result, cached: False })6. 总结与扩展建议6.1 项目回顾通过本教程我们实现了基于Flask的RESTful API封装Webhook回调的异步任务处理CeleryRedis的分布式任务队列生产级的安全与性能优化6.2 扩展方向模型微调针对特定领域数据微调模型多模型支持添加模型路由和负载均衡监控系统集成Prometheus监控指标自动扩缩容基于队列长度自动调整worker数量6.3 最佳实践建议为不同客户端设置不同的速率限制实现请求签名验证防止API滥用定期清理Redis中的过期任务数据为长时间运行的任务添加心跳检测获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

更多文章