deftask(n): print(f"Task {n} is starting...") time.sleep(1) print(f"Task {n} is done.")
with ThreadPoolExecutor(max_workers=3) as executor: for i inrange(5): executor.submit(task, i)
3. 使用 multiprocessing 模块
3.1. 创建进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import multiprocessing import time
defworker(number): print(f"Worker {number} is starting...") time.sleep(2) print(f"Worker {number} is done.")
if __name__ == '__main__': processes = [] for i inrange(5): p = multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start() for p in processes: p.join()
print("All workers are done.")
3.2. 使用进程池
1 2 3 4 5 6 7 8 9
from multiprocessing import Pool
defsquare(n): return n * n
if __name__ == '__main__': with Pool(processes=4) as pool: results = pool.map(square, range(10)) print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4. 异步 I/O 使用 asyncio
4.1. 异步函数和事件循环
1 2 3 4 5 6 7 8
import asyncio
asyncdefmain(): print("Main function is starting...") await asyncio.sleep(2) print("Main function is done.")
asyncio.run(main())
4.2. 任务并发执行
1 2 3 4 5 6 7 8 9 10 11 12
import asyncio
asyncdeffetch_data(num): print(f"Fetching data {num}...") await asyncio.sleep(1) print(f"Data {num} fetched.")
asyncdefmain(): tasks = [fetch_data(i) for i inrange(5)] await asyncio.gather(*tasks)
asyncio.run(main())
4.3. 异步 I/O 结合网络请求
使用 aiohttp 模块进行异步 HTTP 请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import aiohttp import asyncio
asyncdeffetch(url): asyncwith aiohttp.ClientSession() as session: asyncwith session.get(url) as response: returnawait response.text()
asyncdefmain(): urls = ['http://example.com'] * 5 tasks = [fetch(url) for url in urls] responses = await asyncio.gather(*tasks) for idx, response inenumerate(responses): print(f"Response from request {idx+1}: {len(response)} characters.")