基于TCP的远程服务调用


声明:本文转载自https://my.oschina.net/marvelcode/blog/2135740,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

前言

    上篇,分析了基于HTTP方式的RPC调用。本篇将在上篇的基础上,分析基于TCP方式的RPC调用。代码的整体思路是一致的,可以看作是在上篇功能上的扩展——即通信的方式。

    代码:https://gitee.com/marvelcode/marvelcode-rpc.git

 

源码

    上篇,在服务消费方,提到过一个 ReferenceAgent 的抽象,可看作是远程服务的代理对象。即通过 JDK 动态代理所有接口方法,进而在 invoke 中使用 RestTemplate 发起HTTP请求并获取响应,接口最重要的方法定义如下图:

package com.menghao.rpc.consumer.handle;

import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.util.RequestIdUtils;

import java.lang.reflect.Method;
import java.util.List;

/**
 * <p>Rpc框架消费方代理.<br>
 * <p>执行具体调用,子类实现Http、Tcp方式的Rpc请求<p/>
 * <p>需要保证线程安全(会被并发调用)</p>
 *
 * @author MarvelCode.
 */
public interface ReferenceAgent {

    /**
     * 通过rpc方式调用处理
     *
     * @param method 调用方法
     * @param args   调用参数
     * @return Object 调用结果
     */
    Object invoke(Method method, Object[] args);

}

    同样的,扩展 TCP 方式调用就需要从此处着手,来扩展新的实现,以发送TCP包给服务提供方。其余的,像对象代理赋值,以及ZooKeeper节点的监听,都可以复用。由此也可以看出,面向接口编程的优势:屏蔽实现,以最小化的代价切换、扩展方案。

    那么就来看下子类 TcpReferenceAgent 的实现:

package com.menghao.rpc.consumer.handle.tcp;

import com.menghao.rpc.consumer.balance.LoadBalancer;
import com.menghao.rpc.consumer.balance.RandomLoadBalancer;
import com.menghao.rpc.consumer.handle.ReferenceAgent;
import com.menghao.rpc.consumer.model.ReferenceKey;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.exception.InvokeException;
import com.menghao.rpc.netty.TcpConnectionContainer;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.spring.BeansManager;
import lombok.Getter;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * <p>ReferenceAgent T方式实现.</br>
 * <p>调用原始接口的任意方法会被该类的invoke方法代理:使用Netty发送请求</p>
 * <p>sourceInterface/implCode:唯一标识一个服务</p>
 *
 * @author MarvelCode
 */
public class TcpReferenceAgent implements ReferenceAgent {

    @Getter
    private Class sourceInterface;
    @Getter
    private String implCode;

    private List<String> providerHosts;

    private TcpConnectionContainer tcpConnectionContainer;

    private LoadBalancer defaultBalancer = new RandomLoadBalancer();

    private ReadWriteLock lock = new ReentrantReadWriteLock();

    public TcpReferenceAgent(ReferenceKey referenceKey) {
        this.sourceInterface = referenceKey.getSourceInterface();
        this.implCode = referenceKey.getName();
        this.tcpConnectionContainer = BeansManager.getInstance().getBeanByType(TcpConnectionContainer.class);
        this.providerHosts = new ArrayList<>();
    }

    @Override
    public Object invoke(Method method, Object[] args) {
        // 构造请求参数
        RpcRequest rpcRequest = makeParam(method, args);
        // 负载均衡选取Tcp连接
        TcpConnection tcpConnection = select();
        // 构建调用上下文
        InvocationContext invocationContext = new InvocationContext(tcpConnection, rpcRequest);
        // 执行调用
        invocationContext.execute();
        // 阻塞获取结果
        return invocationContext.get();
    }

    @Override
    public void setProviderHosts(List<String> hosts) {
        lock.writeLock().lock();
        try {
            // 刷新Tcp连接
            refreshConnection(providerHosts, hosts);
            this.providerHosts = hosts;
        } finally {
            lock.writeLock().unlock();
        }
    }


    private TcpConnection select() {
        lock.readLock().lock();
        try {
            if (providerHosts == null || providerHosts.size() == 0) {
                throw new InvokeException("There are currently no service providers available");
            }
            // 负载均衡
            String host = defaultBalancer.select(providerHosts);
            String[] info = host.split(":");
            return tcpConnectionContainer.get(info[0], Integer.valueOf(info[1]));
        } finally {
            lock.readLock().unlock();
        }
    }

