当前位置: 首页 > news >正文

计算机网络socket编程(5)_TCP网络编程实现echo_server

个人主页:C++忠实粉丝
欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C++忠实粉丝 原创

计算机网络socket编程(5)_TCP网络编程实现echo_server

收录于专栏【计算机网络】
本专栏旨在分享学习计算机网络的一点学习笔记,欢迎大家在评论区交流讨论💌 
 

目录

功能介绍  

InetAddr.hpp

LockGuard.hpp

Log.hpp

Thread.hpp

ThreadPool.hpp

TcpServer.hpp

TcpServerMain.cc

TcpClientMain.cc

效果展示  


功能介绍  

和上回 UDP 网络编程一样, 实现简单的 echo_server, 不过, 这里我们 TCP 网络编程使用了 多线程, 不过大体都差不多~ 

还有就是网络编程代码真的是又多又杂, 有的时候我自己都烦, 没办法网络部分就是这样的, 我最近会尽快更完这个 socket 编程, 提早进入概念部分, 一直编程感觉少了什么~ 还得跟概念结合起来看, 感兴趣的宝子们不要忘记了点赞关注哦! 我现在在网络部分真的待不了一点, 希望我能尽快挣脱网络, 更新数据库 MySQL 的东西吧!  

InetAddr.hpp

这个类封装了 sockaddr_in 结构体,用于简化对 IP 地址和端口的处理。其核心功能是将网络字节序的 sockaddr_in 地址转换为易于操作的主机字节序的 IP 地址字符串和端口号,并提供相关的成员函数来获取这些信息。 

#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>class InetAddr
{
private:void ToHost(const struct sockaddr_in &addr){_port = ntohs(addr.sin_port);// _ip = inet_ntoa(addr.sin_addr);char ip_buf[32];// inet_p to n// p: process// n: net// inet_pton(int af, const char *src, void *dst);// inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr);::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));_ip = ip_buf;}public:InetAddr(const struct sockaddr_in &addr):_addr(addr){ToHost(addr);}InetAddr(){}bool operator == (const InetAddr &addr){return (this->_ip == addr._ip && this->_port == addr._port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}struct sockaddr_in Addr(){return _addr;}std::string AddrStr(){return _ip + ":" + std::to_string(_port);}~InetAddr(){}private:std::string _ip;uint16_t _port;struct sockaddr_in _addr;
};

私有成员变量

_ip:存储 IP 地址的字符串(如 "192.168.0.1")。

_port:存储端口号。

_addr:存储一个 sockaddr_in 结构体,用于保存 IP 地址和端口。

私有成员函数 ToHost

ToHost 函数的作用是将一个 sockaddr_in 地址结构转换为 InetAddr 类的成员 _ip 和 _port。

ntohs(addr.sin_port):将网络字节顺序的端口号(从 sockaddr_in 中获取)转换为主机字节顺序。网络字节顺序是大端模式,而主机字节顺序通常取决于平台。

inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf)):将 sockaddr_in 中的 IP 地址(以二进制形式存储)转换为点分十进制字符串表示(如 "192.168.0.1")。

这里 inet_ntopntohs 用于处理网络字节序和主机字节序的转换,确保 IP 地址和端口在不同环境下的正确性。

构造函数 InetAddr(const struct sockaddr_in &addr)

该构造函数接受一个 sockaddr_in 类型的参数 addr,并调用 ToHost 方法将其转换为 InetAddr 类内部的 _ip 和 _port。

_addr(addr):将 sockaddr_in 结构体存储在 _addr 中。

默认构造函数 InetAddr()

默认构造函数没有做任何事情。它用于创建一个空的 InetAddr 对象

运算符重载 ==

重载了 == 运算符,用于比较两个 InetAddr 对象是否相等。它通过比较 ip 和 port 字段来判断是否相同。

成员函数 Ip

返回当前 InetAddr 对象的 IP 地址。

成员函数 Port

返回当前 InetAddr 对象的端口号。

成员函数 Addr

返回存储的 sockaddr_in 结构体。sockaddr_in 包含了完整的 IP 地址和端口信息。

成员函数 AddrStr

返回一个格式化的字符串,表示 IP 地址和端口,格式为 "ip:port"(例如 "192.168.0.1:8080")。

LockGuard.hpp

#pragma once#include <pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t *_mutex;
};

构造函数 LockGuard(pthread_mutex_t *mutex)

构造函数接收一个 pthread_mutex_t* 类型的指针作为参数,并在构造时通过 pthread_mutex_lock 来锁定该互斥量。

mutex 参数是指向一个 pthread_mutex_t 类型的指针,这个互斥量将用来保护临界区。

_mutex(mutex) 是初始化成员变量 _mutex 的成员初始化列表,它将构造函数的参数 mutex 的值赋给 _mutex 成员变量。

pthread_mutex_lock(_mutex) 调用会尝试锁定互斥量 _mutex。如果互斥量已经被其他线程锁定,当前线程会被阻塞,直到该互斥量变为可用。

析构函数 ~LockGuard()

析构函数负责在 LockGuard 对象生命周期结束时自动解锁互斥量。

当 LockGuard 对象的作用域结束时,析构函数会自动被调用。

pthread_mutex_unlock(_mutex) 会释放锁,即解锁互斥量。这样可以确保即使在发生异常或提前返回的情况下,互斥量也能被正确解锁,从而避免死锁。

成员变量 _mutex

_mutex 是一个指向 pthread_mutex_t 类型的指针,它保存了传递给构造函数的互斥量地址。这个指针将用于在构造和析构中对互斥量进行锁定和解锁操作。

关键特点:

自动锁定:当 LockGuard 对象被创建时,构造函数自动锁定互斥量。

自动解锁:当 LockGuard 对象超出作用域时,析构函数自动解锁互斥量。

简化代码:使用 LockGuard 类可以避免手动调用 pthread_mutex_lock 和 pthread_mutex_unlock,并且保证解锁操作一定会发生。

Log.hpp

