上篇文章介绍到的 Dify+RAGFLow 的协同使用文章里,提到了一个泵类设备预测性维护智能系统。后来陆续有人私信咨询实施细节,这篇做个统一的介绍。
Dify+RAGFlow:1+1>2的混合架构,详细教程+实施案例
项目定位是,利用 Dify 的工作流编排能力和 RAGFlow 的知识库组件,结合模拟的设备传感器数据 (IoT) 和企业资源数据 (CMMS, MES, ERP),构建一个针对离心式冷却液泵的预测性维护系统原型。
注:为遵守保密协议,本文及相关代码在原项目基础上使用了模拟数据和接口进行演示。所展示的工作流设计、节点交互等与实际项目逻辑相似,仅供各位参考。
以下,enjoy:
1、项目背景
作者早年从事金融信贷领域时,在长珠三角走访过大大小小几百家工厂,其中以机械加工、注塑行业为主,但不管什么细分行业,设备的稳定运行对企业来说都是日常经营的头等大事。尤其是像 CNC 加工中心使用的冷却液泵这类关键辅助设备,意外停机会导致生产中断,直接影响订单排查计划。
传统的定期维护 (Preventive Maintenance, PM) 往往过于保守或未能及时发现潜在问题,而事后维修 (Corrective Maintenance, CM) 成本高昂。这篇要介绍的预测性维护 (Predictive Maintenance, PdM) 主要目的是通过监测离心泵设备状态,在故障实际发生前安排维护,最大限度地减少停机时间、降低维护成本、提高设备利用率。
复制泵类型: 离心式冷却液泵 (Centrifugal Coolant Pump) 泵型号 : CoolFlow Pro 5000 设备 ID: PUMP-CNC-001 使用场景: 安装在一台中型 CNC 立式加工中心 (Machine ID: VMC-03) 上,用于循环供给水基切削液,冷却刀具和工件。该机床主要加工钢材和铝合金零件,实行两班倒工作制 (16小时/天)。泵已运行约 18 个月。 关键监测参数: 振动 (mm/s RMS), 温度 (°C), 出口压力 (Bar), 流量 (L/min)。
2、数据构成
参照真实的业务场景,我构建了一套模拟 API (mock_api.py) 和相应的 JSON 数据文件,代表不同系统的数据源:
1、IoT 数据 (`IOT.json` & `/api/pump/status`): * 包含泵的实时传感器读数,如时间戳 (`timestamp`)、轴向/径向振动 (`vibration_axial`/`radial`)、电机/轴承温度 (`temperature_motor`/`bearing`)、出口压力 (`pressure_outlet`)、流量 (`flow_rate`)。 * 模拟了数据随时间变化,包含正常和逐渐异常的状态。 2、MES 数据 (`MES.json` & `/api/mes/pump/operation`): * 提供泵的运行日志,关联的设备 (`associated_machineId`),运行小时数 (`operating_hours`),处理的工单 (`jobs_processed`) 等。 * 有助于了解泵的工作负载和上下文。 3、CMMS 数据 (`CMMS.json` & `/api/cmms/pump/history`): * 包含历史维护记录 (`maintenance_records`):工单号、日期、类型、描述、更换部件、停机时间等。 * 包含相关故障信息 (`related_failures_info`):其他泵(或本机历史)的故障案例,包括症状、故障模式、根本原因、纠正措施。这是重要的知识来源。 4、ERP 数据 (`ERP.json` & `/api/erp/spare_parts`): * 提供备件信息:零件 ID (`partId`)、名称 (`name`)、描述、适用泵型号、供应商、库存水平 (`stock_level`)、单价 (`unit_price`)、采购提前期 (`lead_time_days`)。 * 用于生成维护建议时考虑备件可用性。 模拟维护完成后,实际执行情况的反馈数据,用于评估预测准确性和闭环优化。
3、工作流整体思路
由 RAGFlow 负责处理知识库,工作流基于 Dify 构建,核心思路是:
状态监测 -> 异常判断 -> 深度分析与建议 -> 报告生成。
获取状态: 通过 GET_IOT_DATA 节点调用模拟 API 获取指定 pumpId 的最新传感器数据。
解析数据: 使用 CODE 节点解析返回的 JSON 字符串,提取关键指标,并判断是否需要维护 (needs_maintenance 标志)。
条件分流: IF/ELSE 节点根据 needs_maintenance 标志决定流程走向。
复制ELSE (正常): 流向 ANSWER 节点,输出简单的状态正常信息。复制
IF (异常): 启动预测性维护流程。
信息整合 (IF 分支):
复制调用 GET_CMMS_DATA 和 GET_MES_DATA 获取历史维护和运行数据。复制
调用 KNOWLEDGE_RETRIEVAL 节点,结合当前异常指标查询知识库 (离心泵设备手册 pdf、历史维修案例 excel)。
故障预测 (IF 分支): 调用 FAULT_PREDICTION(LLM 节点),将整合后的信息(实时数据、历史数据、知识库信息)提供给 LLM,要求其分析可能的故障模式并预测需要更换的关键备件 ID。
备件 ID 提取 (IF 分支): 使用 EXTRACT_PART_ID (CODE 节点) 从 FAULT_PREDICTION 的文本输出中解析出 required_partId。
ERP 查询 (IF 分支): 调用 GET_ERP_DATA (HTTP 节点),使用 extracted_part_id 作为参数查询具体备件信息 。
ERP 数据解析 (IF 分支): 使用 PARSE_ERP_DATA(CODE 节点) 解析 GET_ERP_DATA 返回的 JSON 字符串为列表。
生成维护建议 (IF 分支): 调用 SUGGESTIONS(LLM 节点),将故障预测结果和解析后的 ERP 备件信息列表提供给 LLM,要求生成详细的维护步骤、备件清单和时间建议。
最终输出: 通过 ANSWER 2 节点展示维护建议或最终报告。
4、关键节点介绍
**START:** 定义工作流入口参数 (`pumpId`)。 **GET\_... (HTTP Request):** 与模拟 API 交互,获取各类数据。 **CODE (解析 IoT):** 解析 IoT JSON,提取关键指标,**计算 `needs_maintenance` 标志**。 **IF/ELSE:** 根据 `needs_maintenance` 标志进行流程分支。 **KNOWLEDGE_RETRIEVAL:** (可选) 连接外部知识库进行 RAG 查询。 **FAULT_PREDICTION (LLM):** 基于多源信息进行故障诊断与预测。 **EXTRACT_PART_ID (CODE):** 从 LLM 输出中解析结构化信息 (Part ID)。 **PARSE_ERP_DATA (CODE):** 解析 ERP 返回的 JSON 列表字符串。 **SUGGESTIONS (LLM):** 结合故障预测和备件信息生成维护建议。 **ANSWER / ANSWER 2:** 输出最终结果。
5、主要报错与解决过程参考
基于我前期在上手熟悉 Dify 工作流搭建中踩过的坑,结合这个项目做个对照说明,各位可以大致做个参考:
复制1. HTTP 请求:URL 与参数混淆 * **节点:** `GET_IOT_DATA` * **报错:** `Invalid non-printable ASCII character...` * **原因:** 同时在 URL 字段和 PARAMS 字段中定义了查询参数。 * **解决:** URL 只写基础路径,参数在 PARAMS 中定义 (推荐: `{{start.pumpId}}`)。 2. HTTP 请求:网络访问 & CORS * **节点:** 所有 HTTP 请求节点 * **报错:** Dify 404,但 API 日志显示 200 OK。 * **原因:** API 服务监听地址限制;缺少 CORS 配置。 * **解决:** * API (`mock_api.py`): 启动时 `host='0.0.0.0'`。 * API (`mock_api.py`): 安装 `flask-cors` 并启用 `CORS(app)`。 3. 数据处理:JSON 字符串解析 * **节点:** `CODE` (解析 IoT), `IF/ELSE`, `ANSWER` * **报错:** `IF/ELSE` 条件不生效;`ANSWER` 直接输出占位符。 * **原因:** HTTP 节点返回的 `body` 是 JSON **字符串**,而非结构化对象。 * **解决:** 在 HTTP 节点后加 `CODE` 节点,用 `json.loads()` 解析 `body` 字符串,输出解析后的对象/列表供后续节点使用。 4. IF/ELSE 节点:数值比较类型问题 * **节点:** `IF/ELSE` * **报错:** 数值比较条件 (`> 6.0`) 不生效。 * **原因:** `IF/ELSE` 可能将变量视为字符串进行比较。 * **解决:** 将比较逻辑移入 `CODE` 节点,输出简单标志位 (如 `"1"`/`"0"` 字符串),`IF/ELSE` 只判断此标志位。 5. CODE 节点:Python 语法 & 类型错误 * **节点:** 所有 `CODE` 节点 * **报错:** `IndentationError`, `NameError`, `Output variable 'xxx' must be a [Type]`。 * **原因:** 缩进错误;变量未定义;Dify 输出类型配置与代码返回不符。 * **解决:** 严格检查 Python 缩进;确保 `return` 前所有变量已定义赋值;确保 Dify 中输出变量的**名称和类型**与代码 `return` **完全匹配** (e.g., Part ID 是 String)。 6. TEMPLATE 节点:Jinja2 语法 & 数据错误 * **节点:** `TEMPLATE` * **报错:** `jinja2.exceptions.TemplateSyntaxError`。 * **原因:** 括号错误 (`{{{` vs `{{`, `}} }`); 控制结构括号错误 (`{{%` vs `{%`); 变量名不匹配;在未解析数据上访问属性。 * **解决:** 修正 Jinja2 语法;确保模板变量名与输入变量名**完全一致**;对列表/字典数据,**先用 CODE 解析**再传入模板,并使用 `{% for %}` 访问循环变量属性 (`{{ part.name }}`)。
6、从模拟到生产切换注意事项
看完上述内容,如果你计划将基于 mock_api.py 测试成功的工作流切换到对接真实的生产环境,需要注意替换数据源接入点,并适配数据格式。
主要步骤如下:
复制1. 获取真实 API 信息:** 获取真实的 IoT、CMMS、MES、ERP 系统的 API 详细信息: * **URL 端点 (Endpoint):** 每个系统提供数据的具体网址。 * **请求方法 (Method):** GET, POST, PUT, DELETE 等。 * **认证方式 (Authentication):** 如何验证 Dify 的访问权限?可能是 API Key (放在 Header 或 Query Param)、OAuth 2.0 Token (放在 Header)、用户名密码等。 * **请求参数/体 (Parameters/Body):** 如何指定要查询的设备 ID、时间范围、零件 ID 等?是在 URL 参数中,还是在 POST 请求的 Body 中?Body 的格式是 JSON、XML 还是其他? * **响应格式 (Response Format):** 真实系统返回的数据结构是怎样的?与模拟的 JSON 结构是否一致? 2. 修改 Dify HTTP Request 节点: * 定位工作流中所有调用模拟 API 的 HTTP Request 节点 (`GET_IOT_DATA`, `GET_CMMS_DATA`, `GET_MES_DATA`, `GET_ERP_DATA`)。 * **更新 URL:** 将节点的 URL 字段替换为真实 API 的端点。 * **调整方法:** 根据真实 API 要求修改请求方法 (GET/POST 等)。 * **配置认证:** 在节点的 `Headers` 或 `Params` 部分添加必要的认证信息 (如 `Authorization: Bearer <token>`, `X-API-Key: <key>`)。 * **适配参数/体:** 根据真实 API 要求修改 `Params` 或 `Body` 的内容和格式。 3. 适配数据解析 (关键步骤❗️): * 真实 API 返回的数据结构**极有可能**与模拟 JSON 不同。 * 因此,所有紧跟在 HTTP Request 节点之后的 `CODE` 节点(如解析 IoT 的 `CODE` 节点, `PARSE_ERP_DATA` 节点,以及可能需要为 CMMS/MES 新增的解析节点)**必须进行修改**。 * 需要根据真实 API 返回的 JSON (或其他格式) 结构,调整 Python 代码中 `json.loads()` 之后访问数据的方式(字典键名、列表索引、嵌套结构等),以确保存储到 Dify 输出变量中的是后续节点期望的数据。 * **这是最容易出错的环节,需要仔细比对真实 API 的响应并耐心调试** `CODE` **节点的 Python 代码。 4. 端到端测试:** 修改完成后,使用真实的设备 ID 或测试 ID 进行完整的端到端测试,验证: * 是否能成功从所有真实系统中获取数据。 * 数据解析是否正确。 * LLM 节点接收到的上下文是否符合预期。 * 整个流程是否能按预期运行。
网络连通性: 确保运行 Dify 的环境能够访问到企业内网的真实系统 API 地址。
权限管理: 确保 Dify 使用的 API Key 或凭证具有访问所需数据的正确权限。
数据格式差异: 重点关注真实 API 与模拟 API 在数据结构上的差异,并适配解析逻辑。
性能与频率: 真实 API 可能有调用频率限制或响应时间较长,需要考虑工作流的性能和可能的超时设置。
错误处理: 针对真实 API 可能出现的各种错误(网络错误、认证失败、无效参数、系统内部错误等),在 Dify 工作流中增加更健壮的错误处理逻辑。
7、后续拓展方向
增加报告模板 (TEMPLATE): 引入并调试 TEMPLATE 节点,利用 Jinja2 生成结构更清晰、格式更专业的 HTML、Markdown、PDF 报告 (需要额外工具集成)。
定时触发与批量处理: 增加定时任务触发器 (如 Dify 的计划任务或外部调度系统调用 API),定期检查设备状态,而不是仅通过手动输入 pumpId 触发。
集成到业务系统:
复制告警通知: 在检测到异常时,通过 HTTP 请求节点调用企业微信、钉钉、邮件等的 API 发送告警通知。复制
工单系统集成: 将 `SUGGESTIONS` 生成的维护建议通过 API 推送到 CMMS 系统,自动创建维护工单。
API 暴露: 将整个 Dify 工作流封装为 API,供其他业务系统(如设备监控大屏、生产管理系统)调用。
闭环反馈机制: 实现 /api/cmms/report/update 接口,接收维护完成后的实际结果。设计新的 Dify 工作流或节点来处理这些反馈,分析预测准确性,并将分析结果存入数据库或知识库,用于持续优化模型或 Prompt。
更复杂的故障预测模型: FAULT_PREDICTION 可以替换为更专业的机器学习模型服务接口,或者使用更强大的 LLM 并优化 Prompt 以进行更精确的故障定位和剩余寿命预测 (RUL)。
知识库增强: 持续丰富 RAGFlow (MinerU、 Mistral OCR等工具的测评后续会专门写一篇) 中的内容,加入更多设备类型、故障案例和解决方案。
用户交互优化: 利用 Dify 的 Agent 或 Web App 能力,创建更友好的用户界面,允许用户查询特定设备状态、历史记录或手动触发分析。
8、项目资料包
有需要项目源码的盆友可以付费后查看下方百度云盘链接(包含Code节点代码)