Dubbo源码分析(一):消费端


声明:本文转载自https://my.oschina.net/jzgycq/blog/1584578,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

通观全部Dubbo代码,有两个很重要的对象就是Invoker和Exporter,Dubbo会根据用户配置的协议调用不同协议的Invoker,再通过ReferenceFonfig将Invoker的引用关联到Reference的ref属性上提供给消费端调用。当用户调用一个Service接口的一个方法后由于Dubbo使用javassist动态代理,会调用Invoker的Invoke方法从而初始化一个RPC调用访问请求访问服务端的Service返回结果。下面我们就从Comsumer端开始逐步解析这个框架。

    Dubbo首先使用com.alibaba.dubbo.config.spring.schema.NamespaceHandler注册解析器,当spring解析xml配置文件时就会调用这些解析器生成对应的BeanDefinition交给spring管理:

public class DubboNamespaceHandler extends NamespaceHandlerSupport {      static {         Version.checkDuplicate(DubboNamespaceHandler.class);     }      public void init() {         registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));         registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));         registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));         registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));         registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));         registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));         registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));         registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));         registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));         registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));     }  }

    Spring在初始化IOC容器时会利用这里注册的BeanDefinitionParser的parse方法获取对应的ReferenceBean的BeanDefinition实例,由于ReferenceBean实现了InitializingBean接口,在设置了bean的所有属性后会调用afterPropertiesSet方法:

public void afterPropertiesSet() throws Exception {     if (getConsumer() == null) {         Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);         if (consumerConfigMap != null && consumerConfigMap.size() > 0) {             ConsumerConfig consumerConfig = null;             for (ConsumerConfig config : consumerConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (consumerConfig != null) {                         throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);                     }                     consumerConfig = config;                 }             }             if (consumerConfig != null) {                 setConsumer(consumerConfig);             }         }     }     if (getApplication() == null             && (getConsumer() == null || getConsumer().getApplication() == null)) {         Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);         if (applicationConfigMap != null && applicationConfigMap.size() > 0) {             ApplicationConfig applicationConfig = null;             for (ApplicationConfig config : applicationConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (applicationConfig != null) {                         throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);                     }                     applicationConfig = config;                 }             }             if (applicationConfig != null) {                 setApplication(applicationConfig);             }         }     }     if (getModule() == null             && (getConsumer() == null || getConsumer().getModule() == null)) {         Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);         if (moduleConfigMap != null && moduleConfigMap.size() > 0) {             ModuleConfig moduleConfig = null;             for (ModuleConfig config : moduleConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (moduleConfig != null) {                         throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);                     }                     moduleConfig = config;                 }             }             if (moduleConfig != null) {                 setModule(moduleConfig);             }         }     }     if ((getRegistries() == null || getRegistries().size() == 0)             && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().size() == 0)             && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {         Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);         if (registryConfigMap != null && registryConfigMap.size() > 0) {             List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();             for (RegistryConfig config : registryConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     registryConfigs.add(config);                 }             }             if (registryConfigs != null && registryConfigs.size() > 0) {                 super.setRegistries(registryConfigs);             }         }     }     if (getMonitor() == null             && (getConsumer() == null || getConsumer().getMonitor() == null)             && (getApplication() == null || getApplication().getMonitor() == null)) {         Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);         if (monitorConfigMap != null && monitorConfigMap.size() > 0) {             MonitorConfig monitorConfig = null;             for (MonitorConfig config : monitorConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (monitorConfig != null) {                         throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);                     }                     monitorConfig = config;                 }             }             if (monitorConfig != null) {                 setMonitor(monitorConfig);             }         }     }     Boolean b = isInit();     if (b == null && getConsumer() != null) {         b = getConsumer().isInit();     }     if (b != null && b.booleanValue()) {         getObject();     } }

      这步其实是Reference确认生成Invoker所需要的组件是否已经准备好,都准备好后我们进入生成Invoker的部分。这里的getObject会调用父类ReferenceConfig的init方法完成组装:

