@changedi
2015-11-24T13:48:01.000000Z
字数 16489
阅读 8197
Java
框架
Hsf本身是一个服务框架,支持发布订阅模式的服务调用。服务发布者(provider)开发服务接口和实现,并把服务发布到配置中心;服务消费者(consumer)配置服务接口,调用消费服务。Hsf具体操作时如何进行的呢?
发布者开发一个服务实现,只需要进行如下的配置即可
<bean class="com.taobao.hsf.app.spring.util.HSFSpringProviderBean" init-method="init">
<property name="serviceInterface" value="{your_interface}"/>
<property name="target" ref="{your_implementation_class}"/>
<property name="serviceVersion" value="{your_service_version}"/>
<property name="clientTimeout" value="500"/>
</bean>
HSFSpringProviderBean组合了HSFApiProviderBean,bean加载时即new了HSFApiProviderBean的实例。服务初始化时,调用HSFSpringProviderBean的init方法,里面检测并初始化一些配置,然后将服务发布出去。发布具体是由ProcessComponent调用publish方法。具体示例如下:
HSFSpringProviderBean.java:
public void init() throws Exception {
// 避免被初始化多次
if (!providerBean.getInited().compareAndSet(false, true)) {
return;
}
LoggerInit.initHSFLog();
AppInfoUtils.initAppName(providerBean.getMetadata());
SpasInit.initSpas();
providerBean.checkConfig();
publishIfNotInSpringContainer();
}
private void publishIfNotInSpringContainer() {
if (!isInSpringContainer) {
LOGGER.warn("[SpringProviderBean]不是在Spring容器中创建, 不推荐使用");
providerBean.publish();
}
}
HSFApiProviderBean.java:
public void publish() {
// 防止一个服务被发布多次
if (!isPublished.compareAndSet(false, true)) {
return;
}
try {
boolean pub = true;
if (unitService != null) {
pub = unitService.beforePublish(metadata);
}
if (pub) {
HSFServiceContainer.getInstance(ProcessService.class).publish(metadata);
}
} catch (Exception e) {
LOGGER.error("", "接口[" + metadata.getInterfaceName() + "]版本[" + metadata.getVersion() + "]发布为HSF服务失败", e);
throw new RuntimeException(e);
}
}
ProcessComponent.java:
@Override
public void publish(ServiceMetadata metadata) throws HSFException {
try {
rpcProtocolService.registerProvider(metadata);
} catch (HSFException e) {
LOGGER.error("", "RPC协议:方式发布HSF服务时出现错误,请确认服务:" + metadata.getUniqueName() + "的rpc属性的配置!");
throw e;
}
for (ProcessHookService hookService : hookServices) {
hookService.prePublish(metadata);
}
// 检查是否需要延迟发布服务
if (metadata.isReadyToPublish()) {
if (!metadata.getGroup().equalsIgnoreCase(metadata.getDefaultGroup())) {
metadataService.unregister(metadata);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("", "发布时注销错误:" + e.getMessage());
}
}
metadataService.publish(metadata);
LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]发布为HSF服务成功!");
} else {
LOGGER.info("接口[" + metadata.getUniqueName() + "]组别[" + metadata.getGroup() + "]服务使用了延迟发布功能,服务未发布!");
}
for (ProcessHookService hookService : hookServices) {
hookService.afterPublish(metadata);
}
metadataInfoStoreService.store(metadata);
}
具体看看发布的步骤:
metadataService.publish(metadata);
注册的代码如下:
public void registerProvider(ServiceMetadata metadata) throws HSFException {
// 仅启动一次HSF SERVER
if (isProviderStarted.compareAndSet(false, true)) {
try {
providerServer.startHSFServer();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
for (ProviderServiceModel serviceModel : ApplicationModel.instance().allProvidedServices()) {
serviceModel.getMetadata().fireMetadataBeforeChanged();
metadataService.unregister(serviceModel.getMetadata());
// HSFServiceContainer.getInstance(MetaSupportService.class).unregister(
// serviceModel.getMetadata());
}
Thread.sleep(HSFServiceContainer.getInstance(ConfigurationService.class)
.getshutdownHookWaitTime());
providerServer.stopHSFServer();
} catch (Exception e) {
LoggerInit.LOGGER.warn("Exception happens during stop server:", e);
}
}
});
} catch (Exception e) {
throw new HSFException(LoggerHelper.getErrorCodeStr("hsf", "HSF-0016", "环境问题", "启动HSF SERVER失败."), e);
}
}
// 分配线程池
int corePoolSize = metadata.getCorePoolSize();
int maxPoolSize = metadata.getMaxPoolSize();
if (corePoolSize > 0 && maxPoolSize > 0 && maxPoolSize >= corePoolSize) {
providerServer.allocThreadPool(metadata.getUniqueName(), corePoolSize, maxPoolSize);
}
// 注册对象到HSF Server上
providerServer.addMetadata(metadata.getUniqueName(), metadata, metadata.getTarget());
// providerServer.addWorker(metadata.getUniqueName(), metadata.getTarget());
}
具体我们来看一下,首先要启动providerServer,这个是什么呢?providerServer本质是一个NettyServer,在HSFProviderServer类里持有一个NettyServer的引用server,providerServer.startHSFServer();就是在启动这个NettyServer,并绑定端口。接着代码里给虚拟机关闭加了个hook,当虚拟机关闭时,会unregister服务,同时关掉providerServer。添加了hook后,为服务分配一个线程池,最后把服务的名字、meta和实现类都加入server。当然所谓的加入server,其实就是把服务名称和方法名称都加入到一个Map里,这就注册号了,方便后续消费的时候快速启动。
至此,provider的服务就启动并且注册好了。等下,这不就只是provider本地起了一个服务吗?那不需要远程注册?消费者如何知道这些服务呢?对了,这只是把服务在本地启动并注册,还缺少发布这个动作,发布是需要metadataService.publish。这里的代码如下:(以发布到configServer为例)
@Override
public void publish(ServiceMetadata metadata) {
String serviceUniqueName = metadata.getUniqueName() + metadata.getConfigStyle();
synchronized (lock) {
// 考虑多注册的情况
List<String> centers = metadata.getConfigserverCenter();
if (centers != null && centers.size() > 0) {
for (String center : centers) {
Map<String, Publisher<String>> centerPublishers = publishers.get(center);
if (centerPublishers == null) {
centerPublishers = new HashMap<String, Publisher<String>>();
publishers.put(center, centerPublishers);
}
if (httpPublish()) {
Map<String, Publisher<String>> centerHttpPublishers = httpPublishers.get(center);
if (centerHttpPublishers == null) {
centerHttpPublishers = new HashMap<String, Publisher<String>>();
httpPublishers.put(center, centerHttpPublishers);
}
}
// let Dubbo style to exist two group as issue #219
if (metadatas.add(metadata)) {
Publisher<String> publisher = doPublish(metadata, center);
centerPublishers.put(serviceUniqueName, publisher);
if (httpPublish()) {
Publisher<String> httpPublisher = doHttpPublish(metadata, center);
httpPublishers.get(center).put(serviceUniqueName, httpPublisher);
}
}
}
} else {
Map<String, Publisher<String>> centerPublishers = publishers.get(DEFAULT);
if (centerPublishers == null) {
centerPublishers = new HashMap<String, Publisher<String>>();
publishers.put(DEFAULT, centerPublishers);
}
if (httpPublish()) {
Map<String, Publisher<String>> centerHttpPublishers = httpPublishers.get(DEFAULT);
if (centerHttpPublishers == null) {
centerHttpPublishers = new HashMap<String, Publisher<String>>();
httpPublishers.put(DEFAULT, centerHttpPublishers);
}
}
if (!centerPublishers.containsKey(serviceUniqueName)) {
metadatas.add(metadata);
Publisher<String> publisher = doPublish(metadata);
centerPublishers.put(serviceUniqueName, publisher);
if (httpPublish()) {
Publisher<String> httpPublisher = doHttpPublish(metadata);
httpPublishers.get(DEFAULT).put(serviceUniqueName, httpPublisher);
}
}
}
}
}
发布动作主要干的事情就是连接到configServer,然后send一个package过去,告知服务的信息。当然HSF做了大量的服务连接的准备和善后工作,这里就不贴代码了。这里的例子是老版本的HSF连接configserver的做法,兼容dubbo后连接zk的代码干净了很多,直接zkClient创建目录完事。
这样provider的事情就说完了,可能中间很多代码的配置没有说清楚,后面consumer的时候会讲到。
说完了注册发布服务(provider的事),再来看看订阅消费服务(consumer的事)。
如果有个客户要调用你的服务,那他要做的就是在自己的应用中做如下配置(应用使用spring配置例子):
<bean id="myService" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean">
<property name="interfaceName">
<value>{your_interface}</value>
</property>
<property name="version">
<value>{your_service_version}</value>
</property>
<property name="group">
<value>{your_group}</value>
</property>
</bean>
这样,在他的应用代码里,只要注入这个bean,比如:
@Autowired
{your_interface} myService;
Public void test(){
myService.doSth();
}
就可以使用了。
service被消费时的过程很明确,因为发布/订阅模型嘛,第一步总要去订阅,也就是去连接了。当然首先做的是初始化,在HSFSpringConsumerBean的init阶段,会把引用的HSFApiConsumerBean做bean的初始化,初始化过程就是set注入各种配置属性,当然最主要的就是上面列出的几个配置。与provider不同的是consumer不需要check什么,直接init。代码如下:
/**
* 初始化
*
* @throws Exception
*/
public void init() throws Exception {
// 避免被初始化多次
if (!inited.compareAndSet(false, true)) {
LOGGER.warn(LoggerHelper.getErrorCodeStr("hsf", "HSF-0020", "业务问题", "HSF服务:" + metadata.getUniqueName()
+ " 重复初始化!"));
return;
}
LoggerInit.initHSFLog();
AppInfoUtils.initAppName(metadata);
if (HSFServiceTargetUtil.isGeneric(metadata.getGeneric()) && metadata.getIfClazz() == null) {
metadata.setIfClazz(GenericService.class);
} else if (metadata.getIfClazz() == null) {
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("ConsumerBean中指定的接口类不存在[");
errorMsg.append(metadata.getInterfaceName()).append("].");
throw new IllegalArgumentException(errorMsg.toString());
}
if (asyncallMethods != null) {
for (String desc : asyncallMethods) {
this.parseAsyncFunc(desc);
}
}
metadata.initUniqueName();
ProcessService processService = HSFServiceContainer.getInstance(ProcessService.class);
try {
metadata.setTarget(processService.consume(metadata));
LOGGER.warn("成功生成对接口为[" + metadata.getInterfaceName() + "]版本为[" + metadata.getVersion() + "]的HSF服务调用的代理!");
} catch (Exception e) {
LOGGER.error("", "生成对接口为[" + metadata.getInterfaceName() + "]版本为[" + metadata.getVersion()
+ "]的HSF服务调用的代理失败", e);
// since 2007,一旦初始化异常就抛出
throw e;
}
int waitTime = metadata.getMaxWaitTimeForCsAddress();
if (waitTime > 0) {
try {
metadata.getCsAddressCountDownLatch().await(waitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
}
}
其中比较重要的过程就是metadata.setTarget(processService.consume(metadata));
,ServiceMetadata是个什么东西,这个在provider的时候没有讲,这里提一下,这个是HSF服务的元数据信息,包括了服务整个发布订阅过程中涉及到的各种数据元描述,包括比如:版本号、分组、服务接口名称、异步调用标记、单元化信息、动态代理时的一些方法和接口信息等等。基本贯穿在服务整个发布和调用的各种环节中。
先继续跟这个setTarget方法,target就是服务要调用的目标类,是个Object,说到底你远程服务调用要一个实例的呀。那processService.consume(metadata)就来生成这个Object。这里就又引入了ProcessService,这个在provider也引入过,这是HSF服务发布与消费总控流程,它的publish方法负责对外发布HSF服务,负责将服务调用地址注册到服务配置中心(configServer)。而consume方法就是负责生成调用远程HSF服务的代理。此代理的效果为生成ServiceMetadata中指定的interface的代理,调用时可将代理转型为服务接口,并进行直接的对象调用。代理将完成对于远程HSF的调用。
跟进到代码里看看:
@Override
public Object consume(ServiceMetadata metadata) throws HSFException {
// 首先从缓存中查找服务实例
if (ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()) != null) {
return ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName()).getProxyObject();
}
for (ProcessHookService hookService : hookServices) {
hookService.preConsume(metadata);
}
// 生成调用远程HSF服务的代理
List<Class<?>> interfaces = new ArrayList<Class<?>>(3);
if (metadata.getIfClazz() != null) {
interfaces.add(metadata.getIfClazz());
}
if (metadata.isSupportEcho()) {
interfaces.add(com.taobao.hsf.remoting.service.EchoService.class);
}
// 默认都支持Generic
if (!com.taobao.hsf.remoting.service.GenericService.class.equals(metadata.getIfClazz())) {
interfaces.add(com.taobao.hsf.remoting.service.GenericService.class);
}
Class<?>[] interfacesArray = new Class<?>[interfaces.size()];
interfaces.toArray(interfacesArray);
HSFServiceProxy proxy = new HSFServiceProxy(metadata, interfacesArray,
!HSFConstants.PROXY_STYLE_JAVASSIST.equalsIgnoreCase(metadata.getProxyStyle()));
Object proxyObj = proxy.getInstance();
// 订阅服务信息
metadataService.subscribe(metadata);
for (ProcessHookService hookService : hookServices) {
hookService.afterConsume(metadata);
}
metadataInfoStoreService.store(metadata);
return proxyObj;
}
抛开缓存和前后的hook,其实生成代理就两句话:
HSFServiceProxy proxy = new HSFServiceProxy(metadata, interfacesArray,
!HSFConstants.PROXY_STYLE_JAVASSIST.equalsIgnoreCase(metadata.getProxyStyle()));
Object proxyObj = proxy.getInstance();
而这个HSFServiceProxy的构造主要做的也就一件事:
this.instance = Proxy.newProxyInstance(serviceConsumerMetadata.getIfClazz().getClassLoader(), classes,this);
instance是HSFServiceProxy的一个Object类型的域,而HSFServiceProxy本身是实现InvocationHandler的,毕竟动态代理嘛。除了这件主要做的事情——生成instance以外,还有一个事情就是把这个instance的方法存起来:
ApplicationModel.instance().initConsumerService(metadata.getUniqueName(),
new ConsumerServiceModel(metadata, this.instance, isJava));
this.serviceModel = ApplicationModel.instance().getConsumedServiceModel(metadata.getUniqueName());
这个initConsumerService方法就是把代理对象里的所有方法拿到并存到一个map放到ConsumerServiceModel里。ConsumerServiceModel就是一个持有这个instance和methods的类。补充说明一下,这些methods是存到一个Map里的,ConsumerMethodModel是对method的一些封装,包含了methodName、method对象、parameter对象等信息。
生成了代理的对象后,具体代理调用的逻辑在哪里进行呢?就是实现InvocationHandler要override的方法嘛:invoke。就是这样一个逻辑:
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
...
ConsumerMethodModel methodModel = serviceModel.getMethodModel(method);
return this.trueInvoke(methodModel, args);
...
}
从刚才说过的ConsumerServiceModel里找到对应的ConsumerMethodModel,然后调用它:
private Object trueInvoke(ConsumerMethodModel methodModel, Object[] args) throws HSFException, Throwable {
AtomicInteger maxPoolSize = serviceConsumerMetadata.getCurConsumerMaxPoolSize();
if (maxPoolSize == null) {
return rpcProtocolService.invokeWithMethodObject(methodModel, args);
} else {
int currentSize = maxPoolSize.decrementAndGet();
try {
if (currentSize < 0) {
String errorMsg = MessageFormat.format(
"消费端线程池已满,service[{0}],consumerMaxPoolSize[{1}]",
new Object[] { serviceConsumerMetadata.getUniqueName(),
serviceConsumerMetadata.getConsumerMaxPoolSize() });
LOGGER.warn(errorMsg);
throw new RuntimeException(new HSFException(errorMsg));
} else {
return rpcProtocolService.invokeWithMethodObject(methodModel, args);
}
} finally {
maxPoolSize.incrementAndGet();
}
}
}
很清楚了,代理的调用就是
rpcProtocolService.invokeWithMethodObject(methodModel, args)
里面的rpcProtocolService是
HSFServiceContainer.getInstance(RPCProtocolTemplateService.class)
这样的单例。RPCProtocolTemplateComponent是RPCProtocolTemplateService的一个实现,也是唯一的实现,具体做的事情就是实现RPC协议调用时的共同部分,例如HSFRequest的组装,地址路由的获取、检查,监测信息的埋点,日志的处理等最后转由各RPC协议的具体实现完成远程调用。这个类在provider的时候就已经提到过,有个方法是registerProvider。在consumer阶段,就是invokeWithMethodObject方法了。这个方法里核心的代码是invoke0方法,具体逻辑如下:
// 组装HSFRequest
final HSFRequest request = new HSFRequest();
request.setTargetServiceUniqueName(serviceUniqueName);
request.setMethodName(methodName);
request.setMethodArgSigs(parameterTypes);
request.setMethodArgs(args);
request.setReturnClass(methodModel.getReturnClass());
首先拼装好request,然后就是寻址,寻址的代码在我看来是HSF里比较复杂的代码,逻辑各种兼容,毕竟这是大事,其他的目标比较直接,而这里的分支会特别多,(我就偷个懒,不去罗列各种分支情况了),我们就按默认的寻址方法来,找到目标地址:
remotingURL = selectAddress(targetUnit, metadata, serviceUniqueName, methodName,
parameterTypes, args);
具体跟踪selectAddress方法调用栈,最底层是:
List<String> addresses = this.addressPool.getArgsAddresses(serviceUniqueName, methodName, paramTypeStrs, args);
这里会列出所有的服务地址,真实的调用地址从个list选出。这个地址pool存了这些地址,什么时候存的呢?这个pool对应的类是AddressPool,被AddressService持有,在AddressService实例化的时候就被new出来,而构造逻辑里有这样一段代码:
public AddressPool(String unitName) {
this.unitName = unitName;
ExecutorService addressAndRuleExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(
HSFThreadNameSpace.getUnitThreadPoolName(unitName)));
addressAndRuleExecutor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
if (signalBell.poll(100, TimeUnit.SECONDS) != null) {
for (AddressBucket addressBucket : pool.values()) {
addressBucket.refreshAddress();
}
}
} catch (Throwable e) {
LoggerInit.LOGGER.error("", "[address pool] Refresh ", e);
} finally {
// 先去掉这个逻辑,使地址推送能及时生效
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// }
}
}
}
});
}
这下明白了,pool启动的时候就起了一个线程去监听一个信号队列,收到信号的时候就让pool里的AddressBucket刷新地址。
最后发起调用:
appResponse = rpcService.invoke(request, methodModel, remotingURL);
如此,一个HSF的远程调用例子就从生产发布到订阅消费讲完了。
等等,好像consume还漏了个事情,你怎么知道rpc去连哪个地址,刚才AddressService那还没讲清楚呢。对了,回到最早的consume方法,好像生成代理Object然后实现invoke方法后,就忘了还有subscribe的事情了,看看
// 订阅服务信息
metadataService.subscribe(metadata);
这句话,一个大事还没讲到呢。provider生产阶段在产生本地服务后,要把服务注册到远程的配置中心ConfigServer。同样的道理,consumer构造好代理bean后,要订阅ConfigServer的服务。要不然刚才提到的远程地址就是空的呀。subscribe方法描述了整个订阅的过程。跟踪subscribe方法的最终结果如下:
@Override
public void subscribe(final ServiceMetadata metadata, final AddressSubscribeListener listener) {
...
final String group = metadata.getGroup();
final String cs_subscriberId = SUBSCRIBER_PREFIX + serviceUniqueName;
SubscriberRegistration cs_registration = new SubscriberRegistration(cs_subscriberId, uniqueName);
cs_registration.setGroup(group);
Subscriber subscriber = SubscriberRegistrar.register(cs_registration);
subscribers.put(serviceUniqueName, subscriber);
subscriber.setDataObserver(new SubscriberDataObserver() {
@Override
public void handleData(String dataId, final List<Object> datas) {
List<String> urls = new ArrayList<String>();
for (Object url : datas) {
if (((String) url).startsWith("dubbo://")) {
urls.add(((String) url).substring(8));
} else if (((String) url).startsWith("hsf://")) {
urls.add(((String) url).substring(6));
} else {
urls.add((String) url);
}
}
...
if (!urls.isEmpty()) {
List<String> formattedUrls = new ArrayList<String>();
for (Object serviceUrl : urls) {
if (!StringUtils.isBlank((String) serviceUrl)) {
formattedUrls.add(HSFServiceTargetUtil.formatTargetURL((String) serviceUrl));
}
}
listener.processAddress(metadata, formattedUrls);
} else {
...
}
...
}
});
}
这里的主要逻辑是Subscriber subscriber = SubscriberRegistrar.register(cs_registration),干的事情就是通过注册信息拿到订阅者这样一个对象——subscriber。接着添加一个observer,做listener.processAddress(metadata, formattedUrls)这样的事情,那listener还是metadataService,回去看代码:
@Override
public void processAddress(ServiceMetadata metadata, List<String> urls) {
if ((null != urls && !urls.isEmpty())) {
String serviceUniqueName = metadata.getUniqueName();
addressService.setServiceAddresses(serviceUniqueName, urls);
processUnitConfig(metadata, urls);
if (unitAddressService != null) {
unitAddressService.setServiceAddresses(serviceUniqueName, urls);
}
}
}
这样结论明显了,订阅完了ConfigServer后,把observer观察到的地址set到addressService里,如此就回答了上面的远程服务地址的问题。
so far,一个完整的HSF的发布订阅过程就整理出来了,当然这只是HSF众多支持特性中的一例,还有很多其他的方法去发布订阅,同时HSF也在缓存优化、连接优化、性能监控等多方面有代码加强,这篇文章就不涉及了。
服务框架是一个说起来很轻,做起来很重的东西,涉及到客户端配置、远程连接、配置中心、服务路由、容灾多方面的事情。不是简单的说反射调用
能解决的。对于HSF,是我在淘宝第一个接触的系统,也是目前阿里巴巴集团RPC框架的标准,效果是拿得出的,但是代码的优化,我个人认为还有很多可做的事情。