python并发爬虫
爬虫多线程方法生成
from threading import Threaddef func(name):for i in range(100):print(f"{name}完成了{i}项任务")if __name__ == '__main__':t1 = Thread(target=func, args=('老杨',))t2 = Thread(target=func, args=('老李',))t3 = Thread(target=func, args=('老孙',))t1.start()t2.start()t3.start()t1.join()t2.join()t3.join()print("主线程结束")
爬虫多线程类生成
from threading import Thread
from time import sleepclass MyThread(Thread):def __init__(self, name):super(MyThread, self).__init__() # 继承MyThread的父类self.name = namedef run(self):for i in range(100):print(f"{self.name}完成了{i}项工作")sleep(0.5)if __name__ == '__main__':t1 = MyThread('老杨')t2 = MyThread('老孙')t3 = MyThread('老李')t1.start()t2.start()t3.start()t1.join()t2.join()t3.join()print("主线程结束")
线程池
from concurrent.futures import ThreadPoolExecutordef func(name):for i in range(10):print(name, i)if __name__ == '__main__':with ThreadPoolExecutor(10) as t: # 作用是创建10个线程for i in range(100):t.submit(func, f"周杰伦{i}")
这段代码使用了 ThreadPoolExecutor
来创建一个线程池,允许并行执行多个任务。具体来说:
-
创建线程池:
ThreadPoolExecutor(10)
创建一个能够同时管理 10 个线程的线程池。 -
提交任务:
t.submit(func, f"周杰伦{i}")
在循环中提交了 100 个任务(i
从 0 到 99)。每个任务调用func
函数,并传入一个字符串参数,格式为"周杰伦{i}"
(例如 "周杰伦0", "周杰伦1", ..., "周杰伦99")。 -
并发执行:
ThreadPoolExecutor
将会在可用的 10 个线程中并发执行这些任务。然而,由于总共有 100 个任务,线程池会轮流使用线程,确保每次都只有 10 个任务在运行。
但是这种会发生资源的争端,后续可以使用生产者消费者的模式,来确保资源不会被重复。
如果想要拿到返回值怎么弄
线程池返回值1
import time
from concurrent.futures import ThreadPoolExecutordef func(name, t):time.sleep(t)return namedef fn(res):print(res.result())if __name__ == '__main__':with ThreadPoolExecutor(10) as t:t.submit(func, '周结论', 3).add_done_callback(fn)t.submit(func, '周一', 2).add_done_callback(fn)t.submit(func, '周二', 1).add_done_callback(fn)
在这段代码中,t.submit(func, '周二', 1)
的结果是一个 Future
对象,它表示异步执行 func
函数的计算结果。add_done_callback(fn)
方法用于注册一个回调函数 fn
,这个回调函数会在 Future
对象完成时被调用。
当 fn
被调用时,它会接收一个参数,该参数是已完成的 Future
对象。这个对象包含了 func
执行的结果、异常信息等。
因此,add_done_callback(fn)
会向 fn
传入这个 Future 对象作为参数。你可以在 fn
函数内通过这个对象访问你需要的信息,比如:
- 如果
func
执行成功,可以通过future.result()
获取结果。 - 如果
func
执行失败,可以通过future.exception()
获取抛出的异常。
在这个业务逻辑当中,add_done_callback返回会立即执行,返回call_back执行的顺序是不确定的,返回的顺序是不确定的。
线程池返回值2
import time
from concurrent.futures import ThreadPoolExecutordef func(name, t):time.sleep(t)print(f"我是", name)return nameif __name__ == '__main__':with ThreadPoolExecutor(10) as t:result = t.map(func, ['周杰伦', '老李', '小王'], [2, 1, 3])for i in result:print(i)
map的返回值是生成器,返回的内容和任务分发的顺序是一致的
我们要学会看函数
这是map的函数,要学会自己看,fn就是要传入的函数,*iterables的意思是可迭代对象,所以列表是可迭代对象吧,元组也是可迭代对象,所以这里不只是传入列表。然后看Returns an iterator equivalent to map(fn ,iter)会返回一个可迭代的对象,所以要拿到返回值我们就可以使用for循环来拿取返回值。
线程池实战案例
网址为北京新发地菜市场:新发地-价格行情
import json
import requests
from concurrent.futures import ThreadPoolExecutor
import threadingheaders = {"Accept": "*/*","Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6","Connection": "keep-alive","Content-Type": "application/x-www-form-urlencoded; charset=UTF-8","Origin": "http://www.xinfadi.com.cn","Referer": "http://www.xinfadi.com.cn/priceDetail.html","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0","X-Requested-With": "XMLHttpRequest"
}
url = "http://www.xinfadi.com.cn/getPriceData.html"# 线程锁用于文件写入
file_lock = threading.Lock()def get_data(current):data = {"limit": "20","current": "","pubDateStartTime": "2023/01/01","pubDateEndTime": "2023/12/31", # 设置结束时间"prodPcatid": "","prodCatid": "","prodName": ""}data["current"] = str(current)err_number = 0while True:try:proxy = get_ip()#这里插入自己的代理ipresponse = requests.post(url, headers=headers, data=data, proxies=proxy)response.raise_for_status() # 检查请求是否成功data_list = json.loads(response.text)['list']except requests.exceptions.RequestException as e:err_number += 1if err_number == 10:print(f"{current}请求次数超过10次")print(f'{current}号请求失败: {e}')returnexcept json.JSONDecodeError as e:err_number += 1if err_number == 10:print(f"{current}请求次数超过10次")print(f'{current}号JSON解析失败: {e}')returnexcept KeyError as e:err_number += 1if err_number == 10:print(f"{current}请求次数超过10次")print(f'{current}号数据格式错误: {e}')returncontinuefor item in data_list:prodName = item.get("prodName", "")highPrice = item.get("highPrice", "")lowPrice = item.get("lowPrice", "")avgPrice = item.get("avgPrice", "")# 使用线程锁确保文件写入安全with file_lock:with open('data.csv', mode='a', encoding='utf-8') as f:f.write(f'{current}, {prodName}, {lowPrice}, {avgPrice}, {highPrice}\n')print(f"{current}号的数据爬取完成")if __name__ == '__main__':# 初始化或清理 data.csv 文件with open('data.csv', mode='w', encoding='utf-8') as f:f.write("日期, 产品名称, 最低价, 平均价, 最高价\n")with ThreadPoolExecutor(max_workers=10) as t: # 调整线程池大小for day in range(1, 40):t.submit(get_data, day)
讲解一下,这里的ThreadPoolExecutor(max_workers=10) as t:
下面有for循环,每次爬取的数据包不一样,所以在爬取数据包上不会造成数据冲突,但是在写入数据的时候,有可能会造成数据重复,所以这里采用了数据锁,在写文件的时候,保证每次只有一个线程对文件进行写操作。
多进程
多进程和多线程差不多,读者可以去看我在python收录下的并发程序这篇文章
多线程和多进程的共同使用
【示例】爬取堆糖的图片
import json
import time
import requests
from threading import Thread
from multiprocessing import Process, Queue
from concurrent.futures import ThreadPoolExecutorclass getUrl(object):def __init__(self, Queue):self.url = "https://www.duitang.com/napi/blogv2/list/by_search/"self.headers = {"Accept": "text/plain, */*; q=0.01","Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6","Connection": "keep-alive","Referer": "https://www.duitang.com/search/?kw=%E6%90%9E%E7%AC%91%E8%A1%A8%E6%83%85%E5%8C%85&type=feed","Sec-Fetch-Dest": "empty","Sec-Fetch-Mode": "cors","Sec-Fetch-Site": "same-origin","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0","X-Requested-With": "XMLHttpRequest","sec-ch-ua": "\"Chromium\";v=\"134\", \"Not:A-Brand\";v=\"24\", \"Microsoft Edge\";v=\"134\"","sec-ch-ua-mobile": "?0","sec-ch-ua-platform": "\"Windows\""}self.cookies = {}self.queue = Queuedef get_ip(self):"""请求代理ip"""passdef get_url(self, sum):print(f"正在爬取第{sum / 24}页的数据")params = {"kw": "搞笑表情包","after_id": str(sum),"type": "feed","include_fields": "top_comments,is_root,source_link,item,buyable,root_id,status,like_count,like_id,sender,album,reply_count,favorite_blog_id","_type": "","_": str(int(time.time() * 1000))}err_number = 0while True:try:proxy = self.get_ip()response = requests.get(self.url, headers=self.headers, cookies=self.cookies, params=params,proxies=proxy, timeout=60)response.encoding = 'utf-8'data_list = json.loads(response.text)['data']['object_list']for data in data_list:img_url = data['photo']['path'] # 拿到图片的urlself.queue.put(img_url) # 将爬取到的图片数据传入队列中breakexcept Exception as e:print(f"getUrl出现问题{e}")if err_number == 5:print(f"{sum / 24}页的url请求次数过多跳过")breakerr_number += 1def run(self):"""程序的主程序"""with ThreadPoolExecutor(5) as T: # 开辟线程池,包含十个线程for page in range(0, 10):T.submit(self.get_url, page * 24)self.queue.put('OK')class savePhoto(object):def __init__(self, Queue):self.queue = Queue # 实例化进程队列self.headers = {"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7","accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6","cache-control": "max-age=0","if-modified-since": "Fri, 26 Jul 2024 12:38:40 GMT","if-none-match": "\"2cb739af98e219ca4681ca8316ac7265\"","priority": "u=0, i","sec-ch-ua": "\"Chromium\";v=\"134\", \"Not:A-Brand\";v=\"24\", \"Microsoft Edge\";v=\"134\"","sec-ch-ua-mobile": "?0","sec-ch-ua-platform": "\"Windows\"","sec-fetch-dest": "document","sec-fetch-mode": "navigate","sec-fetch-site": "none","sec-fetch-user": "?1","upgrade-insecure-requests": "1","user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"}self.cookies = {}def get_ip(self):"""请求代理ip"""passdef save_photo(self, url):"""用于存储图片"""title = url.split('/')[-1]err_number = 0while True:try:proxy = self.get_ip()response = requests.get(url=url, headers=self.headers, cookies=self.cookies, proxies=proxy, timeout=60)response.encoding = 'utf-8'with open('./img/' + title, mode='wb') as f:f.write(response.content)print(f"{url}下载完毕")breakexcept Exception as e:print(f"savePhoto出现问题{e}")if err_number == 5:print(f"{url}请求次数过多")return 0err_number += 1def run(self):with ThreadPoolExecutor(5) as T:while 1:url = self.queue.get()if url == 'OK':breakT.submit(self.save_photo, url)if __name__ == '__main__':# 实例化队列对象start = time.perf_counter()time.time()Q = Queue()gu = getUrl(Q)sp = savePhoto(Q)# 创建示例化对象,开启进程print("准备启动进程")p1 = Process(target=gu.run)p2 = Process(target=sp.run)p1.start()print("进程p1启动")p2.start()print("进程p2启动")p1.join()p2.join()end = time.perf_counter()print(f"耗时: {end - start} 秒")