Caffeine

月伴飞鱼 2024-06-23 15:20:26
框架相关
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

相较于Guava Cache,Caffeine在整体设计理念、实现策略以及接口定义等方面都基本继承了前辈的优秀特性。

作为新时代背景下的后来者,Caffeine做了很多细节层面的优化,比如:

  • 基础数据结构层面优化 借助JAVA8对ConcurrentHashMap底层由链表切换为红黑树、以及废弃分段锁逻辑的优化,提升了Hash冲突时的查询效率以及并发场景下的处理性能。
  • 数据驱逐(淘汰)策略的优化 通过使用改良后的W-TinyLFU算法,提供了更佳的热点数据留存效果,提供了近乎完美的热点数据命中率,以及更低消耗的过程维护。
  • 异步并行能力的全面支持 完美适配JAVA8之后的并行编程场景,可以提供更为优雅的并行编码体验与并发效率。

容器创建

Cache<Integer, String> cache = Caffeine.newBuilder().build();
方法 含义说明
build() 构建一个手动回源的Cache对象
build(CacheLoader) 构建一个支持使用给定CacheLoader对象进行自动回源操作的LoadingCache对象
buildAsync() 构建一个支持异步操作的异步缓存对象
buildAsync(CacheLoader) 使用给定的CacheLoader对象构建一个支持异步操作的缓存对象
buildAsync(AsyncCacheLoader) 与buildAsync(CacheLoader)相似,区别点仅在于传入的参数类型不一样。

为了便于异步场景中处理,可以通过buildAsync()构建一个手动回源数据加载的缓存对象:

public static void main(String[] args) {
    AsyncCache<String, User> asyncCache = Caffeine.newBuilder()
    .buildAsync();
    User user = asyncCache.get("123", s -> {
        System.out.println("异步callable thread:" + Thread.currentThread().getId());
        return userDao.getUser(s);
    }).join();
}

为了支持异步场景中的自动异步回源,可以通过buildAsync(CacheLoader)或者buildAsync(AsyncCacheLoader)来实现:

public static void main(String[] args) throws Exception{
    AsyncLoadingCache<String, User> asyncLoadingCache =
            Caffeine.newBuilder().maximumSize(1000L).buildAsync(key -> userDao.getUser(key));
    User user = asyncLoadingCache.get("123").join();
}

在创建缓存对象的同时,可以指定此缓存对象的一些处理策略,比如容量限制、比如过期策略等等。

