当前位置: 首页 > news >正文

Netty笔记09-网络协议设计与解析

文章目录

  • 前言
  • 一、协议设计
    • 1. 数据格式
    • 2. 消息长度
    • 3. 编码方式
    • 4. 错误处理
    • 5. 安全性
  • 二、协议解析
    • 1. 消息分隔
    • 2. 粘包与半包处理
    • 3. 校验机制
  • 三、为什么需要协议?
  • 四、redis 协议
  • 五、HTTP 协议
  • 六、自定义协议要素
    • 编解码器
    • 💡 什么时候可以加 @Sharable


前言

在网络通信中,设计和解析网络协议是确保数据可靠传输的关键环节。网络协议定义了数据在网络中传输时必须遵循的规则和格式,包括数据的组织方式、传输方式以及如何处理错误和异常情况。

一、协议设计

1. 数据格式

  • 消息头:通常包含消息的元数据,如消息长度、消息类型、源地址、目标地址等。
  • 消息体:包含实际的数据内容。
  • 校验码:可选字段,用于检测传输过程中的数据完整性,如CRC校验码。

2. 消息长度

  • 定长消息:所有消息都具有固定的长度,易于解析但不够灵活。
  • 变长消息:消息长度可变,需要在消息头中指定消息长度,以便接收方正确解析。

3. 编码方式

  • 文本编码:如JSON、XML等,易于阅读和调试,但解析效率较低。
  • 二进制编码:如Protocol Buffers、Thrift等,解析效率高,但可读性较差。

4. 错误处理

  • 重传机制:当数据包丢失或损坏时,请求重新发送。
  • 超时机制:设置合理的超时时间,超过时间未收到确认则重传。
  • 错误码:定义错误码以标识不同的错误类型,便于错误处理。

5. 安全性

  • 加密:使用SSL/TLS等协议加密传输的数据。
  • 认证:确保消息来源的真实性和合法性。

二、协议解析

1. 消息分隔

  • 定长分隔:根据固定的长度来区分消息。
  • 特殊分隔符:如换行符(\n)、定界符等。
  • 消息头:根据消息头中指定的长度来区分消息。

2. 粘包与半包处理

  • 粘包:多个消息合并成一个数据包,需要根据消息头或定界符来识别消息边界。
  • 半包:一个消息被分割成多个数据包,需要累积接收到的所有片段才能构成完整消息。

3. 校验机制

  • CRC校验:接收方计算接收到的消息的CRC值并与消息中携带的CRC值比较,确保数据完整性。
  • MD5/SHA哈希:对于大文件传输,使用哈希值校验数据完整性

三、为什么需要协议?

TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输 :

下雨天留客天留我不留

是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读

下雨天留客,天留,我不留

另一种解读

下雨天,留客天,留我不?留

如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
定长字节表示内容长度 + 实际内容

例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了

0f下雨天留客06天留09我不留

四、redis 协议

使用redis 协议举例

如发送命令:set key value

  1. redis要求先发送 * 加 数组的长度/个数(redis协议需要将set、key、value放到一个数组中)
  2. 要求发送$ 加 每个(命令/键值)的长度
  3. 要求多个部分之间要用回车换行分隔
    如:set name zhangsan
    *3
    $3
    set
    $4
    name
    $8
    zhangsan
@Slf4j
public class Test01Redis {public static void main(String[] args) {final byte[] LINE = {13, 10};//13.回车 10.换行NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {//channelActive:连接建立发送命令@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);//LINE:回车换行buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$4".getBytes());buf.writeBytes(LINE);buf.writeBytes("name".getBytes());buf.writeBytes(LINE);buf.writeBytes("$8".getBytes());buf.writeBytes(LINE);buf.writeBytes("zhangsan".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}//接受redis返回的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("channelRead");System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

运行以上代码后即可在redis中查询到对应的kv数据
在这里插入图片描述

五、HTTP 协议

在拿http 协议举例

@Slf4j
public class Test02Http {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());
//                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//                        @Override
//                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                            log.debug("{}", msg.getClass());
//
//                            if (msg instanceof HttpRequest) { // 请求行,请求头
//
//                            } else if (msg instanceof HttpContent) { //请求体
//
//                            }
//                        }
//                    });//现在只处理HttpRequest类型的消息(选择处理),HttpContent这里会选择跳过ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();//设置响应体长度response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

启动以上代码后发送http请求
在这里插入图片描述

六、自定义协议要素

消息内容:

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
//@ChannelHandler.Sharable//表示可以多个channel共享当前Handler
public class MessageCodec extends ByteToMessageCodec<Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {System.out.println("执行MessageCodec.encode()");// 1. 4 字节的魔数(暗号)out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1(自定义约定)out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节的请求序号out.writeInt(msg.getSequenceId());// 无意义,对齐填充(使满足2的n次方倍)out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);//将对象转化为二进制字节数组oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("执行MessageCodec.decode()");int magicNum = in.readInt();//read会改变读指针,get只根据索引找byte version = in.readByte();byte serializerType = in.readByte();//字节的序列化方式byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();//对齐填充int length = in.readInt();//长度byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);if(serializerType == 0){//如果是jdkObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();//注意这里magicNum显示为16909060,因为十六进制的01020304转为是十进制,为16909060log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);//netty约定需要将解码后的结果放到参数中,不然下一个header将无法拿到解码后的结果}}
}
import lombok.Data;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;/*** 消息基类*/
@Data
public abstract class Message implements Serializable {private int sequenceId;private int messageType;public abstract int getMessageType();//登录请求消息public static final int LoginRequestMessage = 0;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);}
}
import lombok.Data;
import lombok.ToString;@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {private String username;private String password;public LoginRequestMessage() {}public LoginRequestMessage(String username, String password) {this.username = username;this.password = password;}@Overridepublic int getMessageType() {return LoginRequestMessage;}
}

测试类

public class TestMessageCodec {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),//当将s1发送给解码器时,只拿到了前100个字节,此时发现不是一个完整的消息,不会继续传递给下面的handlernew LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec());// encodeLoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");channel.writeOutbound(message);//消息出战时会经过MessageCodec(),此时会运行MessageCodec()的encodeThread.sleep(1000);// decodeByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec().encode(null, message, buf);//让ByteBuf有数据,模拟入站//模拟半包,将buf切成两个
//        ByteBuf s1 = buf.slice(0, 100);
//        channel.writeInbound(s1);ByteBuf s1 = buf.slice(0, 100);ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);s1.retain(); //引用计数++  --> 引用计数=2System.out.println("接受s1");channel.writeInbound(s1); // release 1Thread.sleep(2000);System.out.println("接受s2");//注意:这里如果没有s1.retain()会报错:IllegalReferenceCountException: refCnt: 0, decrement: 1//原因是slice()只是逻辑切割,物理上buf、s1、s2是公用一块内存,// 而当运行channel.writeInbound(s1)时,会将s1的引用计数减为0(表示buf、s1、s2公用的内存被释放掉了),// 所以运行channel.writeInbound(s2)时会报错channel.writeInbound(s2);}
}

输出

