茄子的个人空间

python多进程示例

字数统计: 255阅读时长: 1 min
2025/03/08
loading
  1. 导入依赖的包
1
from concurrent.futures import ProcessPoolExecutor, as_completed
  1. 定义处理内容的函数,该函数要是全局函数,不能放在函数或者类里面
    函数的参数只有一个,可以用元组包装起来,然后再解包
1
2
3
4
5
6
7
8
def process_record(args):
text = args[0]["text"]
CONTENT = args[1]
extract_info = inference(text, model="deepseek-chat", CONTENT=CONTENT)
return {
"text": text,
"extract_info": extract_info
}
  1. 使用多进程处理数据
    在 with 下面一共有两个for,第一个for负责将数据和处理函数放在一个 futures 中, 第二个for负责处理和返回处理结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for d in data:
args = (d, CONTENT)
futures.append(executor.submit(process_record, args))

# 使用 as_completed 迭代返回已经完成的任务
# tqdm 用于显示进度条
for idx, future in enumerate(tqdm.tqdm(as_completed(futures), total=len(futures), desc="Extracting info")):
result = future.result()
new_data.append(result)

# 每完成 100 个样本,保存一次临时文件
if (idx + 1) % 500 == 0:
with open(save_path, "w", encoding="utf-8") as f:
json.dump(new_data, f, indent=4, ensure_ascii=False)
print(f"Extracted information from {idx+1} medical records, and saved to {save_path}")

enjoy !

CATALOG