Netty笔记09-网络协议设计与解析
文章目录
- 前言
- 一、协议设计
- 1. 数据格式
- 2. 消息长度
- 3. 编码方式
- 4. 错误处理
- 5. 安全性
- 二、协议解析
- 1. 消息分隔
- 2. 粘包与半包处理
- 3. 校验机制
- 三、为什么需要协议?
- 四、redis 协议
- 五、HTTP 协议
- 六、自定义协议要素
- 编解码器
- 💡 什么时候可以加 @Sharable
前言
在网络通信中,设计和解析网络协议是确保数据可靠传输的关键环节。网络协议定义了数据在网络中传输时必须遵循的规则和格式,包括数据的组织方式、传输方式以及如何处理错误和异常情况。
一、协议设计
1. 数据格式
- 消息头:通常包含消息的元数据,如消息长度、消息类型、源地址、目标地址等。
- 消息体:包含实际的数据内容。
- 校验码:可选字段,用于检测传输过程中的数据完整性,如CRC校验码。
2. 消息长度
- 定长消息:所有消息都具有固定的长度,易于解析但不够灵活。
- 变长消息:消息长度可变,需要在消息头中指定消息长度,以便接收方正确解析。
3. 编码方式
- 文本编码:如JSON、XML等,易于阅读和调试,但解析效率较低。
- 二进制编码:如Protocol Buffers、Thrift等,解析效率高,但可读性较差。
4. 错误处理
- 重传机制:当数据包丢失或损坏时,请求重新发送。
- 超时机制:设置合理的超时时间,超过时间未收到确认则重传。
- 错误码:定义错误码以标识不同的错误类型,便于错误处理。
5. 安全性
- 加密:使用SSL/TLS等协议加密传输的数据。
- 认证:确保消息来源的真实性和合法性。
二、协议解析
1. 消息分隔
- 定长分隔:根据固定的长度来区分消息。
- 特殊分隔符:如换行符(\n)、定界符等。
- 消息头:根据消息头中指定的长度来区分消息。
2. 粘包与半包处理
- 粘包:多个消息合并成一个数据包,需要根据消息头或定界符来识别消息边界。
- 半包:一个消息被分割成多个数据包,需要累积接收到的所有片段才能构成完整消息。
3. 校验机制
- CRC校验:接收方计算接收到的消息的CRC值并与消息中携带的CRC值比较,确保数据完整性。
- MD5/SHA哈希:对于大文件传输,使用哈希值校验数据完整性
三、为什么需要协议?
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输 :
下雨天留客天留我不留
是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读
下雨天留客,天留,我不留
另一种解读
下雨天,留客天,留我不?留
如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
定长字节表示内容长度 + 实际内容
例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了
0f下雨天留客06天留09我不留
四、redis 协议
使用redis 协议举例
如发送命令:set key value
- redis要求先发送 * 加 数组的长度/个数(redis协议需要将set、key、value放到一个数组中)
- 要求发送$ 加 每个(命令/键值)的长度
- 要求多个部分之间要用回车换行分隔
如:set name zhangsan
*3
$3
set
$4
name
$8
zhangsan
@Slf4j
public class Test01Redis {public static void main(String[] args) {final byte[] LINE = {13, 10};//13.回车 10.换行NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {//channelActive:连接建立发送命令@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);//LINE:回车换行buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$4".getBytes());buf.writeBytes(LINE);buf.writeBytes("name".getBytes());buf.writeBytes(LINE);buf.writeBytes("$8".getBytes());buf.writeBytes(LINE);buf.writeBytes("zhangsan".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}//接受redis返回的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("channelRead");System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
运行以上代码后即可在redis中查询到对应的kv数据
五、HTTP 协议
在拿http 协议举例
@Slf4j
public class Test02Http {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());
// ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.debug("{}", msg.getClass());
//
// if (msg instanceof HttpRequest) { // 请求行,请求头
//
// } else if (msg instanceof HttpContent) { //请求体
//
// }
// }
// });//现在只处理HttpRequest类型的消息(选择处理),HttpContent这里会选择跳过ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();//设置响应体长度response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
启动以上代码后发送http请求
六、自定义协议要素
消息内容:
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
//@ChannelHandler.Sharable//表示可以多个channel共享当前Handler
public class MessageCodec extends ByteToMessageCodec<Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {System.out.println("执行MessageCodec.encode()");// 1. 4 字节的魔数(暗号)out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1(自定义约定)out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节的请求序号out.writeInt(msg.getSequenceId());// 无意义,对齐填充(使满足2的n次方倍)out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);//将对象转化为二进制字节数组oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("执行MessageCodec.decode()");int magicNum = in.readInt();//read会改变读指针,get只根据索引找byte version = in.readByte();byte serializerType = in.readByte();//字节的序列化方式byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();//对齐填充int length = in.readInt();//长度byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);if(serializerType == 0){//如果是jdkObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();//注意这里magicNum显示为16909060,因为十六进制的01020304转为是十进制,为16909060log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);//netty约定需要将解码后的结果放到参数中,不然下一个header将无法拿到解码后的结果}}
}
import lombok.Data;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;/*** 消息基类*/
@Data
public abstract class Message implements Serializable {private int sequenceId;private int messageType;public abstract int getMessageType();//登录请求消息public static final int LoginRequestMessage = 0;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);}
}
import lombok.Data;
import lombok.ToString;@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {private String username;private String password;public LoginRequestMessage() {}public LoginRequestMessage(String username, String password) {this.username = username;this.password = password;}@Overridepublic int getMessageType() {return LoginRequestMessage;}
}
测试类
public class TestMessageCodec {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),//当将s1发送给解码器时,只拿到了前100个字节,此时发现不是一个完整的消息,不会继续传递给下面的handlernew LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec());// encodeLoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");channel.writeOutbound(message);//消息出战时会经过MessageCodec(),此时会运行MessageCodec()的encodeThread.sleep(1000);// decodeByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec().encode(null, message, buf);//让ByteBuf有数据,模拟入站//模拟半包,将buf切成两个
// ByteBuf s1 = buf.slice(0, 100);
// channel.writeInbound(s1);ByteBuf s1 = buf.slice(0, 100);ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);s1.retain(); //引用计数++ --> 引用计数=2System.out.println("接受s1");channel.writeInbound(s1); // release 1Thread.sleep(2000);System.out.println("接受s2");//注意:这里如果没有s1.retain()会报错:IllegalReferenceCountException: refCnt: 0, decrement: 1//原因是slice()只是逻辑切割,物理上buf、s1、s2是公用一块内存,// 而当运行channel.writeInbound(s1)时,会将s1的引用计数减为0(表示buf、s1、s2公用的内存被释放掉了),// 所以运行channel.writeInbound(s2)时会报错channel.writeInbound(s2);}
}
输出
+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 c6 |................|
|00000010| ac ed 00 05 73 72 00 25 63 6e 2e 69 74 63 61 73 |....sr.%cn.itcas|
|00000020| 74 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 6e 52 |t.message.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 a0 3f 71 |equestMessage.?q|
|00000040| cb 31 45 b5 88 02 00 02 4c 00 08 70 61 73 73 77 |.1E.....L..passw|
|00000050| 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 |ordt..Ljava/lang|
|00000060| 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e |/String;L..usern|
|00000070| 61 6d 65 71 00 7e 00 01 78 72 00 19 63 6e 2e 69 |ameq.~..xr..cn.i|
|00000080| 74 63 61 73 74 2e 6d 65 73 73 61 67 65 2e 4d 65 |tcast.message.Me|
|00000090| 73 73 61 67 65 3d dd 19 a0 bc 07 47 cb 02 00 02 |ssage=.....G....|
|000000a0| 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 |I..messageTypeI.|
|000000b0| 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 |.sequenceIdxp...|
|000000c0| 00 00 00 00 00 74 00 03 31 32 33 74 00 08 7a 68 |.....t..123t..zh|
|000000d0| 61 6e 67 73 61 6e |angsan |
+--------+-------------------------------------------------+----------------+
通过以上打印的日志可以看出:
💡 什么时候可以加 @Sharable
- 当 handler 不保存状态(如多线程情况下半包问题)时,就可以安全地在多线程下被共享
- 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
- 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
在多线程情况下半包问题,A线程接受到的消息只发送了一半,B线程使用了另外一个channel,接受到了一个半包,这时候LengthFieldBasedFrameDecoder就会将A线程和B线程接受的数据拼接在一起。
所以当一个handler只要记录了多次消息之间的状态,就是线程不安全的。不能在多线程下同时使用这个handler(如LengthFieldBasedFrameDecoder)
只有沾包半包处理器需要每次创建新的对象,不能和其他channel共享。
其他的处理器如果没有在多个事件中共享的数据,如果没有则可以和其他channel共享。