AI在线 AI在线

实战攻略:使用KubeMQ简化多LLM集成流程

作者:核子可乐
2025-03-18 08:00
译者 | 核子可乐审校 | 重楼将多个大语言模型集成至应用程序当中往往是项艰巨的挑战,各类不同API及通信协议的协同处理,以及如何确保请求路由的复杂性难题往往令人望而生畏。 好在可以使用消息代理与路由机制更优雅地解决此类问题,在解决痛点的同时实现多个关键优势。 本文将向大家介绍具体操作步骤。

译者 | 核子可乐

审校 | 重楼

将多个大语言模型集成至应用程序当中往往是项艰巨的挑战,各类不同API及通信协议的协同处理,以及如何确保请求路由的复杂性难题往往令人望而生畏。

实战攻略:使用KubeMQ简化多LLM集成流程

好在可以使用消息代理与路由机制更优雅地解决此类问题,在解决痛点的同时实现多个关键优势。

本文将向大家介绍具体操作步骤。这里以KubeMQ为例,配合代码示例来指导大家逐步建立一套可与OpenAI及Anthropic Claude交互的路由体系。

使用消息代理作为大模型路由工具的主要优势

1. 简化集成

通过使用消息代理作为路由机制,我们可以将不同大模型API交互所涉及的复杂性抽象出来,从而简化客户端代码并降低出错几率。

2. 多模型用例

消息代理能够实现多模型或专门用于不同任务的模型间的通信(如一个模型用于摘要,另一模型用于情绪分析)。其可以确保请求被有效路由至适当模型,使得应用程序能够利用各模型的优势且无需额外开销。

3. 批处理与大规模推理

对于需要批处理或大规模推理任务的应用程序,消息代理通过在大模型繁忙或不可用时建立请求队列,从而实现异步处理。这将确保不会丢失任何数据或请求,即使是在繁重的工作负载下也能提供可靠的处理响应。

4. 冗余与回退保证

对于特别关注正常运行时间的用例,消息代理可确保无缝回退至替代环境。例如,如果与提供OpenAI模型的某家云服务商发生连接失败,KubeMQ可自动切换至另一服务商。这样的冗余设计保证AI不间断操作,有助于增强服务可靠性与客户满意度。

5. 处理高流量应用程序

消息代理能够将传入的请求分发至多个大模型实例或副本,防止过载并确保平衡运行。这种负载均衡设计对于高流量应用程序至关重要,可使其在不影响性能的前提下有效扩展。

使用KubeMQ建立大模型路由机制:集成OpenAI与Claude

现在,我们将分步了解如何使用KubeMQ设置能够与OpenAI和Anthropic Claude交互的路由机制。

