分类目录归档:Dubbo源码分析

Dubbo的客户端负载均衡关键过程

String applicationConfig = "consumer-applicationContext.xml";
ApplicationContext context = new ClassPathXmlApplicationContext(applicationConfig);
UserInfoService userInfoService = (UserInfoService) context.getBean("userInfoService");
//负载均衡的处理是一个典型的懒加载模式,只有在第一次调用接口时,才进行负载均衡的处理。
System.out.println(userInfoService.sayHello("zhangsan"));

1.在进入sayHello的调用时,堆栈信息和帧代码如下


其中Constants.LOADBALANCE_KEY的值是“loadbalance”,而Constants.DEFAULT_LOADBALANCE的值是“random”,每个接口(reference)均可以灵活配置一个均衡方式,默认不配置的情况下都是random的。

不配置loadbalance时,是random的模式。
<dubbo:reference id="userInfoService" interface="com.kxtry.dubbo.service.UserInfoService" loadbalance="leastactive"></dubbo:reference>

2.假如当前的loadbalance设置了leastactive时,它是如何把名字和类LeastActiveLoadBalance的实例关联起来呢?
3.首先它在com.alibaba.dubbo.rpc.cluster.loadbalance目录下实现了4种均衡方式,如下:

ConsistentHashLoadBalance:一致哈希
LeastActiveLoadBalance:最久不使用
RandomLoadBalance:随机
RoundRobinLoadBalance:顺序循环

注:上1点中的loadbalance返回的则是上述的其中一个均衡策略类实例。
4.其它dubbo实现了一套类似spi的服务加载机制如下:

private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                try {
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

5.每个均衡策略类均实现了select接口,通过该接口从多个可用的后端服务中返回其中一个服务。

<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

Dubbo客户端的RPC构造关键过程

1.Dubbo的客户端请求配置和调用代码如下:

    <dubbo:reference id="userInfoService" interface="com.kxtry.dubbo.service.UserInfoService"></dubbo:reference>
    <dubbo:reference id="boyInfoService" interface="com.kxtry.dubbo.service.BoyInfoService"></dubbo:reference>
    <dubbo:reference id="girlInfoService" interface="com.kxtry.dubbo.service.GirlInfoService"></dubbo:reference>
        String applicationConfig = "consumer-applicationContext.xml";
        ApplicationContext context = new ClassPathXmlApplicationContext(applicationConfig);

注:在上一文章,曾介绍xml表的命名空间间解析过程,而dubbo:service是服务端配置,而dubbo:reference是客户端配置。
2.构造beanmap列表时,每一行dubbo::refence对应一个ReferenceBean配置。Reference会创建三个对象,如下:
FailoverClusterInvoker:失败转移使用,如服务器宕机等,是从com.alibaba.dubbo.common.Node接口派生的。
MockClusterInvoker:如其名,接口集群化,同一个接口,存在多个服务器为其提供服务支持时,需要一个均衡策略地访问这些服务器。
InvokerInvocationHandler:这个接口是真正的动态代理,它是从InvocationHandler中派生出来的,Failover和MockCluster都是从Node接口派生,用于管理集群的。
3.如下调用代码

UserInfoService userInfoService = (UserInfoService) context.getBean("userInfoService");
System.out.println(userInfoService.sayHello("zhangsan"));


4.如下代码,通过元数据RpcInvocation打包函数名和函数参数,然后通过RPC通信把请求发送到服务器端。

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

Dubbo的xml命名空间解析关键

在Spring中的applicationContext.xml经常看到如下标签

<dubbo:service ref="userInfoService" interface="com.kxtry.dubbo.service.UserInfoService" />
<dubbo:protocol name="dubbo" port="20880"></dubbo:protocol>
<dubbo:registry address="zookeeper://mysql.vmware.com:2181" ></dubbo:registry>
<dubbo:application name="com.kxtry.dubbo.demo"/>

1.Dubbo创建了一个dubbo.xsd文件。
2.并为dubbo.xsd创建了另外两个文件:spring.handlers和spring.schemas(这是Spring的命名解析规范要求),Spring在解析过程中会主动搜索这两资源文件。

3.spring.handlers文件内容如下

http://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

4.spring.schemas文件内容如下

http://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

5.在执行下面代码时,会主动触发相关文件解析。

String configure = "provider-applicationContext.xml";
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(configure);



6.具体如下:

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
	static {
		Version.checkDuplicate(DubboNamespaceHandler.class);
	}
	public void init() {
	    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }
}

7.上述步骤,已经完成了beanmap列表的建立,但仍需要下述代码,触发Dubbo的建立RPC服务,其中xxxConfig是在xxxBean之前先创建的。

context.start();
context.registerShutdownHook();

8.在6步骤中的的ServiceBean中监听了ApplicationEvent如下:

public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
        	if (isDelay() && ! isExported() && ! isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                会把xxxConfig的内容,初始化ServiceBean里的对象。
                export();
            }
        }
    }

9.serviceBean是多个的,每个service对象,对应一个,故每个对象都监听了ApplicationEvent。

<dubbo:service ref="userInfoService" interface="com.kxtry.dubbo.service.UserInfoService" />
    <dubbo:service ref="boyInfoService" interface="com.kxtry.dubbo.service.BoyInfoService" />
    <dubbo:service ref="girlInfoService" interface="com.kxtry.dubbo.service.GirlInfoService" />
    <bean id="userInfoService" class="com.kxtry.dubbo.service.impl.UserInfoServiceImpl" />
    <bean id="girlInfoService" class="com.kxtry.dubbo.service.impl.GirlInfoServiceImpl" />
    <bean id="boyInfoService" class="com.kxtry.dubbo.service.impl.BoyInfoServiceImpl" />