当前位置: 首页 > news >正文

Caffeine 手动策略缓存 put() 方法源码解析

BoundedLocalManualCache put() 方法源码解析

先看一下BoundedLocalManualCache的类图

BoundedLocalManualCache

com.github.benmanes.caffeine.cache.BoundedLocalCache中定义的BoundedLocalManualCache静态内部类。

static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable

实现了LocalManualCache接口,这个接口提供了Cache接口的骨架实现,以最简的方式去实现一个LocalCache

详细查看LocalManualCache接口里定义的内容,代码也不多,直接贴到内容里:

interface LocalManualCache<K, V> extends Cache<K, V> {/** Returns the backing {@link LocalCache} data store. */LocalCache<K, V> cache();@Overridedefault long estimatedSize() {return cache().estimatedSize();}@Overridedefault void cleanUp() {cache().cleanUp();}@Overridedefault @Nullable V getIfPresent(Object key) {return cache().getIfPresent(key, /* recordStats */ true);}@Overridedefault @Nullable V get(K key, Function<? super K, ? extends V> mappingFunction) {return cache().computeIfAbsent(key, mappingFunction);}@Overridedefault Map<K, V> getAllPresent(Iterable<?> keys) {return cache().getAllPresent(keys);}@Overridedefault Map<K, V> getAll(Iterable<? extends K> keys,Function<Iterable<? extends K>, Map<K, V>> mappingFunction) {requireNonNull(mappingFunction);Set<K> keysToLoad = new LinkedHashSet<>();Map<K, V> found = cache().getAllPresent(keys);Map<K, V> result = new LinkedHashMap<>(found.size());for (K key : keys) {V value = found.get(key);if (value == null) {keysToLoad.add(key);}result.put(key, value);}if (keysToLoad.isEmpty()) {return found;}bulkLoad(keysToLoad, result, mappingFunction);return Collections.unmodifiableMap(result);}/*** Performs a non-blocking bulk load of the missing keys. Any missing entry that materializes* during the load are replaced when the loaded entries are inserted into the cache.*/default void bulkLoad(Set<K> keysToLoad, Map<K, V> result,Function<Iterable<? extends @NonNull K>, @NonNull Map<K, V>> mappingFunction) {boolean success = false;long startTime = cache().statsTicker().read();try {Map<K, V> loaded = mappingFunction.apply(keysToLoad);loaded.forEach((key, value) ->cache().put(key, value, /* notifyWriter */ false));for (K key : keysToLoad) {V value = loaded.get(key);if (value == null) {result.remove(key);} else {result.put(key, value);}}success = !loaded.isEmpty();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new CompletionException(e);} finally {long loadTime = cache().statsTicker().read() - startTime;if (success) {cache().statsCounter().recordLoadSuccess(loadTime);} else {cache().statsCounter().recordLoadFailure(loadTime);}}}@Overridedefault void put(K key, V value) {cache().put(key, value);}@Overridedefault void putAll(Map<? extends K, ? extends V> map) {cache().putAll(map);}@Overridedefault void invalidate(Object key) {cache().remove(key);}@Overridedefault void invalidateAll(Iterable<?> keys) {cache().invalidateAll(keys);}@Overridedefault void invalidateAll() {cache().clear();}@Overridedefault CacheStats stats() {return cache().statsCounter().snapshot();}@Overridedefault ConcurrentMap<K, V> asMap() {return cache();}
}

可以看到,CacheLoader接口定义了loadloadAllputputAllinvalidateinvalidateAllstatsasMap等方法,做一个简单实现。这些方法提供了缓存的基本操作,如加载缓存、添加缓存、移除缓存、获取缓存统计信息等。

Manual Cache 源码

static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable {private static final long serialVersionUID = 1;final BoundedLocalCache<K, V> cache;final boolean isWeighted;@Nullable Policy<K, V> policy;BoundedLocalManualCache(Caffeine<K, V> builder) {this(builder, null);}BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);isWeighted = builder.isWeighted();}@Overridepublic BoundedLocalCache<K, V> cache() {return cache;}@Overridepublic Policy<K, V> policy() {return (policy == null)? (policy = new BoundedPolicy<>(cache, Function.identity(), isWeighted)): policy;}@SuppressWarnings("UnusedVariable")private void readObject(ObjectInputStream stream) throws InvalidObjectException {throw new InvalidObjectException("Proxy required");}Object writeReplace() {return makeSerializationProxy(cache, isWeighted);}}

