当前位置: 首页 > news >正文

从源码一把聊清楚nacos2.x的事件驱动架构,从迷茫到精通!!

为什么要去聊事件驱动呢,是无聊么,还真不是,它真的能解决很多系统复杂性问题,别急,听我细细道来!!

  随着业务的不断复杂化,可能会渐渐出现屎山代码或者屎山系统的问题,此时你可能会去重构,但是重构的时候往往又缺少一些标准,比如你只是用设计模式优化了一部分,但是其他的同学不一样按照你的风格去扩展,所以此时静下心来想想,能否在软件设计和架构层面提供一种高效、灵活和可扩展的解决方案,去避免屎山代码的出现,此时你可以考虑下事件驱动架构的思想,开冲!!!

文章目录

  • 1. 事件驱动架构的特性:
  • 2. 事件驱动架构有哪些角色:
  • 3. 为什么nacos2要使用事件驱动架构
  • 4. 业务模型
    • 4.1 通知中心NotifyCenter
    • 4.2 事件发布器EventPublisher
    • 4.3 事件订阅者Subscriber
    • 4.4 事件Event
    • 4.5 模型交互架构
  • 5. 事件中心到底干了哪些事
    • 5.1 注册事件发布者
    • 5.2 注册订阅者
    • 5.3 注销事件发布器
    • 5.4 注销订阅者
    • 5.5 发布事件
  • 6. 一个例子讲透事件驱动如何应用
    • 6.1 定义事件
    • 6.2 构建发布订阅模型

1. 事件驱动架构的特性:

  1. 保证系统的灵活性和可扩展性
  • 组件化:在事件驱动架构中,可以把系统拆成独立的发布者和订阅者,并且可以独立的开发测试,并且可以方便添加,去除和替换组件,以应对不断变化的需求。
  • 高扩展:事件驱动架构可以通过添加更多的订阅者来水平扩展系统,减少组件之间的依赖,减少屎山的风险。
  1. 减低系统的耦合度
  • 松耦合:事件的发布者和订阅者通过事件通信,不需要直接引用和依赖,接触耦合性。
  • 异步通信:事件驱动可以采用异步通信,减少组件之间的同步等待时间,这个特性在nacos2.x高性能的关键,后面会细聊。
  1. 提升系统的可维护性
  • 事件处理逻辑清晰:每一个事件都有明确的处理逻辑和处理条件,代码具有高度语义化。
  • 统一的日志和监控:可以在事件驱动架构统一加入日志和监控,便于记录事件发布和处理,提升系统的可观 测性。

2. 事件驱动架构有哪些角色:

事件驱动架构(Event Driven Architecture,EDA)是一种基于事件的异步通信模式,它可以通过发布和订阅事件来实现解耦,在这种架构这下,主要有三个角色:

  1. 发布者只需要把事件发布到事件通道中去。
  2. 订阅者负责对事件的接收或者处理。
  3. 事件在发布订阅架构中,主要表示系统或者应用中变化的抽象描述。

3. 为什么nacos2要使用事件驱动架构

nacos在微服务架构中充当大脑的作用,使用相关广泛,它的性能为什么好?版本迭代过程中,为什么能够快速集成很多功能和外部中间件,都得益于它的事件驱动架构机制,话不多说,主要表现在以下几个方面!!

  • 易于扩展:事件驱动架构使得Nacos能够快速添加新的功能,而不需要对现有的代码进行大量的修改。
  • 高度解耦:事件驱动架构使得服务注册,实例管理等操作与具体的业务逻辑相分离,从而实现了高度解耦。
  • 低延迟:通过事件发布,并且异步执行,可以在nacos做一些业务操作时减少等待时间,提升系统的响应速度。

一波好处讲完了,开整吧!!看看底层是如何构建事件驱动架构!

4. 业务模型

从上面的架构图会看到nacos的事件驱动模型主要有以下几个角色组成:

4.1 通知中心NotifyCenter

