10、Dubbo源码解析:dubbo服务的完整调用过程

本文深入剖析了Apache Dubbo分布式RPC框架的完整服务调用过程,详细解读了服务消费方发起请求、编码、提供方解码、调用服务、返回结果及消费方接收结果六个阶段的调用栈和核心源码,帮助你彻底理解Dubbo的底层实现机制。

参考文献dubbo官网-dubbo服务的完整调用过程

感受学习业内优秀开源分布式框架的底层rpc实现。

调用过程大致可以分为六个阶段,这里只贴出服务调用各个阶段的调用栈进行备忘,详细源码分析请点击原文链接进行阅读

1服务消费方(dubbo-consumer)发布请求
调用栈

proxy0#sayHello(String)
  > InvokerInvocationHandler#invoke(Object, Method, Object[])
    > MockClusterInvoker#invoke(Invocation)
      > AbstractClusterInvoker#invoke(Invocation)
        > FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          > Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            > ListenerInvokerWrapper#invoke(Invocation) 
              > AbstractInvoker#invoke(Invocation) 
                > DubboInvoker#doInvoke(Invocation)
                  > ReferenceCountExchangeClient#request(Object, int)
                    > HeaderExchangeClient#request(Object, int)
                      > HeaderExchangeChannel#request(Object, int)
                        > AbstractPeer#send(Object)
                          > AbstractClient#send(Object, boolean)
                            > NettyChannel#send(Object, boolean)
                              > NioClientSocketChannel#write(Object)

2请求(request)编码
dubbo数据包
 
请求头

偏移量(Bit)字段取值
0 ~ 7魔数高位0xda00
8 ~ 15魔数低位0xbb
16数据包类型0 - Response, 1 - Request
17调用方式仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18事件标识0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23序列化器编号2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31状态20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE …
32 ~ 95请求编号共8字节,运行时生成
96 ~ 127消息体长度运行时计算

调用栈

ExchangeCodec#encode(Channel channel, ChannelBuffer buffer, Object msg)
	->ExchangeCodec#encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
		->DubboCodec#encodeRequestData(Channel channel, ObjectOutput out, Object data, String version)

3服务提供方(dubbo-provider)解码请求
调用栈

ExchangeCodec#decode(Channel channel, ChannelBuffer buffer)
	->ExchangeCodec#decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
		->DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
			->DecodeableRpcInvocation#decode

4服务提供方调用服务
提供方处理请求的线程模型
解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。

调用栈

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  > AbstractPeer#received(Channel, Object)
    > MultiMessageHandler#received(Channel, Object)
      > HeartbeatHandler#received(Channel, Object)
        > AllChannelHandler#received(Channel, Object)
          > ExecutorService#execute(Runnable)    // 由线程池执行后续的调用逻辑

在不同的子线程里进行实际的服务调用,整个调用栈如下

ChannelEventRunnable#run()
  > DecodeHandler#received(Channel, Object)
    > HeaderExchangeHandler#received(Channel, Object)
      > HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        > DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          > Filter#invoke(Invoker, Invocation)
            > AbstractProxyInvoker#invoke(Invocation)
              > Wrapper0#invokeMethod(Object, String, Class[], Object[])
                > DemoServiceImpl#sayHello(String)

5服务提供方返回调用结果(response)
服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。

调用栈

ExchangeCodec#(Channel channel, ChannelBuffer buffer, Object msg)
	->ExchangeCodec#encodeResponse(Channel channel, ChannelBuffer buffer, Response res)
		->DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data, String version)

6服务消费方接收调用结果
服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。

响应数据解码-调用栈

DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
	->DecodeableRpcResult#DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
		->DecodeableRpcResult#decode()
			->DecodeableRpcResult#decode(Channel channel, InputStream input)

解码接收到的数据的线程与rpc请求最初发起的线程必定不是同一个线程,所以最后要解决的问题就是如何将调用结果传递给用户线程。dubbo设计了一个类似于 java.util.concurrent.Future的ResponseFuture(具体实现类为DefaultFuture)。使用ReentrantLock进行线程通讯。在用户线程发起请求时会调用condition.awit()对用户线程进行阻塞,在接收到响应结果后反序列化并塞入DefaultFuture的response字段,此时调用condition.signal()唤醒用户线程,用户线程便可拿到序列化后的结果。更具体的实现推荐直接阅读源码,这里只给出调用栈。

序列化结果传递-调用栈

HeaderExchangeHandler#received(Channel channel, Object message)
	->HeaderExchangeHandler#handleResponse(Channel channel, Response response)
		->DefaultFuture#received(Channel channel, Response response)
			->DefaultFuture#doReceived(Response res)

一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:
 

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: