@linwbai
2018-08-03T16:09:21.000000Z
字数 3716
阅读 940
eureka
EurekaBootStrap#contextInitialized >> initEurekaServerContext >> DefaultEurekaServerContext#initialize >> PeerEurekaNodes#start >> updatePeerEurekaNodes >> createPeerEurekaNode >> new PeerEurekaNode >> TaskDispatchers#createBatchingTaskDispatcher >> TaskExecutors#batchExecutors >> new BatchWorkerRunnable >> run >> getWork、ReplicationTaskProcessor#process、AcceptorExecutor#reprocess
//resolvePeerUrls方法为获取集群中所有serviceUrl,不包括自身
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = () -> {
updatePeerEurekaNodes(resolvePeerUrls());
};
taskExecutor.scheduleWithFixedDelay(peersUpdateTask,..,..,..);
updatePeerEurekaNodes方法中和原来或者上次更新的peerEurekaNodeUrls比较,移除异常的节点PeerEurekaNode#shutDown。添加新增的节点this#createPeerEurekaNode 创建一个新的PeerEurekaNode。
创建PeerEurekaNode节点的过程中会创建一个AcceptorRunner守护线程
TaskHolder(ID id, T task, long expiryTime) {
this.id = id;
//延迟执行时间
this.expiryTime = expiryTime;
this.task = task;
//对象创建时间
this.submitTimestamp = System.currentTimeMillis();
}
/**
* reprocessQueue 再次处理队列
* acceptorQueue 存放接收到任务的队列
* pendingTasks 未执行任务map
* batchWorkQueue 批量执行任务队列
* processingOrder处理任务的顺序队列
*/
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
//reprocessQueue和acceptorQueue中的TaskHolder放入任务队列processingOrder中。 其中如果是reprocessQueue中的如果满足expiryTime大于当前时间并且不在pendingTasks中则放在processingOrder的头部
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
//把processingOrder队列中的任务转到batchWorkQueue队列中去
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
} catch (Throwable e) {
}
}
//是否满足批量任务处理
if (hasEnoughTasksForNextBatch()) {
...
}
//正在处理的任务队列为空
if (processingOrder.isEmpty()) {
return false;
}
//未执行的任务大小 >= 最大缓存大小 则返回true 默认10000参数为maxElementsInPeerReplicationPool
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
//顺序执行队列中第一个TaskHolder存在的时间如果大于maxBatchingDelay 则满足批量任务处理的条件 maxBatchingDelay固定为 500 PeerEurekaNode.MAX_BATCHING_DELAY_MS
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
return delay >= maxBatchingDelay;
List<TaskHolder>
while (!isShutdown.get()) {
//从batchWorkQueue中获取List<TaskHolder>
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
//执行复制逻辑 http请求其他节点的 peerreplication/batch/ 接口
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
//失败的话放入reprocessQueue队列
taskDispatcher.reprocess(holders, result);
break;
}
...
}
//getWork方法
//workQueue就是batchWorkQueue
do {
result = workQueue.poll(1, TimeUnit.SECONDS);
} while (!isShutdown.get() && result == null);
//ReplicationInstance.java
private String appName;
private String providerId;
private String id;
private Long lastDirtyTimestamp;
private String overriddenStatus;
private String status;
private InstanceInfo instanceInfo;
private Action action;
//dispatch方法根据ReplicationInstance的操作类型进行不同的处理
switch (instanceInfo.getAction()) {
case Register:
handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
handleCancel(resource);
break;
case StatusUpdate:
handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
handleDeleteStatusOverride(instanceInfo, resource);
break;
}
我们看一下比较复杂Heartbeat
java