@zero1036
2019-03-21T07:31:32.000000Z
字数 4700
阅读 4892
Java-其他库 架构
private Map cache = new ConcurenthashMap();Object getFromCache(String Key){Object value = cache.get(key);if(value == null){value = getFromSource(key);cache.put(key, value);}}
多线程并发访问if(value == null),只有第一条线程加载数据,后续线程才能命中缓存;反之,如果getFromSource()方法请求资源有阻塞,所有流量都会通过value == null的判断冲击请求资源,缓存根本没有起到保护资源的作用;解决方案:加载数据时,需要加锁,避免多线程同时装载;
针对以上问题,可以通过加锁避免并发装载数据,对于同一份需要装载的数据,需要加锁避免多个线程同时装载。一个线程在等待装载数据,其他线程应该等待它完成装载。数据装载完成之后,等待中的其他工作线程应该直接使用新装载的数据
private Map cache = new ConcurenthashMap();Object getFromCache(String Key){lock(this) { // or syncObject value = cache.get(key);if(value == null){value = getFromSource(key);cache.put(key, value);}}}
LoadingCache解决方案:
@Testpublic void testMultiThread() throws InterruptedException {LoadingCache<String, String> cache = CacheBuilder.newBuilder()//缓存项在创建后,在给定时间内没有被读/写访问,则清除。.expireAfterAccess(100, TimeUnit.MILLISECONDS).build(new CacheLoader<String, String>() {@Overridepublic String load(String key) throws Exception {System.out.println("loading from CacheLoader datasource ");Thread.sleep(500);return "target value";}});//并发装载资源,线程自动挂起等待Thread thread1 = new Thread(() -> getAndReload(cache));Thread thread2 = new Thread(() -> getAndReload(cache));thread1.start();thread2.start();Thread.sleep(2000);//output ://loading from CacheLoader datasource//Thread-1:target value//Thread-2:target value}
断崖式下滑问题:
缓存的数据更新逻辑和数据在缓存中的存在被绑死
只有当缓存数据被清理时,才有机会更新数据
而数据被清理时,请求拿不到旧数据被迫等待,造成停顿
解决方案:始终只有一条线程更新缓存数据,而其他线程不阻塞不等待,直接获取缓存数据。缓存过期策略调整为不过期,而是进程主动更新缓存。
loadingCache解决方案:
@Testpublic void testMultiThreadAndReloadAsync() throws InterruptedException {LoadingCache<String, String> cache = CacheBuilder.newBuilder().refreshAfterWrite(100, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<String, String>() {@Overridepublic void onRemoval(RemovalNotification<String, String> removalNotification) {System.out.println(Thread.currentThread().getName() + "-remove key:" + removalNotification.getKey());System.out.println(Thread.currentThread().getName() + "-remove value:" + removalNotification.getValue());}}).build(new CacheLoader<String, String>() {@Overridepublic String load(String key) throws InterruptedException {System.out.println(Thread.currentThread().getName() + "start loading");value++;String output = String.valueOf(value);Thread.sleep(1000L);System.out.println(Thread.currentThread().getName() + "load from db:" + output);return output;}});//此外需要注意一个点,这里的定时并不是真正意义上的定时。Guava cache的刷新需要依靠用户请求线程,让该线程去进行load方法的调用,所以如果一直没有用户尝试获取该缓存值,则该缓存也并不会刷新。for (int i = 0; i < 5; i++) {Thread thread3 = new Thread(() -> {while (true) {try {Thread.sleep((long) (Math.random() * 1000));getAndReload(cache);} catch (InterruptedException ex) {ex.printStackTrace();}}});thread3.start();}Thread.sleep(800000L);}private void getAndReload(LoadingCache<String, String> cache) {try {String result = cache.get("key");System.out.println(Thread.currentThread().getName() + ":get from cache:" + result);} catch (ExecutionException ex) {ex.printStackTrace();}}
未完待续
V get(K k): 内部调用getOrLoad(K key)方法,缓存中有对应的值则返回,没有则使用CacheLoader load方法getOrLoad(K key)方法为线程安全方法,内部加锁V getIfPresent(Object key):缓存中有对应的值则返回,没有则返回NULL注意:理论上,expire的机制是只要给定时间内一直有读或写的访问,本地缓存就不会过期
通过定时刷新可以让缓存项保持可用,但请注意:缓存项只有在被检索时才会真正刷新,
即只有刷新间隔时间到了你再去get(key)才会重新去执行Loading否则就算刷新间隔时间到了也不会执行loading操作。因此,如你在缓存上同时声明expireAfterWrite和refreshAfterWrite,缓存并不会因为刷新盲目地定时重置如果缓存项没有被检索,那刷新就不会真的发生,缓存项在过期时间后也变得可以回收。
@Testpublic void testLoadingCacheExpireAfterAccess2() throws InterruptedException, ExecutionException {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").build();ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);int expireAfterAccess = 300;int refreshAfterWrite = 200;LoadingCache<String, String> cache = CacheBuilder.newBuilder()//缓存项在创建后,在给定时间内没有被读/写访问,则清除。.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS).refreshAfterWrite(refreshAfterWrite, TimeUnit.MILLISECONDS).recordStats().build(new CacheLoader<String, String>() {@Overridepublic String load(String key) throws Exception {System.out.println("loading from CacheLoader datasource ");return "target value";}});Future submit = executorService.submit(() -> {while (true) {String value = cache.get("key");System.out.println(value);System.out.println(cache.stats());try {//线程挂起时长超过300毫秒,缓存会过期,从cacheLoader中加载//反之,若少于300毫秒,缓存永远不会过期TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e1) {Thread.currentThread().interrupt();}}});Thread.sleep(2000);submit.cancel(true);Thread.sleep(500);String value = cache.get("key");System.out.println("last:" + value);System.out.println("last:" + cache.stats());}