PySpark3.4.4_基于StreamingContext实现网络字节流中英文分词词频累加统计结果保存到数据库中
实验目的
开发streamingContext程序,统计实时中英文网络字节流数据,实现中英文累计分词统计,并将统计结果持久化保存到关系型数据库.
本案例特色
综合运用了PySpark Streaming进行实时数据处理、pymysql实现MySQL数据库基于数据库连接池交互、configparser读取配置文件,以及广播变量优化集群间数据传输。通过设置检查点、批处理插入和错误重试机制增强了系统的稳健性。日志记录和状态管理确保了应用的可维护性和性能优化。软件工程上,采用模块化设计、异常处理和配置分离提升了代码质量和开发效率。
实验步骤
1. 开发datasourcesocket.py,模拟实时socket字节流发送程序
2. 开发自定义日志管理模块myLogger.py,记录程序日志
3. 开发数据库连接池模块DBUtils.py,高效与关系型数据库MySQL交互访问
4. 开发中英文混合数据流分词模块TextProcessor.py,实现字节流过滤及中英文分词
5. 开发SocketWordCountSFDBPool.py实现基于pyspark streaming的实时数据分词统计并将结果保存到数据库中
整体项目的结构如下
(pyspark2024-py3.9) (base) pblh123@LeginR7:~/PycharmProjects/pyspark2024$ tree
.
├── datas
│ ├── checkpoint
│ ├── stopwords
│ │ ├── baidu_stopwords.txt
│ │ ├── cn_all_stopwords.txt
│ │ ├── cn_stopwords.txt
│ │ ├── hit_stopwords.txt
│ │ └── scu_stopwords.txt
└── src├── charpter7│ ├── NetworkWordCountStatuefulText.py│ ├── networkWordCountStatufulDB.py│ ├── SockertWordCountStataFullDB.py│ ├── SocketWordCountSFDBPool.py│ └── windowedNetwordWC.py├── __init__.py├── __pycache__│ └── __init__.cpython-39.pyc└── utils├── configparse.ini├── DBUtils.py├── __init__.py├── myLogger.py└── TextProcessor.py
1. 开发datasourcesocket.py,模拟实时socket字节流发送程序
代码功能:实现了一个能够持续向多个客户端发送随机中英文短语、数字及特殊字符的TCP服务器,具有日志记录、异常处理和资源清理等功能。
代码如下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
==================================================================Creation Date: 2024/12/7$ 21:34$Author: John <pblh123@126.com>Remarks: $创建时间: 2024/12/7$ 21:34$作 者: 李先生 <pblh123@126.com>备 注: $
=================================================================="""import random
import socket
import threading
import time
from src.utils.myLogger import *class DataSourceSocket:def __init__(self, host='localhost', port=9999):self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server.bind((host, port))self.server.listen(5) # 设置监听队列长度为5logger.info("Server started, waiting for connections...")def send_data(self, conn, addr):data_container = self._prepare_data()try:while True:random_item = random.choice(data_container)logger.info(f"Sending to {addr}: {random_item}")conn.sendall(random_item.encode('utf-8') + b'\n')time.sleep(0.1) # 控制发送间隔时间except Exception as e:logger.error(f"Client {addr} disconnected unexpectedly: {e}")finally:conn.close()logger.info(f"Connection with {addr} closed.")def _prepare_data(self):data_container = []chinese_data = ["你好,世界", "今天天气真好", "学习是一件快乐的事", "分享知识,传递快乐","探索未知的世界", "坚持就是胜利", "努力不懈,梦想终会实现", "失败乃成功之母","平凡造就非凡", "相信自己,你是最棒的", "I like Spark", "I like Flink","I like Hadoop", "大数据分析", "机器学习", "深度学习", "人工智能", "云计算","分布式系统", "区块链技术", "网络安全", "物联网应用", "Python编程", "Java开发","C++语言", "JavaScript框架", "React Native移动开发", "Vue.js前端开发","Docker容器化", "Kubernetes集群管理", "Git版本控制", "Agile敏捷开发","DevOps文化", "持续集成与部署", "性能优化技巧", "数据库设计原则","算法与数据结构", "操作系统原理", "计算机网络基础", "软件工程实践","项目管理技能", "团队协作精神", "创新思维培养", "职业发展规划"]for i in range(100):data_container.append(f"Random number: {random.randint(0, 1000)}")data_container.append(f"Special chars: !@#$%^&*()_+{i}")data_container.extend(chinese_data)return data_containerdef accept_connections(self):try:while True:conn, addr = self.server.accept()logger.info(f"Connected by {addr}")threading.Thread(target=self.send_data, args=(conn, addr), daemon=True).start()except Exception as e:logger.error(f"An error occurred while accepting connections: {e}")def start(self):try:self.accept_connections()except KeyboardInterrupt:print("\nShutting down the server.")finally:self.server.close()logger.info("Server socket closed.")if __name__ == "__main__":log_directory = "logs/sparkstreaming"log_filename = "DataSourceSocket.log"log_level = logging.DEBUG # 可以根据需要调整日志级别# 初始化日志设置logger = setup_logging(log_directory, log_filename, log_level)# 启动数据源套接字服务data_source_socket = DataSourceSocket()data_source_socket.start()
本代码实现了一个基于Python的TCP服务器,其主要功能和特点如下:
- 服务器初始化:
- 通过
DataSourceSocket
类创建一个TCP服务器实例。 - 服务器默认绑定到
localhost
的9999
端口上,但可以通过构造函数参数指定不同的主机和端口。 - 服务器设置监听队列长度为5,意味着最多可以有5个未处理的连接请求等待处理。
- 通过
- 数据准备:
- 在
_prepare_data
方法中,服务器准备了一个包含中英文短语和随机数字及特殊字符的列表作为数据源。 - 数据源包括中文语句(如"你好,世界")、英文技术术语(如"I like Spark")、随机生成的数字(如"Random number: 42")和包含特殊字符的字符串(如"Special chars: !@#$%^&*()_+0")。
- 在
- 接受连接:
- 服务器通过
accept_connections
方法不断接受客户端的连接请求。 - 一旦有新的连接建立,服务器会记录连接的客户端地址,并启动一个新的线程来处理这个连接的数据发送。
- 服务器通过
- 数据发送:
- 对于每个已连接的客户端,服务器会随机选择数据源中的一项,将其编码为UTF-8格式后发送给客户端。
- 每条消息发送后会等待0.1秒,然后发送下一条消息,形成一个连续的数据流。
- 如果客户端断开连接或出现其他异常,服务器会记录错误信息并关闭与该客户端的连接。
- 日志记录:
- 服务器使用自定义的日志系统(通过
src.utils.myLogger
模块中的setup_logging
函数初始化)来记录服务器运行过程中的关键信息,如启动、连接建立、数据发送、错误和连接关闭等。 - 日志文件默认存储在
logs/sparkstreaming
目录下的DataSourceSocket.log
文件中,日志级别可以根据需要调整。
- 服务器使用自定义的日志系统(通过
- 服务器启动与关闭:
- 服务器通过调用
start
方法来启动,该方法会不断接受新的连接直到接收到键盘中断信号(如Ctrl+C)。 - 收到中断信号后,服务器会记录一条关闭信息,关闭服务器套接字,并结束程序。
- 服务器通过调用
2. 开发自定义日志管理模块myLogger.py,记录程序日志
代码功能:定义了一个名为 setup_logging
的函数,用于配置日志记录系统,并将日志同时输出到控制台和文件中。此外,它还支持日志文件的滚动,即当日志文件达到一定大小时,会创建一个新的日志文件来继续记录,同时保留一定数量的旧日志文件作为备份。
代码如下
# coding=utf8
import os
import logging
from logging.handlers import RotatingFileHandlerdef setup_logging(log_dir, log_filename, log_level=None):"""设置日志记录,同时输出到控制台和文件。如果日志目录不存在则创建,支持日志滚动,日志文件名自定义。参数:log_dir (str): 日志文件所在的目录。log_filename (str): 日志文件的名称(不包括路径)。log_level (int or str, optional): 日志级别,默认为logging.INFO。"""logger = logging.getLogger()# 如果已经存在处理器,则不再添加,避免重复日志if not logger.handlers:# 设置日志级别if log_level is not None:logger.setLevel(log_level)else:logger.setLevel(logging.INFO) # 设置最低日志级别# 创建格式化器formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')# 创建控制台处理器console_handler = logging.StreamHandler()console_handler.setFormatter(formatter)logger.addHandler(console_handler)# 检查并创建日志目录(如果不存在)if not os.path.exists(log_dir):os.makedirs(log_dir)# 构造日志文件路径log_file_path = os.path.join(log_dir, log_filename)# 创建文件处理器,支持日志滚动file_handler = RotatingFileHandler(log_file_path,maxBytes=1024 * 1024 * 1024, # 每个日志文件的最大字节数为1GBbackupCount=5, # 最多保留5个备份日志文件encoding='utf-8',delay=False # 确保日志即时写入)file_handler.setFormatter(formatter)logger.addHandler(file_handler)logging.info(f"Logging initialized with log file at {log_file_path}")return logger# 示例用法
if __name__ == "__main__":log_directory = "logs/logdir" # 自定义日志目录log_file_name = "app.log" # 自定义日志文件名log_level = logging.DEBUG # 自定义日志级别# 初始化日志设置,并获取日志记录器logger = setup_logging(log_directory, log_file_name, log_level)# 使用日志记录器记录日志信息logger.info("This is an info message.")logger.error("This is an error message.")logger.debug("This is a debug message.")
以下是代码功能的详细总结:
- 日志配置:
setup_logging
函数接收三个参数:日志目录log_dir
、日志文件名log_filename
和可选的日志级别log_level
。- 如果未指定
log_level
,则默认使用logging.INFO
级别。
- 日志级别设置:
- 根据传入的
log_level
参数(如果提供)或默认值,设置日志记录器的日志级别。
- 根据传入的
- 日志格式化:
- 创建一个
Formatter
对象,用于定义日志消息的格式。格式为时间戳、日志级别和日志消息。
- 创建一个
- 控制台日志处理器:
- 创建一个
StreamHandler
对象,用于将日志消息输出到控制台。 - 将格式化器应用到控制台处理器上。
- 将控制台处理器添加到日志记录器上。
- 创建一个
- 文件日志处理器:
- 检查并创建日志目录(如果不存在)。
- 构造日志文件的完整路径。
- 创建一个
RotatingFileHandler
对象,用于将日志消息输出到文件,并支持日志滚动。- 设置每个日志文件的最大大小为 1GB。
- 设置最多保留 5 个备份日志文件。
- 设置日志文件编码为 UTF-8。
- 确保日志即时写入文件(
delay=False
)。
- 将格式化器应用到文件处理器上。
- 将文件处理器添加到日志记录器上。
- 日志初始化确认:
- 记录一条信息日志,确认日志系统已初始化,并显示日志文件的路径。
- 示例用法:
- 在脚本的
__main__
部分,提供示例用法,展示如何调用setup_logging
函数来初始化日志设置,并使用返回的日志记录器记录不同级别的日志消息。
- 在脚本的
3. 开发数据库连接池模块DBUtils.py,高效与关系型数据库MySQL交互访问
代码功能:定义了一个名为 SimpleDBPool
的数据库连接池类,旨在管理数据库连接的创建、获取、释放和关闭,以提高数据库操作的效率和性能。
代码如下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
==================================================================Creation Date: 2024/12/7$ 21:34$Author: John <pblh123@126.com>Remarks: $创建时间: 2024/12/7$ 21:34$作 者: 李先生 <pblh123@126.com>备 注: $
=================================================================="""import pymysql
import threading
from queue import Queue, Full, Empty
import logging"""
定义数据库数据池用法
"""class SimpleDBPool:def __init__(self, creator, maxconnections=5, mincached=2, maxcached=None, maxshared=0, blocking=False, **db_config):"""初始化数据库连接池。参数:creator: 数据库连接创建者,如pymysql.connect。maxconnections (int): 连接池中最大连接数,默认为5。mincached (int): 初始化时至少创建的空闲连接数,默认为2。maxcached (int or None): 链接池中最多闲置的连接,默认为None(无限制)。maxshared (int): 链接池中最多共享的连接数量,默认为0(不支持共享连接)。blocking (bool): 如果没有可用连接,是否阻塞等待,默认为False。db_config (dict): 数据库配置参数。"""self.creator = creatorself.maxconnections = maxconnectionsself.mincached = mincachedself.maxcached = maxcached if maxcached is not None else maxconnectionsself.maxshared = maxsharedself.blocking = blockingself.db_config = db_configself._lock = threading.Lock()self._idle_queue = Queue(maxsize=self.maxcached)self._active_connections = set()# 初始化最小空闲连接数for _ in range(mincached):try:conn = self._create_connection()self._idle_queue.put(conn)except Exception as e:logging.error(f"Failed to create initial connections: {e}")breakdef _create_connection(self):"""创建一个新的数据库连接"""return self.creator(**self.db_config)def get_connection(self):"""从连接池获取一个数据库连接"""conn = Noneacquired = Falsewhile not acquired:try:conn = self._idle_queue.get_nowait()acquired = Trueexcept Empty:if len(self._active_connections) < self.maxconnections:conn = self._create_connection()acquired = Trueelif self.blocking:try:conn = self._idle_queue.get(timeout=1) # 等待直到有空闲连接或超时acquired = Trueexcept Empty:passelse:raise Exception("No available database connections.")with self._lock:self._active_connections.add(conn)return conndef release_connection(self, conn):"""释放一个数据库连接回连接池"""with self._lock:if conn in self._active_connections:self._active_connections.remove(conn)try:if self._idle_queue.qsize() < self.maxcached:self._idle_queue.put_nowait(conn)else:conn.close() # 如果队列已满,则关闭连接except Full:conn.close()def close_all_connections(self):"""关闭所有连接"""while not self._idle_queue.empty():conn = self._idle_queue.get_nowait()conn.close()for conn in list(self._active_connections):conn.close()with self._lock:self._active_connections.remove(conn)# 使用示例
if __name__ == "__main__":# 示例数据库配置db_config = {'host': 'localhost','port': 3306,'user': '你的数据库账号','password': '你的数据库密码','db': 'mydb','charset': 'utf8mb4'}# 创建日志记录器logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)# 创建数据库连接池db_pool = SimpleDBPool(pymysql.connect , **db_config)try:# 获取连接conn = db_pool.get_connection()cursor = conn.cursor()# 执行查询(这里只是一个示例)cursor.execute("SELECT VERSION()")result = cursor.fetchone()print(f"Database version: {result}")# 关闭游标cursor.close()# 释放连接db_pool.release_connection(conn)except Exception as e:logger.error(f"Error occurred: {e}")finally:# 关闭所有连接(在实际应用中通常不需要这样做)db_pool.close_all_connections()
以下是代码功能的详细总结:
- 类初始化 (
__init__
方法):- 接收多个参数来配置连接池,包括数据库连接创建者(如
pymysql.connect
)、最大连接数、最小空闲连接数、最大空闲连接数、最大共享连接数(本例中不支持共享连接,因此默认为0)、是否阻塞等待可用连接,以及数据库配置参数。 - 初始化内部数据结构,包括一个用于存储空闲连接的队列(
_idle_queue
)和一个用于跟踪活动连接的集合(_active_connections
)。 - 根据最小空闲连接数的要求,创建并初始化一定数量的空闲连接。
- 接收多个参数来配置连接池,包括数据库连接创建者(如
- 创建连接 (
_create_connection
方法):- 使用提供的数据库配置参数创建一个新的数据库连接。
- 获取连接 (
get_connection
方法):- 尝试从空闲连接队列中获取一个连接。
- 如果队列为空且未达到最大连接数,则创建一个新的连接。
- 如果达到最大连接数且设置为阻塞等待,则尝试等待直到有空闲连接或超时。
- 如果以上条件都不满足,则抛出异常表示没有可用的数据库连接。
- 将获取到的连接标记为活动连接。
- 释放连接 (
release_connection
方法):- 将一个活动连接释放回连接池。
- 如果空闲连接队列未满,则将连接放入队列中;否则,关闭连接。
- 关闭所有连接 (
close_all_connections
方法):- 关闭所有空闲和活动连接,并清空内部数据结构。
- 使用示例:
- 配置数据库连接参数。
- 创建日志记录器。
- 创建
SimpleDBPool
实例。 - 从连接池中获取连接,执行查询操作,并打印数据库版本。
- 关闭游标和释放连接。
- 在
finally
块中关闭所有连接(注意:在实际应用中,通常不需要在每次操作后都关闭所有连接,这里只是为了演示如何关闭)。
通过该代码,开发者可以轻松地创建和管理一个数据库连接池,从而优化数据库操作的资源使用和性能。连接池通过重用现有的数据库连接,减少了连接和断开连接的开销,提高了应用程序的响应速度和吞吐量。
4. 开发中英文混合数据流分词模块TextProcessor.py,实现字节流过滤及中英文分词
代码功能:提供了一个基于Python和Spark的文本处理工具,支持中英文分词、停用词过滤、清理文本,移除非字母数字字符、汉字、英文标点符号以外的特殊字符。
功能,适用于文本挖掘、自然语言处理等应用场景。
代码如下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
==================================================================Creation Date: 2024/12/7$ 21:34$Author: John <pblh123@126.com>Remarks: $创建时间: 2024/12/7$ 21:34$作 者: 李先生 <pblh123@126.com>备 注: $
=================================================================="""import configparser
import os
import jieba
import re
from functools import lru_cache
from pyspark import SparkContext
import sys# 提前编译正则表达式以提高性能
# 编译一个正则表达式模式,用于匹配中文字符
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')# 编译一个正则表达式模式,用于匹配英文字符、数字和下划线
english_pattern = re.compile(r'[a-zA-Z0-9_]+')class TextProcessor:def __init__(self, stopwords_path=None, spark_context=None):"""初始化 TextProcessor 实例并加载停用词列表。参数:stopwords_path (str): 停用词文件或文件夹的路径。spark_context (SparkContext): 可选参数,如果提供了 SparkContext,则会广播停用词列表。"""self.stopwords = self.load_stopwords(stopwords_path)if spark_context and self.stopwords:self.broadcast_stopwords = spark_context.broadcast(self.stopwords)else:self.broadcast_stopwords = Nonedef clean_text(self, text):"""清理文本,移除非字母数字字符、汉字、英文标点符号以外的特殊字符。参数:text (str): 要清理的原始文本字符串。返回:str: 清理后的文本字符串。"""if not isinstance(text, str):raise TypeError("Input must be a string")try:text = self.remove_html_tags(text)text = self.remove_urls(text)text = self.remove_non_alphanumeric(text)except re.error as e:print(f"正则表达式错误: {e}", file=sys.stderr)except Exception as e:print(f"发生未知错误: {e}", file=sys.stderr)return text.strip()@staticmethoddef remove_html_tags(text):return re.sub(r'<[^>]+>', '', text)@staticmethoddef remove_urls(text):return re.sub(r'http[s]?://\S+', '', text)@staticmethoddef remove_non_alphanumeric(text):general_pattern = re.compile(r'[^\w\s\u4e00-\u9fff]', flags=re.UNICODE)return general_pattern.sub('', text)@staticmethod@lru_cache(maxsize=100)def load_stopwords(file_path):"""从指定文件或文件夹中加载停用词列表。参数:file_path (str): 停用词文件或文件夹的路径。返回:frozenset: 包含停用词的集合。"""stopwords = set()try:if os.path.isfile(file_path):with open(file_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)elif os.path.isdir(file_path):for filename in os.listdir(file_path):file_full_path = os.path.join(file_path, filename)if os.path.isfile(file_full_path):with open(file_full_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)else:raise ValueError(f"The path {file_path} is neither a file nor a directory.")except Exception as e:print(f"An unexpected error occurred: {e}", file=sys.stderr)return frozenset(stopwords) # 返回不可变集合def split_words(self, text):"""对给定文本进行中英文分词,去除停用词,并返回分词结果。参数:text (str): 要分词的文本字符串。返回:list: 分词后并过滤掉停用词的单词列表。"""stopwords = self.broadcast_stopwords.value if self.broadcast_stopwords else self.stopwordsclean_text_str = self.clean_text(text)chinese_segments = chinese_pattern.findall(clean_text_str)english_segments = english_pattern.findall(clean_text_str)chinese_words = [word for segment in chinese_segmentsfor word in jieba.lcut(segment)if word not in stopwords]english_words = [word for word in english_segmentsif word not in stopwords]combined_words = chinese_words + english_wordsreturn combined_wordsdef read_config(config_file):config = configparser.ConfigParser()config.read(config_file)return config# 示例使用:
if __name__ == "__main__":# 读取配置文件config_file = '/home/pblh123/PycharmProjects/pyspark2024/src/utils/configparse.ini'config = read_config(config_file)# 设置环境变量os.environ['JAVA_HOME'] = config.get('Environment', 'JAVA_HOME')os.environ['SPARK_HOME'] = config.get('Environment', 'SPARK_HOME')sc = SparkContext(appName="TextProcessorExample")processor = TextProcessor(stopwords_path='/home/pblh123/PycharmProjects/pyspark2024/datas/stopwords', spark_context=sc)sample_text = """这是一个测试文本,它包括了英文 words 和 中文词语。2024-12-07 16:39:27,380 - INFO - Sending to ('127.0.0.1', 39848): Special chars: !@#$%^&*()_+132024-12-07 16:39:27,682 - INFO - Sending to ('127.0.0.1', 39848): Random number: 948"""result = processor.split_words(sample_text)print(result)
该代码定义了一个文本处理工具,主要功能如下:
-
配置读取:通过
configparser
模块读取配置文件,用于获取环境变量(如JAVA_HOME
和SPARK_HOME
)的设置。 -
环境设置:根据读取的配置文件,设置
JAVA_HOME
和SPARK_HOME
环境变量,这对于运行Apache Spark是必要的。 -
文本处理:定义了一个
TextProcessor
类,用于处理文本数据。主要功能包括:- 加载停用词:从指定文件或文件夹中加载停用词列表,支持单个文件或多个文件在一个文件夹中。加载的停用词存储在
frozenset
中,以提高性能和保证不可变性。 - 清理文本:移除HTML标签、URL、非字母数字字符(除了汉字和英文标点符号以外的特殊字符)等,以净化文本数据。
- 中英文分词:利用正则表达式和
jieba
分词库,分别对中文和英文进行分词处理。 - 去除停用词:在分词结果中去除之前加载的停用词。
- 支持Spark:如果提供了
SparkContext
,则可以将停用词列表广播到Spark集群的所有节点上,以便在分布式处理中使用。
- 加载停用词:从指定文件或文件夹中加载停用词列表,支持单个文件或多个文件在一个文件夹中。加载的停用词存储在
-
正则表达式优化:通过提前编译正则表达式模式(用于匹配中文字符和英文字符、数字及下划线),提高文本处理的性能。
-
缓存优化:利用
functools.lru_cache
装饰器对load_stopwords
方法进行缓存,减少重复加载停用词列表的开销,特别是当多次实例化TextProcessor
时。 -
示例使用:在
__main__
块中,通过读取配置文件、设置环境变量、初始化SparkContext
和TextProcessor
实例,对一个示例文本进行分词处理,并打印结果。
总的来说,这段代码提供了一个基于Python和Spark的文本处理工具,支持中英文分词、停用词过滤、文本清理等功能,适用于文本挖掘、自然语言处理等应用场景。
5. 开发SocketWordCountSFDBPool.py实现基于pyspark streaming的实时数据分词统计并将结果保存到数据库中
代码功能:代码是一个完整的实时文本处理系统,涵盖了从配置读取、Spark Streaming初始化、数据处理到数据库存储的整个过程。通过使用Spark Streaming进行实时数据处理,结合数据库连接池和批处理技术,实现了高效、可靠的文本流处理系统。此外,代码还包含了详细的错误处理和日志记录机制,有助于系统的维护和故障排查
代码如下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
==================================================================File Name: SocketWordCountSFDBPool.py$Creation Date: 2024/12/7$ 22:14$Author: John <pblh123@126.com>Remarks: $文件名称: SocketWordCountSFDBPool.py$创建时间: 2024/12/7$ 22:14$作 者: 李先生 <pblh123@126.com>备 注: $
=================================================================="""from pyspark.streaming import StreamingContext
from src.utils.DBUtils import SimpleDBPool # 使用数据库连接池
from src.utils.TextProcessor import TextProcessor
from src.utils.myLogger import setup_logging
from pyspark import SparkContextimport os
import sys
import configparser
import pymysql
from pymysql.constants import ER
import time# 读取配置文件
def read_config(config_file):"""读取配置文件:param config_file: 配置文件路径:return: 配置对象"""config = configparser.ConfigParser()config.read(config_file)return config# 获取数据库配置
def get_db_config(config):"""获取数据库配置:param config: 配置对象:return: 数据库配置字典"""db_section = 'Database'return {'host': config.get(db_section, 'DB_HOST'),'port': int(config.get(db_section, 'DB_PORT')),'user': config.get(db_section, 'DB_USER'),'password': config.get(db_section, 'DB_PASSWORD'),'db': config.get(db_section, 'DB_NAME')}# 创建数据库连接池
def create_db_pool(db_config):"""创建数据库连接池:param db_config: 数据库配置字典:return: 数据库连接池对象"""pool = SimpleDBPool(creator=pymysql.connect,**db_config)return pool# 主要的Spark Streaming处理函数
def sparkstreamingnetworkcount(logger, config):"""主要的Spark Streaming处理函数:param logger: 日志记录器:param config: 配置对象"""global sc, ssc, lines# 设置环境变量os.environ['JAVA_HOME'] = config.get('Environment', 'JAVA_HOME')os.environ['SPARK_HOME'] = config.get('Environment', 'SPARK_HOME')# 获取当前文件名并去掉扩展名file_name = os.path.splitext(os.path.basename(__file__))[0]sc = SparkContext(appName=file_name)ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))# 创建 TextProcessor 实例并广播停用词列表processor = TextProcessor(stopwords_path='/home/pblh123/PycharmProjects/pyspark2024/datas/stopwords',spark_context=sc)broadcast_stopwords = sc.broadcast(processor.stopwords) # 只广播停用词列表# 定义处理每个分区的函数def process_partition(iter):stopwords = broadcast_stopwords.value # 获取广播的停用词列表local_processor = TextProcessor() # 创建一个没有参数的 TextProcessor 实例local_processor.stopwords = stopwords # 手动设置停用词属性for record in iter:words = local_processor.split_words(record) # 使用 TextProcessor 进行分词和过滤for word in words:yield (word, 1)# 定义更新函数 updateFunc,用于维护状态def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)# 设置检查点目录ssc.checkpoint(sys.argv[3])# 对接收到的每一行进行处理counts = lines.flatMap(lambda line: process_partition([line])) \.reduceByKey(lambda x, y: x + y) \.updateStateByKey(updateFunc)# 修改后的 dbfunc 函数def dbfunc(records):"""将词频记录插入到数据库中,如果单词已存在,则更新计数。参数:- records: 词频记录的列表,每个记录是一个包含(word, count)的元组。"""# 在函数内部创建数据库连接池或直接创建连接db_config = get_db_config(config)pool = create_db_pool(db_config)# SQL语句用于插入或更新词频记录sql = "INSERT INTO wordcount (word, count) VALUES (%s, %s) ON DUPLICATE KEY UPDATE count=count+VALUES(count)"# 批处理大小、最大重试次数和重试延迟batch_size = 50max_retries = 5retry_delay = 2 # seconds# 尝试多次执行数据库操作,以处理潜在的死锁或重复条目问题for attempt in range(max_retries):conn = Nonecursor = Nonetry:# 使用 get_connection 方法代替 connection 方法conn = pool.get_connection()cursor = conn.cursor()batch = []# 遍历记录,构建批处理列表,并执行批处理插入/更新操作for item in records:if isinstance(item, tuple) and len(item) == 2:batch.append(item)if len(batch) >= batch_size:execute_batch(cursor, sql, batch)batch = []if batch:execute_batch(cursor, sql, batch)conn.commit()breakexcept pymysql.err.OperationalError as e:# 处理特定的数据库操作错误,如重复条目或死锁if e.args[0] in [ER.DUP_ENTRY, ER.LOCK_DEADLOCK]:logger.warning(f"Attempt {attempt + 1}: Detected deadlock or duplicate entry, rolling back and retrying in {retry_delay} seconds...")if conn:conn.rollback()time.sleep(retry_delay)else:logger.error(f"Database error: {e}")raise eexcept pymysql.err.InterfaceError as e:# 处理接口错误,如连接丢失logger.error(f"Interface error: {e}, attempting to reconnect...")time.sleep(retry_delay)except Exception as e:# 处理其他异常logger.error(f"Unexpected error: {e}")raise efinally:# 确保关闭游标和释放数据库连接if cursor:cursor.close()if conn:pool.release_connection(conn) # 确保释放连接回连接池def execute_batch(cursor, sql, batch):"""执行数据库批处理操作。参数:- cursor: 数据库游标。- sql: 要执行的SQL语句。- batch: 记录的批处理列表。"""try:cursor.executemany(sql, batch)except pymysql.err.OperationalError as e:# 在批处理执行期间处理操作错误,特别是死锁情况if e.args[0] == ER.LOCK_DEADLOCK:logger.warning("Deadlock detected during batch execution, skipping this batch.")returnraise edef func(rdd):"""处理每个分区的词频记录,并调用dbfunc函数将记录插入数据库。参数:- time: 时间戳,表示当前的处理时间。- rdd: 包含词频记录的RDD。"""# 重新分区RDD以优化处理,并对每个分区应用dbfunc函数repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(dbfunc)# 对counts DStream的每个RDD应用func函数,并打印结果counts.foreachRDD(func)counts.pprint()# 启动StreamingContext并等待终止ssc.start()ssc.awaitTermination()if __name__ == "__main__":# 确保命令行参数数量正确if len(sys.argv) != 4:print("Usage: networkcount.py <hostname> <port> <checkpoint>", file=sys.stderr)exit(-1)# 读取配置文件config_file = '/home/pblh123/PycharmProjects/pyspark2024/src/utils/configparse.ini'config = read_config(config_file)# 设置日志log_dir = config.get('Logging', 'LOG_DIR')log_filename = "socketwcdb.log"logger = setup_logging(log_dir, log_filename)# 调用主函数开始处理sparkstreamingnetworkcount(logger, config)
代码总结
该代码是一个使用PySpark Streaming处理网络文本流数据,并将处理结果存储到数据库中的程序。以下是详细的总结:
1. 依赖和配置读取
- 依赖库:代码使用了
pyspark.streaming
、pymysql
等库,分别用于实时数据处理和数据库操作。 - 配置读取:通过
configparser
库读取配置文件(如数据库连接信息、环境变量等)。
2. 配置和初始化
- 数据库配置:通过
get_db_config
函数从配置文件中提取数据库连接信息。 - 数据库连接池:使用自定义的
SimpleDBPool
类(可能是基于pymysql
的封装)创建数据库连接池,以优化数据库连接管理。 - 日志设置:通过
setup_logging
函数设置日志记录器,用于记录程序运行过程中的信息。
3. Spark Streaming 初始化
- 环境变量:设置
JAVA_HOME
和SPARK_HOME
环境变量。 - SparkContext和StreamingContext:初始化
SparkContext
和StreamingContext
,分别用于Spark的上下文管理和实时流处理。 - 网络文本流:通过
ssc.socketTextStream
从指定的主机和端口接收文本流数据。
4. 数据处理
- 文本处理:使用
TextProcessor
类(自定义)进行分词和停用词过滤。停用词列表通过Spark的广播变量机制进行分发,以减少内存开销。 - 词频统计:对处理后的文本进行词频统计,使用
flatMap
、reduceByKey
和updateStateByKey
等Spark Streaming的转换操作。 - 检查点:设置检查点目录,用于容错和状态恢复。
5. 数据库操作
- 词频记录存储:定义
dbfunc
函数,将词频记录批量插入或更新到数据库中。该函数使用数据库连接池,并包含重试机制以处理潜在的死锁或重复条目问题。 - 批处理:为了提高效率,
dbfunc
函数内部实现了批处理逻辑,将记录分组后批量插入数据库。 - 错误处理:对数据库操作中的各种异常(如死锁、连接丢失等)进行了详细的错误处理和日志记录。
6. 主函数和程序入口
- 命令行参数:检查命令行参数数量,确保提供了必要的主机名、端口和检查点目录。
- 程序启动:读取配置文件,设置日志,调用主函数
sparkstreamingnetworkcount
开始处理网络文本流数据。
实验验证
运行datasourcesocket.py
效果如下,需要启动SocketWordCountSFDBPool.py,才能看到实时发送的消息
运行myLogger.py
运行DBUtils.py
运行TextProcessor.py
运行SocketWordCountSFDBPool.py
1. 在数据库进行目标表建立
2. 配置SocketWordCountSFDBPool.py运行参数后在运行
1. 在数据库进行目标表建立
进入你的MySQL数据库
相关数据库建表语句如下:
2. 配置SocketWordCountSFDBPool.py运行参数后在运行
参数配置前
配置参数
找到checkpoint文件夹地址 选中文件夹-》右键-》copy path/reference-》absolute path
/home/pblh123/PycharmProjects/pyspark2024/datas/checkpoint
直接查看因为client客户端设置中文乱码,换一个客户检查就可以看到中文统计结果
欢迎关注、您的支持与回复,是我继续开发更多技术文档分享的动力。欢迎扫描关注: