Python多线程实例详解

B站影视 电影资讯 2025-04-06 12:57 3

摘要:进程:独立内存空间,资源分配的基本单位线程:共享进程资源,CPU调度的基本单位Python 特性:受 GIL(全局解释器锁)限制,适合 I/O 密集型任务

以下是一个详细的 Python 多线程实例教程,包含代码逐行解析和应用场景说明:

一、Python 多线程基础

1. 线程 vs 进程

进程:独立内存空间,资源分配的基本单位线程:共享进程资源,CPU调度的基本单位Python 特性:受 GIL(全局解释器锁)限制,适合 I/O 密集型任务

2. 核心模块

python

import threading

from queue import Queue

import time

二、基础多线程实例

示例1:简单线程创建

python

def print_numbers:

for i in range(5):

time.sleep(1)

print(f"Number: {i}")

def print_letters:

for letter in 'ABCDE':

time.sleep(1)

print(f"Letter: {letter}")

# 创建线程

t1 = threading.Thread(target=print_numbers)

t2 = threading.Thread(target=print_letters)

# 启动线程

t1.start

t2.start

# 等待线程结束

t1.join

t2.join

print("任务完成!")

输出结果:

复制

Number: 0

Letter: A

Number: 1

Letter: B

Number: 2

Letter: C

...(交替输出)

代码解析:

定义两个普通函数作为线程任务创建 Thread 对象并指定 targetstart 启动线程join 等待线程结束

三、线程同步实例

示例2:使用锁防止资源冲突

python

class TicketSystem:

def __init__(self):

self.tickets = 10

self.lock = threading.Lock

def buy_ticket(self, user):

with self.lock: # 自动获取和释放锁

if self.tickets > 0:

print(f"{user} 买到票!剩余票数:{self.tickets}")

self.tickets -= 1

else:

print(f"{user} 票已售罄!")

def user_action(system, name):

time.sleep(0.1) # 模拟网络延迟

system.buy_ticket(name)

# 创建票务系统

system = TicketSystem

# 创建10个购票用户

users = [threading.Thread(target=user_action, args=(system, f"用户-{i}"))

for i in range(15)]

# 启动所有线程

for t in users:

t.start

# 等待所有线程完成

for t in users:

t.join

print("最终剩余票数:", system.tickets)

输出结果:

复制

用户-0 买到票!剩余票数:10

用户-1 买到票!剩余票数:9

...

用户-9 买到票!剩余票数:1

用户-10 票已售罄!

...

最终剩余票数: 0

关键点:

使用 with 语句自动管理锁避免多个线程同时修改共享资源注意锁的粒度控制

四、线程池实践

示例3:使用 concurrent.futures

python

import concurrent.futures

import requests

def download_url(url):

try:

resp = requests.get(url, timeout=3)

return f"{url} 下载完成,状态码:{resp.status_code}"

except Exception as e:

return f"{url} 下载失败:{str(e)}"

urls = [

'https://www.baidu.com',

'https://www.google.com',

'https://www.github.com',

'https://invalid.url'

]

# 创建线程池(最多5个线程)

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

# 提交任务

future_to_url = {executor.submit(download_url, url): url for url in urls}

# 获取结果

for future in concurrent.futures.as_completed(future_to_url):

url = future_to_url[future]

try:

data = future.result

except Exception as exc:

print(f'{url} 生成异常: {exc}')

else:

print(data)

输出结果:

复制

https://www.baidu.com 下载完成,状态码:200

https://www.github.com 下载完成,状态码:200

https://invalid.url 下载失败:...

https://www.google.com 下载失败:...

优势:

自动管理线程创建/销毁方便获取任务结果支持最大线程数控制

五、生产者-消费者模型

示例4:使用队列实现

python

复制

import random

def producer(queue, name):

for i in range(3):

item = f"产品-{name}-{i}"

time.sleep(random.uniform(0.1, 0.5))

queue.put(item)

print(f"生产 {item}")

queue.put(None) # 结束信号

def consumer(queue):

while True:

item = queue.get

if item is None:

break

time.sleep(random.uniform(0.2, 0.7))

print(f"消费 {item}")

print("消费者退出")

# 创建队列

q = Queue(5) # 最大容量5

# 创建生产者和消费者

producers = [threading.Thread(target=producer, args=(q, i)) for i in range(2)]

consumers = [threading.Thread(target=consumer, args=(q,)) for _ in range(3)]

# 启动所有线程

for t in producers + consumers:

t.start

# 等待生产者完成

for t in producers:

t.join

# 发送结束信号

for _ in consumers:

q.put(None)

# 等待消费者完成

for t in consumers:

t.join

print("所有任务完成")

输出结果:

复制

生产 产品-0-0

生产 产品-1-0

消费 产品-0-0

生产 产品-0-1

...

消费 产品-1-2

消费者退出

所有任务完成

六、关键知识点总结

线程创建方式

Ø 直接实例化 Thread

Ø 继承 threading.Thread 类

Ø 使用线程池

同步机制

Ø Lock:基础互斥锁

Ø RLock:可重入锁

Ø Semaphore:信号量

Ø Event:事件通知

Ø Condition:条件变量

最佳实践

Ø 优先使用队列进行线程通信

Ø 避免使用全局变量

Ø 使用 with 语句管理锁

Ø 合理设置线程数量

调试技巧

Ø 使用 threading.current_thread.name 查看线程信息

Ø 使用 logging 模块代替 print

Ø 使用 threading.enumerate 查看活动线程

七、适用场景建议

推荐使用

Ø 网络请求密集型(爬虫、API调用)

Ø 文件I/O操作

Ø GUI应用保持响应

Ø 数据库查询批量处理

不推荐使用

Ø CPU密集型计算(推荐使用多进程)

Ø 需要精确时序控制的任务

Ø 对线程安全要求极高的场景

通过以上实例,可以系统掌握 Python 多线程的核心用法。实际开发中需注意:虽然多线程可以提高 I/O 密集型任务的效率,但线程数量并非越多越好,通常建议控制在 CPU 核心数 * 5 左右。

来源:老客数据一点号

相关推荐