定义了一个BoundedLocalCache属性,还有权重的标志位isWeighted,以及一个Policy属性。BoundedLocalManualCache的构造方法中,调用了LocalCacheFactory.newBoundedLocalCache方法,创建了一个BoundedLocalCache对象,并赋值给cache属性。policy属性则是在policy()方法中创建的。policy 是一个BoundedPolicy对象,它实现了Policy接口,用于管理缓存策略。BoundedPolicy源码紧接着就在BoundedLocalManualCache下面,这里就不贴出来了。

static final class BoundedPolicy<K, V> implements Policy<K, V>,里具体定义了了BoundedLocalCache的缓存策略,比如缓存大小,缓存权重,缓存过期时间等。

接下来我们看BoundedLocalCacheput方法

手动使用调用cache.put(k, v);会调用put(key, value, expiry(), /* notifyWriter */ true, /* onlyIfAbsent */ false);
具体的参数解释如下:

  • key:要放入缓存的键。
  • value:要放入缓存的值。
  • expiry:缓存的过期时间,默认为Duration.ZERO,表示永不过期。
  • notifyWriter:是否通知写入者,默认为true
  • onlyIfAbsent:是否只在缓存中不存在该键时才放入,默认为false

put 方法源码如下:

@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean notifyWriter, boolean onlyIfAbsent) {requireNonNull(key);requireNonNull(value);Node<K, V> node = null;long now = expirationTicker().read();int newWeight = weigher.weigh(key, value);for (;;) {// 获取 prior 节点Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));if (prior == null) {// 如果不存在 prior 节点,则创建新的节点if (node == null) {node = nodeFactory.newNode(key, keyReferenceQueue(),value, valueReferenceQueue(), newWeight, now);setVariableTime(node, expireAfterCreate(key, value, expiry, now));}// notifyWriter 为 true 且存在Writer时,通知Writerif (notifyWriter && hasWriter()) {Node<K, V> computed = node;prior = data.computeIfAbsent(node.getKeyReference(), k -> {writer.write(key, value);return computed;});//    如果存在 prior 节点,调用 afterWrite 方法if (prior == node) {afterWrite(new AddTask(node, newWeight));return null;// 如果onlyIfAbsent 为 true。代表只在缓存中不存在该键时才放入缓存} else if (onlyIfAbsent) {V currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}// 如果 notifyWriter 为 false,直接放入缓存} else {prior = data.putIfAbsent(node.getKeyReference(), node);if (prior == null) {afterWrite(new AddTask(node, newWeight));return null;} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}}} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}// 如果 prior != null,则说明该节点已经存在,则尝试获取锁V oldValue;long varTime;int oldWeight;boolean expired = false;boolean mayUpdate = true;boolean exceedsTolerance = false;synchronized (prior) {if (!prior.isAlive()) {continue;}oldValue = prior.getValue();oldWeight = prior.getWeight();// 如果 oldValue == null,通过 expireAfterCreate 方法计算过期时间,并删除key对应的值if (oldValue == null) {varTime = expireAfterCreate(key, value, expiry, now);writer.delete(key, null, RemovalCause.COLLECTED);// 返回prior是否过期,true,则删除key对应的值} else if (hasExpired(prior, now)) {expired = true;varTime = expireAfterCreate(key, value, expiry, now);writer.delete(key, oldValue, RemovalCause.EXPIRED);// 如果 onlyIfAbsent 为 true,则不更新key对应的值,返回新的过期时间} else if (onlyIfAbsent) {mayUpdate = false;varTime = expireAfterRead(prior, key, value, expiry, now);} else {varTime = expireAfterUpdate(prior, key, value, expiry, now);}// notifyWriter 为true,如果过期或者更新了值,则通知Writerif (notifyWriter && (expired || (mayUpdate && (value != oldValue)))) {writer.write(key, value);}// 如果mayUpdate为true,计算过期时间是否超出容忍度if (mayUpdate) {exceedsTolerance =(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)|| (expiresVariable()&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);setWriteTime(prior, now);prior.setWeight(newWeight);prior.setValue(value, valueReferenceQueue());}// 设置访问时间和过期时间setVariableTime(prior, varTime);setAccessTime(prior, now);}// 如果在创建缓存时设置了移除监听器,则通知移除监听器if (hasRemovalListener()) {if (expired) {notifyRemoval(key, oldValue, RemovalCause.EXPIRED);} else if (oldValue == null) {notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);} else if (mayUpdate && (value != oldValue)) {notifyRemoval(key, oldValue, RemovalCause.REPLACED);}}// 更新权重,判断是不是第一写入,如果是,调用afterWrite方法int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;if ((oldValue == null) || (weightedDifference != 0) || expired) {afterWrite(new UpdateTask(prior, weightedDifference));// 判断 onlyIfAbsent 是否为 true,以及是否超过容忍度,如果超过容忍度,调用afterWrite方法} else if (!onlyIfAbsent && exceedsTolerance) {afterWrite(new UpdateTask(prior, weightedDifference));} else {if (mayUpdate) {setWriteTime(prior, now);}//执行 afterRead 方法afterRead(prior, now, /* recordHit */ false);}return expired ? null : oldValue;}}

案例中通过 cache.put(k,v)调用方法,走到这个方法中,因为是第一次尝试储存key和value,所以代码中声明的 node = null,获取的prior = nullif (prior == null),创建新节点,设置创建后过期时间。notifyWriter=truehasWriter=false,执行else中方法

          prior = data.putIfAbsent(node.getKeyReference(), node);if (prior == null) {afterWrite(new AddTask(node, newWeight));return null;} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}

putIfAbsent 方法:由于data中不存在我们的key,value,返回 null,调用 afterWrite() 方法,将任务放入writeBuffer中,调用scheduleAfterWrite()方法

  void afterWrite(Runnable task) {for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {if (writeBuffer.offer(task)) {scheduleAfterWrite();return;}scheduleDrainBuffers();}

scheduleAfterWrite()方法:

  void scheduleAfterWrite() {for (;;) {switch (drainStatus()) {case IDLE:casDrainStatus(IDLE, REQUIRED);scheduleDrainBuffers();return;case REQUIRED:scheduleDrainBuffers();return;case PROCESSING_TO_IDLE:if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {return;}continue;case PROCESSING_TO_REQUIRED:return;default:throw new IllegalStateException();}}}

看到这我其实是有点蒙了,因为笔者的异步编程基础薄弱,只看方法名字做一个不负责任的猜想,写入后安排异步任务,条件符合执行清理计划,会继续调用 scheduleDrainBuffers() 方法

scheduleDrainBuffers() 方法:

void scheduleDrainBuffers() {if (drainStatus() >= PROCESSING_TO_IDLE) {return;}if (evictionLock.tryLock()) {try {int drainStatus = drainStatus();if (drainStatus >= PROCESSING_TO_IDLE) {return;}lazySetDrainStatus(PROCESSING_TO_IDLE);executor.execute(drainBuffersTask);} catch (Throwable t) {logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);maintenance(/* ignored */ null);} finally {evictionLock.unlock();}}}

drainStatus() 就是返回这条件的值,如果大于等于 PROCESSING_TO_IDLE 就直接返回,否则执行 tryLock() 方法,如果成功,则执行 executor.execute(drainBuffersTask); 方法,否则执行 maintenance() 方法,这个方法就是执行清理任务的方法。

传进来的drainBuffersTask是一个PerformCleanupTask,这个类实现了Runnable接口,重写了run()方法,这个方法就是执行清理任务的方法。

    @Overridepublic void run() {BoundedLocalCache<?, ?> cache = reference.get();if (cache != null) {cache.performCleanUp(/* ignored */ null);}}

继续看performCleanUp()方法:

  void performCleanUp(@Nullable Runnable task) {evictionLock.lock();try {maintenance(task);} finally {evictionLock.unlock();}if ((drainStatus() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {scheduleDrainBuffers();}}

可以看到,这里也是调用了maintenance()方法,然后判断drainStatus()是否等于REQUIRED,如果等于,则调用scheduleDrainBuffers()方法。

@GuardedBy("evictionLock")void maintenance(@Nullable Runnable task) {lazySetDrainStatus(PROCESSING_TO_IDLE);try {drainReadBuffer();drainWriteBuffer();if (task != null) {task.run();}drainKeyReferences();drainValueReferences();expireEntries();evictEntries();climb();} finally {if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {lazySetDrainStatus(REQUIRED);}}}

maintenance() 是实际的清理方法,它首先将drainStatus()设置为PROCESSING_TO_IDLE,然后调用drainReadBuffer()drainWriteBuffer()drainKeyReferences()drainValueReferences()expireEntries()evictEntries()climb()等方法,清理读写缓冲区、过期条目、驱逐条目等。

到这里,afterWrite()基本就执行完了,写入一次(key,value),都会去判断是否需要清理,如果需要清理,就异步调用maintenance()方法进行清理。

如果是给已经存在的key设置值,put方法执行到最后会调用 afterRead()方法

  void afterRead(Node<K, V> node, long now, boolean recordHit) {if (recordHit) {statsCounter().recordHits(1);}boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);if (shouldDrainBuffers(delayable)) {scheduleDrainBuffers();}refreshIfNeeded(node, now);}

afterRead()方法会记录命中次数,然后判断是否需要延迟写入缓冲区,如果需要延迟写入缓冲区,则将节点放入读取缓冲区,如果读取缓冲区已满,则调用scheduleDrainBuffers()方法异步清理缓冲区,最后调用refreshIfNeeded()方法异步刷新节点。

refreshIfNeeded()方法会根据节点的过期时间、访问时间、更新时间等判断是否需要刷新节点,如果需要刷新节点,则调用refresh()方法刷新节点。

本例中没有设置过期时间,直接返回。

总结

本文算是比较详细的把put()方法执行流程分析了一遍,通过分析put()方法,我们可以了解到Caffeine缓存的基本原理,以及如何使用Caffeine缓存,学习如何自己实现一个本地缓存的 put()方法,怎样执行一个异步的清理任务,怎样判断是否需要清理,怎样异步刷新节点等等。

笔者也是一个小菜鸟,刚开始看一些源码,可能有些地方理解的不对,欢迎指正,谢谢!

希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言讨论。


http://www.mrgr.cn/news/63950.html

相关文章:

  • Spring Boot框架下的信息学科平台系统架构设计
  • 信息学科平台系统开发:Spring Boot实用指南
  • 搭建Apache web服务器实例
  • 魔乐社区(Modelers)多机多卡训练实践
  • 在VS中安装chatGPT
  • HTB:Cicada[WriteUP]
  • Copilot功能
  • 单例模式的五种实现方式及优缺点
  • 从0开始学统计-什么是Z-score
  • 【国产MCU系列】-GD32F4开发环境搭建(基于Embedded Builder)
  • 自动化测试工具Ranorex Studio(十九)-其他编辑选项
  • HTML 基础标签——分组标签 <div>、<span> 和基础语义容器
  • magic-api简单使用六:删除接口(支持路径传参)
  • 从实验室到生活:超分子水凝胶湿电发电机的应用之路
  • 【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割!
  • 学生党百元预算如何选到高性价比头戴耳机?四款百元热门耳机推荐
  • 国密SM2 非对称加解密前后端工具
  • 在 openEuler 22.03 服务器上搭建 web 服务教程
  • 100种算法【Python版】第34篇——PageRank算法
  • 构建高效信息学科平台:Spring Boot实践
  • 小区搜索和SSB简介
  • 华为机试HJ17 坐标移动
  • 基于java+SpringBoot+Vue的美容院管理系统设计与实现
  • 【华为HCIP实战课程三十】中间到中间系统协议IS-IS路由渗透及TAG标识详解,网络工程师
  • 【第几小】
  • 华为 HCIP-Datacom H12-821 题库 (40)