如何使用Python WebDriver爬取ChatGPT内容(完整教程)
大背景
虽然我们能用网页版chatGPT来聊天、写文章,但是我们采集大量的内容,就得不断地手动输入提问来获取答案,并且将结果复制到数据库来保存。如果整个过程能使用程序来做自然要节省很多的人力,精力和时间。
Python webdirver 模拟浏览器的方式来实现,刚好能实现以上功能。
另外之所以不选择API 是因为以下原因:
-
普通开发者(国内)获取API KEY 是有困难的,需要海外手机号 + 信用卡等一系列条件,但是如果只是网页端,我们仅仅需要登录或者未登录的方式就可以直接聊天
-
网页端无需调整各项参数,可以直接交互获取内容,而且内容质量更高!
当然,如果你有条件用API 或者直接通过 wss交互获取内容的,到这儿可以直接结束了。
完整源码在文章末尾哦!
基本环境要求
准备自己的梯子
ChatGpt属于海外项目,国内的小伙伴,翻墙得找好自己的梯子(代理),这里自己有啥用啥即可。
安装Python 环境
Python要求:3.10 +
安装 Python Selenium
pip install selenium
其他扩展库说明:取决于自己电脑缺什么就安装什么。后面完整代码会提供完整的 requirements.txt
浏览器
这里以Chrome浏览器为准。也就是你本地必须要安装Chrome浏览器,并且获取其安装路径。
比如:C:\Program Files\Google\Chrome\Application\chrome.exe
操作系统
这里以 windows 作为开发环境。
开发工具
自己什么顺手用什么。
这里讲个程序界的笑话:传说级别的开发者据说用的记事本来开发。
实现过程
这里只介绍主要的代码
浏览器控制
使用程序实现浏览器的控制,这包括浏览器的打开,关闭,以及代理配置。
创建 browser_manage.py
import subprocess, datetime
import os, signal, psutildef run_cmd(port=9200):cmd = [# chrome浏览器路径。 必须为首个参数'C:/Program Files/Google/Chrome/Application/chrome.exe',# 【必要】设置浏览器端口'--remote-debugging-port=%s' % port,# 【必要】设置浏览器数据存储路径'--user-data-dir=D:/data',# 隐藏一些弹窗之类的信息'--hide-crash-restore-bubble',# 设置浏览器分辨率。如果要跑多个浏览器可以将每个浏览器设置小一些'--force-device-scale-factor=1',# 假设代理地址为 http://127.0.0.1:10809'--proxy-server=http://127.0.0.1:10809',# 默认打开一个空白页面'about:blank']process = subprocess.Popen(cmd)# 返回pid 用于关闭浏览器杀死进程return process.piddef kill_process(parent_pid):try:# 获取父进程parent = psutil.Process(parent_pid)# 获取父进程的所有子进程(包括孙子进程等)children = parent.children(recursive=True)# 创建一个包含父进程PID的列表pids_to_kill = [parent_pid]# 将所有子进程的PID添加到列表中pids_to_kill.extend(child.pid for child in children)# 遍历列表,对每个PID发送SIGKILL信号for pid in pids_to_kill:try:os.kill(pid, signal.SIGILL)except PermissionError:# 忽略权限错误,可能我们没有权限杀死某个进程print("close browser PermissionError")passexcept ProcessLookupError:# 忽略进程查找错误,进程可能已经自然死亡print("close browser ProcessLookupError")passexcept (psutil.NoSuchProcess, PermissionError):# 忽略错误,如果进程不存在或者没有权限print("close browser PermissionError1")passreturn Truedef open_browser():"""打开浏览器"""# 打开指定端口的浏览器pid = run_cmd(9200)def close_borwser():"""关闭浏览器"""# pid 为打开浏览器获取到的进程idkill_process(pid)# 执行open_browser() 打开浏览器,执行 close_borwser() 关闭浏览器
初始化selenium
创建爬虫脚本 spider.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from fake_useragent import UserAgent
import subprocess, datetimechrome_options = Options()
ua = UserAgent()
# 浏览器端口信息,取决于启动浏览器设置的端口
browser_host = 9200
browser_host= "127.0.0.1"random_ua = ua.random
chrome_options.add_argument(f'user-agent={random_ua}')# 设置要连接的浏览器端口信息
chrome_options.add_experimental_option("debuggerAddress","%s:%s" % (browser_host, browser_port))
driver = webdriver.Chrome(options=chrome_options)
进入到目标页面
文件:spider.py
# 页面加载等待:最多10s
driver.implicitly_wait(10)
driver.get("https://chat.openai.com/")
页面等待除了以上的方案也可以用其他方法:
# 等待某个元素可见
try:element = WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.XPATH, "//h1")))print(element.text)
finally:driver.quit()# 或者直接
time.sleep(10)
发起询问
发起新的提问
每次提问都应该是基于一个新窗口来提问。如果你的问题需要上下文的基础来回答,可以直接跳过这里。
新询问输入框可以根据 button data-testid="create-new-chat-button" 来定位。
# 定位到新聊天元素
new_chat_dom = driver.find_element(By.CSS_SELECTOR, '[data-testid="create-new-chat-button"]')
# 发起点击,进入新界面
new_chat_dom.click()
定位到输入框
输入框使用的是div contentediable作为文本域。其id为 prompt-textarea
# 因为元素提供了id,则直接通过id获取最方便
textarea_dom = driver.find_element(By.ID, "prompt-textarea")
创建询问队列
因为我们的目标是自动化对问题列表发起询问,而不是一次性询问。所以需要创建一个问题队列。问题队列可以来源于数据库或者队列文件。这里为了演示,直接创建一个list
ask_list = ["请用Python写一个平均等分list的方案","请写一个关于小猪佩奇的笑话,要求:小猪佩奇可能不是猪,而是河马, 100字","称赞一个女生长得漂亮,如何不直接称赞也能看出来在形容她漂亮"
]
输入问题
这里我们按行来输入:一次输入一行。
for msg in ask_list:# 将字符串按换行分割开ask_msg_arr = msg.split('\n')for msg_line in ask_msg_arr:textarea_dom.send_keys(ask_msg_item)# 发送textarea_dom.send_keys(Keys.ENTER)# TODO 这里是后续获取数据,存储到数据库环节
等待数据响应
可以根据回答结束后,出现的交互按钮来确认是否回答完毕
idx = 0
while True:idx = idx + 1# 请求超时if idx > 180:breaktime.sleep(1)try:driver.find_element(By.CSS_SELECTOR, '[data-testid="bad-response-turn-action-button"]') breakexcept:continue
也可以使用以下方法来校验:
# 设置最大等待时间
wait = WebDriverWait(driver, 180)
# 等待直到元素出现
element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[data-testid="bad-response-turn-action-button"]')))
获取响应数据
通过 class="markdown" 来获取数据。
idx = 0
while True:idx = idx + 1# 请求超时if idx > 180:breaktime.sleep(1)try:html_dom = driver.find_element(By.CSS_SELECTOR, '.markdown') breakexcept:continuecontent = html_dom .get_attribute('innerHTML')
数据清洗
具体的清洗规则,看具体的业务需求。大多数清洗,需要将一些总结类的语句删除,不合法的返回内容删除。这里不提供相应的清洗的方案。
至此,一个完整的数据链就已经完结。
完整源码
browser.py
import subprocess, datetime
import os, signal, psutildef run_cmd(port=9200):cmd = [# chrome浏览器路径。'C:/Program Files/Google/Chrome/Application/chrome.exe',# 【必要】设置浏览器端口'--remote-debugging-port=%s' % port,# 【必要】设置浏览器数据存储路径'--user-data-dir=E:/browser_data',# 隐藏一些弹窗之类的信息'--hide-crash-restore-bubble',# 设置浏览器分辨率。如果要跑多个浏览器可以将每个浏览器设置小一些'--force-device-scale-factor=1',# 假设代理地址为 http://127.0.0.1:10809'--proxy-server=http://127.0.0.1:10809',# 默认打开一个空白页面'about:blank']process = subprocess.Popen(cmd)# 返回pid 用于关闭浏览器杀死进程return process.piddef kill_process(parent_pid):try:# 获取父进程parent = psutil.Process(parent_pid)# 获取父进程的所有子进程(包括孙子进程等)children = parent.children(recursive=True)# 创建一个包含父进程PID的列表pids_to_kill = [parent_pid]# 将所有子进程的PID添加到列表中pids_to_kill.extend(child.pid for child in children)# 遍历列表,对每个PID发送SIGKILL信号for pid in pids_to_kill:try:os.kill(pid, signal.SIGILL)except PermissionError:# 忽略权限错误,可能我们没有权限杀死某个进程print("close browser PermissionError")passexcept ProcessLookupError:# 忽略进程查找错误,进程可能已经自然死亡print("close browser ProcessLookupError")passexcept (psutil.NoSuchProcess, PermissionError):# 忽略错误,如果进程不存在或者没有权限print("close browser PermissionError1")passreturn Truedef open_browser():"""打开浏览器"""# 打开指定端口的浏览器pid = run_cmd(9200)return piddef close_browser(pid):"""关闭浏览器"""# pid 为打开浏览器获取到的进程idkill_process(pid)pid = open_browser()
print(pid)# close_browser(25996)
spider.py
import time
import tracebackfrom selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.keys import Keys
from fake_useragent import UserAgent
import subprocess, datetime
import oschrome_options = Options()
ua = UserAgent()
# 浏览器端口信息,取决于启动浏览器设置的端口
browser_port = 9200
browser_host = "127.0.0.1"random_ua = ua.random
chrome_options.add_argument(f'user-agent={random_ua}')# 设置要连接的浏览器端口信息
chrome_options.add_experimental_option("debuggerAddress", "%s:%s" % (browser_host, browser_port))
driver = webdriver.Chrome(options=chrome_options)def open_gpt_page():driver.implicitly_wait(10)driver.get("https://chat.openai.com/")def new_chat():"""该方案当前只适用于已经登录到chatgpt的页面。不适用非的登录页面"""# 定位到新聊天元素new_chat_dom = driver.find_element(By.CSS_SELECTOR, '[data-testid="create-new-chat-button"]')# 发起点击,进入新界面new_chat_dom.click()def get_input_box():return driver.find_element(By.ID, "prompt-textarea")def get_ask_list():return ["请用Python写一个平均等分list的方案","请写一个关于小猪佩奇的笑话,要求:小猪佩奇可能不是猪,而是河马, 100字","称赞一个女生长得漂亮,如何不直接称赞也能看出来在形容她漂亮"]def get_response():# 等待响应完成idx = 0while True:idx = idx + 1# 请求超时if idx > 180:breaktime.sleep(1)try:driver.find_element(By.CSS_SELECTOR, '[data-testid="bad-response-turn-action-button"]')breakexcept:continue# 获取请求结果idx = 0content = Nonewhile True:idx = idx + 1# 请求超时if idx > 180:breaktime.sleep(1)try:html_dom = driver.find_element(By.CSS_SELECTOR, '.markdown')content = html_dom.get_attribute('innerHTML')breakexcept:continuereturn contentdef simulator(ask_msg):# 进入输入框并且点击print("进入输入框并且点击")textarea_dom = get_input_box()textarea_dom.click()# 发送ask_msg_arr = ask_msg.split('\n')for msg_line in ask_msg_arr:textarea_dom.send_keys(msg_line)textarea_dom.send_keys(Keys.ENTER)# 获取响应content = get_response()# 保存数据到文件root_path = "./data"os.makedirs(root_path, exist_ok=True)full_path = f"{root_path}/%s.text" % int(time.time())with open(full_path, 'w', encoding='utf-8') as file:file.write(content)print("save success")def start():open_gpt_page()print("页面加载完毕")time.sleep(3)ask_list = get_ask_list()for ask_msg in ask_list:time.sleep(3)print("进入新的聊天")new_chat()print("current msg %s" % ask_msg)for retry in range(0,3):try:simulator(ask_msg)breakexcept:print(traceback.format_exc())time.sleep(10)passstart()
requirements.txt
fake_useragent==1.2.1
psutil==5.9.5
selenium==4.26.1
获取扩展源码
扩展源码是基于 chatgpt聊天页面,通过多个浏览器并行数据爬取,包含以下功能:
-
基于数据库创建问题队列
-
创建多个浏览器窗口多线程并行运行,提高产量和效率
-
解决人工校验-自动化进行人工校验
-
为多个浏览器配置不同的代理方案
-
代理配置,为浏览器自动化分配代理,
-
通过web端进行浏览器管理,代理管理,代理分配等
需要以上扩展源码,QQ:1186969412