并发的几种方式

多进程

多线程

携程

python 的并发限制: GIL

Treading 基础

基本语法

import threading

def func():...

# 创建线程, 使用 target 指定线程任务, args 指定参数
t = threading.Tread(target=func, args=(xx, xx...))

# 开始线程
t.start()

# 阻塞主线程直到线程完成
t.join()

多线程爬虫示例

import requests
import threading

def craw(url: str) -> None:
    resp = request.get(url)
    print(url, len(resp.text))

def multi_thread_craw(urls: list[str]) -> None:
    threads = []
    for url in urls:
        threads.append(threading.Thread(target=craw, args=(url,)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    multi_thread_craw([
        f"https://example.com/?a={i}"
        for i in range(50)
    ])

生产-消费模式

结构图解

img

使用 queue.Queue 进行线程间通信

该数据结构常用于多线程的线程安全的数据通信

通过阻塞来实现线程安全,保证数据一致性

import queue

q = queue.Queue()

q.put(item)

item = q.get()

爬虫示例

import requests
import threading
import queue
import time

def craw(url: str):
    resp = requests.get(url)
    return resp

def get_links(html: str) -> list:
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]

# 生产者任务
def do_craw(url_queue:queue.Queue, html_queue:queue.Queue) -> None:
    while True:
        url = url_queue.get()
        html = craw(url).text
        html_queue.put(html)
        time.sleep(1)

# 消费者任务
def do_parse(html_queue:queue.Queue, fout) -> None:
    while True:
        html = html_queue.get()
        results = get_links(html)
        for result in results:
            fout.write(str(result) + "\n")
        time.sleep(1)

if __name__ == "__main__":
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in [f"https://example.com/?a={i}" for i in range(10)]:
        url_queue.put(url)
    # 启动生产者线程
    for i in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{i}")
        t.start()
    # 启动消费者线程
    fout = open("data.txt")
    for i in range(3):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{i}")
        t.start()

线程安全与线程池

误用多线程导致的经典 bug

下面是一个使用多线程的模拟银行

import threading
import time

class Account(object):
    def __init__(self, money):
        self.money = money

# 取钱
def draw(value, account):
    if account.money < value:
        print("余额不足")
        return
    time.sleep(0.1) # 模拟一些 I/O 操作
    account.money -= value
    print(f"已取出, 剩余: {account.money}")

if __name__ == "__main__":
    account = Account(600)
    t1 = threading.Thread(target=draw, args=(500, account))
    t2 = threading.Thread(target=draw, args=(500, account))
    t1.start()
    t2.start()

img

可以看到出现 -400 的余额,明明已经做了 account.money < value 的判定,还是会导致 bug

使用锁

lock 模块用于给代码加“锁”,被线程A加了锁的代码,在锁被释放前,无法被线程B访问

lock 有两种使用方式

第一种:try-finally 模式

import threading

lock = threading.lock()

lock.acquire() # 加锁
try:
    # do something
finally:
    lock.release() # 释放锁

第二种:with 模式

import threading

lock =  threading.lock()

# 锁自动释放
with lock:
    # do something

线程池