public synchronized T get() {     if (destroyed) {         throw new IllegalStateException("Already destroyed!");     }     if (ref == null) {         init();     }     return ref; }
private void init() {     if (initialized) {         return;     }     initialized = true;     if (interfaceName == null || interfaceName.length() == 0) {         throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");     }     // 获取消费者全局配置     checkDefault();     appendProperties(this);     if (getGeneric() == null && getConsumer() != null) {         setGeneric(getConsumer().getGeneric());     }     if (ProtocolUtils.isGeneric(getGeneric())) {         interfaceClass = GenericService.class;     } else {         try {             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()                     .getContextClassLoader());         } catch (ClassNotFoundException e) {             throw new IllegalStateException(e.getMessage(), e);         }         checkInterfaceAndMethods(interfaceClass, methods);     }     String resolve = System.getProperty(interfaceName);     String resolveFile = null;     if (resolve == null || resolve.length() == 0) {         resolveFile = System.getProperty("dubbo.resolve.file");         if (resolveFile == null || resolveFile.length() == 0) {             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");             if (userResolveFile.exists()) {                 resolveFile = userResolveFile.getAbsolutePath();             }         }         if (resolveFile != null && resolveFile.length() > 0) {             Properties properties = new Properties();             FileInputStream fis = null;             try {                 fis = new FileInputStream(new File(resolveFile));                 properties.load(fis);             } catch (IOException e) {                 throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);             } finally {                 try {                     if (null != fis) fis.close();                 } catch (IOException e) {                     logger.warn(e.getMessage(), e);                 }             }             resolve = properties.getProperty(interfaceName);         }     }     if (resolve != null && resolve.length() > 0) {         url = resolve;         if (logger.isWarnEnabled()) {             if (resolveFile != null && resolveFile.length() > 0) {                 logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");             } else {                 logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");             }         }     }     if (consumer != null) {         if (application == null) {             application = consumer.getApplication();         }         if (module == null) {             module = consumer.getModule();         }         if (registries == null) {             registries = consumer.getRegistries();         }         if (monitor == null) {             monitor = consumer.getMonitor();         }     }     if (module != null) {         if (registries == null) {             registries = module.getRegistries();         }         if (monitor == null) {             monitor = module.getMonitor();         }     }     if (application != null) {         if (registries == null) {             registries = application.getRegistries();         }         if (monitor == null) {             monitor = application.getMonitor();         }     }     checkApplication();     checkStubAndMock(interfaceClass);     Map<String, String> map = new HashMap<String, String>();     Map<Object, Object> attributes = new HashMap<Object, Object>();     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);     map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));     if (ConfigUtils.getPid() > 0) {         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));     }     if (!isGeneric()) {         String revision = Version.getVersion(interfaceClass, version);         if (revision != null && revision.length() > 0) {             map.put("revision", revision);         }          String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();         if (methods.length == 0) {             logger.warn("NO method found in service interface " + interfaceClass.getName());             map.put("methods", Constants.ANY_VALUE);         } else {             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));         }     }     map.put(Constants.INTERFACE_KEY, interfaceName);     appendParameters(map, application);     appendParameters(map, module);     appendParameters(map, consumer, Constants.DEFAULT_KEY);     appendParameters(map, this);     String prifix = StringUtils.getServiceKey(map);     if (methods != null && methods.size() > 0) {         for (MethodConfig method : methods) {             appendParameters(map, method, method.getName());             String retryKey = method.getName() + ".retry";             if (map.containsKey(retryKey)) {                 String retryValue = map.remove(retryKey);                 if ("false".equals(retryValue)) {                     map.put(method.getName() + ".retries", "0");                 }             }             appendAttributes(attributes, method, prifix + "." + method.getName());             checkAndConvertImplicitConfig(method, map, attributes);         }     }      //     String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);     if (hostToRegistry == null || hostToRegistry.length() == 0) {         hostToRegistry = NetUtils.getLocalHost();     } else if (isInvalidLocalHost(hostToRegistry)) {         throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);     }     map.put(Constants.REGISTER_IP_KEY, hostToRegistry);      //attributes通过系统context进行存储.     StaticContext.getSystemContext().putAll(attributes);     ref = createProxy(map); }
private T createProxy(Map<String, String> map) {     URL tmpUrl = new URL("temp", "localhost", 0, map);     final boolean isJvmRefer;     if (isInjvm() == null) {         if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用             isJvmRefer = false;         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {             //默认情况下如果本地有服务暴露,则引用本地服务.             isJvmRefer = true;         } else {             isJvmRefer = false;         }     } else {         isJvmRefer = isInjvm().booleanValue();     }      if (isJvmRefer) {         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);         invoker = refprotocol.refer(interfaceClass, url);         if (logger.isInfoEnabled()) {             logger.info("Using injvm service " + interfaceClass.getName());         }     } else {         if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);             if (us != null && us.length > 0) {                 for (String u : us) {                     URL url = URL.valueOf(u);                     if (url.getPath() == null || url.getPath().length() == 0) {                         url = url.setPath(interfaceName);                     }                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));                     } else {                         urls.add(ClusterUtils.mergeUrl(url, map));                     }                 }             }         } else { // 通过注册中心配置拼装URL             List<URL> us = loadRegistries(false);             if (us != null && us.size() > 0) {                 for (URL u : us) {                     URL monitorUrl = loadMonitor(u);                     if (monitorUrl != null) {                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));                     }                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));                 }             }             if (urls == null || urls.size() == 0) {                 throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");             }         }          if (urls.size() == 1) {             invoker = refprotocol.refer(interfaceClass, urls.get(0));         } else {             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();             URL registryURL = null;             for (URL url : urls) {                 invokers.add(refprotocol.refer(interfaceClass, url));                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {                     registryURL = url; // 用了最后一个registry url                 }             }             if (registryURL != null) { // 有 注册中心协议的URL                 // 对有注册中心的Cluster 只用 AvailableCluster                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);                 invoker = cluster.join(new StaticDirectory(u, invokers));             } else { // 不是 注册中心的URL                 invoker = cluster.join(new StaticDirectory(invokers));             }         }     }      Boolean c = check;     if (c == null && consumer != null) {         c = consumer.isCheck();     }     if (c == null) {         c = true; // default true     }     if (c && !invoker.isAvailable()) {         throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());     }     if (logger.isInfoEnabled()) {         logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());     }     // 创建服务代理     return (T) proxyFactory.getProxy(invoker); }

    至此Reference在关联了所有application、module、consumer、registry、monitor、service、protocol后调用对应Protocol类的refer方法生成InvokerProxy。当用户调用service时dubbo会通过InvokerProxy调用Invoker的invoke的方法向服务端发起请求。客户端就这样完成了自己的初始化。

本文发表于2017年12月04日 18:39
(c)注:本文转载自https://my.oschina.net/jzgycq/blog/1584578,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2078 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1