    private void refreshConnection(List<String> lastHost, List<String> nowHost) {
        Set<String> commonHost = new HashSet<>(lastHost);
        // 当前存活机器与上次存活机器交集
        commonHost.retainAll(nowHost);
        Set<String> lostHost = new HashSet<>(lastHost);
        Set<String> addHost = new HashSet<>(nowHost);
        // 当前存活机器与交集的差集,得出新增的机器
        addHost.removeAll(commonHost);
        // 上次存活机器与交集的差集,得出下线的机器
        lostHost.removeAll(commonHost);
        // 下线的机器,将关闭并移除Tcp连接
        for (String host : lostHost) {
            String[] info = host.split(":");
            tcpConnectionContainer.remove(info[0], Integer.valueOf(info[1]));
        }
        // 新增的机器,将新建Tcp连接
        for (String host : addHost) {
            String[] info = host.split(":");
            tcpConnectionContainer.register(info[0], Integer.valueOf(info[1]));
        }
    }

}

    从逻辑上看,分为构造参数、挑选请求机器信息、构造调用上下文、执行,返回结果五步。其中构造参数借助了 JDK8 特性,将 RpcRequest请求实体的构造,放在了接口层实现。其余的步骤引入了几个新的模型对象:

  • TcpConnection:通过 Netty 实现的,服务消费方和服务提供方的长连接。针对一台远程服务器,该连接全局唯一。
    • 举个例子,服务A和服务B部署在相同的两台机器上,那么消费方将持有两条Tcp长连接(针对服务部署的机器,而非服务),调用服务A和B将复用这两条连接。
  • TcpConnectionContainer:上述连接的容器,管理着连接的生命周期。服务的启停会触发连接的打开、关闭。
  • InvocationContext:一次调用的上下文环境(伴随调用的生命周期)。存储了请求数据及响应结果,重要的职责在于回写结果。

    介绍完 TcpReferenceAgent ,我就以连接创建——发起请求——请求处理——响应处理这四步来分析如何实现。

 

连接创建

    首先,来看看连接的创建过程吧。我的实现采用了 Netty 作为客户端与服务端通信的基础。没Netty基础的小伙伴需要自行补下课了。

    public TcpConnection(String ip, int port) throws InterruptedException {
        this.close = new AtomicBoolean(true);
        TcpClient tcpClient = BeansManager.getInstance().getBeanByType(TcpClient.class);
        ChannelFuture future = tcpClient.connect(ip, port);
        this.channel = future.channel();
        this.close.set(false);
    }
package com.menghao.rpc.netty;

import com.menghao.rpc.NamedThreadFactory;
import com.menghao.rpc.netty.in.TcpInboundHandler;
import com.menghao.rpc.netty.in.TcpMessageDecoder;
import com.menghao.rpc.netty.out.TcpMessageEncoder;
import com.menghao.rpc.provider.model.RpcResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.List;

/**
 * <p>借助Netty实现TCP客户端.<br>
 *
 * @author MarvelCode.
 */
public class TcpClient {

    private static EventLoopGroup workerGroup;

    private Bootstrap bootstrap;

    private int connectTimeout;

    private int maxFrameLength;

    private int readIdle;

    private int writIdle;

    private List<TcpMessageHandler> messageHandlers;

    public TcpClient(int connectTimeout, int maxFrameLength, int readIdle, int writIdle, List<TcpMessageHandler> messageHandlers) {
        this.connectTimeout = connectTimeout;
        this.maxFrameLength = maxFrameLength;
        this.readIdle = readIdle;
        this.writIdle = writIdle;
        this.messageHandlers = messageHandlers;
    }

    static {
        // 当jvm关闭的时候执行addShutdownHook添加的钩子方法
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                workerGroup.shutdownGracefully();
            }
        });
    }

    public void initBootstrap() {
        bootstrap = new Bootstrap();
        workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("netty-server-io", true));
        int ct = connectTimeout > 0 ? this.connectTimeout : 5000;
        bootstrap.group(workerGroup)
                // 连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ct)
                // 是否使用Nagle的算法以尽可能发送大块数据
                .option(ChannelOption.TCP_NODELAY, true)
                // 是否启动心跳保活机制(长连接)
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 是否允许一个地址重复绑定
                .option(ChannelOption.SO_REUSEADDR, true)
                // 基于内存池的缓冲区重用机制
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // 客户端需要序列化 rpcRequest、反序列化 rpcResponse
                        ch.pipeline().addLast(new TcpMessageEncoder())
                                .addLast(new TcpMessageDecoder(maxFrameLength, RpcResponse.class))
                                .addLast(new IdleStateHandler(readIdle, writIdle, 0))
                                .addLast(new TcpInboundHandler(messageHandlers));

                    }
                });
    }

    public ChannelFuture connect(String ip, int port) throws InterruptedException {
        return bootstrap.connect(ip, port).sync();
    }
}

    其中 TcpClient 的初始化时机,借助了 Spring Bean生命周期的 init-method

        @Bean(initMethod = "initBootstrap")
        public TcpClient tcpClient() {
            List<TcpMessageHandler> messageHandlers = new ArrayList<>(2);
            messageHandlers.add(new RpcRequestMsgHandler());
            messageHandlers.add(new RpcResponseMsgHandler());
            return new TcpClient(tcpProperties.getConnectTimeout(),
                    tcpProperties.getMaxFrameLength(),
                    tcpProperties.getReadIdle(),
                    tcpProperties.getWritIdle(),
                    messageHandlers);
        }

    在进行了一些参数的设置之后,就可以调用 connect 方法来创建一条指定ip、端口的连接。最终会使用 io.netty.channel.Channel 对象来进行数据的传输通信。

    其中 ChannelOption.SO_KEEPALIVE 参数设置为 true,来保持长连接。之后会往 ChannelPipeline 中添加 ChannelHandler 处理类。这里包含了信息的编解码Handler、心跳检测Handler,TcpInboundHandler 是对解码后的 RpcRequest/RpcResponse 数据进行处理。

    请求创建好之后,接下来就该发起请求了。    

 

发起请求

    请求的主要工作集中在了 InvocationContext 这个类中,同时这个类还包含了响应数据的回写。利用锁机制,通过响应 result 是否为空来判断一次调用是否已完成,在调用 get 方法时,调用未完成前会一直阻塞,直到 result 结果被回写。

    如果响应的 RpcResponse 是正常响应的结果,就返回;如果是异常,则抛出 :

package com.menghao.rpc.consumer.handle.tcp;

import com.menghao.rpc.consumer.model.Future;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.exception.InvokeException;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.provider.model.RpcResponse;
import com.menghao.rpc.spring.BeansManager;
import lombok.Getter;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>一次调用请求上下文.<br>
 * <p>存放请求及结果,伴随一次调用的生命周期</p>
 *
 * @author MarvelCode.
 */
public class InvocationContext implements Future {

    private InvocationContextContainer invocationContextContainer;

    @Getter
    private RpcRequest rpcRequest;

    private TcpConnection tcpConnection;

    /** 异步调用锁,在结果返回时解锁 */
    private Lock lock = new ReentrantLock();
    private Condition doneCondition = lock.newCondition();

    /** 预防调用无返回值的判断 */
    private static final Object NULL_OBJECT = new Object();

    private Object result;

    InvocationContext(TcpConnection tcpConnection, RpcRequest rpcRequest) {
        this.tcpConnection = tcpConnection;
        this.rpcRequest = rpcRequest;
        invocationContextContainer = BeansManager.getInstance().getBeanByType(InvocationContextContainer.class);
    }

    public void execute() {
        invocationContextContainer.add(this);
        tcpConnection.write(rpcRequest);
    }

    @Override
    public Object get() {
        if (!isDone()) {
            try {
                lock.lock();
                if (!isDone()) {
                    doneCondition.await();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                lock.unlock();
            }
        }
        if (result instanceof Throwable) {
            throw new InvokeException((Throwable) result);
        }
        return result == NULL_OBJECT ? null : result;
    }

    @Override
    public boolean isDone() {
        return result != null;
    }

    public void notifyCompleted(RpcResponse rpcResponse) {
        lock.lock();
        try {
            if (rpcResponse.getThrowable() != null) {
                setResult(rpcResponse.getThrowable());
            }
            setResult(rpcResponse.getResult());
            doneCondition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    private void setResult(Object value) {
        lock.lock();
        try {
            result = (value == null) ? NULL_OBJECT : value;
            doneCondition.signalAll();
        } finally {
            lock.unlock();
        }
        invocationContextContainer.remove(rpcRequest.getId());
    }
}

    这里的 execute 方法,首先将自身(InvocationContext放置到容器中,因为通过管道写完数据流后,不会立马读取到响应流,所以需要有一个唯一标识(标识请求-响应的对应关系),这样在反序列化得到响应数据时,就可以找到对应请求的 InvocationContext 来回写数据了。

    这里生成唯一标识的规则可自定义,即保证一段时间内不会出现重复的即可(并发量越大,越需要考量唯一标识生成的唯一性)。

    当执行了 tcpConnection.write(rpcRequest) 这行代码时,会进而调用 channel.writeAndFlush(rpcRequest),将数据写入管道:

package com.menghao.rpc.netty.out;

import com.menghao.rpc.serialize.ObjectOutput;
import com.menghao.rpc.serialize.hessian.HessianObjectOutput;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.io.OutputStream;

/**
 * <p>Tcp信息编码类.</br>
 *
 * @author MarvelCode
 */
public class TcpMessageEncoder extends MessageToByteEncoder<Object> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        OutputStream outputStream = new ByteBufOutputStream(byteBuf);
        // 预占,回写包长度
        byteBuf.writeInt(0);
        ObjectOutput objectOutput = new HessianObjectOutput(outputStream);
        objectOutput.writeObject(o);
        objectOutput.flush();
        // 写包长度
        byteBuf.setInt(0, byteBuf.writerIndex() - 4);
    }
}

    可以看到,将对象序列化写入缓冲区前,会先预写一个整形(4字节)的“0”进去,在写入数据流之后,再使用“写指针索引”减去整形的4字节,就得到了数据包的长度。

    这样做的目的,就是利用“消息分割”的方式,解决了Tcp半包、粘包的问题。类似 Http协议的 content-length。

 

请求处理

    既然有序列化,自然有反序列化的处理:

package com.menghao.rpc.netty.in;

import com.menghao.rpc.serialize.ObjectInput;
import com.menghao.rpc.serialize.hessian.HessianObjectInput;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.io.InputStream;

/**
 * <p>Tcp信息解码类.</br>
 *
 * @author MarvelCode
 */
public class TcpMessageDecoder extends LengthFieldBasedFrameDecoder {


    private Class<?> covertClass;

    /**
     * @param maxFrameLength 解码时,处理每个帧数据的最大长度
     *                       0 该帧数据中,存放该帧数据的长度的数据的起始位置
     *                       4 记录该帧数据长度的字段本身的长度
     *                       0 修改帧数据长度字段中定义的值,可以为负数
     *                       4 解析的时候需要跳过的字节数
     */
    public TcpMessageDecoder(int maxFrameLength, Class<?> covertClass) {
        super(maxFrameLength, 0, 4, 0, 4);
        this.covertClass = covertClass;
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);
        if (byteBuf == null) {
            return null;
        }
        try {
            InputStream inputStream = new ByteBufInputStream(byteBuf);
            ObjectInput objectInput = new HessianObjectInput(inputStream);
            return objectInput.readObject(covertClass);
        } finally {
            byteBuf.release();
        }
    }
}

    这里通过集成 Netty 提供的 LengthFieldBasedFrameDecoder ,来实现对Tcp包数据部分的截取工作。

    由于这里实现的Tcp协议是我自定义的,所以我很清楚开头使用了几字节来记录数据长度,应该越过几字节才是真正数据的部分。因此配置好参数(即构造器指定的0、4、0、4),进而调用 super.decode(ctx, in),就可以获取到数据流了,然后使用 Hessian 反序列化就获取到了 RpcRequest 对象。

