[关闭]
@changedi 2015-11-24T13:48:01.000000Z 字数 16489 阅读 8219

HSF工作原理——源码浅析

Java 框架


1. 简介

Hsf本身是一个服务框架,支持发布订阅模式的服务调用。服务发布者(provider)开发服务接口和实现,并把服务发布到配置中心;服务消费者(consumer)配置服务接口,调用消费服务。Hsf具体操作时如何进行的呢?

2. 服务开发

2.1 provider

发布者开发一个服务实现,只需要进行如下的配置即可

  1. <bean class="com.taobao.hsf.app.spring.util.HSFSpringProviderBean" init-method="init">
  2. <property name="serviceInterface" value="{your_interface}"/>
  3. <property name="target" ref="{your_implementation_class}"/>
  4. <property name="serviceVersion" value="{your_service_version}"/>
  5. <property name="clientTimeout" value="500"/>
  6. </bean>

HSFSpringProviderBean组合了HSFApiProviderBean,bean加载时即new了HSFApiProviderBean的实例。服务初始化时,调用HSFSpringProviderBean的init方法,里面检测并初始化一些配置,然后将服务发布出去。发布具体是由ProcessComponent调用publish方法。具体示例如下:
HSFSpringProviderBean.java:

  1. public void init() throws Exception {
  2. // 避免被初始化多次
  3. if (!providerBean.getInited().compareAndSet(false, true)) {
  4. return;
  5. }
  6. LoggerInit.initHSFLog();
  7. AppInfoUtils.initAppName(providerBean.getMetadata());
  8. SpasInit.initSpas();
  9. providerBean.checkConfig();
  10. publishIfNotInSpringContainer();
  11. }
  12. private void publishIfNotInSpringContainer() {
  13. if (!isInSpringContainer) {
  14. LOGGER.warn("[SpringProviderBean]不是在Spring容器中创建, 不推荐使用");
  15. providerBean.publish();
  16. }
  17. }

HSFApiProviderBean.java:

  1. public void publish() {
  2. // 防止一个服务被发布多次
  3. if (!isPublished.compareAndSet(false, true)) {
  4. return;
  5. }
  6. try {
  7. boolean pub = true;
  8. if (unitService != null) {
  9. pub = unitService.beforePublish(metadata);
  10. }
  11. if (pub) {
  12. HSFServiceContainer.getInstance(ProcessService.class).publish(metadata);
  13. }
  14. } catch (Exception e) {
  15. LOGGER.error("", "接口[" + metadata.getInterfaceName() + "]版本[" + metadata.getVersion() + "]发布为HSF服务失败", e);
  16. throw new RuntimeException(e);
  17. }
  18. }

ProcessComponent.java:

  1. @Override
  2. public void publish(ServiceMetadata metadata) throws HSFException {
  3. try {
  4. rpcProtocolService.registerProvider(metadata);
  5. } catch (HSFException e) {
  6. LOGGER.error("", "RPC协议:方式发布HSF服务时出现错误,请确认服务:" + metadata.getUniqueName() + "的rpc属性的配置!");
  7. throw e;
  8. }
  9. for (ProcessHookService hookService : hookServices) {
  10. hookService.prePublish(metadata);
  11. }
  12. // 检查是否需要延迟发布服务
  13. if (metadata.isReadyToPublish()) {
  14. if (!metadata.getGroup().equalsIgnoreCase(metadata.getDefaultGroup())) {
  15. metadataService.unregister(metadata);
  16. try {
  17. Thread.sleep(100);
  18. } catch (InterruptedException e) {
  19. LOGGER.error("", "发布时注销错误:" + e.getMessage());
  20. }
  21. }
  22. metadataService.publish(metadata);
  23. LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]发布为HSF服务成功!");
  24. } else {
  25. LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]服务使用了延迟发布功能,服务未发布!");
  26. }
  27. for (ProcessHookService hookService : hookServices) {
  28. hookService.afterPublish(metadata);
  29. }
  30. metadataInfoStoreService.store(metadata);
  31. }

具体看看发布的步骤:

  1. 首先把对应的信息注册,rpcProtocolService.registerProvider(metadata);
  2. 第二步做发布前的prePublish
  3. 接下来发布,metadataService.publish(metadata);
  4. 最后做发布后的afterPublish
  5. 把meta信息存起来保证一天只发布一次。

注册的代码如下:

  1. public void registerProvider(ServiceMetadata metadata) throws HSFException {
  2. // 仅启动一次HSF SERVER
  3. if (isProviderStarted.compareAndSet(false, true)) {
  4. try {
  5. providerServer.startHSFServer();
  6. Runtime.getRuntime().addShutdownHook(new Thread() {
  7. public void run() {
  8. try {
  9. for (ProviderServiceModel serviceModel : ApplicationModel.instance().allProvidedServices()) {
  10. serviceModel.getMetadata().fireMetadataBeforeChanged();
  11. metadataService.unregister(serviceModel.getMetadata());
  12. // HSFServiceContainer.getInstance(MetaSupportService.class).unregister(
  13. // serviceModel.getMetadata());
  14. }
  15. Thread.sleep(HSFServiceContainer.getInstance(ConfigurationService.class)
  16. .getshutdownHookWaitTime());
  17. providerServer.stopHSFServer();
  18. } catch (Exception e) {
  19. LoggerInit.LOGGER.warn("Exception happens during stop server:", e);
  20. }
  21. }
  22. });
  23. } catch (Exception e) {
  24. throw new HSFException(LoggerHelper.getErrorCodeStr("hsf", "HSF-0016", "环境问题", "启动HSF SERVER失败."), e);
  25. }
  26. }
  27. // 分配线程池
  28. int corePoolSize = metadata.getCorePoolSize();
  29. int maxPoolSize = metadata.getMaxPoolSize();
  30. if (corePoolSize > 0 && maxPoolSize > 0 && maxPoolSize >= corePoolSize) {
  31. providerServer.allocThreadPool(metadata.getUniqueName(), corePoolSize, maxPoolSize);
  32. }
  33. // 注册对象到HSF Server上
  34. providerServer.addMetadata(metadata.getUniqueName(), metadata, metadata.getTarget());
  35. // providerServer.addWorker(metadata.getUniqueName(), metadata.getTarget());
  36. }

具体我们来看一下,首先要启动providerServer,这个是什么呢?providerServer本质是一个NettyServer,在HSFProviderServer类里持有一个NettyServer的引用server,providerServer.startHSFServer();就是在启动这个NettyServer,并绑定端口。接着代码里给虚拟机关闭加了个hook,当虚拟机关闭时,会unregister服务,同时关掉providerServer。添加了hook后,为服务分配一个线程池,最后把服务的名字、meta和实现类都加入server。当然所谓的加入server,其实就是把服务名称和方法名称都加入到一个Map里,这就注册号了,方便后续消费的时候快速启动。

至此,provider的服务就启动并且注册好了。等下,这不就只是provider本地起了一个服务吗?那不需要远程注册?消费者如何知道这些服务呢?对了,这只是把服务在本地启动并注册,还缺少发布这个动作,发布是需要metadataService.publish。这里的代码如下:(以发布到configServer为例)

  1. @Override
  2. public void publish(ServiceMetadata metadata) {
  3. String serviceUniqueName = metadata.getUniqueName() + metadata.getConfigStyle();
  4. synchronized (lock) {
  5. // 考虑多注册的情况
  6. List<String> centers = metadata.getConfigserverCenter();
  7. if (centers != null && centers.size() > 0) {
  8. for (String center : centers) {
  9. Map<String, Publisher<String>> centerPublishers = publishers.get(center);
  10. if (centerPublishers == null) {
  11. centerPublishers = new HashMap<String, Publisher<String>>();
  12. publishers.put(center, centerPublishers);
  13. }
  14. if (httpPublish()) {
  15. Map<String, Publisher<String>> centerHttpPublishers = httpPublishers.get(center);
  16. if (centerHttpPublishers == null) {
  17. centerHttpPublishers = new HashMap<String, Publisher<String>>();
  18. httpPublishers.put(center, centerHttpPublishers);
  19. }
  20. }
  21. // let Dubbo style to exist two group as issue #219
  22. if (metadatas.add(metadata)) {
  23. Publisher<String> publisher = doPublish(metadata, center);
  24. centerPublishers.put(serviceUniqueName, publisher);
  25. if (httpPublish()) {
  26. Publisher<String> httpPublisher = doHttpPublish(metadata, center);
  27. httpPublishers.get(center).put(serviceUniqueName, httpPublisher);
  28. }
  29. }
  30. }
  31. } else {
  32. Map<String, Publisher<String>> centerPublishers = publishers.get(DEFAULT);
  33. if (centerPublishers == null) {
  34. centerPublishers = new HashMap<String, Publisher<String>>();
  35. publishers.put(DEFAULT, centerPublishers);
  36. }
  37. if (httpPublish()) {
  38. Map<String, Publisher<String>> centerHttpPublishers = httpPublishers.get(DEFAULT);
  39. if (centerHttpPublishers == null) {
  40. centerHttpPublishers = new HashMap<String, Publisher<String>>();
  41. httpPublishers.put(DEFAULT, centerHttpPublishers);
  42. }
  43. }
  44. if (!centerPublishers.containsKey(serviceUniqueName)) {
  45. metadatas.add(metadata);
  46. Publisher<String> publisher = doPublish(metadata);
  47. centerPublishers.put(serviceUniqueName, publisher);
  48. if (httpPublish()) {
  49. Publisher<String> httpPublisher = doHttpPublish(metadata);
  50. httpPublishers.get(DEFAULT).put(serviceUniqueName, httpPublisher);
  51. }
  52. }
  53. }
  54. }
  55. }

发布动作主要干的事情就是连接到configServer,然后send一个package过去,告知服务的信息。当然HSF做了大量的服务连接的准备和善后工作,这里就不贴代码了。这里的例子是老版本的HSF连接configserver的做法,兼容dubbo后连接zk的代码干净了很多,直接zkClient创建目录完事。

2.2 服务consumer

这样provider的事情就说完了,可能中间很多代码的配置没有说清楚,后面consumer的时候会讲到。

说完了注册发布服务(provider的事),再来看看订阅消费服务(consumer的事)。

如果有个客户要调用你的服务,那他要做的就是在自己的应用中做如下配置(应用使用spring配置例子):

  1. <bean id="myService" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean">
  2. <property name="interfaceName">
  3. <value>{your_interface}</value>
  4. </property>
  5. <property name="version">
  6. <value>{your_service_version}</value>
  7. </property>
  8. <property name="group">
  9. <value>{your_group}</value>
  10. </property>
  11. </bean>

这样,在他的应用代码里,只要注入这个bean,比如:

  1. @Autowired
  2. {your_interface} myService;
  3. Public void test(){
  4. myService.doSth();
  5. }

就可以使用了。

service被消费时的过程很明确,因为发布/订阅模型嘛,第一步总要去订阅,也就是去连接了。当然首先做的是初始化,在HSFSpringConsumerBean的init阶段,会把引用的HSFApiConsumerBean做bean的初始化,初始化过程就是set注入各种配置属性,当然最主要的就是上面列出的几个配置。与provider不同的是consumer不需要check什么,直接init。代码如下:

  1. /**
  2. * 初始化
  3. *
  4. * @throws Exception
  5. */
  6. public void init() throws Exception {
  7. // 避免被初始化多次
  8. if (!inited.compareAndSet(false, true)) {
  9. LOGGER.warn(LoggerHelper.getErrorCodeStr("hsf", "HSF-0020", "业务问题", "HSF服务:" + metadata.getUniqueName()
  10. + " 重复初始化!"));
  11. return;
  12. }
  13. LoggerInit.initHSFLog();
  14. AppInfoUtils.initAppName(metadata);
  15. if (HSFServiceTargetUtil.isGeneric(metadata.getGeneric()) && metadata.getIfClazz() == null) {
  16. metadata.setIfClazz(GenericService.class);
  17. } else if (metadata.getIfClazz() == null) {
  18. StringBuilder errorMsg = new StringBuilder();
  19. errorMsg.append("ConsumerBean中指定的接口类不存在[");
  20. errorMsg.append(metadata.getInterfaceName()).append("].");
  21. throw new IllegalArgumentException(errorMsg.toString());
  22. }
  23. if (asyncallMethods != null) {
  24. for (String desc : asyncallMethods) {
  25. this.parseAsyncFunc(desc);
  26. }
  27. }
  28. metadata.initUniqueName();
  29. ProcessService processService = HSFServiceContainer.getInstance(ProcessService.class);
  30. try {
  31. metadata.setTarget(processService.consume(metadata));
  32. LOGGER.warn("成功生成对接口为[" + metadata.getInterfaceName() + "]版本为[" + metadata.getVersion() + "]的HSF服务调用的代理!");
  33. } catch (Exception e) {
  34. LOGGER.error("", "生成对接口为[" + metadata.getInterfaceName() + "]版本为[" + metadata.getVersion()
  35. + "]的HSF服务调用的代理失败", e);
  36. // since 2007,一旦初始化异常就抛出
  37. throw e;
  38. }
  39. int waitTime = metadata.getMaxWaitTimeForCsAddress();
  40. if (waitTime > 0) {
  41. try {
  42. metadata.getCsAddressCountDownLatch().await(waitTime, TimeUnit.MILLISECONDS);
  43. } catch (InterruptedException e) {
  44. // ignore
  45. }
  46. }
  47. }

其中比较重要的过程就是metadata.setTarget(processService.consume(metadata));,ServiceMetadata是个什么东西,这个在provider的时候没有讲,这里提一下,这个是HSF服务的元数据信息,包括了服务整个发布订阅过程中涉及到的各种数据元描述,包括比如:版本号、分组、服务接口名称、异步调用标记、单元化信息、动态代理时的一些方法和接口信息等等。基本贯穿在服务整个发布和调用的各种环节中。

先继续跟这个setTarget方法,target就是服务要调用的目标类,是个Object,说到底你远程服务调用要一个实例的呀。那processService.consume(metadata)就来生成这个Object。这里就又引入了ProcessService,这个在provider也引入过,这是HSF服务发布与消费总控流程,它的publish方法负责对外发布HSF服务,负责将服务调用地址注册到服务配置中心(configServer)。而consume方法就是负责生成调用远程HSF服务的代理。此代理的效果为生成ServiceMetadata中指定的interface的代理,调用时可将代理转型为服务接口,并进行直接的对象调用。代理将完成对于远程HSF的调用。

跟进到代码里看看:

  1. @Override
  2. public Object consume(ServiceMetadata metadata) throws HSFException {
  3. // 首先从缓存中查找服务实例
  4. if (ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()) != null) {
  5. return ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()).getProxyObject();
  6. }
  7. for (ProcessHookService hookService : hookServices) {
  8. hookService.preConsume(metadata);
  9. }
  10. // 生成调用远程HSF服务的代理
  11. List<Class<?>> interfaces = new ArrayList<Class<?>>(3);
  12. if (metadata.getIfClazz() != null) {
  13. interfaces.add(metadata.getIfClazz());
  14. }
  15. if (metadata.isSupportEcho()) {
  16. interfaces.add(com.taobao.hsf.remoting.service.EchoService.class);
  17. }
  18. // 默认都支持Generic
  19. if (!com.taobao.hsf.remoting.service.GenericService.class.equals(metadata.getIfClazz())) {
  20. interfaces.add(com.taobao.hsf.remoting.service.GenericService.class);
  21. }
  22. Class<?>[] interfacesArray = new Class<?>[interfaces.size()];
  23. interfaces.toArray(interfacesArray);
  24. HSFServiceProxy proxy = new HSFServiceProxy(metadata, interfacesArray,
  25. !HSFConstants.PROXY_STYLE_JAVASSIST.equalsIgnoreCase(metadata.getProxyStyle()));
  26. Object proxyObj = proxy.getInstance();
  27. // 订阅服务信息
  28. metadataService.subscribe(metadata);
  29. for (ProcessHookService hookService : hookServices) {
  30. hookService.afterConsume(metadata);
  31. }
  32. metadataInfoStoreService.store(metadata);
  33. return proxyObj;
  34. }

抛开缓存和前后的hook,其实生成代理就两句话:

  1. HSFServiceProxy proxy = new HSFServiceProxy(metadata, interfacesArray,
  2. !HSFConstants.PROXY_STYLE_JAVASSIST.equalsIgnoreCase(metadata.getProxyStyle()));
  3. Object proxyObj = proxy.getInstance();

而这个HSFServiceProxy的构造主要做的也就一件事:

  1. this.instance = Proxy.newProxyInstance(serviceConsumerMetadata.getIfClazz().getClassLoader(), classes,this);

instance是HSFServiceProxy的一个Object类型的域,而HSFServiceProxy本身是实现InvocationHandler的,毕竟动态代理嘛。除了这件主要做的事情——生成instance以外,还有一个事情就是把这个instance的方法存起来:

  1. ApplicationModel.instance().initConsumerService(metadata.getUniqueName(),
  2. new ConsumerServiceModel(metadata, this.instance, isJava));
  3. this.serviceModel = ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName());

这个initConsumerService方法就是把代理对象里的所有方法拿到并存到一个map放到ConsumerServiceModel里。ConsumerServiceModel就是一个持有这个instance和methods的类。补充说明一下,这些methods是存到一个Map里的,ConsumerMethodModel是对method的一些封装,包含了methodName、method对象、parameter对象等信息。

生成了代理的对象后,具体代理调用的逻辑在哪里进行呢?就是实现InvocationHandler要override的方法嘛:invoke。就是这样一个逻辑:

  1. @Override
  2. public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
  3. ...
  4. ConsumerMethodModel methodModel = serviceModel.getMethodModel(method);
  5. return this.trueInvoke(methodModel, args);
  6. ...
  7. }

从刚才说过的ConsumerServiceModel里找到对应的ConsumerMethodModel,然后调用它:

  1. private Object trueInvoke(ConsumerMethodModel methodModel, Object[] args) throws HSFException, Throwable {
  2. AtomicInteger maxPoolSize = serviceConsumerMetadata.getCurConsumerMaxPoolSize();
  3. if (maxPoolSize == null) {
  4. return rpcProtocolService.invokeWithMethodObject(methodModel, args);
  5. } else {
  6. int currentSize = maxPoolSize.decrementAndGet();
  7. try {
  8. if (currentSize < 0) {
  9. String errorMsg = MessageFormat.format(
  10. "消费端线程池已满,service[{0}],consumerMaxPoolSize[{1}]",
  11. new Object[] { serviceConsumerMetadata.getUniqueName(),
  12. serviceConsumerMetadata.getConsumerMaxPoolSize() });
  13. LOGGER.warn(errorMsg);
  14. throw new RuntimeException(new HSFException(errorMsg));
  15. } else {
  16. return rpcProtocolService.invokeWithMethodObject(methodModel, args);
  17. }
  18. } finally {
  19. maxPoolSize.incrementAndGet();
  20. }
  21. }
  22. }

很清楚了,代理的调用就是

  1. rpcProtocolService.invokeWithMethodObject(methodModel, args)

里面的rpcProtocolService是

  1. HSFServiceContainer.getInstance(RPCProtocolTemplateService.class)

这样的单例。RPCProtocolTemplateComponent是RPCProtocolTemplateService的一个实现,也是唯一的实现,具体做的事情就是实现RPC协议调用时的共同部分,例如HSFRequest的组装,地址路由的获取、检查,监测信息的埋点,日志的处理等最后转由各RPC协议的具体实现完成远程调用。这个类在provider的时候就已经提到过,有个方法是registerProvider。在consumer阶段,就是invokeWithMethodObject方法了。这个方法里核心的代码是invoke0方法,具体逻辑如下:

  1. // 组装HSFRequest
  2. final HSFRequest request = new HSFRequest();
  3. request.setTargetServiceUniqueName(serviceUniqueName);
  4. request.setMethodName(methodName);
  5. request.setMethodArgSigs(parameterTypes);
  6. request.setMethodArgs(args);
  7. request.setReturnClass(methodModel.getReturnClass());

首先拼装好request,然后就是寻址,寻址的代码在我看来是HSF里比较复杂的代码,逻辑各种兼容,毕竟这是大事,其他的目标比较直接,而这里的分支会特别多,(我就偷个懒,不去罗列各种分支情况了),我们就按默认的寻址方法来,找到目标地址:

  1. remotingURL = selectAddress(targetUnit, metadata, serviceUniqueName, methodName,
  2. parameterTypes, args);

具体跟踪selectAddress方法调用栈,最底层是:

  1. List<String> addresses = this.addressPool.getArgsAddresses(serviceUniqueName, methodName, paramTypeStrs, args);

这里会列出所有的服务地址,真实的调用地址从个list选出。这个地址pool存了这些地址,什么时候存的呢?这个pool对应的类是AddressPool,被AddressService持有,在AddressService实例化的时候就被new出来,而构造逻辑里有这样一段代码:

  1. public AddressPool(String unitName) {
  2. this.unitName = unitName;
  3. ExecutorService addressAndRuleExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(
  4. HSFThreadNameSpace.getUnitThreadPoolName(unitName)));
  5. addressAndRuleExecutor.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. while (true) {
  9. try {
  10. if (signalBell.poll(100, TimeUnit.SECONDS) != null) {
  11. for (AddressBucket addressBucket : pool.values()) {
  12. addressBucket.refreshAddress();
  13. }
  14. }
  15. } catch (Throwable e) {
  16. LoggerInit.LOGGER.error("", "[address pool] Refresh ", e);
  17. } finally {
  18. // 先去掉这个逻辑,使地址推送能及时生效
  19. // try {
  20. // TimeUnit.SECONDS.sleep(3);
  21. // } catch (InterruptedException e) {
  22. // }
  23. }
  24. }
  25. }
  26. });
  27. }

