[关闭]
@zero1036 2019-03-21T15:31:32.000000Z 字数 4700 阅读 4560

Cache常见应用问题与Guava LoadingCache解决方案

Java-其他库 架构


经典缓存写法

  1. private Map cache = new ConcurenthashMap();
  2. Object getFromCache(String Key){
  3. Object value = cache.get(key);
  4. if(value == null){
  5. value = getFromSource(key);
  6. cache.put(key, value);
  7. }
  8. }

并发问题

多线程并发访问if(value == null),只有第一条线程加载数据,后续线程才能命中缓存;反之,如果getFromSource()方法请求资源有阻塞,所有流量都会通过value == null的判断冲击请求资源,缓存根本没有起到保护资源的作用;解决方案:加载数据时,需要加锁,避免多线程同时装载;

针对以上问题,可以通过加锁避免并发装载数据,对于同一份需要装载的数据,需要加锁避免多个线程同时装载。一个线程在等待装载数据,其他线程应该等待它完成装载。数据装载完成之后,等待中的其他工作线程应该直接使用新装载的数据

  1. private Map cache = new ConcurenthashMap();
  2. Object getFromCache(String Key){
  3. lock(this) { // or sync
  4. Object value = cache.get(key);
  5. if(value == null){
  6. value = getFromSource(key);
  7. cache.put(key, value);
  8. }
  9. }
  10. }

LoadingCache解决方案:

  1. @Test
  2. public void testMultiThread() throws InterruptedException {
  3. LoadingCache<String, String> cache = CacheBuilder.newBuilder()
  4. //缓存项在创建后,在给定时间内没有被读/写访问,则清除。
  5. .expireAfterAccess(100, TimeUnit.MILLISECONDS)
  6. .build(new CacheLoader<String, String>() {
  7. @Override
  8. public String load(String key) throws Exception {
  9. System.out.println("loading from CacheLoader datasource ");
  10. Thread.sleep(500);
  11. return "target value";
  12. }
  13. });
  14. //并发装载资源,线程自动挂起等待
  15. Thread thread1 = new Thread(() -> getAndReload(cache));
  16. Thread thread2 = new Thread(() -> getAndReload(cache));
  17. thread1.start();
  18. thread2.start();
  19. Thread.sleep(2000);
  20. //output :
  21. //loading from CacheLoader datasource
  22. //Thread-1:target value
  23. //Thread-2:target value
  24. }

断崖式下滑问题:
缓存的数据更新逻辑和数据在缓存中的存在被绑死
只有当缓存数据被清理时,才有机会更新数据
而数据被清理时,请求拿不到旧数据被迫等待,造成停顿

解决方案:始终只有一条线程更新缓存数据,而其他线程不阻塞不等待,直接获取缓存数据。缓存过期策略调整为不过期,而是进程主动更新缓存。

loadingCache解决方案:

  1. @Test
  2. public void testMultiThreadAndReloadAsync() throws InterruptedException {
  3. LoadingCache<String, String> cache = CacheBuilder.newBuilder()
  4. .refreshAfterWrite(100, TimeUnit.MILLISECONDS)
  5. .removalListener(new RemovalListener<String, String>() {
  6. @Override
  7. public void onRemoval(RemovalNotification<String, String> removalNotification) {
  8. System.out.println(Thread.currentThread().getName() + "-remove key:" + removalNotification.getKey());
  9. System.out.println(Thread.currentThread().getName() + "-remove value:" + removalNotification.getValue());
  10. }
  11. })
  12. .build(new CacheLoader<String, String>() {
  13. @Override
  14. public String load(String key) throws InterruptedException {
  15. System.out.println(Thread.currentThread().getName() + "start loading");
  16. value++;
  17. String output = String.valueOf(value);
  18. Thread.sleep(1000L);
  19. System.out.println(Thread.currentThread().getName() + "load from db:" + output);
  20. return output;
  21. }
  22. });
  23. //此外需要注意一个点,这里的定时并不是真正意义上的定时。Guava cache的刷新需要依靠用户请求线程,让该线程去进行load方法的调用,所以如果一直没有用户尝试获取该缓存值,则该缓存也并不会刷新。
  24. for (int i = 0; i < 5; i++) {
  25. Thread thread3 = new Thread(() -> {
  26. while (true) {
  27. try {
  28. Thread.sleep((long) (Math.random() * 1000));
  29. getAndReload(cache);
  30. } catch (InterruptedException ex) {
  31. ex.printStackTrace();
  32. }
  33. }
  34. });
  35. thread3.start();
  36. }
  37. Thread.sleep(800000L);
  38. }
  39. private void getAndReload(LoadingCache<String, String> cache) {
  40. try {
  41. String result = cache.get("key");
  42. System.out.println(Thread.currentThread().getName() + ":get from cache:" + result);
  43. } catch (ExecutionException ex) {
  44. ex.printStackTrace();
  45. }
  46. }