public class NotifyCenter {//默认事件发布器的队列长度public static int ringBufferSize;//共享事件发布器队列长度public static int shareBufferSize;private static final AtomicBoolean CLOSED = new AtomicBoolean(false);//默认事件发布器工厂private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;//事件发布中心单例private static final NotifyCenter INSTANCE = new NotifyCenter();//共享事件发布器private DefaultSharePublisher sharePublisher;//事件发布器类private static Class<? extends EventPublisher> clazz;//事件发布器集合,用于处理不同事件的发布private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);//默认事件发布器工厂函数DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};try {// Create and init DefaultSharePublisher instance.// 共享的事件发布器,主要去处理慢事件(SlowEvent)INSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);}
}

主要成员变量如下:

  1. publisherMap:维护了一个全局的事件处理器列表,用于处理不同类型的事件的发布。
  2. sharePublisher:慢事件发布器,主要处理SlowEvent事件的发布。
  3. DEFAULT_PUBLISHER_FACTORY:默认事件发布器工厂函数,主要用来构建事件发布者,并且初始化发布器里面的事件通道长度。

4.2 事件发布器EventPublisher

EventPublisher是一个抽象类,用于事件的发布,类之间的继承图如下:

上面有4个重要的类如下:

  1. EventPublisher:事件发布器的统一接口。
  2. ShardedEventPublisher:继承EventPublisher,可以添加订阅者的订阅事件类型。
  3. DefaultPublisher:默认的单独事件发布器
  4. DefaultSharePublisher:默认的共享事件发布器
public interface ShardedEventPublisher extends EventPublisher {void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}

那问题来了:此处的DefaultPublisher和DefaultSharePublisher有什么区别呢?

  1. DefaultPublisher在NotifyCenter事件通知中心对于每一种事件类型都会有一个DefaultPublisher,因此有多个,而DefaultSharePublisher全局只有一个,因此DefaultPublisher可以理解为单事件发布者,DefaultSharePublisher可以理解为多事件发布者。
  2. DefaultSharePublisher全局只有一个,所有的事件都会走一个事件队列,所以在通道的消费端需要根据不同事件去调用不同的订阅者,DefaultPublisher针对于每一种事件类型都会有一个通道,因此在通道的消费侧不用关注事件类型。具体的代码如下:
  3. DefaultSharePublisher事件消费端源码:
public class DefaultSharePublisher extends DefaultPublisher implements ShardedEventPublisher {//事件->订阅者集合private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();@Overridepublic void receiveEvent(Event event) {final long currentEventSequence = event.sequence();// get subscriber set based on the slow EventType.final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();// Get for Map, the algorithm is O(1).// 根据慢事件类型获取对应的慢事件订阅者Set<Subscriber> subscribers = subMappings.get(slowEventType);if (null == subscribers) {LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", slowEventType.getName());return;}// Notification single event subscriberfor (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",event.getClass());continue;}// Notify single subscriber for slow event.notifySubscriber(subscriber, event);}}
}

