【C-项目】网盘(一期,线程池版)
【C-项目】网盘(一期,线程池版)
概述
登录服务器后,即可浏览服务器的文件系统。通过命令上传或下载文件。
服务器使用线程池技术
- 创建两个进程,主进程负责接收退出信号,子进程负责管理线程池。
- 子进程中的主线程**(包工头)**:监听客户端请求,线程池中的线程请求,退出管道请求
- 有客户端连接**(来活啦,哪个闲着呢赶紧干活)**,将其封装成任务,放入任务队列,通知线程池中的空闲线程处理任务
- 退出管道就绪**(所有人,准备下班!)**,通知所有线程下班,将退出标志置位1,唤醒所有线程
- 线程池中的线程**(工人)**:
- 阻塞在条件变量上,等待任务到来**(摸鱼摸鱼)**
- 有任务到来,从条件变量上醒来**(老大发消息了,看看是啥东西)**
- 判断退出标志是否被置位,如果是,退出线程**(下班下班)**
- 处理任务**(干干干)**
- 任务结束,清理任务**(完活,继续摸鱼)**
一期功能
客户端可以使用的命令
命令 | 解析 |
---|---|
ls | 显示当前工作目录的所有文件 |
pwd | 显示当前工作目录 |
cd [dir] | 切换工作目录 |
rm [filename] | 删除当前目录下的文件 |
mkdir [dir] | 创建一个新目录 |
puts [filename] | 上传文件 |
gets [filename] | 下载文件 |
启动
启动服务器
1、在服务器的bin
目录下使用Makefile
,生成可执行文件
w@Ubuntu20:bin $ make
2、启动服务器
w@Ubuntu20:bin $ ./server ../conf/server.conf
Makefile
OBJS:=$(wildcard ../src/*.c)
server:$(OBJS)gcc $^ -o $@ -I../include -lpthread
clean:rm server
启动客户端
1、在客户端的bin
的目录下使用Makefile
,生成可执行文件
w@Ubuntu20:bin $ make
2、启动客户端
w@Ubuntu20:bin $ ./client client.conf
Makefile
OBJS:=$(wildcard ../src/*.c)
client:$(OBJS)gcc $^ -o $@ -I../include
clean:rm client
目录设计
服务器
- bin:存放二进制文件
- conf:存放配置文件
- include:存放头文件
- src:存放源文件
w@Ubuntu20:bin $ tree ..
..
├── bin
│ ├── Makefile
│ └── server
├── conf
│ └── server.conf
├── include
│ ├── func.h
│ ├── head.h
│ ├── task_queue.h
│ └── thread_pool.h
└── src├── disk_conf.c├── disk_func.c├── disk_handle.c├── disk_server.c├── epoll_ctl.c├── task_queue.c├── tcp_init.c└── thread_pool.c4 directories, 15 files
客户端
w@Ubuntu20:bin $ tree ..
..
├── bin
│ ├── client
│ └── Makefile
├── conf
│ └── client.conf
├── include
│ ├── func.h
│ └── head.h
└── src├── disk_client.c├── disk_conf.c├── disk_func.c└── tcp_connect.c4 directories, 9 files
配置文件
服务器配置文件 server.conf
存放服务器ip
地址,服务器port
端口,线程池中的线程数量
根据实际情况自行更改
server_ip = 192.168.160.129
server_port = 2000
thread_num = 5
客户端配置文件 client.conf
存放服务器ip
地址,服务器port
端口
根据实际情况自行更改
server_ip = 192.168.160.129
server_port = 2000
传输文件
使用自定义协议传输:先发送数据长度,再发送数据内容
使用类型
//接收发送缓冲区大小
#define BUF_SIZE 1024
//命令中最多有几个参数
#define MAX_WORDNUM 8
//命令一个参数的最大长度
#define MAX_WORDLEN 30//套接字类型
typedef int socket_t;//通讯类型
typedef struct {socket_t _sess_fd ; //对端套接字char _sess_buf[BUF_SIZE] ; //缓冲区
}Session_t, *pSession_t ;//命令类型
typedef struct {int _argc; //命令中有几个参数char _cmd[MAX_WORDNUM][MAX_WORDLEN];
}Cmd_t, *pCmd_t;//传输文件协议:小货车
typedef struct {int _data_len;//货车头,表示数据长度char _data[BUF_SIZE];//火车车厢,表示数据
}Truck_t;
服务器
主流程搭建
- 父进程---------------------->用来退出程序
- 注册
SIGCHLD
信号(信号到来时,通知子进程退出) - 等待回收子进程资源
- 退出程序
- 注册
- 子进程---------------------->管理进程池
- 创建一个服务器套接字,开始监听客户端的连接
- 创建并启动线程池
- 使用
epoll
管理服务器套接字和退出管道 - 监听
epfd
- 如果有新客户连接,将其加入任务队列,通知子线程处理
- 如果收到父进程发来的退出消息,将退出标志置位,通知线程池退出,关闭线程池,销毁线程池,退出程序
工作进程
每个子线程的具体工作
void *child_handle(void *)
参数:任务队列地址(有客户端的套接字)
功能:处理任务(回应客户端的发来的命令)
-
循环运行
-
如果退出标志被置位,退出线程
-
从任务队列中取一个任务
-
循环处理任务(接收客户端发来的命令,直到客户退出)
-
先接受数据长度(如果为0表示客户退出,break)
-
再接收数据到缓冲区
-
从缓冲区中取出命令
-
如果是命令合法就执行,如果不合法就跳过
-
-
命令
ls
-
功能:发送当前目录下所有文件信息
-
函数:
int do_ls(pSession_t ps);
-
输入:通信类型(客户端套接字,缓冲区)
-
实现
- 生成一个当前目录的目录流
- 循环读取目录流中的每一个目录项
- 根据目录项的信息,拼接一条格式化(比如文件类型、文件名、文件大小)后的文件信息
- 发送给客户端(先发信息长度,再发信息)
- 所有信息发完后,再发送一个结束标识符,告诉客户端命令已完成
- 关闭目录流
pwd
-
功能:发送当前目录
-
函数:
int do_pwd(pSession_t ps);
-
输入:通信类型(客户端套接字,缓冲区)
-
实现
- 利用
getcwd
接口获取信息 - 发送给客户端(先发信息长度,再发信息)
- 利用
cd
-
功能:切换工作目录
-
函数:
int do_cd(pSession_t ps, pCmd_t pcmd);
-
输入:通信类型(客户端套接字,缓冲区),命令类型
-
实现
- 得到目标目录,就是命令的第二个参数
- 切换到目标目录
- 回应客户端,将切换后的目录发送过去(先发信息长度,再发信息)
rm
-
功能:删除文件
-
函数:
int do_rm(pSession_t ps, pCmd_t pcmd);
-
输入:通信类型(客户端套接字,缓冲区),命令类型
-
实现
- 得到待删除文件,就是命令的第二个参数
- 在当前目录下查找是否存在
- 如果存在,删除文件,回应客户端删除成功(先发信息长度,再发信息)
- 如果不存在,回应客户端删除失败(先发信息长度,再发信息)
mkdir
-
功能:创建一个新目录
-
函数:
int do_mkdir(pSession_t ps, pCmd_t pcmd);
-
输入:通信类型(客户端套接字,缓冲区),命令类型
-
实现
- 得到新目录名字,就是命令的第二个参数
- 查找此目录是否存在
- 如果存在,回应客户端创建失败(先发信息长度,再发信息)
- 如果不存在,创建目录,回应客户端创建成功(先发信息长度,再发信息)
gets
-
功能:下载文件(服务器是下载,客户端是上传)
-
函数:
int do_gets(pSession_t ps, pCmd_t pcmd);
-
输入:通信类型(客户端套接字,缓冲区),命令类型
-
实现
- 得到文件名字,就是命令的第二个参数
- 先接传输标志,如果是1就继续,如果是0就退出程序
- 下载文件
- 创建一个同名文件
- 循环接收文件内容
- 先接收数据长度(如果为0表示数据传输完毕,退出)
- 根据长度,接收数据
- 将数据写入文件
- 关闭文件
puts
-
功能:上传文件(服务器是上传,客户端是下载)
-
函数:
int do_puts(pSession_t ps, pCmd_t pcmd);
-
输入:通信类型(客户端套接字,缓冲区),命令类型
-
实现
- 得到文件名字,就是命令的第二个参数
- 判断该文件类型,如果是普通文件就发送传输标志1,如果不是就发送传输标志0
- 如果传输标志是0,退出程序
- 上传文件
- 打开待上传文件
- 循环上传文件内容
- 从文件中读取数据到缓冲区(如果返回值为0,表示文件全部读完,break)
- 根据读取的字节数量,发送数据(如果返回值为-1,表示对端断开,return -1)
- 文件发完后,发送结束标识符,通知对端传输完毕
- 关闭文件
客户端
主流程搭建
- 从配置文件中取出:服务器的
ip
地址,port
端口号 - 连接服务器
- 循环从
stdin
中读取命令-
从
stdin
中读取一行数据(如果读到quit
,结束程序) -
将这一行数据,格式化成命令格式
-
判断命令是否合法
- 如果不合法,跳过此次循环
- 如果合法,将命令发送给服务器
-
等待服务器返回数据
-
如果命令是
cd
,pwd
,rm
,mkdir
- 先接数据长度,再接数据,打印命令结果
-
如果命令是
ls
- 循环接收数据
- 先接数据长度(如果为0,表示传输完毕,break)
- 再接数据,打印命令结果
- 循环接收数据
-
如果命令是
gets
- 下载文件
- 先接数据长度,再接数据,打印命令结果
-
如果命令是
puts
- 上传文件
- 先接数据长度,再接数据,打印命令结果
-
-
代码
服务器代码
head.h
#ifndef __HEAD_H__
#define __HEAD_H__#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <dirent.h>
#include <sys/stat.h>
#include <fcntl.h>//检查系统调用返回值
#define EXIT_CHECK(ret, num, msg) if (ret == num) {\do {\perror(msg);\exit(-1);\}while(0); }#endif
func.h
#ifndef __FUNC_H__
#define __FUNC_H__//接收发送缓冲区大小
#define BUF_SIZE 1024
//命令中最多有几个参数
#define MAX_WORDNUM 8
//命令一个参数的最大长度
#define MAX_WORDLEN 30//套接字类型
typedef int socket_t;//通讯类型
typedef struct {socket_t _sess_fd ; //对端套接字char _sess_buf[BUF_SIZE] ; //缓冲区
}Session_t, *pSession_t ;//命令类型
typedef struct {int _argc; //命令中有几个参数char _cmd[MAX_WORDNUM][MAX_WORDLEN];
}Cmd_t, *pCmd_t;//传输文件协议:小货车
typedef struct {int _data_len;//货车头,表示数据长度char _data[BUF_SIZE];//火车车厢,表示数据
}Truck_t;void conf_get(char *conf, char *ip, int *port, int *thread_num); //从配置文件中获取参数
socket_t tcp_init(char *ip, int port); //返回一个正在监听的tcp类型的服务器套接字
void *child_handle(void* p); //子线程的具体工作
int upload(int fd_socket, char *filename); //上传文件
int download(int fd_socket, char *filename); //下载文件
int epoll_add(int fd, int epfd); //将fd加入epfd
int epoll_del(int fd, int epfd); //将fd从epfd里删除
void print_cmd(pCmd_t pcmd); //打印命令
#endif
disk_server.c
#include "head.h"
#include "func.h"
#include "thread_pool.h"int exitpipe[2];
//信号处理函数,实现异步退出线程池
//父进程收到信号后,通知子进程处理
void sig_handle(int signum)
{printf("signal [%d] is comming!\n", signum);write(exitpipe[1], &signum, 1);
}int main(int argc, char *argv[])
{//参数:配置文件路径if (2 != argc) {fprintf(stderr, "Args error!\n");return -1;}//父进程用来接收退出信号if (fork()) {close(exitpipe[0]);//注册退出信号signal(SIGCHLD, sig_handle);//回收子进程资源,退出程序wait(NULL);printf("The thread_pool was already closed, all child_thread were exited!\n");exit(0);}//子进程管理各种请求:客户端,线程池,退出管道close(exitpipe[1]);//从配置文件中拿到 服务器ip和port,线程数量char ip[16] = "";int port = 0;int thread_num = 0;conf_get(argv[1], ip, &port, &thread_num);printf("ip:%s, port:%d, thread_num:%d\n", ip, port, thread_num);//建立tcp监听socket_t fd_server = tcp_init(ip, port);EXIT_CHECK(fd_server, -1, "socket_init");printf("DiskServer[ip:%s, port:%d] boot...\n", ip, port);//建立线程池并启动ThreadPool_t pool;init_ThreadPool(&pool, thread_num);boot_ThreadPool(&pool);printf("The thread_pool is already boot!\n");//将服务器套接字,退出管道加入epoll管理int epfd = epoll_create(1);epoll_add(fd_server, epfd);epoll_add(exitpipe[1], epfd);//监听epfdint ready_fd_num = 0;struct epoll_event evs[2];socket_t fd_client;while (1) {ready_fd_num = epoll_wait(epfd, evs, 2, -1);for (int i = 0; i < ready_fd_num; ++i) {if (evs[i].data.fd == fd_server) {//有新客户连接, 加入任务队列,通知子线程fd_client = accept(fd_server, NULL, NULL);pTaskNode_t pNew = (pTaskNode_t)calloc(1, sizeof(TaskNode_t));pNew->_clifd = fd_client;pthread_mutex_lock(&pool._que._mutex);//加锁push_TaskQueue(&pool._que, pNew);//入队pthread_cond_signal(&pool._que._cond);//通知子线程处理pthread_mutex_unlock(&pool._que._mutex);//解锁}else if (evs[i].data.fd == exitpipe[0]) {//父进程发来退出通知//将退出标志置位,唤醒所有线程,让它们退出pool._que._flag = 1;pthread_cond_broadcast(&pool._que._cond);//关闭线程池,回收线程池资源close_ThreadPool(&pool);destory_ThreadPool(&pool);//退出程序exit(0);}}}return 0;
}
disk_conf.c
#include "head.h"//得到一行中'='符号后面的字符串
static void set_arg(char *line, char *arg)
{//将指针偏移到字符=char *ptr = strchr(line, '=');if (NULL == ptr) {fprintf(stderr, "conf_file is error!\n");exit(-1);}//= 后面的字符串strcpy(arg, ptr + 2);
}//从配置文件中获取参数
void conf_get(char *conf, char *ip, int *port, int *thread_num)
{FILE *fp = fopen(conf, "r");char line[128] = "";char buf[128] = "";//得到ipfgets(line, sizeof(line), fp);line[strlen(line) - 1] = '\0';set_arg(line, ip);//得到portmemset(buf, 0, sizeof(buf));memset(line, 0, sizeof(line));fgets(line, sizeof(line), fp);line[strlen(line) - 1] = '\0';set_arg(line, buf);*port = atoi(buf);//得到thead_nummemset(buf, 0, sizeof(buf));memset(line, 0, sizeof(line));fgets(line, sizeof(line), fp);line[strlen(line) - 1] = '\0';set_arg(line, buf);*thread_num = atoi(buf);fclose(fp);
}
disk_handle.c
#include "head.h"
#include "func.h"
#include "task_queue.h"//静态函数的可见域是本文件
//向客户端发送命令结果
static int do_ls(pSession_t ps);
static int do_pwd(pSession_t ps);
static int do_cd(pSession_t ps, pCmd_t pcmd);
static int do_rm(pSession_t ps, pCmd_t pcmd);
static int do_mkdir(pSession_t ps, pCmd_t pcmd);
static int do_puts(pSession_t ps, pCmd_t pcmd);
static int do_gets(pSession_t ps, pCmd_t pcmd);
//将文件类型从int转成char*
static void file_type(mode_t mode, char *type);//子线程的具体工作,回应客户端发来的命令
void *child_handle(void *p)
{//参数是任务队列pTaskQueue_t pQue = (pTaskQueue_t)p;pTaskNode_t pCur = NULL; //定义一个任务节点,用来接收任务pSession_t ps = (pSession_t)calloc(1, sizeof(Session_t)); //定义一个通信类型,用来传递客户端套接字和缓冲区int data_len = -1; //数据长度Cmd_t cmd; //定义一个命令类型,用来接命令int ret = -1;while (1) {//如果退出标志为1,退出线程if (1 == pQue->_flag) {printf("thread exit!\n");free(ps);ps = NULL;pthread_exit(NULL);}//从任务队列中取一个任务pthread_mutex_lock(&pQue->_mutex);if (0 == pQue->_size) {pthread_cond_wait(&pQue->_cond, &pQue->_mutex);//等待任务}get_TaskNode(pQue, &pCur);//得到任务pthread_mutex_unlock(&pQue->_mutex);//处理任务ps->_sess_fd = pCur->_clifd;//设置客户端套接字printf("client connected...\n");while (1) {memset(ps->_sess_buf, 0, BUF_SIZE);memset(&cmd, 0, sizeof(Cmd_t));//先接收数据长度ret = recv(ps->_sess_fd, &data_len, sizeof(data_len), 0);if (0 == ret || 0 == data_len) {//对端已断开close(ps->_sess_fd);printf("client exit!\n");break;//此次任务结束,退出循环}//再接收具体的数据recv(ps->_sess_fd, ps->_sess_buf, data_len, 0);//从接收缓冲区中读取命令memcpy(&cmd, ps->_sess_buf, sizeof(Cmd_t));print_cmd(&cmd);//判断是什么命令if (!strcmp("cd", cmd._cmd[0])) {do_cd(ps, &cmd);}else if (!strcmp("pwd", cmd._cmd[0])) {do_pwd(ps);}else if (!strcmp("ls", cmd._cmd[0])) {do_ls(ps);}else if (!strcmp("rm", cmd._cmd[0])) {do_rm(ps, &cmd);}else if (!strcmp("mkdir", cmd._cmd[0])) {do_mkdir(ps, &cmd);}else if (!strcmp("puts", cmd._cmd[0])) {do_puts(ps, &cmd);}else if (!strcmp("gets", cmd._cmd[0])) {do_gets(ps, &cmd);}else {//非法命令continue;}}}
}//将文件类型从int转成char*
static void file_type(mode_t mode, char *type)
{if (S_ISREG(mode)) {strncpy(type, "-", 1);}else if (S_ISDIR(mode)) {strncpy(type, "d", 1);}else if (S_ISFIFO(mode)) {strncpy(type, "p", 1);}else {strncpy(type, "o", 1);}
}static int do_ls(pSession_t ps)
{//生成目录流DIR *dirp = opendir("./");if (NULL == dirp) {int flag = -1; //目录流打开失败,发送-1作为标志send(ps->_sess_fd, &flag, sizeof(flag), 0);return -1;}else {struct dirent *dir_info;int data_len = -1;//循环读取目录项while ((dir_info = readdir(dirp)) != NULL) {if (!strncmp(".", dir_info->d_name, 1) || !strncmp("..", dir_info->d_name, 2)) {continue;}//获取目录项的信息struct stat stat_info;memset(&stat_info, 0, sizeof(stat_info));stat(dir_info->d_name, &stat_info);//获取文件类型char type[1] = "";file_type(stat_info.st_mode, type);//拼接此文件信息memset(ps->_sess_buf, 0, BUF_SIZE);sprintf(ps->_sess_buf, "%-2s%-20s %10ldB", type, dir_info->d_name, stat_info.st_size);//发送此文件信息data_len = strlen(ps->_sess_buf);send(ps->_sess_fd, &data_len, sizeof(data_len), 0);//先发数据长度send(ps->_sess_fd, ps->_sess_buf, data_len, 0);}//此目录文件已读完, 发送0作为标志data_len = 0;send(ps->_sess_fd, &data_len, sizeof(data_len), 0);}closedir(dirp);return 0;
}static int do_pwd(pSession_t ps)
{//获取当前工作目录memset(ps->_sess_buf, 0, BUF_SIZE);getcwd(ps->_sess_buf, BUF_SIZE);//回应客户端int data_len = strlen(ps->_sess_buf);send(ps->_sess_fd, &data_len, sizeof(int), 0);send(ps->_sess_fd, ps->_sess_buf, data_len, 0);return 0;
}static int do_cd(pSession_t ps, pCmd_t pcmd)
{//拿到目标目录char dir[128] = "";strcpy(dir, pcmd->_cmd[1]);//切换目录chdir(dir);//回应客户端getcwd(dir, sizeof(dir));int data_len = strlen(dir);send(ps->_sess_fd, &data_len, sizeof(int), 0);send(ps->_sess_fd, dir, data_len, 0);return 0;
}static int do_rm(pSession_t ps, pCmd_t pcmd)
{//查看文件是否存在DIR *dirp = opendir("./");struct dirent *dir_cur;while ((dir_cur = readdir(dirp)) != NULL) {if (!strcmp(dir_cur->d_name, pcmd->_cmd[1])) {break;}}closedir(dirp);if (dir_cur) {//删除文件char cmd[256] = "";sprintf(cmd, "rm -rf %s", pcmd->_cmd[1]);system(cmd);//回应客户端sprintf(ps->_sess_buf, "file [%s] removed success!", pcmd->_cmd[1]);}else {//文件不存在sprintf(ps->_sess_buf, "file [%s] removed failed!", pcmd->_cmd[1]);}int data_len = strlen(ps->_sess_buf);send(ps->_sess_fd, &data_len, sizeof(int), 0);send(ps->_sess_fd, ps->_sess_buf, data_len, 0);return 0;
}static int do_mkdir(pSession_t ps, pCmd_t pcmd)
{//查看新目录是否存在DIR *dirp = opendir("./");struct dirent *dir_cur;while ((dir_cur = readdir(dirp)) != NULL) {if (!strcmp(dir_cur->d_name, pcmd->_cmd[1]) && S_ISDIR(dir_cur->d_type)) {break;}}closedir(dirp);if (NULL == dir_cur) {//创建目录mkdir(pcmd->_cmd[1], 0775);//回应客户端, 创建成功sprintf(ps->_sess_buf, "create dir [%s] succeed!", pcmd->_cmd[1]);}else {//目录已存在, 创建失败sprintf(ps->_sess_buf, "the [%s] is already exist!", pcmd->_cmd[1]);}int data_len = strlen(ps->_sess_buf);send(ps->_sess_fd, &data_len, sizeof(int), 0);send(ps->_sess_fd, ps->_sess_buf, data_len, 0);return 0;
}static int do_puts(pSession_t ps, pCmd_t pcmd)
{//客户端是上传,服务器是下载if (0 == download(ps->_sess_fd, pcmd->_cmd[1])) {sprintf(ps->_sess_buf, "file [%s] upload succeed!", pcmd->_cmd[1]);}else {sprintf(ps->_sess_buf, "file [%s] upload failed!", pcmd->_cmd[1]);}int data_len = strlen(ps->_sess_buf);send(ps->_sess_fd, &data_len, sizeof(int), 0);send(ps->_sess_fd, ps->_sess_buf, data_len, 0);return 0;
}static int do_gets(pSession_t ps, pCmd_t pcmd)
{//客户端是下载,服务器是发送if (0 == upload(ps->_sess_fd, pcmd->_cmd[1])) {sprintf(ps->_sess_buf, "file [%s] download succeed!", pcmd->_cmd[1]);}else {sprintf(ps->_sess_buf, "file [%s] download failed!", pcmd->_cmd[1]);}int data_len = strlen(ps->_sess_buf);send(ps->_sess_fd, &data_len, sizeof(int), 0);send(ps->_sess_fd, ps->_sess_buf, data_len, 0);return 0;
}
disk_func.c
#include "head.h"
#include "func.h"//上传文件
int upload(int fd_socket, char *filename)
{//查看文件类型是否正确DIR *dirp = opendir("./");struct dirent *dir_cur;int flag = -1; //是否传输标志, 0-不传,1-传while ((dir_cur = readdir(dirp)) != NULL) {if (!strcmp(dir_cur->d_name, filename)) {if (S_ISREG(dir_cur->d_type)) {//如果是普通文件, 传输flag = 1;send(fd_socket, &flag, sizeof(int), 0);}else {//如果不是普通文件,不传flag = 0;send(fd_socket, &flag, sizeof(int), 0);}break;}}closedir(dirp);if (0 == flag) {return -1;}//定义一个小货车,用来传输文件Truck_t truck;memset(&truck, 0, sizeof(Truck_t));int ret = -1;//根据文件名打开传输文件int fd_file = open(filename, O_RDONLY);EXIT_CHECK(fd_file, -1, "open");#if 0//发文件大小struct stat file_info;memset(&file_info, 0, sizeof(file_info));fstat(fd_file, &file_info);truck._data_len = sizeof(file_info.st_size);memcpy(truck._data, &file_info.st_size, truck._data_len);ret = send(fd_socket, &truck, sizeof(int) + truck._data_len, 0);EXIT_CHECK(ret, -1, "send_filesize");
#endif//发文件内容while (1) {memset(truck._data, 0, sizeof(truck._data));//读取文件truck._data_len = read(fd_file, truck._data, BUF_SIZE);if (0 == truck._data_len) {//传输完成,通知对端truck._data_len = 0;send(fd_socket, &truck._data_len, sizeof(int), 0);//关闭传输文件close(fd_file);return 0;}//发送ret = send(fd_socket, &truck, sizeof(int) + truck._data_len, 0);printf("send_len: %d\n", ret);if (-1 == ret) {//客户端异常断开,退出printf("client already break!\n");return -1;}}
}//下载文件
int download(int fd_socket, char *filename)
{//先接收传输标志,如果是1就下载,如果是0就退出 int flag = -1; recv(fd_socket, &flag, sizeof(int), 0); if (0 == flag) {return -1; } //打开或创建一个文件int fd_file = open(filename, O_WRONLY | O_CREAT, 0600);EXIT_CHECK(fd_file, -1, "open");#if 0//接收文件大小int filesize = 0;recv(fd_socket, &filesize, sizeof(int), 0);printf("filesize: %d", filesize);
#endif//接收文件内容Truck_t truck;while (1) {memset(&truck, 0, sizeof(truck));//接收数据长度recv(fd_socket, &truck._data_len, sizeof(truck._data_len), 0);if (0 == truck._data_len) {//文件传输完毕break;}//接收数据内容recv(fd_socket, truck._data, truck._data_len, MSG_WAITALL);write(fd_file, truck._data, truck._data_len);}close(fd_file);return 0;
}//打印命令
void print_cmd(pCmd_t pcmd)
{printf("cmd: ");for (int i = 0; i < pcmd->_argc; ++i) {printf("%s ", pcmd->_cmd[i]);}printf("\n");
}
tcp_init.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>#define ERROR_CHECK(ret, num, msg) { if (ret == num) {\perror("msg"); return -1;} }//输入:服务器的ip地址,端口号
//输出:绑定了服务器ip和端口的,正在监听的套接字
int tcp_init(char *ip, int port)
{//生成一个tcp类型的套接字int sfd = socket(AF_INET, SOCK_STREAM, 0);ERROR_CHECK(sfd, -1, "ser_socket");//将端口号设置为可重用, 不用再等待重启时的TIME_WAIT时间int reuse = 1;setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));//给套接字绑定服务端ip和portstruct sockaddr_in serverAddr;memset(&serverAddr, 0, sizeof(struct sockaddr_in));serverAddr.sin_family = AF_INET;serverAddr.sin_addr.s_addr = inet_addr(ip);serverAddr.sin_port = htons(port);int ret = bind(sfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr));ERROR_CHECK(ret, -1, "ser_bind");//将套接字设为监听模式,并指定最大监听数(全连接队列的大小)ret = listen(sfd, 10); ERROR_CHECK(ret, -1, "ser_listen");/* printf("[ip:%s, port:%d] is listening...\n", ip, port); */return sfd;
}
thread_pool.c
#include "head.h"
#include "func.h"
#include "thread_pool.h"
#include "task_queue.h"//线程清理函数, 传入任务队列的地址
void clean_func(void *p)
{pTaskQueue_t pQue = (pTaskQueue_t)p;pthread_mutex_unlock(&pQue->_mutex);
}//初始化线程池
int init_ThreadPool(pThreadPool_t pPool, int thread_num)
{pPool->_boot = 0;pPool->_thread_num = thread_num;pPool->_pthid = (pthread_t*)calloc(thread_num, sizeof(pthread_t));init_TaskQueue(&pPool->_que);return 0;
}//启动线程池
int boot_ThreadPool(pThreadPool_t pPool)
{if (0 == pPool->_boot) {for (int i = 0; i < pPool->_thread_num; ++i) {pthread_create(pPool->_pthid + i, NULL, child_handle, &pPool->_que);}pPool->_boot = 1;}return 0;
}//关闭线程池
int close_ThreadPool(pThreadPool_t pPool)
{if (1 == pPool->_boot) {for (int i = 0; i < pPool->_thread_num; ++i) {pthread_join(pPool->_pthid[i], NULL);}pPool->_boot = 1;}return 0;
}//销毁线程池资源
int destory_ThreadPool(pThreadPool_t pPool)
{for (int i = 0; i < pPool->_thread_num; ++i) {free(pPool->_pthid + i);}pPool->_boot = 0;return 0;
}
task_queue.c
#include "head.h"
#include "task_queue.h"//初始化任务队列
int init_TaskQueue(pTaskQueue_t pQueue)
{pQueue->_size = 0;pQueue->_pHead = pQueue->_pTail = NULL;pthread_cond_init(&pQueue->_cond, NULL);pthread_mutex_init(&pQueue->_mutex, NULL);pQueue->_flag = 0;return 0;
}//入队
int push_TaskQueue(pTaskQueue_t pQueue, pTaskNode_t pNew)
{if (NULL == pQueue->_pHead) {pQueue->_pHead = pQueue->_pTail = pNew;}else {pQueue->_pTail->_pNext = pNew;pQueue->_pTail = pNew;}++pQueue->_size;return 0;
}//得到队头元素,同时出队
int get_TaskNode(pTaskQueue_t pQueue, pTaskNode_t *ppGet)
{//没有元素if (0 == pQueue->_size) {return -1;}//有元素,取出,更新队头*ppGet = pQueue->_pHead;pQueue->_pHead = pQueue->_pHead->_pNext;//只有一个元素,更新队尾if (1 == pQueue->_size) {pQueue->_pTail = NULL;}//减小队列长度--pQueue->_size;return 0;
}
epoll_ctl.c
#include "head.h"//将fd加入epfd
int epoll_add(int fd, int epfd)
{struct epoll_event event;memset(&event, 0, sizeof(event));event.events = EPOLLIN;event.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);return 0;
}//将fd从epfd中移除
int epoll_del(int fd, int epfd)
{struct epoll_event event;memset(&event, 0, sizeof(event));event.events = EPOLLIN;event.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event);return 0;
}
客户端代码
head.h
#ifndef __HEAD_H__
#define __HEAD_H__#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <dirent.h>
#include <sys/stat.h>
#include <fcntl.h>//检查命令行参数个数
#define ARGS_CHECK(argc, num) {if (argc != num) {\fprintf(stderr, "Args error\n");\exit(-1);}}//检查系统调用返回值
#define EXIT_CHECK(ret, num, msg) if (ret == num) {\do {\perror(msg);\exit(-1);\}while(0); }#endif
func.h
#ifndef __FUNC_H__
#define __FUNC_H__//接收发送缓冲区大小
#define BUF_SIZE 1024
//命令中最多有几个参数
#define MAX_WORDNUM 8
//命令一个参数的最大长度
#define MAX_WORDLEN 30//套接字类型
typedef int socket_t;//传输小货车
typedef struct {int _data_len;//货车头,表示数据长度char _data[BUF_SIZE];//货车车厢,表示数据
}Truck_t;//通讯类型
typedef struct {socket_t _sess_fd ; //对端套接字char _sess_buf[BUF_SIZE] ; //发送接收缓冲区
}Session_t, *pSession_t;//命令类型
typedef struct {int _argc; //命令中有几个参数char _cmd[MAX_WORDNUM][MAX_WORDLEN];
}Cmd_t, *pCmd_t;void conf_get(char *conf, char *ip, char *port);//从配置文件中获取参数
int tcp_connect(char *ip, int port);//连接服务器
int upload(int fd_socket, char *filename);//上传文件
int download(int fd_socket, char *filename);//下载文件void init_cmd(char *line, pCmd_t pcmd); //将line分割,存入cmd
void print_cmd(pCmd_t pcmd); //打印命令
int cmd_check(pCmd_t pcmd);//检查命令是否合法
#endif
disk_client.c
#include "head.h"
#include "func.h"int main(int argc, char *argv[])
{//命令行参数:配置文件路径ARGS_CHECK(argc, 2); //拿到服务器ip和portchar ip_server[16] = "";char port_server[5] = "";conf_get(argv[1], ip_server, port_server);//连接服务器socket_t fd_server = tcp_connect(ip_server, atoi(port_server));if (-1 == fd_server) {perror("socket_server");exit(-1);}//从stdin中读取命令,发给服务器char line[128] = "";Cmd_t cmd;Truck_t truck;//小货车,传输数据while (1) {memset(&truck, 0, sizeof(truck));memset(&cmd, 0, sizeof(Cmd_t));memset(line, 0, sizeof(line));//从标准输入中读取命令read(STDIN_FILENO, line, sizeof(line));line[strlen(line) - 1] = '\0';if (!strcmp(line, "quit")) {exit(1); }//格式化这行数据,并存入cmdinit_cmd(line, &cmd);//判断命令是否合法if (-1 == cmd_check(&cmd)) {printf("the cmd is illegal!\n");continue;}print_cmd(&cmd);//将cmd打包放入小货车memcpy(truck._data, &cmd, sizeof(cmd));//将命令发送给服务器truck._data_len = sizeof(Cmd_t);send(fd_server, &truck._data_len, sizeof(int), 0);send(fd_server, truck._data, truck._data_len, 0);//等待服务器返回数据if (!strncmp("cd", cmd._cmd[0], 2)) {system("clear");//先接长度,后接数据recv(fd_server, &truck._data_len, sizeof(int), 0);recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}else if (!strncmp("pwd", cmd._cmd[0], 3)) {system("clear");//先接长度,后接数据recv(fd_server, &truck._data_len, sizeof(int), 0);recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}else if (!strncmp("rm", cmd._cmd[0], 2)) {system("clear");//先接长度,后接数据recv(fd_server, &truck._data_len, sizeof(int), 0);recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}else if (!strncmp("mkdir", cmd._cmd[0], 5)) {system("clear");//先接长度,后接数据recv(fd_server, &truck._data_len, sizeof(int), 0);recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}else if (!strncmp("ls", cmd._cmd[0], 2)) {system("clear");//先接长度,后接数据while (1) {recv(fd_server, &truck._data_len, sizeof(int), 0);/* printf("data_len: %d\n", truck._data_len); */if (0 == truck._data_len) {break;}recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}}else if (!strncmp("puts", cmd._cmd[0], 4)) {system("clear");//上传文件upload(fd_server, cmd._cmd[1]);//先接长度,后接数据recv(fd_server, &truck._data_len, sizeof(int), 0);recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}else if (!strncmp("gets", cmd._cmd[0], 4)) {system("clear");//下载文件download(fd_server, cmd._cmd[1]);//先接长度,后接数据recv(fd_server, &truck._data_len, sizeof(int), 0);recv(fd_server, truck._data, truck._data_len, MSG_WAITALL);printf("%s\n", truck._data);}}return 0;
}
disk_conf.c
#include "head.h"//得到一行中'='符号后面的字符串
static void set_arg(char *line, char *arg)
{//将指针偏移到字符=char *ptr = strchr(line, '=');if (NULL == ptr) {fprintf(stderr, "conf_file is error!\n");exit(-1);}//= 后面的字符串strcpy(arg, ptr + 2);
}//从配置文件中获取参数
void conf_get(char *conf, char *ip, char *port)
{FILE *fp = fopen(conf, "r");EXIT_CHECK(fp, NULL, "fopen");char line[128] = "";//得到ipfgets(line, sizeof(line), fp);line[strlen(line) - 1] = '\0';set_arg(line, ip);//得到portmemset(line, 0, sizeof(line));fgets(line, sizeof(line), fp);line[strlen(line) - 1] = '\0';set_arg(line, port);fclose(fp);
}
disk_func.c
#include "head.h"
#include "func.h"//将line分割,存入cmd
void init_cmd(char *line, pCmd_t pcmd)
{//使用strtok将line拆分char *token;const char s[] = {' ', '\n'};//首次使用strtok时,需传入待分割的字符串和分隔符集合//之后再调用,第一个参数设为NULL,表示从继续上次的位置分割token = strtok(line, s);while (NULL != token) {strcpy(pcmd->_cmd[pcmd->_argc++], token);token = strtok(NULL, s);}
}//打印命令
void print_cmd(pCmd_t pcmd)
{printf("cmd: ");for (int i = 0; i < pcmd->_argc; ++i) {printf("%s ", pcmd->_cmd[i]);}printf("\n");
}//检查命令是否合法
int cmd_check(pCmd_t pcmd)
{char tmp[64] = "";strncpy(tmp, pcmd->_cmd[0], 64);if (!strcmp(tmp, "cd") || !strcmp(tmp, "ls") ||!strcmp(tmp, "pwd") ||!strcmp(tmp, "rm") ||!strcmp(tmp, "puts") ||!strcmp(tmp, "gets") ||!strcmp(tmp, "mkdir")) {return 0;} else {return -1;}
}//上传文件
int upload(int fd_socket, char *filename)
{//查看文件类型是否正确DIR *dirp = opendir("./");struct dirent *dir_cur;int flag = -1; //是否传输标志, 0-不传,1-传while ((dir_cur = readdir(dirp)) != NULL) {if (!strcmp(dir_cur->d_name, filename)) {if (S_ISREG(dir_cur->d_type)) {//如果是普通文件, 传输flag = 1;send(fd_socket, &flag, sizeof(int), 0);}else {//如果不是普通文件,不传flag = 0;send(fd_socket, &flag, sizeof(int), 0);}break;}}closedir(dirp);if (0 == flag) {return -1;}int ret = -1;//定义一个小货车,用来传输文件Truck_t truck;memset(&truck, 0, sizeof(Truck_t));//根据文件名打开传输文件int fd_file = open(filename, O_RDONLY);EXIT_CHECK(fd_file, -1, "open");#if 0//发文件大小struct stat file_info;memset(&file_info, 0, sizeof(file_info));fstat(fd_file, &file_info);truck._data_len = sizeof(file_info.st_size);memcpy(truck._data, &file_info.st_size, truck._data_len);ret = send(fd_socket, &truck, sizeof(int) + truck._data_len, 0);EXIT_CHECK(ret, -1, "send_filesize");
#endif//发文件内容while (1) {memset(truck._data, 0, sizeof(truck._data));//读取文件truck._data_len = read(fd_file, truck._data, BUF_SIZE);if (0 == truck._data_len) {//传输完成,退出循环break;}//发送ret = send(fd_socket, &truck, sizeof(int) + truck._data_len, 0);if (-1 == ret) {//服务器异常断开,退出循环printf("server already break!\n");break;}}//传输完成,通知fdtruck._data_len = 0;send(fd_socket, &truck._data_len, sizeof(int), 0);//关闭传输文件close(fd_file);return 0;
}//下载文件
int download(int fd_socket, char *filename)
{//先接收传输标志,如果是1就下载,如果是0就退出int flag = -1;recv(fd_socket, &flag, sizeof(int), 0);if (0 == flag) {return -1;}//打开或创建一个文件int fd_file = open(filename, O_WRONLY | O_CREAT, 0600);EXIT_CHECK(fd_file, -1, "open");/* //接收文件大小 *//* int filesize = 0; *//* recv(fd_socket, &filesize, sizeof(int), 0); *//* printf("filesize: %d", filesize); *///接收文件内容Truck_t truck;while (1) {memset(&truck, 0, sizeof(truck));//接收数据长度recv(fd_socket, &truck._data_len, sizeof(truck._data_len), 0);if (0 == truck._data_len) {//文件传输完毕break;}//接收数据内容recv(fd_socket, truck._data, truck._data_len, MSG_WAITALL);write(fd_file, truck._data, truck._data_len);}close(fd_file);return 0;
}
tcp_connect.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>#include <sys/types.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <arpa/inet.h>//连接服务器
int tcp_connect(char *ip, int port)
{int fd_server = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serAddr;memset(&serAddr, 0, sizeof(serAddr));serAddr.sin_family = AF_INET;serAddr.sin_addr.s_addr = inet_addr(ip);serAddr.sin_port = htons(port);if (-1 == connect(fd_server, (struct sockaddr*)&serAddr, sizeof(serAddr))) {perror("connect");return -1;}return fd_server;
}
总结
服务器通过线程池处理任务,方便管理,后续均在次基础上升级。