    由于 Netty 的处理链使用的是“责任链”设计模式,上述解码类获取到的 RpcRequest 对象会作为下个处理器的入参:

package com.menghao.rpc.netty.in;

import com.menghao.rpc.netty.TcpMessageHandler;
import com.menghao.rpc.netty.model.TcpConnection;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;

/**
 * <p>Netty上行处理Handler.</br>
 * <p>该层已经可以获取到反序列化后的对象</p>
 * <p>根据对象为 RpcRequest、RpcResponse,执行不同的处理逻辑</p>
 *
 * @author MarvelCode
 */
public class TcpInboundHandler extends SimpleChannelInboundHandler<Object> {

    private List<TcpMessageHandler> messageHandlers;

    public TcpInboundHandler(List<TcpMessageHandler> messageHandlers) {
        this.messageHandlers = messageHandlers;
    }

    @SuppressWarnings("unchecked")
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        if (o == null) {
            return;
        }
        for (TcpMessageHandler tcpMessageHandler : messageHandlers) {
            Channel channel = channelHandlerContext.channel();
            if (tcpMessageHandler.support(o.getClass())) {
                tcpMessageHandler.handler(new TcpConnection(channel), o);
            }
        }
    }
}

    由于服务提供方、服务消费方都会共用 TcpInboundHandler ,但提供方只会处理 RpcRequest,消费方只会处理 RpcResponse,所以就定义了 TcpMessageHandler 来定制化处理。是不是能看到 SpringMVC-HttpMessageConverter<T> 的影子?

package com.menghao.rpc.netty;

import com.menghao.rpc.netty.model.TcpConnection;

import java.io.Serializable;

/**
 * <p>Tcp消息处理Handler.</br>
 *
 * @author MarvelCode
 */
public interface TcpMessageHandler<T> {

    /**
     * 判断该Handler是否支持指定类型的处理
     *
     * @param supportClass 支持处理的类型
     * @return 是否支持该处理
     */
    boolean support(Class<?> supportClass);

    /**
     * 处理读取到的数据
     *
     * @param connection 使用Netty Channel封装的连接
     * @param data       将要处理的数据
     */
    void handler(TcpConnection connection, T data);
}

    该接口的两种实现,分别对应 RpcRequest 和 RpcResponse 类型数据的处理。这里先来看下 RpcRequestMsgHandler

package com.menghao.rpc.provider.handle.tcp;

import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.netty.TcpMessageHandler;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.spring.BeansManager;

/**
 * <p>RpcRequest处理类.</br>
 * <p>根据请求信息,定位服务,在 ExecutionExecutor中反射调用</p>
 *
 * @author MarvelCode
 */
public class RpcRequestMsgHandler implements TcpMessageHandler<RpcRequest> {

    @Override
    public boolean support(Class<?> supportClass) {
        return RpcRequest.class.isAssignableFrom(supportClass);
    }

    @Override
    public void handler(TcpConnection connection, RpcRequest data) {
        ExecutionExecutor executor = BeansManager.getInstance().getBeanByType(ExecutionExecutor.class);
        executor.execute(new ExecutionExecutor.ExecutionTask(
                new ExecutionContext(data, connection)
        ));
    }
}
package com.menghao.rpc.provider.handle.tcp;

import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.NamedThreadFactory;
import com.menghao.rpc.exception.InvokeException;
import com.menghao.rpc.provider.model.ProviderKey;
import com.menghao.rpc.provider.regisiter.ProviderRepository;
import com.menghao.rpc.spring.BeansManager;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.concurrent.*;

/**
 * <p>Rpc调用的执行器.</br>
 * <p>线程池执行</p>
 *
 * @author MarvelCode
 */
public class ExecutionExecutor {

    private ThreadPoolExecutor threadPoolExecutor;
    private int queueLimit;
    private static final String THREAD_PREFIX = "Executor-Execution-Task";