receiveEvent是怎么被调用呢?继续看

    public void run() {openEventHandler();}void openEventHandler() {try {// This variable is defined to resolve the problem which message overstock in the queue.int waitTimes = 60;// To ensure that messages are not lost, enable EventHandler when// waiting for the first Subscriber to registerwhile (!shutdown && !hasSubscriber() && waitTimes > 0) {ThreadUtils.sleep(1000L);waitTimes--;}while (!shutdown) {//从事件通道中获取事件final Event event = queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : ", ex);}}

这个run方法会在线程启动时会被执行,会从源码看到会一直从时间通过获取事件,在调用事件处理方法,这个queue就是事件通道,在nacos就是一个阻塞队列BlockingQueue。

4.3 事件订阅者Subscriber

当从事件通道中拿到消息是如何消费呢?那我们就看看Subscriber的类继承图:

  1. Subscriber定义了订阅者基本功能,主要包括
  • onEvent(T event):当事件发生时,触发的回调。
  • subscribeType():订阅者定义的事件类型
  • executor():事件的回调是同步还是异步
  • ignoreExpireEvent():是否忽略事件过期时间
  1. SmartSubscriber相对于Subscriber而言,可以监听多个事件类型。
  • subscribeTypes() 监听的事件类型集合
  1. ClientServiceIndexesManager 客户端服务注册监听器
public class ClientServiceIndexesManager extends SmartSubscriber {//订阅者定义事件@Overridepublic List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);return result;}//事件回调@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}}

由于我们现在只关注基础架构,具体的业务会放在后面再去详细了解。

4.4 事件Event

在事件抽象类中定义了一个事件的序列号,它是自增的。用于区分事件执行的前后顺序。它是由DefaultPublisher来处理。

public abstract class Event implements Serializable {private static final long serialVersionUID = -3731383194964997493L;private static final AtomicLong SEQUENCE = new AtomicLong(0);//事件序列号private final long sequence = SEQUENCE.getAndIncrement();public long sequence() {return sequence;}public String scope() {return null;}public boolean isPluginEvent() {return false;}
}

4.5 模型交互架构

在聊完这些模型之后,会发现是独立的点,那发布事件是如何运行呢?一张图总结下:

当发布事件后,会根据事件是否为慢事件进入不同的发布者队列里面。

DefaultPublisher:是一个事件类型一个队列,因此消费速度快。

DefaultSharePublisher:多个事件一个队列,需要在执行订阅者回调的时候,会根据事件类型去寻找订阅者,因此消费能力慢,所以DefaultSharePublisher主要负责慢事件的订阅消费。

5. 事件中心到底干了哪些事

5.1 注册事件发布者

根据事件类型,创建发布者,如果是慢事件,为DefaultSharePublisher,如果不是,则为默认的DefaultPublisher。

    public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {//判断事件是否为慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}//如果不是慢事件final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.// 判断事件类型对应的发布者是否存在,如果不存在就新建,存在就忽略MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);}

你会发现:若是SlowEvent则直接返回了,则创建DefaultSharePublisher就返回了,为啥呢?结合之前聊过的业务模型,因为DefaultSharePublisher全局只有一个事件队列,在静态初始化就已经创建完了,这个队列的订阅者需要根据不同的时间类型做对应的操作,而DefaultPublisher一个事件类型对应一个队列。

5.2 注册订阅者

根据订阅事件的类型,绑定到不同类型的事件发布器中

public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {// If you want to listen to multiple events, you do it separately,// based on subclass's subscribeTypes method return list, it can register to publisher.// 多事件订阅者注册if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.//根据它的事件类型来决定采用哪种Publisher,多事件订阅者由多事件发布者调度if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {//注册到多事件发布者中INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {//注册到单事件发布者中// For case, producer: defaultPublisher -> consumer: subscriber.addSubscriber(consumer, subscribeType, factory);}}return;}

5.3 注销事件发布器

public static void deregisterPublisher(final Class<? extends Event> eventType) {final String topic = ClassUtils.getCanonicalName(eventType);//从事件发布队列移除EventPublisher publisher = INSTANCE.publisherMap.remove(topic);try {//调用钩子方法publisher.shutdown();} catch (Throwable ex) {LOGGER.error("There was an exception when publisher shutdown : ", ex);}
}@Override
public void shutdown() {//标记关闭this.shutdown = true;//清空事件队列this.queue.clear();
}

5.4 注销订阅者

下面为注册的方向操作

    public static void deregisterSubscriber(final Subscriber consumer) {if (consumer instanceof SmartSubscriber) {//若是多事件调用订阅者for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {//若是慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {//从多事件发布者中移除INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);} else {//从单事件发布者中移除removeSubscriber(consumer, subscribeType);}}return;}//若是单事件订阅者final Class<? extends Event> subscribeType = consumer.subscribeType();//判断是否是慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);return;}//调用移除方法if (removeSubscriber(consumer, subscribeType)) {return;}throw new NoSuchElementException("The subscriber has no event publisher");}

5.5 发布事件

    private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {//假如为慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {//共享事件发布器处理return INSTANCE.sharePublisher.publish(event);}//常规事件final String topic = ClassUtils.getCanonicalName(eventType);//根据事件类型创建对应事件发布器EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}

6. 一个例子讲透事件驱动如何应用

在这里插入图片描述

兄弟们,下面这个例子可以把nacos2.x源码下来,用下面的例子把事件驱动相关组件串联起来跑一下,一定会理解更加深刻。

6.1 定义事件

创建两个事件Test1Event和Test2Event继承Event。

private static class Test1Event extends Event {@Overridepublic long sequence() {return System.currentTimeMillis();}}private static class Test2Event extends Event {@Overridepublic long sequence() {return System.currentTimeMillis();}}

6.2 构建发布订阅模型

public void testPublisher() throws Exception {//创建事件订阅发布器NotifyCenter.registerToPublisher(Test1Event.class, 8);NotifyCenter.registerToPublisher(Test2Event.class, 8);final CountDownLatch latch = new CountDownLatch(2);//创建事件的订阅者NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if(event instanceof Test1Event){System.out.println("Test1Event");}else {System.out.println("Test2Event");}latch.countDown();}@Overridepublic Class<? extends Event> subscribeType() {return Test1Event.class;}});NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if(event instanceof Test1Event){System.out.println("Test1Event");}else {System.out.println("Test2Event");}latch.countDown();}@Overridepublic Class<? extends Event> subscribeType() {return Test2Event.class;}});//发布事件Test1EventNotifyCenter.publishEvent(new Test1Event());Thread.sleep(1000);//发布事件Test1EventNotifyCenter.publishEvent(new Test2Event());latch.await();}

执行结果如下:

Test1Event的订阅者先执行,Test2Event的订阅者后执行,正好就是事件的发布顺序。

从上面的例子可以发现我们只是往事件中心注册了发布者和订阅者,并没有直接让发布者和订阅者进行直接引用,这正是事件驱动在系统解耦上的魅力!!

在之后的nacos源码深度分析中,会聊到事件驱动在整个中间件是如何进行应用的

后续更精彩!!!


http://www.mrgr.cn/news/75200.html

相关文章:

  • 创新培养:汽车零部件图像分割
  • 关于我重生到21世纪学C语言这件事——指针详解(1)
  • 力扣 二叉树的直径-543
  • Docker 篇-Docker 详细安装、了解和使用 Docker 核心功能(数据卷、自定义镜像 Dockerfile、网络)
  • Javaweb—Ajax与jQuery请求
  • Python并发编程入门:使用concurrent.futures与asyncio
  • 【easily-openJCL】要尝试下用 显卡 做数据对称加密吗?
  • Netty之EventLoop自定义任务
  • 自动驾驶系列—从数据采集到存储:解密自动驾驶传感器数据采集盒子的关键技术
  • Ubuntu 的 ROS 操作系统 turtlebot3 导航仿真
  • 输出1~100内的所有偶数C++
  • SpringSecurity入门
  • ubuntu连接orangepi-zero-2w桌面的几种方法
  • 深入浅出C#编程语言
  • 速盾:高防 CDN 的缓存机制是什么?
  • 优选算法 - 3 ( 位运算 模拟 分治 11000 字详解 )
  • docker .vhdx文件压缩
  • LeetCode297.二叉树的序列化和反序列化
  • 搜维尔科技:SenseGlove触觉反馈手套开箱+场景测试
  • IDL脚手架遇到的cwgo问题
  • 黑马智数Day8
  • 机器学习 ---模型评估、选择与验证(1)
  • cache中命中率和平均访问时间
  • RK3568平台开发系列讲解(platform虚拟总线驱动篇)platform总线模型
  • Shell脚本的使用
  • CPLD架构