这下明白了,pool启动的时候就起了一个线程去监听一个信号队列,收到信号的时候就让pool里的AddressBucket刷新地址。

最后发起调用:

  1. appResponse = rpcService.invoke(request, methodModel, remotingURL);

如此,一个HSF的远程调用例子就从生产发布到订阅消费讲完了。

等等,好像consume还漏了个事情,你怎么知道rpc去连哪个地址,刚才AddressService那还没讲清楚呢。对了,回到最早的consume方法,好像生成代理Object然后实现invoke方法后,就忘了还有subscribe的事情了,看看

  1. // 订阅服务信息
  2. metadataService.subscribe(metadata);

这句话,一个大事还没讲到呢。provider生产阶段在产生本地服务后,要把服务注册到远程的配置中心ConfigServer。同样的道理,consumer构造好代理bean后,要订阅ConfigServer的服务。要不然刚才提到的远程地址就是空的呀。subscribe方法描述了整个订阅的过程。跟踪subscribe方法的最终结果如下:

  1. @Override
  2. public void subscribe(final ServiceMetadata metadata, final AddressSubscribeListener listener) {
  3. ...
  4. final String group = metadata.getGroup();
  5. final String cs_subscriberId = SUBSCRIBER_PREFIX + serviceUniqueName;
  6. SubscriberRegistration cs_registration = new SubscriberRegistration(cs_subscriberId, uniqueName);
  7. cs_registration.setGroup(group);
  8. Subscriber subscriber = SubscriberRegistrar.register(cs_registration);
  9. subscribers.put(serviceUniqueName, subscriber);
  10. subscriber.setDataObserver(new SubscriberDataObserver() {
  11. @Override
  12. public void handleData(String dataId, final List<Object> datas) {
  13. List<String> urls = new ArrayList<String>();
  14. for (Object url : datas) {
  15. if (((String) url).startsWith("dubbo://")) {
  16. urls.add(((String) url).substring(8));
  17. } else if (((String) url).startsWith("hsf://")) {
  18. urls.add(((String) url).substring(6));
  19. } else {
  20. urls.add((String) url);
  21. }
  22. }
  23. ...
  24. if (!urls.isEmpty()) {
  25. List<String> formattedUrls = new ArrayList<String>();
  26. for (Object serviceUrl : urls) {
  27. if (!StringUtils.isBlank((String) serviceUrl)) {
  28. formattedUrls.add(HSFServiceTargetUtil.formatTargetURL((String) serviceUrl));
  29. }
  30. }
  31. listener.processAddress(metadata, formattedUrls);
  32. } else {
  33. ...
  34. }
  35. ...
  36. }
  37. });
  38. }

这里的主要逻辑是Subscriber subscriber = SubscriberRegistrar.register(cs_registration),干的事情就是通过注册信息拿到订阅者这样一个对象——subscriber。接着添加一个observer,做listener.processAddress(metadata, formattedUrls)这样的事情,那listener还是metadataService,回去看代码:

  1. @Override
  2. public void processAddress(ServiceMetadata metadata, List<String> urls) {
  3. if ((null != urls && !urls.isEmpty())) {
  4. String serviceUniqueName = metadata.getUniqueName();
  5. addressService.setServiceAddresses(serviceUniqueName, urls);
  6. processUnitConfig(metadata, urls);
  7. if (unitAddressService != null) {
  8. unitAddressService.setServiceAddresses(serviceUniqueName, urls);
  9. }
  10. }
  11. }

这样结论明显了,订阅完了ConfigServer后,把observer观察到的地址set到addressService里,如此就回答了上面的远程服务地址的问题。

so far,一个完整的HSF的发布订阅过程就整理出来了,当然这只是HSF众多支持特性中的一例,还有很多其他的方法去发布订阅,同时HSF也在缓存优化、连接优化、性能监控等多方面有代码加强,这篇文章就不涉及了。

3. 总结

服务框架是一个说起来很轻,做起来很重的东西,涉及到客户端配置、远程连接、配置中心、服务路由、容灾多方面的事情。不是简单的说反射调用能解决的。对于HSF,是我在淘宝第一个接触的系统,也是目前阿里巴巴集团RPC框架的标准,效果是拿得出的,但是代码的优化,我个人认为还有很多可做的事情。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注