如何通过 Python 实现一个消息队列,为在线客服系统与海外运营的APP对接
我在业余时间开发了一款自己的独立产品:升讯威在线客服与营销系统。陆陆续续开发了几年,从一开始的偶有用户尝试,到如今线上环境和私有化部署均有了越来越多的稳定用户。而我收到的用户需求也越来越多,产品化的需求,个性化的需求都有。这段时间收到一个海外 APP 的对接需求,需要我将客服系统的消息以队列的形式转发到对方的业务服务器上。
对方有两个核心需求:
[*]访客上线的时候,要通知对方的业务系统,业务系统根据访客的身份信息,推送个性化的欢迎词。
[*]访客完成下单的时候,要能推送一个下单成功的通知,并且包含订单信息和链接。
根据这两个需求,那就需要实现由客服系统到业务系统的消息队列推送,以及通过 Open Api 开放接口,以队列的形式接收对方业务系统的消息。
<hr>什么是消息队列,以及使用消息队列的好处这些基础知识,这里就不再赘述,本文重点讲一讲如何用 python 实现一个消息队列。
要用 Python 实现一个消息队列,你可以使用内置的 queue 模块来创建一个简单的队列,或者使用第三方库如 RabbitMQ、Redis 或者 Kafka 来实现更复杂的分布式消息队列。
如何通过 python 实现消息队列
1. 使用 Python 内置的 queue.Queue(适用于单机应用)
queue.Queue 提供了线程安全的队列操作,适合在多线程应用中使用。
import queueimport threadingimport time# 创建一个先进先出(FIFO)队列msg_queue = queue.Queue()# 生产者线程def producer(): for i in range(5): time.sleep(1)# 模拟一些处理 msg = f"消息{i}" msg_queue.put(msg)# 将消息放入队列 print(f"生产者放入:{msg}")# 消费者线程def consumer(): while True: msg = msg_queue.get()# 从队列获取消息 if msg is None:# 终止条件 break print(f"消费者处理:{msg}") msg_queue.task_done()# 标记任务已完成# 创建生产者和消费者线程producer_thread = threading.Thread(target=producer)consumer_thread = threading.Thread(target=consumer)# 启动线程producer_thread.start()consumer_thread.start()# 等待生产者线程完成producer_thread.join()# 向消费者线程发送终止信号msg_queue.put(None)# 等待消费者线程完成consumer_thread.join()2. 使用 Redis(适用于分布式应用)
Redis 是一个高效的内存数据存储,可以用作分布式消息队列。你可以使用 redis-py 库与 Redis 进行交互。
pip install redisimport redisimport time# 创建 Redis 连接r = redis.StrictRedis(host='localhost', port=6379, db=0)# 生产者:将消息放入队列def producer(): for i in range(5): time.sleep(1)# 模拟一些处理 msg = f"消息{i}" r.lpush('msg_queue', msg)# 将消息推送到队列 print(f"生产者放入:{msg}")# 消费者:从队列中获取消息def consumer(): while True: msg = r.brpop('msg_queue').decode('utf-8')# 从队列中获取消息 print(f"消费者处理:{msg}")# 启动生产者和消费者producer()consumer()3. 使用 RabbitMQ(适用于更复杂的消息传递)
RabbitMQ 是一个强大的消息代理系统,支持多种消息队列模式。如果需要高度可靠和高性能的消息队列,可以使用 pika 库连接 RabbitMQ。
pip install pikaimport pika# 连接到 RabbitMQconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 声明一个队列channel.queue_declare(queue='msg_queue')# 生产者:发送消息def producer(): for i in range(5): msg = f"消息{i}" channel.basic_publish(exchange='', routing_key='msg_queue', body=msg) print(f"生产者发送:{msg}")# 消费者:接收并处理消息def consumer(ch, method, properties, body): print(f"消费者处理:{body.decode('utf-8')}")# 启动消费者channel.basic_consume(queue='msg_queue', on_message_callback=consumer, auto_ack=True)# 启动生产者和消费者producer()print('等待消息...')channel.start_consuming()选择合适的实现
[*]如果你是单机应用并且需要线程安全的队列,使用 queue.Queue。
[*]如果你的应用是分布式的,使用 Redis 或 RabbitMQ 更加合适,它们提供了高可用性、消息持久化和可靠的消息传递机制。
如何把 http 请求放在队列中执行
将 HTTP 请求放入队列并异步执行的场景通常用于处理高并发、后台任务、延迟任务等情况。你可以使用消息队列系统(如 queue.Queue、Redis 或 RabbitMQ)来将 HTTP 请求放入队列,消费队列中的任务并执行相应的 HTTP 请求。
这里我会展示几种不同的实现方式,供你参考。
1. 使用 queue.Queue 和 requests 库
你可以将 HTTP 请求封装为任务,并将其放入队列中,然后使用多个消费者线程异步处理队列中的请求。
import queueimport threadingimport timeimport requests# 创建一个队列task_queue = queue.Queue()# HTTP 请求任务处理函数def handle_request(): while True: url = task_queue.get()# 从队列中获取任务 if url is None:# 终止条件 break try: response = requests.get(url) print(f"请求 {url} 的响应状态: {response.status_code}") except Exception as e: print(f"请求 {url} 失败: {e}") task_queue.task_done()# 标记任务完成# 生产者:将 HTTP 请求放入队列def producer(): urls = [ "https://jsonplaceholder.typicode.com/posts/1", "https://jsonplaceholder.typicode.com/posts/2", "https://jsonplaceholder.typicode.com/posts/3" ] for url in urls: print(f"将 URL {url} 放入队列") task_queue.put(url) time.sleep(1)# 模拟任务产生的延迟# 创建多个消费者线程consumer_threads = []for i in range(3): t = threading.Thread(target=handle_request) t.start() consumer_threads.append(t)# 启动生产者线程producer_thread = threading.Thread(target=producer)producer_thread.start()# 等待生产者线程完成producer_thread.join()# 向消费者线程发送终止信号for _ in range(3): task_queue.put(None)# 等待消费者线程完成for t in consumer_threads: t.join()2. 使用 Redis 和 requests 库
Redis 可以作为一个分布式的消息队列,适用于分布式系统中将 HTTP 请求放入队列并异步执行。你可以使用 Redis 的列表数据结构(lpush、brpop)来实现。
import redisimport requestsimport time# 创建 Redis 连接r = redis.StrictRedis(host='localhost', port=6379, db=0)# 生产者:将 HTTP 请求放入队列def producer(): urls = [ "https://jsonplaceholder.typicode.com/posts/1", "https://jsonplaceholder.typicode.com/posts/2", "https://jsonplaceholder.typicode.com/posts/3" ] for url in urls: print(f"将 URL {url} 放入 Redis 队列") r.lpush('task_queue', url) time.sleep(1)# 模拟任务产生的延迟# 消费者:从队列中获取请求并执行def consumer(): while True: url = r.brpop('task_queue').decode('utf-8')# 从队列中获取任务 try: response = requests.get(url) print(f"请求 {url} 的响应状态: {response.status_code}") except Exception as e: print(f"请求 {url} 失败: {e}")# 启动生产者和消费者producer_thread = threading.Thread(target=producer)consumer_thread = threading.Thread(target=consumer)producer_thread.start()consumer_thread.start()# 等待生产者线程完成producer_thread.join()# 由于 Redis 队列会一直阻塞等待任务,可以根据需要添加退出逻辑3. 使用 RabbitMQ 和 requests 库
RabbitMQ 提供了强大的消息队列机制,适合用于大规模的消息传递。你可以创建一个任务队列,将 HTTP 请求放入队列中,并通过消费者处理队列中的请求。
import pikaimport requestsimport time# 连接到 RabbitMQconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 声明队列channel.queue_declare(queue='http_requests')# 生产者:将 HTTP 请求放入队列def producer(): urls = [ "https://jsonplaceholder.typicode.com/posts/1", "https://jsonplaceholder.typicode.com/posts/2", "https://jsonplaceholder.typicode.com/posts/3" ] for url in urls: print(f"将 URL {url} 放入 RabbitMQ 队列") channel.basic_publish(exchange='', routing_key='http_requests', body=url) time.sleep(1)# 模拟任务产生的延迟# 消费者:处理 HTTP 请求def consumer(ch, method, properties, body): url = body.decode('utf-8') try: response = requests.get(url) print(f"请求 {url} 的响应状态: {response.status_code}") except Exception as e: print(f"请求 {url} 失败: {e}")# 启动消费者channel.basic_consume(queue='http_requests', on_message_callback=consumer, auto_ack=True)# 启动生产者producer_thread = threading.Thread(target=producer)producer_thread.start()# 启动消费者并等待消息print('等待消费者处理 HTTP 请求...')producer_thread.join()channel.start_consuming()4. 使用 Celery 异步任务队列
Celery 是一个强大的异步任务队列,适用于分布式任务执行。通过 Celery,你可以把 HTTP 请求封装为任务,放入队列中进行异步执行。
首先,你需要安装 Celery 和 requests:
pip install celery requests然后在 celery.py 中配置 Celery:
from celery import Celeryimport requestsapp = Celery('http_requests', broker='redis://localhost:6379/0')@app.taskdef fetch_url(url): try: response = requests.get(url) print(f"请求 {url} 的响应状态: {response.status_code}") except Exception as e: print(f"请求 {url} 失败: {e}")然后在主程序中提交任务:
from celery import Celeryfrom celery.py import fetch_url# 添加任务到队列fetch_url.apply_async(args=["https://jsonplaceholder.typicode.com/posts/1"])fetch_url.apply_async(args=["https://jsonplaceholder.typicode.com/posts/2"])fetch_url.apply_async(args=["https://jsonplaceholder.typicode.com/posts/3"])启动 Celery Worker:
celery -A celery worker --loglevel=info总结
[*]queue.Queue:适用于单机和多线程环境,可以通过队列异步执行 HTTP 请求。
[*]Redis:适用于分布式环境,将 HTTP 请求放入 Redis 队列,多个消费者异步执行。
[*]RabbitMQ:适合高并发任务和消息传递的分布式环境,使用队列来管理 HTTP 请求。
[*]Celery:适用于大规模异步任务队列的场景,可以使用 Redis 或其他消息中间件作为代理。
简介下这个 .net 开发的小系统
https://kf.shengxunwei.com/
升讯威在线客服与营销系统是一款客服软件,但更重要的是一款营销利器。
[*]可以追踪正在访问网站或使用 APP 的所有访客,收集他们的浏览情况,使客服能够主动出击,施展话术,促进成单。
[*]可嵌入网站、手机 APP、公众号、或者通过 URL 地址直接联系客服。
[*]支持访客信息互通,可传输访客标识、名称和其它任意信息到客服系统,与您的业务系统对接。
[*]可全天候 7 × 24 小时挂机运行,网络中断,拔掉网线,手机飞行模式,不掉线不丢消息,欢迎实测。
希望能够打造: 开放、开源、共享。努力打造 .net 社区的一款优秀开源产品。
钟意的话请给个赞支持一下吧,谢谢~
页:
[1]