全部示例代码均保存在KubeMQ的GitHub repo当中(https://github.com/kubemq-io/kubemq-llm-router)。

准备工作

在开始之前,请确保你已安装以下内容:

  • Python 3.7或更高版本。
  • 本地安装Docker。
  • 拥有有效的OpenAI和Anthropic API密钥。
  • KubeMQ令牌(可从KubeMQ官网处获取)。
  • kubemq-cq Python包:
复制
Plain Text
pip install kubemq-cq
  • .env文件中包含你的AIP密钥:
复制
Plain Text
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key

设置KubeMQ

首先,我们需要确保KubeMQ能够正常运行。这里使用Docker进行部署:

复制
Shell
docker run -d --rm \
 -p 8080:8080 \
 -p 50000:50000 \
 -p 9090:9090 \
 -e KUBEMQ_TOKEN="your_token" \
 kubemq/kubemq-community:latest

端口说明:

  • 8080 – 公开KubeMQ REST API
  • 50000 – 打开 gRPC端口以进行实施意见-服务器通信
  • 9090 – 公开KubeMQ REST网关

注意: 将 your_token部分替换为你的真实KubeMQ令牌。

创建大模型路由服务器

大模型路由将充当客户端与大模型之间的中介,负责监听特定渠道的查询并将其路由至适当的大模型。

server.py

复制
Python
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading

load_dotenv()

class LLMRouter:
 def __init__(self):
 self.openai_llm = ChatOpenAI(
 api_key=os.getenv("OPENAI_API_KEY"),
 model_name="gpt-3.5-turbo"
 )
 self.claude_llm = Anthropic(
 api_key=os.getenv("ANTHROPIC_API_KEY"),
 model="claude-3"
 )
 self.client = Client(address="localhost:50000")

 def handle_openai_query(self, request: QueryMessageReceived):
 try:
 message = request.body.decode('utf-8')
 result = self.openai_llm(message)
 response = QueryResponseMessage(
 query_received=request,
 is_executed=True,
 body=result.encode('utf-8')
 )
 self.client.send_response_message(response)
 except Exception as e:
 self.client.send_response_message(QueryResponseMessage(
 query_received=request,
 is_executed=False,
 error=str(e)
 ))

 def handle_claude_query(self, request: QueryMessageReceived):
 try:
 message = request.body.decode('utf-8')
 result = self.claude_llm(message)
 response = QueryResponseMessage(
 query_received=request,
 is_executed=True,
 body=result.encode('utf-8')
 )
 self.client.send_response_message(response)
 except Exception as e:
 self.client.send_response_message(QueryResponseMessage(
 query_received=request,
 is_executed=False,
 error=str(e)
 ))

 def run(self):
 def on_error(err: str):
 print(f"Error: {err}")

 def subscribe_openai():
 self.client.subscribe_to_queries(
 subscription=QueriesSubscription(
 channel="openai_requests",
 on_receive_query_callback=self.handle_openai_query,
 on_error_callback=on_error,
 ),
 cancel=CancellationToken()
 )

 def subscribe_claude():
 self.client.subscribe_to_queries(
 subscription=QueriesSubscription(
 channel="claude_requests",
 on_receive_query_callback=self.handle_claude_query,
 on_error_callback=on_error,
 ),
 cancel=CancellationToken()
 )

 threading.Thread(target=subscribe_openai).start()
 threading.Thread(target=subscribe_claude).start()

 print("LLM Router running on channels: openai_requests, claude_requests")
 try:
 while True:
 time.sleep(1)
 except KeyboardInterrupt:
 print("Shutting down...")

if __name__ == "__main__":
 router = LLMRouter()
 router.run()

说明:

  • 初始化。

A.为API密钥加载环境变量。

B.初始化OpenAI和Anthropic大模型的客户端。

C.设置KubeMQ客户端。

  • 处理查询。

A.handle_openai_query和 handle_claude_query负责解码传入消息,将其传递给相应大模型,而后发回响应。

B.捕捉错误并将 is_executed 标记设置为 False。

  • 订阅。

A.此路由将订阅两个小道:openai_requests 和 claude_requests。

B.使用线程并行处理订阅。

  • 运行服务器。

A.run方法启动订阅并保持服务器运行,直至中断。

开发大模型客户端

客户端向大模型路由发送查询,指定要使用的模型。

client.py

复制
Python
from kubemq.cq import Client, QueryMessage
import json

class LLMClient:
 def __init__(self, address="localhost:50000"):
 self.client = Client(address=address)

 def send_message(self, message: str, model: str) -> dict:
 channel = f"{model}_requests"
 response = self.client.send_query_request(QueryMessage(
 channel=channel,
 body=message.encode('utf-8'),
 timeout_in_seconds=30
 ))
 if response.is_error:
 return {"error": response.error}
 else:
 return {"response": response.body.decode('utf-8')}

if __name__ == "__main__":
 client = LLMClient()
 models = ["openai", "claude"]
 message = input("Enter your message: ")
 model = input(f"Choose model ({'/'.join(models)}): ")
 if model in models:
 response = client.send_message(message, model)
 if "error" in response:
 print(f"Error: {response['error']}")
 else:
 print(f"Response: {response['response']}")
 else:
 print("Invalid model selected")

说明:

  • 初始化。

A.设置KubeMQ客户端。

  • 发送消息。

A.send_message 方法根据所选模型构建适当通道。

B.向路由发送查询消息并等待响应。

C.处理错误并解码响应主体。

  • 用户交互。

A.提示用户输入消息并选择模型。

B.从大模型处输出响应。

通过REST发送和接收

对于倾向或需要RESTful通信的服务或客户端,KubeMQ亦可提供REST端点。

通过REST发送请求

端点:

复制
Plain Text
POST http://localhost:9090/send/request

标头:

复制
Plain Text
Content-Type: application/json

实体:

复制
JSON
{
 "RequestTypeData": 2,
 "ClientID": "LLMRouter-sender",
 "Channel": "openai_requests",
 "BodyString": "What is the capital of France?",
 "Timeout": 30000
}

负载细节:

  • RequestTypeData – 指定请求类型(查询为2)。
  • ClientID – 发送请求的客户端标识符。
  • Channel – 与大模型(openai_requests或claude_requests)对应的通道。
  • BodyString – 要发送至大模型的消息。
  • Timeout – 等待响应的时间(单位为毫秒)。

接收响应

响应是一个包含大模型输出或错误消息的JSON对象。

总结

在消息代理(KubeMQ)的帮助下,我们建立起可扩展且高效的路由机制,能够与多个大模型进行交互。此设置允许客户端无缝向不同模型发送查询,并可扩展以引入更多模型或功能。

这种方法的好处包括:

  1. 简化集成。大家可以将与不同大模型API交互与涉及的复杂性抽象出来,简化客户端代码并降低出错几率。
  2. 多模型支持。有效将请求路由至专门用于不同任务的适当模型。
  3. 可靠性。确保在大模型繁忙或不可用时,数据不致丢失。
  4. 冗余。提供后备机制以保持不间断操作。
  5. 可扩展性。通过在多个大模型实例间分配请求以应对高流量需求。
原文标题:Simplifying Multi-LLM Integration With KubeMQ,作者:John Vester

相关标签:

相关资讯

麻省理工科技评论:2025年AI五大趋势

随着人工智能技术的迅猛发展,对其未来“走向”的准确预测变得尤为复杂。 尽管如此,鉴于人工智能正在深刻地改变着各行各业,持续关注并理解其发展趋势对于科技从业者、研究学者以及行业分析师来说至关重要。 2025年,预计人工智能将在众多领域扮演更加核心的角色,推动生产力提升和行业创新。
2/17/2025 11:16:28 AM
佚名

OpenAI重磅剧透:GPT-5“很快推出”,CPO预测年内AI代码自动化将达99%!

近日,OpenAI 首席产品官(CPO)Kevin Weil 在一次备受关注的访谈中透露了多个重磅消息,引发业界广泛关注。 最引人瞩目的是关于备受期待的 GPT-5以及 AI 代码自动化进程的预测。 对于 GPT-5的发布时间,Kevin Weil 虽然没有给出具体日期,但他明确表示:“我不会给你具体时间,但 GPT-5会很快出现。
3/17/2025 11:19:00 AM
AI在线

用Ray观测和监控大语言模型工作负载

译者 | 布加迪审校 | 重楼前言GPT-4、PHI2、BERT和T5等大语言模型(LLM)的出现已彻底改变了自然语言处理,这些模型支持高端应用程序,包括聊天机器人、推荐系统和分析。 然而,LLM中工作负载的规模和复杂性使得保证性能和可靠性成了一大挑战。 在这种情况下,在使用Ray等框架部署工作负载的同时进行监控和观测显得非常必要。
1/27/2025 8:23:41 AM
布加迪