译者 | 李睿
审校 | 重楼
梅西百货公司首席数据工程师Naresh Erukulla是一位勇于迎接挑战的数据工程师,他擅长用简洁明了的概念验证(POC)解决各种问题。最近,Naresh关注到了数据工程师日常工作中普遍遭遇的一个难题,并为此采取行动,为所有批处理和流数据管道设置了警报系统。当错误超过阈值或数据管道出现故障时,可以迅速通过电子邮件向数据工程师发送故障通知,确保问题能够得到及时处理。
一切似乎都在顺利进行中,直到他注意到一个关键数据集无法加载到BigQuery中。在调查了错误日志之后,发现一些“缺少所需数据”提示的消息。当看到用户输入文件中频繁出现的原始数据问题时,他为此感到困惑。
处理数据不一致问题,特别是数据缺失或格式错误,会在分析和运营工作流程的后续环节引发严重的后果。有一个关键的下游报告正是建立在这些输入数据的基础之上。该报告在日常业务中发挥着至关重要的作用,它能够反映出公司在多个领域内的关键指标表现,并且为决策制定提供了不可或缺的数据支持。在这份至关重要的报告中,所有高管级别的利益相关者都依赖这些数据来展示业绩指标、讨论面临的挑战以及规划未来的发展路径。
Erukulla耗费了数小时检查源CSV文件,该文件承载了来自另一个上游应用程序的大量事务数据。准确识别并修正问题行显得至关重要。然而,当他着手处理这些问题时,发现已经错过截止日期,这无疑令利益相关者深感失望。Erukulla也意识到传统数据管道的脆弱性。它们很容易出错,而且往往需要多次人工干预来进行修复,这个过程既耗时又容易出错。
人们是否也遇到过类似的情况?是否花费了大量时间调试数据管道,结果却发现根本原因只是一个简单的格式错误或缺少必填字段?事实上,世界各地的数据工程师每天都在努力应对这些挑战。那么是否有可以构建能够“自我修复”数据管道的方法?这正是Erukulla追求的目标。
自我修复数据管道的工作原理
自我修复数据管道的想法很简单:当数据处理过程中出现错误时,数据管道应该自动检测、分析和纠正错误,而无需人工干预。传统上,解决这些问题需要人工干预,这既耗时又容易出错。
虽然有多种方法可以实现这一点,,但使用人工智能代理是最好的方法,也是数据工程师在未来自我修复故障数据管道并动态自动纠正它们的方法。本文将展示如何使用像GPT-4/DeepSeek R1模型这样的LLM来自修复数据管道的基本实现,其方法是使用LLM对失败记录进行分析并提出建议,并在数据管道运行的过程中应用这些修复措施。所提供的解决方案可以扩展到大型数据管道,并将扩展更多的功能。
以下介绍如何利用OpenAI API在云计算环境中使用GPT-4模型构建一个实用的管道。遵循的基本步骤如下:
- 将源文件上传到谷歌云存储桶(Google Cloud Storage Bucket)。如果没有谷歌云平台的访问权限,则可以使用任何本地或其他云存储。
- 创建数据模型,用于将原始数据提取到BigQuery表中,将错误记录提取到错误表中。
- 从CSV中读取源文件,并从输入数据中识别干净(Clean)数据集和无效记录错误行(Error Rows)数据集。
- 将Clean数据集导入BigQuery,并使用提示将Error Rows数据集传递给LLM。
- 对于每个错误行(Error Rows),OpenAI的GPT API进行分析并提供智能产品ID分配。
- 使用Google BigQuery动态存储和检索产品信息。
- 基于Python的自动化无缝集成。
可以参阅Erukulla在GitHub上的完整代码库。
1.从云存储读取输入数据
数据管道首先读取存储在Cloud Storage中的客户端上传的CSV文件,可以利用云函数(无服务器执行管道步骤)在新文件上传到存储桶时触发。该函数使用谷歌云存储库(google-cloud-storage)读取文件,并将其解析为Pandas DataFrame以供进一步处理。
在将数据传递到下一步之前,可以实施一些数据质量检查。然而,现实世界中的数据问题是动态的,无法预测和编写所有测试用例,这会使代码变得复杂且难以阅读。
在这个用例中,CSV文件包含字段ProductID、Price、name、saleAmount。以下是包含数据的示例文件(ProductID和Price字段中也缺少数据)。
1 # Read CSV from GCS 2 client = storage.Client() 3 bucket = client.bucket(bucket_name) 4 blob = bucket.blob(file_name) 5 data = blob.download_as_text() 6 df = pd.read_csv(io.StringIO(data)) 7
2.将数据导入BigQuery
接下来,数据管道尝试将数据导入到BigQuery中。如果由于模式不匹配、数据类型错误或缺少字段而导致任何行失败,则捕获并记录它们以供进一步分析。这一步骤对于检测底层错误信息至关重要,这些错误信息将用于识别OpenAI API的可能解决方案。
复制1 # Function to clean and preprocess data 2 def clean_data(df): 3 avg_price = get_average_price() 4 5 df["Price"] = df["Price"].fillna(avg_price) 6 7 # Log and remove rows with issues 8 error_rows = df[df["ProductID"].isna()] 9 clean_df = df.dropna(subset=["ProductID"]) 10 11 return clean_df, error_rows 12 13 # Function to query BigQuery for an average price 14 def get_average_price(): 15 client = bigquery.Client() 16 query = f"SELECT AVG(Price) AS avg_price FROM `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.Product_Info`" 17 18 try: 19 df = client.query(query).to_dataframe() 20 avg_price = df["avg_price"][0] 21 print(f"Fetched Average Price: {avg_price}") 22 return avg_price 23 except Exception as e: 24 print(f"Error fetching average price: {e}") 25 return None 26
注意,avg_price = get_average_price()是从BigQuery查询中获取的。
在插入干净的数据集之后如下图所示:
3.使用LLM分析错误
分析错误是整个流程中的关键步骤,这就是采用LLM的神奇之处。失败的记录被发送到GPT-4或DeepSeek R1等LLM进行分析。LLM检查错误并提出更正建议和修正后的记录。
例如,假设日期字段的格式不正确。在这种情况下,LLM可能会建议从字符串到整数转换或从字符串到日期/时间戳转换的正确格式记录,反之亦然。在数据是预期的但发现为空的情况下,根据代码强制执行的规则,带有“平均”(Average)或“默认”(Default)值的缺失值将被修复,以确保数据摄取成功。
通过重试机制实现ChatCompletion请求。
为了确保弹性,利用tenacity实现了重试机制。该函数将错误细节发送给GPT并检索建议的修复程序。在本文的示例中,创建了‘functions’(函数)列表,并使用ChatCompletion Request将其传递给JSON有效负载。
需要注意,‘functions’列表是使用在管道代码中创建的Python函数来修复已知或可能问题的所有函数的列表。GPT分析输入提示符和错误消息,以确定是否调用‘functions’列表中列出的特定函数来修复问题。
因此,GPT的响应提供了指示应该调用哪个函数的结构化数据。GPT不会直接执行函数,而是由数据管道来执行。
复制1 @retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3)) 2 def chat_completion_request(messages, functinotallow=None, model=GPT_MODEL): 3 headers = { 4 "Content-Type": "application/json", 5 "Authorization": "Bearer " + openai.api_key, 6 } 7 json_data = {"model": model, "messages": messages} 8 if functions is not None: 9 json_data.update({"functions": functions}) 10 try: 11 response = requests.post( 12 "https://api.openai.com/v1/chat/completions", 13 headers=headers, 14 jsnotallow=json_data, 15 ) 16 return response.json() 17 except Exception as e: 18 print("Unable to generate ChatCompletion response") 19 print(f"Exception: {e}") 20 return e 21 # Function to send ChatCompletion request to OpenAI API 22 functions = [ 23 { 24 "name": "assign_product_id", 25 "description": "assigning a unique ProductID", 26 "parameters": { 27 "type": "object", 28 "properties": { 29 "ProductID": { 30 "type": "integer", 31 "description": "The product ID to assign." 32 }, 33 } 34 }, 35 } 36 ] 37
assign_product_id是‘functions’列表中列出的函数,GPT可以在需要时调用它。在这个示例中,CSV文件的最后两行缺少ProductID。因此,GPT调用特定的assign_product_id函数来确定ProductID值。
assign_product_id函数从BigQuery中获取最高分配的ProductID,并将其递增以供后续使用。如果它是首次运行,或者BigQuery表中没有可用的数据,它将分配默认的99999作为ProductID。
复制1 def assign_product_id(): 2 client = bigquery.Client() 3 # table_ref = client.dataset(BQ_DATASET_ID).table(BQ_TABLE_ID) 4 5 query = f""" 6 Select max(ProductID) as max_id from `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}` WHERE ProductID < 99999 7 """ 8 df = client 9 try: 10 df = client.query(query).to_dataframe() 11 except Exception as e: 12 print(f"Error fetching max ProductID: {e}") 13 return None 14 return df["max_id"][0] + 1 if not df.empty else 99999 15
4.应用自动更正
数据管道将GPT的建议应用于失败的记录,并重新尝试将它们导入到BigQuery中。如果更正成功,数据将存储在主表中。如果没有,不可修复的记录将被记录到一个单独的错误表中,以供人工检查。
在字段是必需且唯一的情况下,GPT可以从BigQuery获得唯一的ProductID值,并在此值的基础上加1,以确保其唯一性。考虑管道中有多个错误行的情况;每个记录都按照GPT响应提供的修复程序顺序处理,并用建议值更新错误记录。
在以下的代码中,ProductID被从assign_product_id()BigQuery表中获取的值替换。当有多个错误行时,每个错误行都会通过递增ProductID获得一个唯一的数字。
复制1 # Function to send error data to GPT-4 for analysis 2 def analyze_errors_with_gpt(error_rows): 3 if error_rows.empty: 4 return error_rows 5 6 new_product_id = assign_product_id() 7 8 for index, row in error_rows.iterrows(): 9 prompt = f""" 10 Fix the following ProductID by assigning a unique ProductID from the bigquery table Product_Info: 11 {row.to_json()} 12 """ 13 chat_response = chat_completion_request( 14 model=GPT_MODEL, 15 messages=[{"role": "user", "content": prompt}], 16 functions=functions 17 ) 18 19 if chat_response is not None: 20 try: 21 if chat_response["choices"][0]["message"]: 22 response_content = chat_response["choices"][0]["message"] 23 else: 24 print("Chat response content is None") 25 continue 26 except json.JSONDecodeError as e: 27 print(f"Error decoding JSON response: {e}") 28 continue 29 30 if 'function_call' in response_content: 31 if response_content['function_call']['name'] == 'assign_product_id': 32 res = json.loads(response_content['function_call']['arguments']) 33 res['product_id'] = new_product_id 34 error_rows.at[index, "ProductID"] = res['product_id'] 35 new_product_id += 1 # Increment the ProductID for the next row 36 37 print(f"Assigned ProductID: {res['product_id']}") 38 else: 39 print("Function not supported") 40 else: 41 chat.add_prompt('assistant', response_content['content']) 42 else: 43 print("ChatCompletion request failed. Retrying...") 44 45 return error_rows 46
5.将已修改的行导入到BigQuery表中
main函数从谷歌云存储(Google Cloud Storage)读取数据并进行清理,并将有效数据导入到BigQuery中,同时动态修复错误。
复制1 # Main function to execute the pipeline 2 def main(): 3 bucket_name = "self-healing-91" 4 file_name = "query_results.csv" 5 6 # Read CSV from GCS 7 client = storage.Client() 8 bucket = client.bucket(bucket_name) 9 blob = bucket.blob(file_name) 10 data = blob.download_as_text() 11 df = pd.read_csv(io.StringIO(data)) 12 13 # Clean data and identify errors 14 clean_df, error_rows = clean_data(df) 15 16 # Load valid data into BigQuery 17 load_to_bigquery(clean_df, BQ_TABLE_ID) 18 19 # Process errors if any 20 if not error_rows.empty: 21 22 # Analyze errors with GPT-4 23 error_rows = analyze_errors_with_gpt(error_rows) 24 25 load_to_bigquery(error_rows, BQ_TABLE_ID) 26 27 print("Fixed Errors loaded successfully into BigQuery original table.") 28
在修复数据错误之后,需要特别检查第66至68行。从BigQuery表中获取最大值10000 ProductID后,对这些ID逐一进行递增处理。此外,错误行中缺少信息的Price字段被BigQuery表中的Avg(Price)替换。
6.日志记录和监控
在整个过程中,使用云日志(Cloud Logging)记录错误和数据管道的活动。这确保工程师可以监控数据管道的运行状况并排查问题。
采用的工具和技术
以下是用来构建和测试数据管道的关键工具和技术:
- 云存储:用于存储输入的CSV文件。
- 云函数:用于无服务器执行管道步骤。
- BigQuery:用于存储清理过的数据和错误日志。
- GPT-4/DeepSeek R1:用于分析失败记录并提出更正建议。
- 云日志:用于监视和故障排除。
- 云编排器:它用于使用Apache气流编排管道。
面临的挑战
1. LLM集成
将LLM集成到数据管道中颇具挑战性。必须确保API调用是有效的,LLM的响应是准确的。此外,还有成本方面的考虑,由于为LLM配置API对于大型数据集来说可能成本高昂。因此,只需知道必须为该服务设置一个API密钥。
例如,对于OpenAI,必须访问https://platform.openai.com/来注册和生成新的API密钥,并在发送带有提示的API调用时在数据管道中使用它。
2.错误处理
设计一个稳健的错误处理机制具有挑战性。必须考虑各种错误,从模式不匹配到网络问题,并确保数据管道能够优雅地处理它们。数据管道可能会面临许多问题,而且所有问题都不能动态解决,例如文件为空或BigQuery表不存在等问题。
3.可扩展性
随着数据量的增长,必须优化数据管道以实现可扩展性。这涉及到在BigQuery中对数据进行分区,并使用Dataflow进行大规模处理。
4.成本管理
虽然谷歌云平台提供了强大的工具,但使用这些工具需要支付费用。因此必须仔细监控使用情况并优化数据管道,以避免额外的成本。OpenAI API成本是需要仔细监控的另一个因素。
结论和要点
对于数据工程师来说,构建自我修复的数据管道是一个改变游戏规则的方法。它可以减少人工干预,提高效率,保证数据质量。然而,这并不是灵丹妙药。虽然自我修复数据管道可以节省时间,但它们会带来额外的成本,例如LLM API费用和增加的云函数的使用量。因此,权衡这些成本与收益至关重要。
对于自我修复数据管道领域的新手来说,建议从小型项目着手,首先尝试集成大型语言模型(LLM)和处理基本错误,然后再逐步扩展。在这一过程中,定期监控数据管道的性能和成本。使用云监控和云日志之类的工具来识别瓶颈并进行相应的优化。最后,要与数据科学家、分析师和业务利益相关者紧密合作,了解他们的实际需求,并确保当业务需求发生变化时,其数据管道能够持续创造价值。
总之,自我修复的数据管道代表着数据工程的未来。通过利用歌云平台和LLM等工具,可以构建健壮、高效、智能的管道,从而最大限度地减少停机时间并提升生产效率。如果曾经受到脆弱的数据管道的困扰,可以探索和采用这一方法,而前期的努力将带来长期的收益。
原文标题:Self-Healing Data Pipelines: The Next Big Thing in Data Engineering?,作者:Naresh Erukulla