#pragma once#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"namespace log_ns
{enum{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string LevelToString(int level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;};#define SCREEN_TYPE 1
#define FILE_TYPE 2const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );class Log{public:Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt, strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){// 加过滤逻辑 --- TODOLockGuard lockguard(&glock);switch (_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = GetCurrTime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;// 打印出来日志FlushLog(lg);}~Log(){}private:int _type;std::string _logfile;};Log lg;#define LOG(Level, Format, ...)                                        \do                                                                 \{                                                                  \lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \} while (0)
#define EnableScreen()          \do                          \{                           \lg.Enable(SCREEN_TYPE); \} while (0)
#define EnableFILE()          \do                        \{                         \lg.Enable(FILE_TYPE); \} while (0)
};

日志系统, 我们的老演员了, 这里就不再多介绍了~~

Thread.hpp

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>namespace ThreadMoudle
{// 线程要执行的方法,后面我们随时调整// typedef void (*func_t)(ThreadData *td); // 函数指针类型// typedef std::function<void()> func_t;using func_t = std::function<void(const std::string&)>;class Thread{public:void Excute(){_isrunning = true;_func(_name);_isrunning = false;}public:Thread(const std::string &name, func_t func):_name(name), _func(func){}static void *ThreadRoutine(void *args) // 新线程都会执行该方法!{Thread *self = static_cast<Thread*>(args); // 获得了当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if(n != 0) return false;return true;}std::string Status(){if(_isrunning) return "running";else return "sleep";}void Stop(){if(_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join(){::pthread_join(_tid, nullptr);}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数};
} // namespace ThreadModle

线程函数类型 func_t

using func_t = std::function<void(const std::string&)>;

这里使用了 std::function 来定义线程要执行的函数类型。func_t 是一个函数对象类型,它表示接受一个 std::string 类型参数并返回 void 的函数。使用 std::function 的好处是,它可以适配普通函数、lambda 表达式以及成员函数等,使得线程任务的定义更加灵活。

Thread 类

Thread 类是该代码的核心,封装了 POSIX 线程的管理操作,包括线程的创建、执行、停止、等待和状态查询。

成员变量

_name: 线程的名称,用于标识线程。

_tid: 线程标识符 (pthread_t 类型),用于标识线程。

_isrunning: 布尔变量,表示线程是否正在运行。

_func: 线程执行的任务(即回调函数),使用 func_t 类型存储。

构造函数

Thread 类的构造函数接收线程名称 (name) 和线程任务 (func) 作为参数,并初始化相关成员变量。_isrunning 被初始化为 false,表示线程在创建时默认处于非运行状态。

线程执行方法 Excute

Excute 方法是线程执行的主体部分,它首先将 _isrunning 设置为 true,然后执行通过构造函数传入的任务函数 _func。执行完后,将 _isrunning 设置为 false,表示线程已经结束执行。

线程例程 ThreadRoutine

ThreadRoutine 是一个静态方法,它会作为线程的入口函数。当创建线程时,系统会调用这个函数。

在 ThreadRoutine 中,首先通过 static_cast 将传入的 void* 类型的参数转换为 Thread* 类型,这样我们就可以访问到线程的成员变量和方法。

然后调用 self->Excute(),即执行线程实际的工作。

启动线程 Start

Start 方法通过 pthread_create 创建一个新的线程。pthread_create 会接受线程标识符 _tid、线程属性(这里是 nullptr,即默认属性)、线程入口函数(这里是 ThreadRoutine)以及线程传递的参数(这里是 this,即当前对象)。

如果线程创建成功,返回 true;否则返回 false。

查询线程状态 Status

Status 方法返回当前线程的状态。如果线程正在执行(_isrunning == true),返回 "running",否则返回 "sleep"。

停止线程 Stop

Stop 方法用于停止正在运行的线程。它调用 pthread_cancel 来请求终止指定线程 _tid。然后将 _isrunning 设置为 false,表示线程已停止。

等待线程完成 Join

Join 方法用于等待线程执行完毕。它通过 pthread_join 阻塞当前线程,直到指定线程 _tid 执行完毕。

获取线程名称 Name

Name 方法返回线程的名称。

ThreadPool.hpp

这段代码实现了一个 线程池 模式,提供了多线程处理任务的能力,能够管理多个线程执行任务,并且通过线程池单例模式(Singleton)来确保只会创建一个线程池实例。 

#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"using namespace ThreadMoudle;
using namespace log_ns;static const int gdefaultnum = 10;void test()
{while (true){std::cout << "hello world" << std::endl;sleep(1);}
}template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void Wakeup(){pthread_cond_signal(&_cond);}void WakeupAll(){pthread_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(const std::string &name) // this{while (true){// 取任务LockQueue();while (IsEmpty() && _isrunning){_sleep_thread_num++;LOG(INFO, "%s thread sleep begin!\n", name.c_str());Sleep();LOG(INFO, "%s thread wakeup!\n", name.c_str());_sleep_thread_num--;}// 判定一种情况if (IsEmpty() && !_isrunning){UnlockQueue();LOG(INFO, "%s thread quit\n", name.c_str());break;}// 有任务T t = _task_queue.front();_task_queue.pop();UnlockQueue();// 处理任务t(); // 处理任务,此处不用/不能在临界区中处理// std::cout << name << ": " << t.result() << std::endl;// LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());}}void Init(){func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());}}void Start(){_isrunning = true;for (auto &thread : _threads){LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());thread.Start();}}ThreadPool(int thread_num = gdefaultnum): _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &) = delete;void operator=(const ThreadPool<T> &) = delete;public:void Stop(){LockQueue();_isrunning = false;WakeupAll();UnlockQueue();LOG(INFO, "Thread Pool Stop Success!\n");}// 如果是多线程获取单例呢?static ThreadPool<T> *GetInstance(){if (_tp == nullptr){LockGuard lockguard(&_sig_mutex);if (_tp == nullptr){LOG(INFO, "create threadpool\n");// thread-1 thread-2 thread-3...._tp = new ThreadPool<T>();_tp->Init();_tp->Start();}else{LOG(INFO, "get threadpool\n");}}return _tp;}void Equeue(const T &in){LockQueue();if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0)Wakeup();}UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;int _sleep_thread_num;pthread_mutex_t _mutex;pthread_cond_t _cond;// 单例模式// volatile static ThreadPool<T> *_tp;static ThreadPool<T> *_tp;static pthread_mutex_t _sig_mutex;
};template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

类定义:ThreadPool<T>

ThreadPool 类是一个模板类,能够处理类型为 T 的任务。ThreadPool 负责管理一组线程,这些线程会从任务队列中取出任务并执行。

成员变量:

线程池大小:_thread_num 表示线程池中线程的数量。

线程队列:_threads 用来存储所有创建的线程。

任务队列:_task_queue 是一个 std::queue,存储待处理的任务。

运行状态:_isrunning 标志线程池是否正在运行。

空闲线程数:_sleep_thread_num 表示当前空闲的线程数量。

同步机制:

_mutex:用于保护任务队列的互斥锁。

_cond:用于线程同步的条件变量,确保线程池在没有任务时可以进入等待状态。

单例模式:

_tp:一个静态指针,用于存储线程池的唯一实例。

_sig_mutex:一个静态互斥锁,用于控制对 _tp 的访问,确保线程池的单例实现是线程安全的。

成员函数:

(1) 同步操作:

LockQueue() 和 UnlockQueue():这些是保护任务队列的互斥锁方法,确保在访问任务队列时线程是同步的,避免并发冲突。

Wakeup() 和 WakeupAll():分别是唤醒一个或所有线程的函数。当有任务加入时,如果某些线程在等待任务,这些函数可以用来通知线程继续工作。

Sleep():如果任务队列为空,线程会调用这个函数进入等待状态,直到有新的任务被加入到队列中。

IsEmpty():检查任务队列是否为空。

(2) 任务处理:

HandlerTask():这是线程执行的主要任务。每个线程会不断地从任务队列中获取任务并执行,直到线程池被停止。

线程首先会尝试从任务队列中取出任务。

如果队列为空且线程池仍然在运行,线程将会休眠,直到有新任务到来。

如果线程池已停止并且队列为空,线程将退出。

如果队列非空,线程会执行任务。

(3) 初始化和启动:

Init():为线程池中的每个线程创建一个 Thread 对象,并绑定任务处理函数 HandlerTask(),然后将线程添加到 _threads 向量中。

Start():启动所有线程。

(4) 停止:

Stop():停止线程池,首先设置 _isrunning = false,然后唤醒所有处于等待状态的线程。

(5) 单例实现:

GetInstance():这是一个线程安全的单例实现,使用双重检查锁定(Double-Checked Locking)来确保线程池实例 _tp 只会被创建一次。_sig_mutex 用于同步对 _tp 的访问。

(6) 任务队列:

Equeue():将任务 in 添加到任务队列中。任务加入后,如果有空闲线程,某些线程会被唤醒来处理这些任务。

(7) 析构函数:

pthread_mutex_destroy(&_mutex) 和 pthread_cond_destroy(&_cond) 用于销毁互斥锁和条件变量,释放相关资源。

线程池的工作流程:

初始化和启动线程池:

通过 ThreadPool<T>::GetInstance() 获取线程池的唯一实例(如果还没有创建)。

调用 ThreadPool::Start() 启动线程池中的所有线程,每个线程执行 HandlerTask() 函数。

任务处理:

任务通过 ThreadPool::Equeue() 被加入到任务队列 _task_queue 中。

线程在 HandlerTask() 中从队列中取任务并执行。

线程阻塞与唤醒:

如果队列为空,线程会调用 Sleep() 进入等待状态,直到有新任务被添加。

当任务被加入队列时,如果有空闲线程,它们会被唤醒执行任务。

停止线程池:

调用 ThreadPool::Stop() 停止线程池,设置 _isrunning = false,并唤醒所有等待中的线程。每个线程在执行完当前任务后退出。

单例模式的线程安全性分析:

ThreadPool<T>::GetInstance() 中使用了双重检查锁定(Double-Checked Locking)来实现线程安全的单例模式:

第一次检查 _tp == nullptr 是为了减少锁的竞争。

第二次检查 _tp == nullptr 是为了确保在锁定后 _tp 仍然没有被创建(避免其他线程已经创建了 ThreadPool 实例)。

LockGuard 用于确保在操作 _tp 时,访问是线程安全的。

TcpServer.hpp

这个 TcpServer 类是一个简单的 TCP 服务器实现,它能够接受客户端的连接,并处理客户端发送的消息。它使用线程池来处理每个客户端的连接,避免了多进程和单线程模型的缺点。 

#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include "Log.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"using namespace log_ns;enum
{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERR
};const static int gport = 8888;
const static int gsock = -1;
const static int gblcklog = 8;using task_t = std::function<void()>;class TcpServer
{
public:TcpServer(uint16_t port = gport): _port(port),_listensockfd(gsock),_isrunning(false){}void InitServer(){// 1. 创建socket_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_listensockfd < 0){LOG(FATAL, "socket create error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket create success, sockfd: %d\n", _listensockfd); // 3struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;// 2. bind sockfd 和 Socket addrif (::bind(_listensockfd, (struct sockaddr *)&local, sizeof(local)) < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success\n");// 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接if (::listen(_listensockfd, gblcklog) < 0){LOG(FATAL, "listen error\n");exit(LISTEN_ERR);}LOG(INFO, "listen success\n");}class ThreadData{public:int _sockfd;TcpServer *_self;InetAddr _addr;public:ThreadData(int sockfd, TcpServer *self, const InetAddr &addr):_sockfd(sockfd), _self(self), _addr(addr){}};void Loop(){// signal(SIGCHLD, SIG_IGN);_isrunning = true;while (_isrunning){struct sockaddr_in client;socklen_t len = sizeof(client);// 4. 获取新连接int sockfd = ::accept(_listensockfd, (struct sockaddr *)&client, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");continue;}InetAddr addr(client);LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", addr.AddrStr().c_str(), sockfd);// version 0 --- 不靠谱版本// Service(sockfd, addr);// version 1 --- 多进程版本// pid_t id = fork();// if (id == 0)// {//     // child//     ::close(_listensockfd); // 建议!//     if(fork() > 0) exit(0);//     Service(sockfd, addr);//     exit(0);// }// // father// ::close(sockfd);// int n = waitpid(id, nullptr, 0);// if (n > 0)// {//     LOG(INFO, "wait child success.\n");// }// version 2 ---- 多线程版本 --- 不能关闭fd了,也不需要了// pthread_t tid;// ThreadData *td = new ThreadData(sockfd, this, addr);// pthread_create(&tid, nullptr, Execute, td); // 新线程进行分离// version 3 ---- 线程池版本 int sockfd, InetAddr addrtask_t t = std::bind(&TcpServer::Service, this, sockfd, addr);ThreadPool<task_t>::GetInstance()->Equeue(t);}_isrunning = false;}static void *Execute(void *args){pthread_detach(pthread_self());ThreadData *td = static_cast<ThreadData *>(args);td->_self->Service(td->_sockfd, td->_addr);delete td;return nullptr;}void Service(int sockfd, InetAddr addr){// 长服务while (true){char inbuffer[1024]; // 当做字符串ssize_t n = ::read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0){inbuffer[n] = 0;LOG(INFO, "get message from client %s, message: %s\n", addr.AddrStr().c_str(), inbuffer);std::string echo_string = "[server echo] #";echo_string += inbuffer;write(sockfd, echo_string.c_str(), echo_string.size());}else if (n == 0){LOG(INFO, "client %s quit\n", addr.AddrStr().c_str());break;}else{LOG(ERROR, "read error: %s\n", addr.AddrStr().c_str());break;}}::close(sockfd);}~TcpServer() {}private:uint16_t _port;int _listensockfd;bool _isrunning;
};

1. 类结构和成员变量

成员变量:

_port:服务器监听的端口号,默认为 8888。

_listensockfd:服务器的监听套接字描述符。

_isrunning:标志服务器是否在运行,控制服务器的生命周期。

枚举常量:

SOCKET_ERROR:表示创建套接字失败时的错误码。

BIND_ERROR:表示绑定地址失败时的错误码。

LISTEN_ERR:表示监听失败时的错误码。

常量:

gport:默认的监听端口号。

gsock:默认的套接字标识符,表示未初始化的套接字。

gblcklog:用于 listen() 调用的 backlog 参数,指定操作系统允许的最大连接数。

task_t:使用 std::function<void()> 定义的任务类型,代表线程池中将要执行的任务。这里用来封装客户端连接的处理工作。

2. TcpServer 类的主要函数

TcpServer::InitServer()

该函数用于初始化服务器,完成以下工作:

创建套接字:

使用 ::socket() 创建一个 TCP 套接字。

如果创建失败,输出错误日志并退出。

绑定地址:

使用 ::bind() 将套接字与指定的地址和端口绑定。

如果绑定失败,输出错误日志并退出。

监听连接:

使用 ::listen() 将套接字设为监听状态,等待客户端连接。

如果监听失败,输出错误日志并退出。

TcpServer::Loop()

该函数是服务器的主循环,它负责接受客户端的连接并将连接交给线程池处理:

在循环中,调用 ::accept() 接受来自客户端的连接请求。

如果连接成功,创建一个 InetAddr 对象以保存客户端的 IP 地址和端口信息。

将处理任务(客户端连接的处理)封装为一个 task_t,然后将任务提交给线程池。

TcpServer::Service()

这是处理客户端请求的核心函数:

该函数会从客户端套接字中读取数据(通过 ::read())。

如果读取成功,则将客户端发送的消息打印出来,并以 "server echo" 开头将数据返回给客户端。

如果读取到的数据为空(客户端关闭连接),则输出日志并关闭套接字。

如果发生读取错误,则输出错误日志并关闭套接字。

TcpServer::Execute()

这是一个静态函数,用于在线程池中执行处理任务。该函数被线程池中的线程调用,处理客户端请求:

它首先将线程标记为分离状态,以便线程结束后自动释放资源。

然后它调用 Service() 来处理客户端请求。

处理完成后,销毁 ThreadData 对象。

3. 线程池的使用

服务器使用线程池来处理每个客户端的连接。每当有新连接时,创建一个 task_t(封装了 TcpServer::Service() 方法),然后将任务提交给线程池:

这样线程池中的线程会并行处理这些任务,避免了每次都创建新线程的开销。

4. ThreadData 类

ThreadData 类用于封装每个客户端连接的相关信息:

sockfd:客户端的套接字。

self:指向当前 TcpServer 对象的指针。

addr:客户端的地址信息。

该类主要用于在线程中传递参数,它的生命周期由线程池中的线程控制。

5. 错误处理

在整个过程中,如果发生错误(如创建套接字失败、绑定失败、监听失败等),会通过 LOG() 打印详细错误信息,并通过 exit() 终止程序。可以在实际应用中根据需求改进错误处理逻辑(如重新尝试,或者返回错误状态)。

6. 程序退出

服务器的退出主要由 Loop() 中的 _isrunning 控制。当前 _isrunning 为 false 时,Loop() 会退出。程序会继续执行后续的清理工作,并最终终止。

TcpServerMain.cc

#include "TcpServer.hpp"#include <memory>// ./tcpserver 8888
int main(int argc, char *argv[])
{if(argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}

TcpClientMain.cc

#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>// ./tcpclient server-ip server-port
int main(int argc, char *argv[])
{if (argc != 3){std::cerr << "Usage: " << argv[0] << " server-ip server-port" << std::endl;exit(0);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);// 1. 创建socketint sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){std::cerr << "create socket error" << std::endl;exit(1);}// 注意:不需要显示的bind,但是一定要有自己的IP和port,所以需要隐式的bind,OS会自动bind sockfd,用自己的IP和随机端口号// 什么时候进行自动bind?If the connection or binding succeedsstruct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);::inet_pton(AF_INET, serverip.c_str(), &server.sin_addr);int n = ::connect(sockfd, (struct sockaddr *)&server, sizeof(server));if (n < 0){std::cerr << "connect socket error" << std::endl;exit(2);}while(true){std::string message;std::cout << "Enter #";std::getline(std::cin, message);write(sockfd, message.c_str(), message.size());char echo_buffer[1024];n = read(sockfd, echo_buffer, sizeof(echo_buffer));if(n > 0){echo_buffer[n] = 0;std::cout << echo_buffer << std::endl;}else{break;}}::close(sockfd);return 0;
}

效果展示  

虽然做的有点粗糙, 但是完成的还不错!

下一章还是 TCP socket 编程, 实现命令处理的功能, 处理从客户端接收到的命令,检查这些命令的安全性,并执行这些命令。好了, 篇幅已经很长了, 我们下期在见~


http://www.mrgr.cn/news/77893.html

相关文章:

  • 【贵州省】乡镇界arcgis格式shp数据乡镇名称和编码内容下载测评
  • HarmonyOS(ArkUI框架介绍)
  • 如何让QPS提升20倍
  • (五)ROS通信编程——参数服务器
  • Java 将RTF文档转换为Word、PDF、HTML、图片
  • JS进阶--JS听到了缄默的回响
  • 现代密码学
  • 一文学习Android系统核心服务ServiceManager
  • VMware ubuntu创建共享文件夹与Windows互传文件
  • 分词器的概念(通俗易懂版)
  • CPU命名那些事
  • SQL进阶技巧:如何分析互逆记录?| 相互关注为例分析
  • 动态规划算法--01背包问题详细讲解步骤
  • 【排序算法 python实现】
  • 【大数据分析机器学习】分布式机器学习
  • C++ For Hot100
  • 机器学习周志华学习笔记-第6章<支持向量机>
  • 【C语言】连接陷阱探秘(3):形参、实参与返回值
  • flux的权重版本
  • Ubuntu下安装Qt
  • 【C++知识总结1】c++第一篇,简单了解一下命名空间是什么
  • Linux: C语言解析域名
  • 使用猴子补丁对pytorch的分布式接口进行插桩
  • 鸿蒙进阶篇-状态管理之@Prop@Link
  • 机器学习周志华学习笔记-第4章<决策树>
  • Android Framework WMS面试题及参考答案