当前位置: 首页 > news >正文

大量数据分批次处理+concurrent.futures.ThreadPoolExecutor多线程处理文件

背景:数据量很大的时候,需要多线程调用接口获取数据但是又不想一次性加载全部的原始数据进列表,可以结合批量加载数据和多线程。

代码实现:

        批量加载:

def load_data_in_batches(file_path, batch_size):if file_path.endswith(".json"):#先读取数据with open(file_path, "r") as file:data_list = json.load(file)else:data_list = []with open(file_path, "r") as f:file_lines = f.read().strip().split("\n")for line in file_lines:line = json.loads(line)data_list.append(line)for i in range(0, len(data_list), batch_size):yield data_list[i:i + batch_size]def prepare_batch_input_data(batch, model, temperature=None): #如果原始数据需要处理一下再发送给接口就在此方法进行 否则不需要此方法input_data_list = []for data in batch:query_content = data["messages"][0]["content"]input_data = {"prompt": [{"role": "user", "content": query_content}],"label": "","model": model,}if temperature is not None:input_data["temperature"] = temperatureinput_data_list.append(input_data)return input_data_list

        多线程调用接口:

import logging
default_logger = logging.getLogger(__name__)
class SSEClient:def __init__(self,data,# url=""auth_token="",logger=None,):if logger is None:self.logger = default_loggerself.url = urlself.headers = {"Authorization": auth_token,"Content-Type": "application/json",}self.data = dataself.send_time = 0  # 用于记录请求发送的时间self.create_time = time.time()  # 创建对象的时间self.first_message_time = 0  # 第一次接收消息的时间self.first_server_response = 0  # 第一次服务器响应的时间self.update_logger_context()def update_logger_context(self):self.logger = logging.LoggerAdapter(self.logger, {"session_id": self.session_id, "task_id": self.session_id})def get_result(self, show_details=False):self.send_time = time.time()  # 记录请求发送的时间# Stream outputresponse = requests.post(self.url,headers=self.headers,json=self.data,stream=True,)if response.headers.get("Content-Type") == "application/json":return {"status": "failed", "message": response.json()}if show_details:print("headers is ", response.headers)text = ""event_type = ""buffer = ""final_text = ""for line in response.iter_lines(decode_unicode=True):buffer += line + "\n"if not self.first_message_time:  # 当收到第一条消息时记录时间self.first_message_time = time.time()self.logger.info(f"First server message response at {self.first_message_time}")if buffer.endswith("\n\n"):for field_line in buffer.split("\n"):if field_line.startswith("event:"):event_type = field_line[len("event:") :].strip()if event_type == "finish":breakelif field_line.startswith("data:"):value = field_line[len("data:") :].replace("\\n", "\n")text += valueif show_details:print(f"Event Type: {event_type}, Text: {text}")final_text += textbuffer = ""text = ""self.message = final_text# 根据您提供的字典结构构建返回结果result = {"response": self.message,"data": copy.deepcopy(self.data),"send_time": self.send_time,"create_time": self.create_time,"first_message_time": self.first_message_time,"get_first_message_time": round(self.first_message_time - self.send_time, 2),"duration": round(time.time() - self.send_time, 2),"status": "success",}self.logger.info(f"Final result is {json.dumps(result)}")return resultdef invoke_model(data):for i in range(0, 1): #如果想一条数据多次调用的话#print(data)if "url" in data:url = data["url"]result = SSEClient(data, url=url).get_result()else:result = SSEClient(data).get_result()# 后面还可以加一些对数据的处理,这里可以直接定义储存,比如:with open(save_path,"a") as f:f.write(result)f.write("\n")def parallel_execution(demo_json_list, n_jobs=4):n_jobs = min(n_jobs, len(demo_json_list))results = [None] * len(demo_json_list)print(len(demo_json_list))with concurrent.futures.ThreadPoolExecutor(max_workers=n_jobs) as executor:# 使用字典来存储future与其在demo_json_list中的索引future_to_idx = {executor.submit(invoke_model, demo): idxfor idx, demo in enumerate(demo_json_list)}for feature, feature_idx in tqdm(future_to_idx.items(), desc="Processing"):try:result = feature.result(timeout=120)results[feature_idx] = resultexcept Exception as e:print(e)results[feature_idx] = None

联合使用:

def main(test_data_list, batch_size):for test_data in test_data_list: #如果有多个数据file_path = test_data["file_path"]temperature = test_data.get("temperature")# Load and process data in batchesfor batch in load_data_in_batches(file_path, batch_size):input_data_list = prepare_batch_input_data(batch, model, temperature)response_list = parallel_execution(input_data_list, n_jobs=n_jobs)data_list = {
[{"file_path":"","temperature":0.1},{"file_path":"","temperature":0.2}]
main(data_list,10)


http://www.mrgr.cn/news/35751.html

相关文章:

  • 【LeetCode每日一题】——LCP 51.烹饪料理
  • 扫雷老年版2.0无猜模式
  • postman下载安装和导入导出脚本一键执行
  • 代码随想录算法训练营第29天|134. 加油站、135. 分发糖果、860.柠檬水找零、406.根据身高重建队列
  • 【机器学习】过拟合与欠拟合——如何优化模型性能
  • 【活动】人工智能时代,程序员如何保持核心竞争力?需要掌握哪些技能?
  • 在多态的方法调用中为什么会出现“左边编译左边运行”的现象?多态创建的对象到底是谁属于父类还是子类?通过深扒集合remove方法调用理解其原理
  • CAPL—on signal到底该怎么玩?
  • 消息队列与Kafka集群
  • 海信智能电视的使用心得
  • 旺店通ERP集成金蝶KIS(金蝶KIS主供应链)
  • CSS 实现文本溢出省略号显示,含单行与多行文本溢出
  • ComfyUI - 使用 ComfyUI 部署与测试 FLUX.1 图像生成模型 教程
  • 2024年9月24日---关于MyBatis框架(3)
  • 猫头虎分享:Python库 Falcon 的简介、安装、用法详解入门教程
  • 【好书推荐】《架构真意:企业级应用架构设计方法论与实践》
  • 苍穹外卖学习笔记(十三)
  • [51单片机] 简单介绍 (一)
  • 你知道怎么合理设置线程池参数吗?
  • 关于数据中心基础设施绿色建维服务认证的介绍