方法 含义说明
initialCapacity 待创建的缓存容器的初始容量大小(记录条数
maximumSize 指定此缓存容器的最大容量(最大缓存记录条数)
maximumWeight 指定此缓存容器的最大容量(最大比重值),需结合weighter方可体现出效果
expireAfterWrite 设定过期策略,按照数据写入时间进行计算
expireAfterAccess 设定过期策略,按照数据最后访问时间来计算
expireAfter 基于个性化定制的逻辑来实现过期处理(可以定制基于新增读取更新等场景的过期策略,甚至支持为不同记录指定不同过期时间
weighter 入参为一个函数式接口,用于指定每条存入的缓存数据的权重占比情况。这个需要与maximumWeight结合使用
refreshAfterWrite 缓存写入到缓存之后
recordStats 设定开启此容器的数据加载与缓存命中情况统计
public static void main(String[] args) {
    AsyncLoadingCache<String, User> asyncLoadingCache = CaffeinenewBuilder()
            .initialCapacity(1000) // 指定初始容量
            .maximumSize(10000L) // 指定最大容量
            .expireAfterWrite(30L, TimeUnit.MINUTES) // 指定写入30分钟后过期
            .refreshAfterWrite(1L, TimeUnit.MINUTES) // 指定每隔1分钟刷新下数据内容
            .removalListener((key, value, cause) ->
                    System.out.println(key + "移除,原因:" + cause)) // 监听记录移除事件
            .recordStats() // 开启缓存操作数据统计
            .buildAsync(key -> userDao.getUser(key)); // 构建异步CacheLoader加载类型的缓存对象
}

缓存操作

方法 含义说明
get 根据key获取指定的缓存值,如果没有则执行回源操作获取
getAll 根据给定的key列表批量获取对应的缓存值,返回一个map格式的结果,没有命中缓存的部分会执行回源操作获取
getIfPresent 不执行回源操作,直接从缓存中尝试获取key对应的缓存值
getAllPresent 不执行回源操作,直接从缓存中尝试获取给定的key列表对应的值,返回查询到的map格式结果, 异步场景不支持此方法
put 向缓存中写入指定的key与value记录
putAll 批量向缓存中写入指定的key-value记录集,异步场景不支持此方法
asMap 将缓存中的数据转换为map格式返回

同步缓存:

public static void main(String[] args) throws Exception {
    LoadingCache<String, String> loadingCache = buildLoadingCache();
    loadingCache.put("key1", "value1");
    String value = loadingCache.get("key1");
    System.out.println(value);
}

异步缓存:

public static void main(String[] args) throws Exception {
    AsyncLoadingCache<String, String> asyncLoadingCache = buildAsyncLoadingCache();
    // 写入缓存记录(value值为异步获取)
    asyncLoadingCache.put("key1", CompletableFuture.supplyAsync(() -> "value1"));
    // 异步方式获取缓存值
    CompletableFuture<String> completableFuture = asyncLoadingCache.get("key1");
    String value = completableFuture.join();
    System.out.println(value);
}

异步策略

Caffeine采用了异步处理的策略,get请求中虽然也会触发淘汰数据的清理操作,但是将清理任务添加到了独立的线程池中进行异步的不会阻塞 get 请求的执行与返回,这样大大缩短了get请求的执行时长,提升了响应性能。

除了对自身的异步处理优化,Caffeine还提供了全套的Async异步处理机制,可以支持业务在异步并行流水线式处理场景中使用以获得更加丝滑的体验。

Caffeine完美的支持了在异步场景下的流水线处理使用场景,回源操作也支持异步的方式来完成。

CompletableFuture并行流水线能力,是JAVA8异步编程领域的一个重大改进。

可以将一系列耗时且无依赖的操作改为并行同步处理,并等待各自处理结果完成后继续进行后续环节的处理,由此来降低阻塞等待时间,进而达到降低请求链路时长的效果。

public static void main(String[] args) throws Exception {
    AsyncLoadingCache<String, String> asyncLoadingCache = buildAsyncLoadingCache();
    // 写入缓存记录(value值为异步获取)
    asyncLoadingCache.put("key1", CompletableFuture.supplyAsync(() -> "value1"));
    // 异步方式获取缓存值
    CompletableFuture<String> completableFuture = asyncLoadingCache.get("key1");
    String value = completableFuture.join();
    System.out.println(value);
}

淘汰算法

算法 弊端说明
FIFO 先进先出策略,属于一种最为简单与原始的策略。如果缓存使用频率较高,会导致缓存数据始终在不停的进进出出,影响性能,且命中率表现也一般。
LRU 最近最久未使用策略,保留最近被访问到的数据,而淘汰最久没有被访问的数据。如果遇到偶尔的批量刷数据情况,很容易将其他缓存内容都挤出内存,带来缓存击穿的风险。
LFU 最近少频率策略,这种根据访问次数进行淘汰,相比而言内存中存储的热点数据命中率会更高些,缺点就是需要维护独立字段用来记录每个元素的访问次数,占用内存空间。

为了保证命中率,一般缓存框架都会选择使用LRU或者LFU策略,很少会有使用FIFO策略进行数据淘汰的。

Caffeine缓存的LFU采用了Count-Min Sketch频率统计算法,由于该LFU的计数器只有4bit大小,所以称为TinyLFU

在TinyLFU算法基础上引入一个基于LRU的Window Cache,这个新的算法叫就叫做W-TinyLFU

W-TinyLFU算法有效的解决了LRU以及LFU存在的弊端,为Caffeine提供了大部分场景下近乎完美命中率表现。

异步淘汰清理机制

Caffeine为了提升读写操作的并发效率而将数据淘汰清理操作改为了异步处理,而异步处理时会有微小的延时。

不管是基于大小、还是基于过期时间或基于引用的数据淘汰策略,由于数据淘汰处理是异步进行的,都会存在短暂不够精确的情况。

数据驱逐机制

基于时间:

Caffine支持基于时间进行数据的淘汰驱逐处理。

这部分的能力与Guava Cache相同,支持根据记录创建时间以及访问时间两个维度进行处理。

数据的过期时间在创建缓存对象的时候进行指定,Caffeine在创建缓存对象的时候提供了3种设定过期策略的方法。

方式 具体说明
expireAfterWrite 基于创建时间进行过期处理
expireAfterAccess 基于最后访问时间进行过期处理
expireAfter 基于个性化定制的逻辑来实现过期处理(可以定制基于新增读取更新等场景的过期策略,甚至支持为不同记录指定不同过期时间)

expireAfterWrite:expireAfterWrite用于指定数据创建之后多久会过期。

当记录被写入缓存之后达到指定的时间之后,就会被过期淘汰(惰性删除,并不会立即从内存中移除,而是在下一次操作的时候触发清理操作)。

Cache<String, User> userCache = 
    Caffeine.newBuilder()
        .expireAfterWrite(1, TimeUnit.SECONDS)
        .build();
userCache.put("123", new User("123", "张三"));

expireAfterAccess:expireAfterAccess用于指定缓存记录多久没有被访问之后就会过期。

这种是基于最后一次访问时间来计算数据是否过期,如果一个数据一直被访问,则其就不会过期。

  • 比较适用于热点数据的存储场景,可以保证较高的缓存命中率。

同样地,数据过期时也不会被立即从内存中移除,而是基于惰性删除机制进行处理。

Cache<String, User> userCache = 
    Caffeine.newBuilder()
        .expireAfterAccess(1, TimeUnit.SECONDS)
        .build();
    userCache.get("123", s -> userDao.getUser(s));

expireAfter:其支持传入一个自定义的Expiry对象,自行实现数据的过期策略,甚至是针对不同的记录来定制不同的过期时间。

Expiry接口中需要实现的三个方法:

方法名称 含义说明
expireAfterCreate 指定一个过期时间,从记录创建的时候开始计时,超过指定的时间之后就过期淘汰,效果类似expireAfterWrite,但是支持更灵活的定制逻辑。
expireAfterUpdate 指定一个过期时间,从记录最后一次被更新的时候开始计时,超过指定的时间之后就过期。每次执行更新操作之后,都会重新计算过期时间。
expireAfterRead 指定一个过期时间,从记录最后一次被访问的时候开始计时,超过指定时间之后就过期。效果类似expireAfterAccess,但是支持更高级的定制逻辑。

比如下面的代码中,定制了expireAfterCreate方法的逻辑,根据缓存key来决定过期时间,如果key以字母A开头则设定1s过期,否则设定2s过期:

public static void main(String[] args) {
    try {
        LoadingCache<String, User> userCache = Caffeine.newBuilder()
                .removalListener((key, value, cause) -> {
                    System.out.println(key + "移除,原因:" + cause);
                })
                .expireAfter(new Expiry<String, User>() {
                    @Override
                    public long expireAfterCreate(@NonNull String key, @NonNullUser value, long currentTime) {
                        if (key.startsWith("A")) {
                            return TimeUnit.SECONDS.toNanos(1);
                        } else {
                            return TimeUnit.SECONDS.toNanos(2);
                        }
                    }
                    @Override
                    public long expireAfterUpdate(@NonNull String key, @NonNullUser value, long currentTime,
                                                  @NonNegative longcurrentDuration) {
                        return Long.MAX_VALUE;
                    }
                    @Override
                    public long expireAfterRead(@NonNull String key, @NonNull Uservalue, long currentTime,
                                                @NonNegative long currentDuration){
                        return Long.MAX_VALUE;
                    }
                })
                .build(key -> userDao.getUser(key));
        userCache.put("123", new User("123", "123"));
        userCache.put("A123", new User("A123", "A123"));
        Thread.sleep(1100L);
        System.out.println(userCache.get("123"));
        System.out.println(userCache.get("A123"));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

除了根据key来定制不同的过期时间,也可以根据value的内容来指定不同的过期时间策略。

也可以同时定制上述三个方法,搭配来实现更复杂的过期策略。

按照这种方式来定时过期时间的时候需要注意一点,如果不需要设定某一维度的过期策略的时候,需要将对应实现方法的返回值设置为一个非常大的数值,比如可以像上述示例代码中一样,指定为Long.MAX_VALUE值。

基于大小:

Caffeine支持针对缓存总体容量大小进行限制,如果容量满的时候,基于W-TinyLFU算法,淘汰最不常被使用的数据,腾出空间给新的记录写入。

Caffeine支持按照Size(记录条数)或者按照Weighter(记录权重)值进行总体容量的限制。

maximumSize:在创建Caffeine缓存对象的时候,可以通过maximumSize来指定允许缓存的最大条数。

Cache<Integer, String> cache = Caffeine.newBuilder()
        .maximumSize(1000L) // 限制最大缓存条数
        .build();

maximumWeight:在创建Caffeine缓存对象的时候,可以通过maximumWeightweighter组合的方式,指定按照权重进行限制缓存总容量。

比如一个字符串value值的缓存场景下,可以根据字符串的长度来计算权重值,最后根据总权重大小来限制容量。

Cache<Integer, String> cache = Caffeine.newBuilder()
        .maximumWeight(1000L) // 限制最大权重值
        .weigher((key, value) -> (String.valueOf(value).length() / 1000) + 1)
        .build();

使用注意点:

如果创建的时候指定了weighter,则必须同时指定maximumWeight值,如果不指定、或者指定了maximumSize,会报错(这一点与Guava Cache一致)。

基于引用:

基于引用回收的策略,核心是利用JVM虚拟机的GC机制来达到数据清理的目的。

当一个对象不再被引用的时候,JVM会选择在适当的时候将其回收。

Caffeine支持三种不同的基于引用的回收方法。

方法 具体说明
weakKeys 采用弱引用方式存储key值内容,当key对象不再被引用的时候,由GC进行回收
weakValues 采用弱引用方式存储value值内容,当value对象不再被引用的时候,由GC进行回收
softValues 采用软引用方式存储value值内容,当内存容量满时基于LRU策略进行回收

weakKeys:默认情况下,创建出一个Caffeine缓存对象并写入key-value映射数据时,key和value都是以强引用的方式存储的。

而使用weakKeys可以指定将缓存中的key值以弱引用(WeakReference)的方式进行存储,这样一来,如果程序运行时没有其它地方使用或者依赖此缓存值的时候,该条记录就可能会被GC回收掉。

 LoadingCache<String,  User> loadingCache = Caffeine.newBuilder()
                .weakKeys()
                .build(key -> userDao.getUser(key));

weakValues:与weakKeys类似,可以在创建缓存对象的时候使用weakValues指定将value值以弱引用的方式存储到缓存中。

这样当这条缓存记录的对象不再被引用依赖的时候,就会被JVM在适当的时候回收释放掉。

 LoadingCache<String,  User> loadingCache = Caffeine.newBuilder()
                .weakValues()
                .build(key -> userDao.getUser(key));

softValues:softValues是指将缓存内容值以软引用的方式存储在缓存容器中,当内存容量满的时候Caffeine会以LRU(least-recently-used,最近最少使用)顺序进行数据淘汰回收。

方式 具体描述
weakValues 弱引用方式存储,一旦不再被引用,则会被GC回收
softValues 软引用方式存储,不会被GC回收,但是在内存容量满的时候,会基于LRU策略数据回收

具体使用的时候,可以在创建缓存对象的时候进行指定基于软引用方式数据淘汰。

 LoadingCache<String,  User> loadingCache = Caffeine.newBuilder()
                .softValues()
                .build(key -> userDao.getUser(key));

同步数据回源

查询缓存、数据回源、数据回填缓存、返回执行结果等一系列操作都是在一个调用线程中同步阻塞完成的。

Callable方式回源

在每次get请求的时候,传入一个Callable函数式接口具体实现,当没有命中缓存的时候,Caffeine框架会执行给定的Callable实现逻辑,去获取真实的数据并且回填到缓存中,然后返回给调用方。

Callable方式的回源填充:

会带来一个问题,就是如果需要获取缓存的地方太多,会导致每个调用的地方都得指定下对应Callable回源方法,调用起来比较麻烦,且对于需要保证回源逻辑统一的场景管控能力不够强势,无法约束所有的调用方使用相同的回源逻辑。

public static void main(String[] args) {
    Cache<String, User> cache = Caffeine.newBuilder().build();
    User user = cache.get("123", s -> userDao.getUser(s));
    System.out.println(user);
}

CacheLoader回源

在创建缓存对象的时候,可以通在build()方法中传入指定的CacheLoader对象的方式来指定回源时默认使用的回源数据加载器,这样当使用方调用get方法获取不到数据的时候,框架就会自动使用给定的CacheLoader对象执行对应的数据加载逻辑。

比如下面的代码中,便在创建缓存对象时指定了当缓存未命中时通过userDao.getUser()方法去DB中执行数据查询操作:

public LoadingCache<String, User> createUserCache() {
    return Caffeine.newBuilder()
            .maximumSize(10000L)
            .build(key -> userDao.getUser(key));
}

CacheLoader更适用于所有回源场景使用的回源策略都固定且统一的情况。

对具体业务使用的时候更加的友好,调用get方法也更加简单,只需要传入带查询的key值即可。

不回源

接口 功能说明
getIfPresent 从内存中查询,如果存在则返回对应值,不存在则返回null
getAllPresent 批量从内存中查询,如果存在则返回存在的键值对,不存在的key则不出现在结果集里
public static void main(String[] args) {
    LoadingCache<String, User> cache = Caffeine.newBuilder().build(userId -> userDao.getUser(userId));
    cache.put("124", new User("124", "张三"));
    User userInfo = cache.getIfPresent("123");
    System.out.println(userInfo);
    Map<String, User> presentUsers =
            cache.getAllPresent(Stream.of("123", "124", "125").collect(Collectors.toList()));
    System.out.println(presentUsers);
}

异步Callable

要想支持异步场景下使用缓存,则创建的时候必须要创建一个异步缓存类型,可以通过buildAsync()方法来构建一个AsyncCache类型缓存对象,进而可以在异步场景下进行使用。

public static void main(String[] args) {
    AsyncCache<String, User> asyncCache = Caffeine.newBuilder().buildAsyn();
    CompletableFuture<User> userCompletableFuture = asyncCache.get("123", s -> userDao.getUser(s));
    System.out.println(userCompletableFuture.join());
}

get方法传入了Callable回源逻辑,然后会开始异步的加载处理操作,并返回了个CompletableFuture类型结果,最后如果需要获取其实际结果的时候,需要等待其异步执行完成然后获取到最终结果(通过上述代码中的join()方法等待并获取结果)。

异步CacheLoader

异步处理的时候,Caffeine也支持直接在创建的时候指定CacheLoader对象,然后生成支持异步回源操作的AsyncLoadingCache缓存对象,然后在使用get方法获取结果的时候,也是返回的CompletableFuture异步封装类型,满足在异步编程场景下的使用。

public static void main(String[] args) {
    try {
        AsyncLoadingCache<String, User> asyncLoadingCache =
                Caffeine.newBuilder().maximumSize(1000L).buildAsync(key -> userDao.getUser(key));
        CompletableFuture<User> userCompletableFuture = asyncLoadingCache.get("123");
        System.out.println(userCompletableFuture.join());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

异步AsyncCacheLoader

在创建的时候给定一个用于回源处理的CacheLoader之外,Caffeine还有一个buildAsync的重载版本,允许传入一个同样是支持异步并行处理的AsyncCacheLoader对象。

public static void main(String[] args) {
    try {
        AsyncLoadingCache<String, User> asyncLoadingCache =
                Caffeine.newBuilder().maximumSize(1000L).buildAsync(
                        (key, executor) -> CompletableFuture.supplyAsync(() -> userDao.getUser(key), executor)
                );
        CompletableFuture<User> userCompletableFuture = asyncLoadingCache.get("123");
        System.out.println(userCompletableFuture.join());
    } catch (Exception e) {
        e.printStackTrace();
    }
}
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!