并发的几种方式
多进程
多线程
携程
python 的并发限制: GIL
Treading 基础
基本语法
import threading
def func():...
# 创建线程, 使用 target 指定线程任务, args 指定参数
t = threading.Tread(target=func, args=(xx, xx...))
# 开始线程
t.start()
# 阻塞主线程直到线程完成
t.join()
多线程爬虫示例
import requests
import threading
def craw(url: str) -> None:
resp = request.get(url)
print(url, len(resp.text))
def multi_thread_craw(urls: list[str]) -> None:
threads = []
for url in urls:
threads.append(threading.Thread(target=craw, args=(url,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
multi_thread_craw([
f"https://example.com/?a={i}"
for i in range(50)
])
生产-消费模式
结构图解
使用 queue.Queue 进行线程间通信
该数据结构常用于多线程的线程安全的数据通信
通过阻塞来实现线程安全,保证数据一致性
import queue
q = queue.Queue()
q.put(item)
item = q.get()
爬虫示例
import requests
import threading
import queue
import time
def craw(url: str):
resp = requests.get(url)
return resp
def get_links(html: str) -> list:
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
# 生产者任务
def do_craw(url_queue:queue.Queue, html_queue:queue.Queue) -> None:
while True:
url = url_queue.get()
html = craw(url).text
html_queue.put(html)
time.sleep(1)
# 消费者任务
def do_parse(html_queue:queue.Queue, fout) -> None:
while True:
html = html_queue.get()
results = get_links(html)
for result in results:
fout.write(str(result) + "\n")
time.sleep(1)
if __name__ == "__main__":
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in [f"https://example.com/?a={i}" for i in range(10)]:
url_queue.put(url)
# 启动生产者线程
for i in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{i}")
t.start()
# 启动消费者线程
fout = open("data.txt")
for i in range(3):
t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{i}")
t.start()
线程安全与线程池
误用多线程导致的经典 bug
下面是一个使用多线程的模拟银行
import threading
import time
class Account(object):
def __init__(self, money):
self.money = money
# 取钱
def draw(value, account):
if account.money < value:
print("余额不足")
return
time.sleep(0.1) # 模拟一些 I/O 操作
account.money -= value
print(f"已取出, 剩余: {account.money}")
if __name__ == "__main__":
account = Account(600)
t1 = threading.Thread(target=draw, args=(500, account))
t2 = threading.Thread(target=draw, args=(500, account))
t1.start()
t2.start()
可以看到出现 -400
的余额,明明已经做了 account.money < value
的判定,还是会导致 bug
使用锁
lock 模块用于给代码加“锁”,被线程A加了锁的代码,在锁被释放前,无法被线程B访问
lock 有两种使用方式
第一种:try-finally
模式
import threading
lock = threading.lock()
lock.acquire() # 加锁
try:
# do something
finally:
lock.release() # 释放锁
第二种:with
模式
import threading
lock = threading.lock()
# 锁自动释放
with lock:
# do something