@zero1036
2019-03-21T15:31:32.000000Z
字数 4700
阅读 4568
Java-其他库
架构
private Map cache = new ConcurenthashMap();
Object getFromCache(String Key){
Object value = cache.get(key);
if(value == null){
value = getFromSource(key);
cache.put(key, value);
}
}
多线程并发访问if(value == null)
,只有第一条线程加载数据,后续线程才能命中缓存;反之,如果getFromSource()
方法请求资源有阻塞,所有流量都会通过value == null
的判断冲击请求资源,缓存根本没有起到保护资源的作用;解决方案:加载数据时,需要加锁,避免多线程同时装载;
针对以上问题,可以通过加锁避免并发装载数据,对于同一份需要装载的数据,需要加锁避免多个线程同时装载。一个线程在等待装载数据,其他线程应该等待它完成装载。数据装载完成之后,等待中的其他工作线程应该直接使用新装载的数据
private Map cache = new ConcurenthashMap();
Object getFromCache(String Key){
lock(this) { // or sync
Object value = cache.get(key);
if(value == null){
value = getFromSource(key);
cache.put(key, value);
}
}
}
LoadingCache解决方案:
@Test
public void testMultiThread() throws InterruptedException {
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
//缓存项在创建后,在给定时间内没有被读/写访问,则清除。
.expireAfterAccess(100, TimeUnit.MILLISECONDS)
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("loading from CacheLoader datasource ");
Thread.sleep(500);
return "target value";
}
});
//并发装载资源,线程自动挂起等待
Thread thread1 = new Thread(() -> getAndReload(cache));
Thread thread2 = new Thread(() -> getAndReload(cache));
thread1.start();
thread2.start();
Thread.sleep(2000);
//output :
//loading from CacheLoader datasource
//Thread-1:target value
//Thread-2:target value
}
断崖式下滑问题:
缓存的数据更新逻辑和数据在缓存中的存在被绑死
只有当缓存数据被清理时,才有机会更新数据
而数据被清理时,请求拿不到旧数据被迫等待,造成停顿
解决方案:始终只有一条线程更新缓存数据,而其他线程不阻塞不等待,直接获取缓存数据。缓存过期策略调整为不过期,而是进程主动更新缓存。
loadingCache解决方案:
@Test
public void testMultiThreadAndReloadAsync() throws InterruptedException {
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
.refreshAfterWrite(100, TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<String, String>() {
@Override
public void onRemoval(RemovalNotification<String, String> removalNotification) {
System.out.println(Thread.currentThread().getName() + "-remove key:" + removalNotification.getKey());
System.out.println(Thread.currentThread().getName() + "-remove value:" + removalNotification.getValue());
}
})
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + "start loading");
value++;
String output = String.valueOf(value);
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + "load from db:" + output);
return output;
}
});
//此外需要注意一个点,这里的定时并不是真正意义上的定时。Guava cache的刷新需要依靠用户请求线程,让该线程去进行load方法的调用,所以如果一直没有用户尝试获取该缓存值,则该缓存也并不会刷新。
for (int i = 0; i < 5; i++) {
Thread thread3 = new Thread(() -> {
while (true) {
try {
Thread.sleep((long) (Math.random() * 1000));
getAndReload(cache);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
thread3.start();
}
Thread.sleep(800000L);
}
private void getAndReload(LoadingCache<String, String> cache) {
try {
String result = cache.get("key");
System.out.println(Thread.currentThread().getName() + ":get from cache:" + result);
} catch (ExecutionException ex) {
ex.printStackTrace();
}
}
未完待续
V get(K k)
: 内部调用getOrLoad(K key)方法,缓存中有对应的值则返回,没有则使用CacheLoader load方法getOrLoad(K key)方法为线程安全方法,内部加锁V getIfPresent(Object key)
:缓存中有对应的值则返回,没有则返回NULL注意:理论上,expire的机制是只要给定时间内一直有读或写的访问,本地缓存就不会过期
通过定时刷新可以让缓存项保持可用,但请注意:缓存项只有在被检索时才会真正刷新,
即只有刷新间隔时间到了你再去get(key)才会重新去执行Loading否则就算刷新间隔时间到了也不会执行loading操作。因此,如你在缓存上同时声明expireAfterWrite和refreshAfterWrite,缓存并不会因为刷新盲目地定时重置如果缓存项没有被检索,那刷新就不会真的发生,缓存项在过期时间后也变得可以回收。
@Test
public void testLoadingCacheExpireAfterAccess2() throws InterruptedException, ExecutionException {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("thread-%d")
.build();
ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
int expireAfterAccess = 300;
int refreshAfterWrite = 200;
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
//缓存项在创建后,在给定时间内没有被读/写访问,则清除。
.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
.refreshAfterWrite(refreshAfterWrite, TimeUnit.MILLISECONDS)
.recordStats()
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("loading from CacheLoader datasource ");
return "target value";
}
});
Future submit = executorService
.submit(() -> {
while (true) {
String value = cache.get("key");
System.out.println(value);
System.out.println(cache.stats());
try {
//线程挂起时长超过300毫秒,缓存会过期,从cacheLoader中加载
//反之,若少于300毫秒,缓存永远不会过期
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
});
Thread.sleep(2000);
submit.cancel(true);
Thread.sleep(500);
String value = cache.get("key");
System.out.println("last:" + value);
System.out.println("last:" + cache.stats());
}