从源码一把聊清楚nacos2.x的事件驱动架构,从迷茫到精通!!
为什么要去聊事件驱动呢,是无聊么,还真不是,它真的能解决很多系统复杂性问题,别急,听我细细道来!!
随着业务的不断复杂化,可能会渐渐出现屎山代码或者屎山系统的问题,此时你可能会去重构,但是重构的时候往往又缺少一些标准,比如你只是用设计模式优化了一部分,但是其他的同学不一样按照你的风格去扩展,所以此时静下心来想想,能否在软件设计和架构层面提供一种高效、灵活和可扩展的解决方案,去避免屎山代码的出现,此时你可以考虑下事件驱动架构的思想,开冲!!!
文章目录
- 1. 事件驱动架构的特性:
- 2. 事件驱动架构有哪些角色:
- 3. 为什么nacos2要使用事件驱动架构
- 4. 业务模型
- 4.1 通知中心NotifyCenter
- 4.2 事件发布器EventPublisher
- 4.3 事件订阅者Subscriber
- 4.4 事件Event
- 4.5 模型交互架构
- 5. 事件中心到底干了哪些事
- 5.1 注册事件发布者
- 5.2 注册订阅者
- 5.3 注销事件发布器
- 5.4 注销订阅者
- 5.5 发布事件
- 6. 一个例子讲透事件驱动如何应用
- 6.1 定义事件
- 6.2 构建发布订阅模型
1. 事件驱动架构的特性:
- 保证系统的灵活性和可扩展性
- 组件化:在事件驱动架构中,可以把系统拆成独立的发布者和订阅者,并且可以独立的开发测试,并且可以方便添加,去除和替换组件,以应对不断变化的需求。
- 高扩展:事件驱动架构可以通过添加更多的订阅者来水平扩展系统,减少组件之间的依赖,减少屎山的风险。
- 减低系统的耦合度
- 松耦合:事件的发布者和订阅者通过事件通信,不需要直接引用和依赖,接触耦合性。
- 异步通信:事件驱动可以采用异步通信,减少组件之间的同步等待时间,这个特性在nacos2.x高性能的关键,后面会细聊。
- 提升系统的可维护性
- 事件处理逻辑清晰:每一个事件都有明确的处理逻辑和处理条件,代码具有高度语义化。
- 统一的日志和监控:可以在事件驱动架构统一加入日志和监控,便于记录事件发布和处理,提升系统的可观 测性。
2. 事件驱动架构有哪些角色:
事件驱动架构(Event Driven Architecture,EDA)是一种基于事件的异步通信模式,它可以通过发布和订阅事件来实现解耦,在这种架构这下,主要有三个角色:
- 发布者只需要把事件发布到事件通道中去。
- 订阅者负责对事件的接收或者处理。
- 事件在发布订阅架构中,主要表示系统或者应用中变化的抽象描述。
3. 为什么nacos2要使用事件驱动架构
nacos在微服务架构中充当大脑的作用,使用相关广泛,它的性能为什么好?版本迭代过程中,为什么能够快速集成很多功能和外部中间件,都得益于它的事件驱动架构机制,话不多说,主要表现在以下几个方面!!
- 易于扩展:事件驱动架构使得Nacos能够快速添加新的功能,而不需要对现有的代码进行大量的修改。
- 高度解耦:事件驱动架构使得服务注册,实例管理等操作与具体的业务逻辑相分离,从而实现了高度解耦。
- 低延迟:通过事件发布,并且异步执行,可以在nacos做一些业务操作时减少等待时间,提升系统的响应速度。
一波好处讲完了,开整吧!!看看底层是如何构建事件驱动架构!
4. 业务模型
从上面的架构图会看到nacos的事件驱动模型主要有以下几个角色组成:
4.1 通知中心NotifyCenter
public class NotifyCenter {//默认事件发布器的队列长度public static int ringBufferSize;//共享事件发布器队列长度public static int shareBufferSize;private static final AtomicBoolean CLOSED = new AtomicBoolean(false);//默认事件发布器工厂private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;//事件发布中心单例private static final NotifyCenter INSTANCE = new NotifyCenter();//共享事件发布器private DefaultSharePublisher sharePublisher;//事件发布器类private static Class<? extends EventPublisher> clazz;//事件发布器集合,用于处理不同事件的发布private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);//默认事件发布器工厂函数DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};try {// Create and init DefaultSharePublisher instance.// 共享的事件发布器,主要去处理慢事件(SlowEvent)INSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);}
}
主要成员变量如下:
- publisherMap:维护了一个全局的事件处理器列表,用于处理不同类型的事件的发布。
- sharePublisher:慢事件发布器,主要处理SlowEvent事件的发布。
- DEFAULT_PUBLISHER_FACTORY:默认事件发布器工厂函数,主要用来构建事件发布者,并且初始化发布器里面的事件通道长度。
4.2 事件发布器EventPublisher
EventPublisher是一个抽象类,用于事件的发布,类之间的继承图如下:
上面有4个重要的类如下:
- EventPublisher:事件发布器的统一接口。
- ShardedEventPublisher:继承EventPublisher,可以添加订阅者的订阅事件类型。
- DefaultPublisher:默认的单独事件发布器
- DefaultSharePublisher:默认的共享事件发布器
public interface ShardedEventPublisher extends EventPublisher {void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}
那问题来了:此处的DefaultPublisher和DefaultSharePublisher有什么区别呢?
- DefaultPublisher在NotifyCenter事件通知中心对于每一种事件类型都会有一个DefaultPublisher,因此有多个,而DefaultSharePublisher全局只有一个,因此DefaultPublisher可以理解为单事件发布者,DefaultSharePublisher可以理解为多事件发布者。
- DefaultSharePublisher全局只有一个,所有的事件都会走一个事件队列,所以在通道的消费端需要根据不同事件去调用不同的订阅者,DefaultPublisher针对于每一种事件类型都会有一个通道,因此在通道的消费侧不用关注事件类型。具体的代码如下:
- DefaultSharePublisher事件消费端源码:
public class DefaultSharePublisher extends DefaultPublisher implements ShardedEventPublisher {//事件->订阅者集合private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();@Overridepublic void receiveEvent(Event event) {final long currentEventSequence = event.sequence();// get subscriber set based on the slow EventType.final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();// Get for Map, the algorithm is O(1).// 根据慢事件类型获取对应的慢事件订阅者Set<Subscriber> subscribers = subMappings.get(slowEventType);if (null == subscribers) {LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", slowEventType.getName());return;}// Notification single event subscriberfor (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",event.getClass());continue;}// Notify single subscriber for slow event.notifySubscriber(subscriber, event);}}
}
receiveEvent是怎么被调用呢?继续看
public void run() {openEventHandler();}void openEventHandler() {try {// This variable is defined to resolve the problem which message overstock in the queue.int waitTimes = 60;// To ensure that messages are not lost, enable EventHandler when// waiting for the first Subscriber to registerwhile (!shutdown && !hasSubscriber() && waitTimes > 0) {ThreadUtils.sleep(1000L);waitTimes--;}while (!shutdown) {//从事件通道中获取事件final Event event = queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : ", ex);}}
这个run方法会在线程启动时会被执行,会从源码看到会一直从时间通过获取事件,在调用事件处理方法,这个queue就是事件通道,在nacos就是一个阻塞队列BlockingQueue。
4.3 事件订阅者Subscriber
当从事件通道中拿到消息是如何消费呢?那我们就看看Subscriber的类继承图:
- Subscriber定义了订阅者基本功能,主要包括
- onEvent(T event):当事件发生时,触发的回调。
- subscribeType():订阅者定义的事件类型
- executor():事件的回调是同步还是异步
- ignoreExpireEvent():是否忽略事件过期时间
- SmartSubscriber相对于Subscriber而言,可以监听多个事件类型。
- subscribeTypes() 监听的事件类型集合
- ClientServiceIndexesManager 客户端服务注册监听器
public class ClientServiceIndexesManager extends SmartSubscriber {//订阅者定义事件@Overridepublic List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);return result;}//事件回调@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}}
由于我们现在只关注基础架构,具体的业务会放在后面再去详细了解。
4.4 事件Event
在事件抽象类中定义了一个事件的序列号,它是自增的。用于区分事件执行的前后顺序。它是由DefaultPublisher来处理。
public abstract class Event implements Serializable {private static final long serialVersionUID = -3731383194964997493L;private static final AtomicLong SEQUENCE = new AtomicLong(0);//事件序列号private final long sequence = SEQUENCE.getAndIncrement();public long sequence() {return sequence;}public String scope() {return null;}public boolean isPluginEvent() {return false;}
}
4.5 模型交互架构
在聊完这些模型之后,会发现是独立的点,那发布事件是如何运行呢?一张图总结下:
当发布事件后,会根据事件是否为慢事件进入不同的发布者队列里面。
DefaultPublisher:是一个事件类型一个队列,因此消费速度快。
DefaultSharePublisher:多个事件一个队列,需要在执行订阅者回调的时候,会根据事件类型去寻找订阅者,因此消费能力慢,所以DefaultSharePublisher主要负责慢事件的订阅消费。
5. 事件中心到底干了哪些事
5.1 注册事件发布者
根据事件类型,创建发布者,如果是慢事件,为DefaultSharePublisher,如果不是,则为默认的DefaultPublisher。
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {//判断事件是否为慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}//如果不是慢事件final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.// 判断事件类型对应的发布者是否存在,如果不存在就新建,存在就忽略MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);}
你会发现:若是SlowEvent则直接返回了,则创建DefaultSharePublisher就返回了,为啥呢?结合之前聊过的业务模型,因为DefaultSharePublisher全局只有一个事件队列,在静态初始化就已经创建完了,这个队列的订阅者需要根据不同的时间类型做对应的操作,而DefaultPublisher一个事件类型对应一个队列。
5.2 注册订阅者
根据订阅事件的类型,绑定到不同类型的事件发布器中
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {// If you want to listen to multiple events, you do it separately,// based on subclass's subscribeTypes method return list, it can register to publisher.// 多事件订阅者注册if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.//根据它的事件类型来决定采用哪种Publisher,多事件订阅者由多事件发布者调度if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {//注册到多事件发布者中INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {//注册到单事件发布者中// For case, producer: defaultPublisher -> consumer: subscriber.addSubscriber(consumer, subscribeType, factory);}}return;}
5.3 注销事件发布器
public static void deregisterPublisher(final Class<? extends Event> eventType) {final String topic = ClassUtils.getCanonicalName(eventType);//从事件发布队列移除EventPublisher publisher = INSTANCE.publisherMap.remove(topic);try {//调用钩子方法publisher.shutdown();} catch (Throwable ex) {LOGGER.error("There was an exception when publisher shutdown : ", ex);}
}@Override
public void shutdown() {//标记关闭this.shutdown = true;//清空事件队列this.queue.clear();
}
5.4 注销订阅者
下面为注册的方向操作
public static void deregisterSubscriber(final Subscriber consumer) {if (consumer instanceof SmartSubscriber) {//若是多事件调用订阅者for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {//若是慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {//从多事件发布者中移除INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);} else {//从单事件发布者中移除removeSubscriber(consumer, subscribeType);}}return;}//若是单事件订阅者final Class<? extends Event> subscribeType = consumer.subscribeType();//判断是否是慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);return;}//调用移除方法if (removeSubscriber(consumer, subscribeType)) {return;}throw new NoSuchElementException("The subscriber has no event publisher");}
5.5 发布事件
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {//假如为慢事件if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {//共享事件发布器处理return INSTANCE.sharePublisher.publish(event);}//常规事件final String topic = ClassUtils.getCanonicalName(eventType);//根据事件类型创建对应事件发布器EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}
6. 一个例子讲透事件驱动如何应用
兄弟们,下面这个例子可以把nacos2.x源码下来,用下面的例子把事件驱动相关组件串联起来跑一下,一定会理解更加深刻。
6.1 定义事件
创建两个事件Test1Event和Test2Event继承Event。
private static class Test1Event extends Event {@Overridepublic long sequence() {return System.currentTimeMillis();}}private static class Test2Event extends Event {@Overridepublic long sequence() {return System.currentTimeMillis();}}
6.2 构建发布订阅模型
public void testPublisher() throws Exception {//创建事件订阅发布器NotifyCenter.registerToPublisher(Test1Event.class, 8);NotifyCenter.registerToPublisher(Test2Event.class, 8);final CountDownLatch latch = new CountDownLatch(2);//创建事件的订阅者NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if(event instanceof Test1Event){System.out.println("Test1Event");}else {System.out.println("Test2Event");}latch.countDown();}@Overridepublic Class<? extends Event> subscribeType() {return Test1Event.class;}});NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if(event instanceof Test1Event){System.out.println("Test1Event");}else {System.out.println("Test2Event");}latch.countDown();}@Overridepublic Class<? extends Event> subscribeType() {return Test2Event.class;}});//发布事件Test1EventNotifyCenter.publishEvent(new Test1Event());Thread.sleep(1000);//发布事件Test1EventNotifyCenter.publishEvent(new Test2Event());latch.await();}
执行结果如下:
Test1Event的订阅者先执行,Test2Event的订阅者后执行,正好就是事件的发布顺序。
从上面的例子可以发现我们只是往事件中心注册了发布者和订阅者,并没有直接让发布者和订阅者进行直接引用,这正是事件驱动在系统解耦上的魅力!!
在之后的nacos源码深度分析中,会聊到事件驱动在整个中间件是如何进行应用的
后续更精彩!!!