过期策略

  1. 缓存未设置过期时间,两种缓存时间过期机制:1)、不再读写的指定时间后删除;2)、指定写入一定时间后删除;3)、指定绝对时间后过期
  2. 业务保护、动态过期策略

业务保护

未完待续

loadingCache

get()与getIfPresent()区别

  1. V get(K k): 内部调用getOrLoad(K key)方法,缓存中有对应的值则返回,没有则使用CacheLoader load方法getOrLoad(K key)方法为线程安全方法,内部加锁
  2. V getIfPresent(Object key):缓存中有对应的值则返回,没有则返回NULL

expireAfterAccess(expireAfterWrite)与refreshAfterWrite

  1. expireAfterAccess:缓存项在创建后,在给定时间内没有被读/写访问,则清除
  2. expireAfterWrite:缓存项在创建后,在给定时间内没有写访问,则清除
  3. refreshAfterWrite:缓存项在创建后,定时从cacheLoader检索data并刷新缓存

注意:理论上,expire的机制是只要给定时间内一直有读或写的访问,本地缓存就不会过期
通过定时刷新可以让缓存项保持可用,但请注意:缓存项只有在被检索时才会真正刷新,
即只有刷新间隔时间到了你再去get(key)才会重新去执行Loading否则就算刷新间隔时间到了也不会执行loading操作。因此,如你在缓存上同时声明expireAfterWrite和refreshAfterWrite,缓存并不会因为刷新盲目地定时重置如果缓存项没有被检索,那刷新就不会真的发生,缓存项在过期时间后也变得可以回收。

  1. @Test
  2. public void testLoadingCacheExpireAfterAccess2() throws InterruptedException, ExecutionException {
  3. ThreadFactory threadFactory = new ThreadFactoryBuilder()
  4. .setNameFormat("thread-%d")
  5. .build();
  6. ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
  7. int expireAfterAccess = 300;
  8. int refreshAfterWrite = 200;
  9. LoadingCache<String, String> cache = CacheBuilder.newBuilder()
  10. //缓存项在创建后,在给定时间内没有被读/写访问,则清除。
  11. .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
  12. .refreshAfterWrite(refreshAfterWrite, TimeUnit.MILLISECONDS)
  13. .recordStats()
  14. .build(new CacheLoader<String, String>() {
  15. @Override
  16. public String load(String key) throws Exception {
  17. System.out.println("loading from CacheLoader datasource ");
  18. return "target value";
  19. }
  20. });
  21. Future submit = executorService
  22. .submit(() -> {
  23. while (true) {
  24. String value = cache.get("key");
  25. System.out.println(value);
  26. System.out.println(cache.stats());
  27. try {
  28. //线程挂起时长超过300毫秒,缓存会过期,从cacheLoader中加载
  29. //反之,若少于300毫秒,缓存永远不会过期
  30. TimeUnit.MILLISECONDS.sleep(50);
  31. } catch (InterruptedException e1) {
  32. Thread.currentThread().interrupt();
  33. }
  34. }
  35. });
  36. Thread.sleep(2000);
  37. submit.cancel(true);
  38. Thread.sleep(500);
  39. String value = cache.get("key");
  40. System.out.println("last:" + value);
  41. System.out.println("last:" + cache.stats());
  42. }

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注