摘要:进程:独立内存空间,资源分配的基本单位线程:共享进程资源,CPU调度的基本单位Python 特性:受 GIL(全局解释器锁)限制,适合 I/O 密集型任务
以下是一个详细的 Python 多线程实例教程,包含代码逐行解析和应用场景说明:
一、Python 多线程基础
1. 线程 vs 进程
进程:独立内存空间,资源分配的基本单位线程:共享进程资源,CPU调度的基本单位Python 特性:受 GIL(全局解释器锁)限制,适合 I/O 密集型任务2. 核心模块
python
import threading
from queue import Queue
import time
二、基础多线程实例
示例1:简单线程创建
python
def print_numbers:
for i in range(5):
time.sleep(1)
print(f"Number: {i}")
def print_letters:
for letter in 'ABCDE':
time.sleep(1)
print(f"Letter: {letter}")
# 创建线程
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)
# 启动线程
t1.start
t2.start
# 等待线程结束
t1.join
t2.join
print("任务完成!")
输出结果:
复制
Number: 0
Letter: A
Number: 1
Letter: B
Number: 2
Letter: C
...(交替输出)
代码解析:
定义两个普通函数作为线程任务创建 Thread 对象并指定 targetstart 启动线程join 等待线程结束三、线程同步实例
示例2:使用锁防止资源冲突
python
class TicketSystem:
def __init__(self):
self.tickets = 10
self.lock = threading.Lock
def buy_ticket(self, user):
with self.lock: # 自动获取和释放锁
if self.tickets > 0:
print(f"{user} 买到票!剩余票数:{self.tickets}")
self.tickets -= 1
else:
print(f"{user} 票已售罄!")
def user_action(system, name):
time.sleep(0.1) # 模拟网络延迟
system.buy_ticket(name)
# 创建票务系统
system = TicketSystem
# 创建10个购票用户
users = [threading.Thread(target=user_action, args=(system, f"用户-{i}"))
for i in range(15)]
# 启动所有线程
for t in users:
t.start
# 等待所有线程完成
for t in users:
t.join
print("最终剩余票数:", system.tickets)
输出结果:
复制
用户-0 买到票!剩余票数:10
用户-1 买到票!剩余票数:9
...
用户-9 买到票!剩余票数:1
用户-10 票已售罄!
...
最终剩余票数: 0
关键点:
使用 with 语句自动管理锁避免多个线程同时修改共享资源注意锁的粒度控制四、线程池实践
示例3:使用 concurrent.futures
python
import concurrent.futures
import requests
def download_url(url):
try:
resp = requests.get(url, timeout=3)
return f"{url} 下载完成,状态码:{resp.status_code}"
except Exception as e:
return f"{url} 下载失败:{str(e)}"
urls = [
'https://www.baidu.com',
'https://www.google.com',
'https://www.github.com',
'https://invalid.url'
]
# 创建线程池(最多5个线程)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务
future_to_url = {executor.submit(download_url, url): url for url in urls}
# 获取结果
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result
except Exception as exc:
print(f'{url} 生成异常: {exc}')
else:
print(data)
输出结果:
复制
https://www.baidu.com 下载完成,状态码:200
https://www.github.com 下载完成,状态码:200
https://invalid.url 下载失败:...
https://www.google.com 下载失败:...
优势:
自动管理线程创建/销毁方便获取任务结果支持最大线程数控制五、生产者-消费者模型
示例4:使用队列实现
python
复制
import random
def producer(queue, name):
for i in range(3):
item = f"产品-{name}-{i}"
time.sleep(random.uniform(0.1, 0.5))
queue.put(item)
print(f"生产 {item}")
queue.put(None) # 结束信号
def consumer(queue):
while True:
item = queue.get
if item is None:
break
time.sleep(random.uniform(0.2, 0.7))
print(f"消费 {item}")
print("消费者退出")
# 创建队列
q = Queue(5) # 最大容量5
# 创建生产者和消费者
producers = [threading.Thread(target=producer, args=(q, i)) for i in range(2)]
consumers = [threading.Thread(target=consumer, args=(q,)) for _ in range(3)]
# 启动所有线程
for t in producers + consumers:
t.start
# 等待生产者完成
for t in producers:
t.join
# 发送结束信号
for _ in consumers:
q.put(None)
# 等待消费者完成
for t in consumers:
t.join
print("所有任务完成")
输出结果:
复制
生产 产品-0-0
生产 产品-1-0
消费 产品-0-0
生产 产品-0-1
...
消费 产品-1-2
消费者退出
所有任务完成
六、关键知识点总结
线程创建方式:Ø 直接实例化 Thread
Ø 继承 threading.Thread 类
Ø 使用线程池
同步机制:Ø Lock:基础互斥锁
Ø RLock:可重入锁
Ø Semaphore:信号量
Ø Event:事件通知
Ø Condition:条件变量
最佳实践:Ø 优先使用队列进行线程通信
Ø 避免使用全局变量
Ø 使用 with 语句管理锁
Ø 合理设置线程数量
调试技巧:Ø 使用 threading.current_thread.name 查看线程信息
Ø 使用 logging 模块代替 print
Ø 使用 threading.enumerate 查看活动线程
七、适用场景建议
推荐使用:Ø 网络请求密集型(爬虫、API调用)
Ø 文件I/O操作
Ø GUI应用保持响应
Ø 数据库查询批量处理
不推荐使用:Ø CPU密集型计算(推荐使用多进程)
Ø 需要精确时序控制的任务
Ø 对线程安全要求极高的场景
通过以上实例,可以系统掌握 Python 多线程的核心用法。实际开发中需注意:虽然多线程可以提高 I/O 密集型任务的效率,但线程数量并非越多越好,通常建议控制在 CPU 核心数 * 5 左右。
来源:老客数据一点号