    public ExecutionExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, int queueLimit) {
        this.queueLimit = queueLimit;
        threadPoolExecutor = new ExecutionTaskThreadPoolExecutor(corePoolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, new ExecutionTaskBlockQueue(),
                new NamedThreadFactory(THREAD_PREFIX, true));
    }


    public void execute(ExecutionTask task) {
        int queueSize = threadPoolExecutor.getQueue().size();
        // TODO 队列溢出处理
        if (queueSize > queueLimit) {

        }
        threadPoolExecutor.execute(task);
    }

    public static class ExecutionTask implements Runnable {

        private ExecutionContext executionContext;

        private ProviderRepository providerRepository;

        ExecutionTask(ExecutionContext executionContext) {
            this.executionContext = executionContext;
            this.providerRepository = BeansManager.getInstance().getBeanByType(ProviderRepository.class);
        }

        @Override
        public void run() {
            RpcRequest rpcRequest = executionContext.getRequest();
            ProviderKey providerKey = new ProviderKey(rpcRequest.getContract(), rpcRequest.getImplCode());
            Object instance = providerRepository.getProvider(providerKey);
            if (instance == null) {
                // 无对应的服务单例,抛异常
                executionContext.writeException(new InvokeException(
                        MessageFormat.format("service {0} not found", providerKey)));
                return;
            }
            try {
                Object result = invoke(instance, rpcRequest);
                executionContext.writeResult(result);
            } catch (Exception e) {
                executionContext.writeException(new InvokeException(e));
            }
        }

        private Object invoke(Object instance, RpcRequest rpcRequest) {
            Method method = ReflectionUtils.findMethod(instance.getClass(), rpcRequest.getMethod(), rpcRequest.getArgsType());
            return ReflectionUtils.invokeMethod(method, instance, rpcRequest.getArgs());
        }
    }

    private class ExecutionTaskThreadPoolExecutor extends ThreadPoolExecutor {


        ExecutionTaskThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }

        /**
         * TODO 执行前置操作(可扩展,计数同一时刻某方法/某服务并发量)
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
        }

        /**
         * TODO 执行后置操作(可扩展,计数同一时刻某方法/某服务并发量)
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
        }
    }

    /**
     * TODO 自定义任务队列
     */
    private class ExecutionTaskBlockQueue extends LinkedBlockingQueue<Runnable> {

    }
}

    服务的调用逻辑,同上篇Http方式一样,都是使用“反射调用”的方式。不同点在于,采用了线程池执行,可以应付更高的并发请求。

    这里的 ExecutionContext 是服务提供方的执行上下文,不同于服务消费方的 InvocationContext,别搞混了~

    其中 executionContext.writeResult(result),同样使用了 TcpMessageEncoder 进行序列化。

 

响应处理

    服务提供方将响应数据通过管道传输给消费方,服务消费方再借助 TcpMessageDecoder 对数据流反序列化获取到 RpcResponse,同样进入 TcpInboundHandler 处理,这些代码之前都分析过了,直接来看 RpcResponse 对应的处理类即可:

package com.menghao.rpc.consumer.handle.tcp;

import com.menghao.rpc.netty.TcpMessageHandler;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.provider.model.RpcResponse;
import com.menghao.rpc.spring.BeansManager;

/**
 * <p>RpcResponse处理类.</br>
 * <p>分析相应结果,调用 InvocationContext回写结果</p>
 *
 * @author MarvelCode
 */
public class RpcResponseMsgHandler implements TcpMessageHandler<RpcResponse> {

    @Override
    public boolean support(Class<?> supportClass) {
        return RpcResponse.class.isAssignableFrom(supportClass);
    }

    @Override
    public void handler(TcpConnection connection, RpcResponse data) {
        InvocationContextContainer contextContainer = BeansManager.getInstance().getBeanByType(InvocationContextContainer.class);
        InvocationContext invocationContext = contextContainer.get(data.getId());
        if (invocationContext != null) {
            invocationContext.notifyCompleted(data);
            contextContainer.remove(data.getId());
        }
    }
}

    嗯,没错。逻辑很简单,通过“唯一标识”找到对应的 InvocationContext,然后调用 invocationContext.notifyCompleted(data) 以通知调用完成,最后从 InvocationContextContatiner 中移除已经完成的调用上下文。

    到此整个基于 Tcp 方式的远程服务调用就分析结束了。

 

总结

    到此,我用两篇博文,分别对基于HTTP、TCP两种方式的远程服务调用进行了简要分析。后期会逐渐对代码进行完善,各位如果发现代码有任何的缺陷,或对代码思路有更好的建议,都可以跟我讨论。

本文发表于2018年09月24日 13:00
(c)注:本文转载自https://my.oschina.net/marvelcode/blog/2135740,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1556 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1