当前位置: 首页 > news >正文

Sentinel源码—8.限流算法和设计模式总结二

大纲

1.关于限流的概述

2.高并发下的四大限流算法原理及实现

3.Sentinel使用的设计模式总结

3.Sentinel使用的设计模式总结

(1)责任链模式

(2)监听器模式

(3)适配器模式

(4)模版方法模式

(5)策略模式

(6)观察者模式

(1)责任链模式

一.责任链接口ProcessorSlot

二.责任链接口的抽象实现类

三.责任链的构建

Sentinel的功能都是靠一条链式的ProcessorSlot来完成的,这些ProcessorSlot的初始化以及调用便使用了责任链模式。

一.责任链接口ProcessorSlot

entry()方法相当于AOP的before()方法,也就是入口方法,因此责任链执行时会调用entry()方法。

exit()方法相当于AOP的after()方法,也就是出口方法,因此责任链执行结束时会调用exit()方法。

fireEntry()方法相当于AOP在执行完before()方法后调用pjp.proceed()方法,也就是调用责任链上的下一个节点的entry()方法。

fireExit()方法相当于AOP在执行完exit()方法后调用pjp.proceed()方法,也就是调用责任链上的下一个节点的exit()方法。

//A container of some process and ways of notification when the process is finished.
public interface ProcessorSlot<T> {//Entrance of this slot.//@param context         current Context//@param resourceWrapper current resource//@param param           generics parameter, usually is a com.alibaba.csp.sentinel.node.Node//@param count           tokens needed//@param prioritized     whether the entry is prioritized//@param args            parameters of the original call//@throws Throwable blocked exception or unexpected errorvoid entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable;//Means finish of #entry(Context, ResourceWrapper, Object, int, boolean, Object...).//@param context         current Context//@param resourceWrapper current resource//@param obj             relevant object (e.g. Node)//@param count           tokens needed//@param prioritized     whether the entry is prioritized//@param args            parameters of the original call//@throws Throwable blocked exception or unexpected errorvoid fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable;//Exit of this slot.//@param context         current Context//@param resourceWrapper current resource//@param count           tokens needed//@param args            parameters of the original callvoid exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);//Means finish of #exit(Context, ResourceWrapper, int, Object...).//@param context         current Context//@param resourceWrapper current resource//@param count           tokens needed//@param args            parameters of the original callvoid fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

二.责任链接口的抽象实现类

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {//下一个节点,这里的责任链是一个单向链表,因此next就是当前节点所指向的下一个节点private AbstractLinkedProcessorSlot<?> next = null;//触发执行责任链下一个节点的entry()方法@Overridepublic void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {if (next != null) {next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);}}@SuppressWarnings("unchecked")void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {T t = (T)o;entry(context, resourceWrapper, t, count, prioritized, args);}//触发执行责任链下一个节点的exit()方法@Overridepublic void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {if (next != null) {next.exit(context, resourceWrapper, count, args);}}public AbstractLinkedProcessorSlot<?> getNext() {return next;}public void setNext(AbstractLinkedProcessorSlot<?> next) {this.next = next;}
}

三.责任链的构建

Sentinel在默认情况下会通过DefaultProcessorSlotChain类来实现责任链的构建,当然我们也可以通过SPI机制指定一个自定义的责任链构建类。

//Builder for a default {@link ProcessorSlotChain}.
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {//创建一个DefaultProcessorSlotChain对象实例ProcessorSlotChain chain = new DefaultProcessorSlotChain();//通过SPI机制加载责任链的节点ProcessorSlot实现类//然后按照@Spi注解的order属性进行排序并进行实例化//最后将ProcessorSlot实例放到sortedSlotList中List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();//遍历已排好序的ProcessorSlot集合for (ProcessorSlot slot : sortedSlotList) {//安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlotif (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}//调用DefaultProcessorSlotChain.addLast()方法构建单向链表//将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}//返回单向链表return chain;}
}

DefaultProcessorSlotChain构建的责任链如下:

(2)监听器模式

一.监听器接口和具体实现

二.监听器管理器接口和具体实现

三.使用方如何基于这套监听器机制管理规则

Sentinel在加载和配置规则的时候就使用了监听器模式。监听器模式的实现分为三大部分:监听器、监听器管理器、使用方(比如规则管理器)。

一.监听器接口和具体实现

//This class holds callback method when SentinelProperty#updateValue(Object) need inform the listener
//监听器接口,负责监听各个配置,包含两个方法:初始化方法以及更新方法
public interface PropertyListener<T> {//Callback method when SentinelProperty#updateValue(Object) need inform the listener.//规则变更时触发的回调方法void configUpdate(T value);//The first time of the value's load.//首次加载规则时触发的回调方法void configLoad(T value);
}//流控规则管理器
public class FlowRuleManager {...//监听器接口的具体实现:流控规则监听器private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {//初始化规则@Overridepublic synchronized void configUpdate(List<FlowRule> value) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);}//规则变更@Overridepublic synchronized void configLoad(List<FlowRule> conf) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);}}
}

二.监听器管理器接口和具体实现

//监听器管理器接口
public interface SentinelProperty<T> {//添加监听者void addListener(PropertyListener<T> listener);//移除监听者void removeListener(PropertyListener<T> listener);//当监听值有变化时,调用此方法进行通知boolean updateValue(T newValue);
}//监听器管理器具体实现
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {//存放每个监听器protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();//要监听的值private T value = null;public DynamicSentinelProperty() {}//添加监听器到集合@Overridepublic void addListener(PropertyListener<T> listener) {listeners.add(listener);//回调监听器的configLoad()方法初始化规则配置listener.configLoad(value);}//移除监听器@Overridepublic void removeListener(PropertyListener<T> listener) {listeners.remove(listener);}//更新值@Overridepublic boolean updateValue(T newValue) {//如果值没变化,直接返回if (isEqual(value, newValue)) {return false;}RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);//如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值value = newValue;for (PropertyListener<T> listener : listeners) {listener.configUpdate(newValue);}return true;}//对比值是否发生了变化private boolean isEqual(T oldValue, T newValue) {if (oldValue == null && newValue == null) {return true;}if (oldValue == null) {return false;}return oldValue.equals(newValue);}//清空监听器集合public void close() {listeners.clear();}
}

三.使用方如何基于这套监听器机制管理规则

//流控规则管理器
public class FlowRuleManager {//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();//饿汉式单例模式实例化流控规则的监听器对象private static final FlowPropertyListener LISTENER = new FlowPropertyListener();//监听器对象的管理器private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();//当FlowRuleManager类的静态方法首次被调用时,会执行这里的静态代码块(对应类加载的过程)static {//将流控规则监听器注册到监听器管理器中currentProperty.addListener(LISTENER);startMetricTimerListener();}//Load FlowRules, former rules will be replaced.//加载流控规则public static void loadRules(List<FlowRule> rules) {//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRulescurrentProperty.updateValue(rules);}...
}//使用方:通过流控规则管理器FlowRuleManager加载和监听流控规则
public class FlowQpsDemo {private static final String KEY = "abc";private static AtomicInteger pass = new AtomicInteger();private static AtomicInteger block = new AtomicInteger();private static AtomicInteger total = new AtomicInteger();private static volatile boolean stop = false;private static final int threadCount = 32;private static int seconds = 60 + 40;public static void main(String[] args) throws Exception {//初始化QPS的流控规则initFlowQpsRule();//启动线程定时输出信息tick();//first make the system run on a very low condition//模拟QPS为32时的访问场景simulateTraffic();System.out.println("===== begin to do flow control");System.out.println("only 20 requests per second can pass");}private static void initFlowQpsRule() {List<FlowRule> rules = new ArrayList<FlowRule>();FlowRule rule1 = new FlowRule();rule1.setResource(KEY);//设置QPS的限制为20rule1.setCount(20);rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);rule1.setLimitApp("default");rules.add(rule1);//首次调用FlowRuleManager的静态方法会加载FlowRuleManager类执行其静态代码块//加载流控规则FlowRuleManager.loadRules(rules);}private static void simulateTraffic() {for (int i = 0; i < threadCount; i++) {Thread t = new Thread(new RunTask());t.setName("simulate-traffic-Task");t.start();}}private static void tick() {Thread timer = new Thread(new TimerTask());timer.setName("sentinel-timer-task");timer.start();}static class TimerTask implements Runnable {@Overridepublic void run() {long start = System.currentTimeMillis();System.out.println("begin to statistic!!!");long oldTotal = 0;long oldPass = 0;long oldBlock = 0;while (!stop) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}long globalTotal = total.get();long oneSecondTotal = globalTotal - oldTotal;oldTotal = globalTotal;long globalPass = pass.get();long oneSecondPass = globalPass - oldPass;oldPass = globalPass;long globalBlock = block.get();long oneSecondBlock = globalBlock - oldBlock;oldBlock = globalBlock;System.out.println(seconds + " send qps is: " + oneSecondTotal);System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);if (seconds-- <= 0) {stop = true;}}long cost = System.currentTimeMillis() - start;System.out.println("time cost: " + cost + " ms");System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get());System.exit(0);}}static class RunTask implements Runnable {@Overridepublic void run() {while (!stop) {Entry entry = null;try {//调用entry()方法开始规则验证entry = SphU.entry(KEY);//token acquired, means passpass.addAndGet(1);} catch (BlockException e1) {block.incrementAndGet();} catch (Exception e2) {//biz exception} finally {total.incrementAndGet();if (entry != null) {//完成规则验证调用exit()方法entry.exit();}}Random random2 = new Random();try {TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));} catch (InterruptedException e) {// ignore}}}}
}

(3)适配器模式

适配器模式是一种结构型设计模式,它允许将一个类的接口转换为客户端期望的另一个接口。在Sentinel中,使用适配器模式将不同框架和库的接口适配为统一的接口,如SphU类。SphU类提供了统一的入口,用于执行不同的资源保护逻辑。

public class SphU {private static final Object[] OBJECTS0 = new Object[0];private SphU() {            }//Record statistics and perform rule checking for the given resource.//@param name the unique name of the protected resource//@return the Entry of this invocation (used for mark the invocation complete and get context data)public static Entry entry(String name) throws BlockException {//调用CtSph.entry()方法创建一个Entry资源访问对象return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}//Checking all Rules about the protected method.//@param method the protected methodpublic static Entry entry(Method method) throws BlockException {return Env.sph.entry(method, EntryType.OUT, 1, OBJECTS0);}//Checking all Rules about the protected method.//@param method     the protected method//@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)public static Entry entry(Method method, int batchCount) throws BlockException {return Env.sph.entry(method, EntryType.OUT, batchCount, OBJECTS0);}//Record statistics and perform rule checking for the given resource.//@param name       the unique string for the resource//@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)public static Entry entry(String name, int batchCount) throws BlockException {return Env.sph.entry(name, EntryType.OUT, batchCount, OBJECTS0);}//Checking all Rules about the protected method.//@param method      the protected method//@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.public static Entry entry(Method method, EntryType trafficType) throws BlockException {return Env.sph.entry(method, trafficType, 1, OBJECTS0);}//Record statistics and perform rule checking for the given resource.public static Entry entry(String name, EntryType trafficType) throws BlockException {//调用CtSph.entry()方法创建一个Entry资源访问对象return Env.sph.entry(name, trafficType, 1, OBJECTS0);}//Checking all Rules about the protected method.//@param method      the protected method//@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.//@param batchCount  the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)public static Entry entry(Method method, EntryType trafficType, int batchCount) throws BlockException {return Env.sph.entry(method, trafficType, batchCount, OBJECTS0);}//Record statistics and perform rule checking for the given resource.public static Entry entry(String name, EntryType trafficType, int batchCount) throws BlockException {return Env.sph.entry(name, trafficType, batchCount, OBJECTS0);}//Checking all Rules about the protected method.//@param method      the protected method//@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.//@param batchCount  the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)//@param args        args for parameter flow control or customized slots//@return the Entry of this invocation (used for mark the invocation complete and get context data)public static Entry entry(Method method, EntryType trafficType, int batchCount, Object... args) throws BlockException {return Env.sph.entry(method, trafficType, batchCount, args);}//Record statistics and perform rule checking for the given resource.public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException {return Env.sph.entry(name, trafficType, batchCount, args);}//Record statistics and check all rules of the resource that indicates an async invocation.//@param name the unique name of the protected resourcepublic static AsyncEntry asyncEntry(String name) throws BlockException {return Env.sph.asyncEntry(name, EntryType.OUT, 1, OBJECTS0);}//Record statistics and check all rules of the resource that indicates an async invocation.//@param name        the unique name for the protected resource//@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.//@return the Entry of this invocation (used for mark the invocation complete and get context data)public static AsyncEntry asyncEntry(String name, EntryType trafficType) throws BlockException {return Env.sph.asyncEntry(name, trafficType, 1, OBJECTS0);}public static AsyncEntry asyncEntry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException {return Env.sph.asyncEntry(name, trafficType, batchCount, args);}//Record statistics and perform rule checking for the given resource. The entry is prioritized.public static Entry entryWithPriority(String name) throws BlockException {return Env.sph.entryWithPriority(name, EntryType.OUT, 1, true);}//Record statistics and perform rule checking for the given resource. The entry is prioritized.public static Entry entryWithPriority(String name, EntryType trafficType) throws BlockException {return Env.sph.entryWithPriority(name, trafficType, 1, true);}//Record statistics and perform rule checking for the given resource.public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);}//Record statistics and perform rule checking for the given resource.public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException {return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);}//Record statistics and perform rule checking for the given resource that indicates an async invocation.public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType) throws BlockException {return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, OBJECTS0);}//Record statistics and perform rule checking for the given resource that indicates an async invocation.public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException {return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, args);}//Record statistics and perform rule checking for the given resource that indicates an async invocation.public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, int batchCount, Object[] args) throws BlockException {return Env.sph.asyncEntryWithType(name, resourceType, trafficType, batchCount, false, args);}
}

(4)模版方法模式

模板方法模式是一种行为型设计模式,它在一个方法中定义一个算法的骨架,将一些步骤延迟到子类中实现。Sentinel便使用了类似模板方法模式来处理熔断策略,但不是严格意义上的模板模式,因为模板方法模式一般会有一个final修饰的模板方法来定义整个流程。例如AbstractCircuitBreaker类定义了熔断策略的基本结构,具体的细节需要继承它并实现对应的方法。

public abstract class AbstractCircuitBreaker implements CircuitBreaker {@Overridepublic boolean tryPass(Context context) {...}//提供抽象方法供子类实现abstract void resetStat();
}//子类
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {@Overrideprotected void resetStat() {stat.currentWindow().value().reset();}
}

(5)策略模式

策略模式是一种行为型设计模式,定义了一系列的算法,并将每个算法封装起来,使它们可以互相替换。Sentinel便在构建流控规则对象时使用了策略模式来设置不同的流控策略。例如TrafficShapingController接口定义了流控策略的方法,具体的实现类负责实现不同的流控策略。

//流控效果接口
public interface TrafficShapingController {//Check whether given resource entry can pass with provided count.//@param node resource node//@param acquireCount count to acquire//@param prioritized whether the request is prioritized//@return true if the resource entry can pass; false if it should be blockedboolean canPass(Node node, int acquireCount, boolean prioritized);//Check whether given resource entry can pass with provided count.//@param node resource node//@param acquireCount count to acquire//@return true if the resource entry can pass; false if it should be blockedboolean canPass(Node node, int acquireCount);
}//流控规则管理器
public class FlowRuleManager {...private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {//初始化规则@Overridepublic synchronized void configUpdate(List<FlowRule> value) {//构建流控规则对象Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);}//规则变更@Overridepublic synchronized void configLoad(List<FlowRule> conf) {//构建流控规则对象Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);}}
}public final class FlowRuleUtil {...public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, Predicate<FlowRule> filter, boolean shouldSort) {Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();if (list == null || list.isEmpty()) {return newRuleMap;}Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();for (FlowRule rule : list) {if (!isValidRule(rule)) {RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);continue;}if (filter != null && !filter.test(rule)) {continue;}if (StringUtil.isBlank(rule.getLimitApp())) {rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);}//获取[流控效果]处理器TrafficShapingController rater = generateRater(rule);rule.setRater(rater);//获取资源名K key = groupFunction.apply(rule);if (key == null) {continue;}//获取资源名对应的流控规则列表Set<FlowRule> flowRules = tmpMap.get(key);//将规则放到Map里,和当前资源绑定if (flowRules == null) {//Use hash set here to remove duplicate rules.flowRules = new HashSet<>();tmpMap.put(key, flowRules);}flowRules.add(rule);}Comparator<FlowRule> comparator = new FlowRuleComparator();for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {List<FlowRule> rules = new ArrayList<>(entries.getValue());if (shouldSort) {//Sort the rules.Collections.sort(rules, comparator);}newRuleMap.put(entries.getKey(), rules);}return newRuleMap;}private static TrafficShapingController generateRater(FlowRule rule) {//判断只有当阈值类型为QPS时才生效if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//根据流控效果选择不同的流量整形控制器TrafficShapingControllerswitch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP://Warm Up预热模式——冷启动模式return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER://排队等待模式return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER://Warm Up + 排队等待模式return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT://快速失败模式——Default默认模式default://Default mode or unknown mode: default traffic shaping controller (fast-reject).}}//默认模式:快速失败用的是DefaultControllerreturn new DefaultController(rule.getCount(), rule.getGrade());}...
}public class FlowRuleChecker {...private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {//选择Node作为限流计算的依据Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器//然后调用TrafficShapingController.canPass()方法对请求进行检查return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}...
}

(6)观察者模式

Sentinel实现熔断功能使用了观察者模式。具体接口是CircuitBreakerStateChangeObserver,它负责感知熔断器状态发生变化后通知到各个观察者。

//1.首先定义观察者接口CircuitBreakerStateChangeObserver
public interface CircuitBreakerStateChangeObserver {void onStateChange(CircuitBreaker oldCircuitBreaker, CircuitBreaker newCircuitBreaker);
}//2.在熔断器事件注册类EventObserverRegistry中:
//定义一个观察者Map(stateChangeObserverMap)用于存放观察者实例,并提供注册、移除和获取全部观察者的方法
public class EventObserverRegistry {private final Map<String, CircuitBreakerStateChangeObserver> stateChangeObserverMap = new HashMap<>();//注册观察者public void addStateChangeObserver(String name, CircuitBreakerStateChangeObserver observer) {stateChangeObserverMap.put(name, observer);}//移除观察者public boolean removeStateChangeObserver(String name) {return stateChangeObserverMap.remove(name) != null;}//获取全部观察者public List<CircuitBreakerStateChangeObserver> getStateChangeObservers() {return new ArrayList<>(stateChangeObserverMap.values());}...
}//3.当熔断器状态发生变化时,通知所有已注册的观察者。
//比如在AbstractCircuitBreaker类中的notifyObservers方法中实现:
public abstract class AbstractCircuitBreaker implements CircuitBreaker {private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {observer.onStateChange(prevState, newState, rule, snapshotValue);}}
}


http://www.mrgr.cn/news/99362.html

相关文章:

  • 【MATLAB第117期】#源码分享 | 基于MATLAB的SSM状态空间模型多元时间序列预测方法(多输入单输出)
  • win10中打开python的交互模式
  • 【白雪讲堂】[特殊字符]内容战略地图|GEO优化框架下的内容全景布局
  • Windows 同步-互锁变量访问
  • Java从入门到“放弃”(精通)之旅——抽象类和接口⑨
  • 发布一个npm包,更新包,删除包
  • redis数据类型-位图bitmap
  • 【Linux】线程ID、线程管理、与线程互斥
  • cnn 吴恩达 笔记 锚框+yolo 图像分割 +反卷积 unet
  • [密码学实战]详解gmssl库与第三方工具兼容性问题及解决方案
  • [密码学基础]GB与GM国密标准深度解析:定位、差异与协同发展
  • [密码学基础]密码学发展简史:从古典艺术到量子安全的演进
  • [密码学基础]GMT 0029-2014签名验签服务器技术规范深度解析
  • [密码学基础]国密算法深度解析:中国密码标准的自主化之路
  • Redis专题
  • [密码学基础]GMT 0002-2012 SM4分组密码算法 技术规范深度解析
  • NLP高频面试题(四十九)——大模型RAG常见面试题解析
  • [安全实战]逆向工程核心名词详解
  • Three.js 场景编辑器 (Vue3 + TypeScript 实现)
  • HTML 初识