         +-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 c6 |................|
|00000010| ac ed 00 05 73 72 00 25 63 6e 2e 69 74 63 61 73 |....sr.%cn.itcas|
|00000020| 74 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 6e 52 |t.message.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 a0 3f 71 |equestMessage.?q|
|00000040| cb 31 45 b5 88 02 00 02 4c 00 08 70 61 73 73 77 |.1E.....L..passw|
|00000050| 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 |ordt..Ljava/lang|
|00000060| 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e |/String;L..usern|
|00000070| 61 6d 65 71 00 7e 00 01 78 72 00 19 63 6e 2e 69 |ameq.~..xr..cn.i|
|00000080| 74 63 61 73 74 2e 6d 65 73 73 61 67 65 2e 4d 65 |tcast.message.Me|
|00000090| 73 73 61 67 65 3d dd 19 a0 bc 07 47 cb 02 00 02 |ssage=.....G....|
|000000a0| 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 |I..messageTypeI.|
|000000b0| 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 |.sequenceIdxp...|
|000000c0| 00 00 00 00 00 74 00 03 31 32 33 74 00 08 7a 68 |.....t..123t..zh|
|000000d0| 61 6e 67 73 61 6e                               |angsan          |
+--------+-------------------------------------------------+----------------+

通过以上打印的日志可以看出:
在这里插入图片描述

💡 什么时候可以加 @Sharable

  • 当 handler 不保存状态(如多线程情况下半包问题)时,就可以安全地在多线程下被共享
  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类

在多线程情况下半包问题,A线程接受到的消息只发送了一半,B线程使用了另外一个channel,接受到了一个半包,这时候LengthFieldBasedFrameDecoder就会将A线程和B线程接受的数据拼接在一起。
所以当一个handler只要记录了多次消息之间的状态,就是线程不安全的。不能在多线程下同时使用这个handler(如LengthFieldBasedFrameDecoder)

只有沾包半包处理器需要每次创建新的对象,不能和其他channel共享。
其他的处理器如果没有在多个事件中共享的数据,如果没有则可以和其他channel共享。



http://www.mrgr.cn/news/28946.html

相关文章:

  • 【Git版本控制器--1】Git的基本操作--本地仓库
  • STM32之两种驱动 “旋转编码器“ 方式
  • WPF系列九:图形控件EllipseGeometry
  • 【Block总结】掩码窗口自注意力 (M-WSA)
  • 单细胞组学大模型(8)--- scGenePT,scGPT和GenePT的结合,实验数据和文本数据的交融模型
  • Sui Move:基本概览一
  • vue3 表单校验规则封装
  • 【docker学习笔记】docker概念和命令
  • 我的5周年创作纪念日,不忘初心,方得始终。
  • CI/CD持续集成和持续交付(git工具、gitlab代码仓库、jenkins)
  • Vue3项目开发——新闻发布管理系统(七)
  • Koa安装和应用
  • RocksDB系列一:基本概念
  • 超全网络安全面试题汇总(2024版)
  • list从0到1的突破
  • 精选评测!分享5款AI写论文最好用的软件排名
  • Get包中的根组件
  • 驱动器磁盘未格式化恢复实战
  • c语言指针4
  • (十五)、把自己的镜像推送到 DockerHub
  • UE(C++API)
  • [Redis] Redis中的set和zset类型
  • Oracle 19c异常恢复—ORA-01209/ORA-65088---惜分飞
  • 代码随想录:动态规划4-5
  • 安徽省建设工程企业资质管理新动向
  • 阿里OSS对象存储服务,实现图片上传回显