【Netty】消息分发处理方式
背景
检索平台,使用长链接的方式与外部系统进行交互; 因平台使用的的自定义二进制交互协议,因此需要针对每个接口请求与响应都要进行编解码, 因此需要一种针对不同消息的分发处理
方案一 注解 + 反射
示例:
/**
* 消息类型注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface MessageHandler {//类型,平台使用符号CMD, 为了兼容不同类型,相同处理方式逻辑,这里可设置为数组String type();// 优先级int priority() default 0;
}/**
* 基础处理器
*/
public abstract class AbstractMessageHandler {// 处理消息的抽象方法public abstract void handle(Object message);
}/**
* 具体处理器示例
*/
@MessageHandler(type = "USER")
public class UserMessageHandler extends AbstractMessageHandler {@Overridepublic void handle(Object message) {// 处理用户消息}
}@MessageHandler(type = "ORDER")
public class OrderMessageHandler extends AbstractMessageHandler {@Overridepublic void handle(Object message) {// 处理订单消息}
}/*** 消息分发处理器*/
public class MessageDispatchHandler extends ChannelInboundHandlerAdapter {private final Map<String, AbstractMessageHandler> handlerMap = new HashMap<>();public MessageDispatchHandler() {// 初始化时扫描并注册所有处理器initHandlers();}private void initHandlers() {// 扫描指定包下的所有类Reflections reflections = new Reflections("com.your.package");Set<Class<?>> handlers = reflections.getTypesAnnotatedWith(MessageHandler.class);for (Class<?> handlerClass : handlers) {MessageHandler annotation = handlerClass.getAnnotation(MessageHandler.class);try {AbstractMessageHandler handler = (AbstractMessageHandler) handlerClass.newInstance();handlerMap.put(annotation.type(), handler);} catch (Exception e) {// 异常处理}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 获取消息类型String type = getMessageType(msg);AbstractMessageHandler handler = handlerMap.get(type);if (handler != null) {handler.handle(msg);} else {// 处理未知类型消息ctx.fireChannelRead(msg);}}
}/**
* 异常处理
*/
public class MessageDispatchHandler extends ChannelInboundHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 统一的异常处理if (cause instanceof MessageHandleException) {// 处理特定异常} else {// 处理其他异常}}
}/**
* 处理器生命周期管理
*/
public abstract class AbstractMessageHandler {public void init() {// 初始化逻辑}public void destroy() {// 销毁逻辑}public abstract void handle(Object message);
}
优势:
- 更容易实现动态的消息类型处理
- 可以更方便地进行统一的消息处理前后的拦截
- 便于实现消息处理的统计、监控等横切功能
问题:
- 反射可能带来性能开销
- 类型判断的逻辑集中在一处,可能会变得复杂
- 不是标准的 Netty Pipeline 模式,不能像 Netty Pipeline 那样灵活地调整处理顺序
方案二、Netty责任链
// 配置 Pipeline
public void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new UserMessageHandler());pipeline.addLast(new OrderMessageHandler());pipeline.addLast(new SystemMessageHandler());
}// 各个 Handler 只处理自己关心的消息类型
public class UserMessageHandler extends SimpleChannelInboundHandler<UserMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, UserMessage msg) {// 处理用户消息}
}public class OrderMessageHandler extends SimpleChannelInboundHandler<OrderMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, OrderMessage msg) {// 处理订单消息}
}
优势:
- 代码结构更清晰 ,每个 Handler 职责单一,只处理特定类型的消息,处理流程清晰可见,易于理解和维护
- 更灵活的消息流转,可以动态决定是否继续传递消息,支持双向传递(Inbound 和 Outbound)
- 线程安全,Netty 的 Pipeline 实现保证了处理器的线程安全
- 避免了手动同步带来的复杂性
- 性能更优,Netty 的 Pipeline 实现经过优化
总结
从性能角度来说
- netty责任链模式性能更好
- 不需要运行时反射
- 类型检查更高效
- 内存占用更小
但是,这并不意味着反射注解就一定不好,在以下场景中,反射注解方案可能更适合:
- 需要非常灵活的消息处理配置
- 需要动态加载/卸载处理器
- 处理器的执行顺序需要频繁调整
- 业务逻辑比网络IO更重,反射带来的性能影响可以忽略
最终的选择应该基于性能要求、业务复杂度、扩展性需求、维护成本;如果没有特殊要求,建议使用原生的Netty原生的责任链模式,因为他不仅性能更好,而且与Netty的整体的设计理念更加契合