RPC框架原理及从零实现系列文章(二):11个类实现简单RPC框架


声明:本文转载自https://my.oschina.net/u/3768288/blog/1607067,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

项目1.0版本源码

github:MeiZhuoRPC


上一博文中 跟大家讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给大家讲解下实现过程中的各个具体问题

读懂本篇需要的基本知识 若尚未清晰请自行了解后再阅读本文

  • java动态代理
  • netty框架的基本使用
  • spring的基本配置

最终项目的使用如下

/**  *调用端代码及spring配置  */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"}) public class Client {      @Test     public void start(){         Service service= (Service) RPC.call(Service.class);         System.out.println("测试Integer,Double类型传参与返回String对象:"+service.stringMethodIntegerArgsTest(233,666.66));         //输出string233666.66     }  }  /**  *Service抽象及其实现  *调用与实现端共同依赖Service  */ public interface Service {     String stringMethodIntegerArgsTest(Integer a,Double b); } /**  * ServiceImpl实现端对接口的具体实现 */ public class ServiceImpl implements Service {     @Override     public String stringMethodIntegerArgsTest(Integer a, Double b) {         return "String"+a+b;     } } 

1.0版本分3个包

  • Client 调用端
  • Server 实现端
  • Core 核心方法

首先看这句代码

调用端只需如此调用 定义接口 传入接口类类型 后面调用的接口内的方法 全部是由实现端实现

Service service= (Service) RPC.call(Service.class); 

这句的作用其实就是生成调用端的动态代理

/**      * 暴露调用端使用的静态方法 为抽象接口生成动态代理对象      * TODO 考虑后面优化不在使用时仍需强转      * @param cls 抽象接口的类类型      * @return 接口生成的动态代理对象      */     public static Object call(Class cls){         RPCProxyHandler handler=new RPCProxyHandler();         Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},handler);         return proxyObj;     } 

RPCProxyHandler为动态代理的方法被调用后的回调方法 每个方法被调用时都会执行这个invoke

/**      * 代理抽象接口调用的方法      * 发送方法信息给服务端 加锁等待服务端返回      * @param proxy      * @param method      * @param args      * @return      * @throws Throwable      */     @Override     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {         RPCRequest request=new RPCRequest();         request.setRequestID(buildRequestID(method.getName()));         request.setClassName(method.getDeclaringClass().getName());//返回表示声明由此 Method 对象表示的方法的类或接口的Class对象         request.setMethodName(method.getName()); //        request.setParameterTypes(method.getParameterTypes());//返回形参类型         request.setParameters(args);//输入的实参         RPCRequestNet.requestLockMap.put(request.getRequestID(),request);         RPCRequestNet.connect().send(request);         //调用用结束后移除对应的condition映射关系         RPCRequestNet.requestLockMap.remove(request.getRequestID());         return request.getResult();//目标方法的返回结果     } 

也就是收集对应调用的接口的信息 然后send给实现端 那么这个requestLockMap又是作何作用的呢

  • 由于我们的网络调用都是异步
  • 但是RPC调用都要做到同步 等待这个远程调用方法完全返回后再继续执行
  • 所以将每个请求的request对象作为对象锁 每个请求发送后加锁 等到网络异步调用返回后再释放所
  • 生成每个请求的ID 这里我用随机数加时间戳
  • 将请求ID和请求对象维护在静态全局的一个map中 实现端通过ID来对应是哪个请求
  • 异步调用返回后 通过ID notify唤醒对应请求对象的线程 netty异步返回的调用 释放对象锁
@Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         String responseJson= (String) msg;         RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson);         synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) {             //唤醒在该对象锁上wait的线程             RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID());             request.setResult(response.getResult());             request.notifyAll();         }     } 

接下来是RPCRequestNet.connect().send(request);方法 connect方法其实是单例模式返回RPCRequestNet实例 RPCRequestNet构造方法是使用netty对实现端进行TCP链接 send方法如下

try {             //判断连接是否已完成 只在连接启动时会产生阻塞             if (RPCRequestHandler.channelCtx==null){                 connectlock.lock();                 //挂起等待连接成功                 System.out.println("正在等待连接实现端");                 connectCondition.await();                 connectlock.unlock();             }             //编解码对象为json 发送请求             String requestJson= null;             try {                 requestJson = RPC.requestEncode(request);             } catch (JsonProcessingException e) {                 e.printStackTrace();             }             ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());             RPCRequestHandler.channelCtx.writeAndFlush(requestBuf);             System.out.println("调用"+request.getRequestID()+"已发送");             //挂起等待实现端处理完毕返回 TODO 后续配置超时时间             synchronized (request) {                 //放弃对象锁 并阻塞等待notify                 request.wait();             }             System.out.println("调用"+request.getRequestID()+"接收完毕");         } catch (InterruptedException e) {             e.printStackTrace();         } 

condition和lock同样是为了同步等待异步IO返回用的 send方法基本是编解码json后发送给实现端

调用端基本实现综上所述 代理 发送 同步锁


下面是服务端的使用和实现

/**  *实现端代码及spring配置  */  @RunWith(SpringJUnit4ClassRunner.class)  @ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"})  public class Server {        @Test      public void start(){          //启动spring后才可启动 防止容器尚未加载完毕          RPC.start();      }  } 

出了配置spring之外 实现端就一句 RPC.start() 其实就是启动netty服务器 服务端的处理客户端信息回调如下

@Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {         String requestJson= (String) msg;         System.out.println("receive request:"+requestJson);         RPCRequest request= RPC.requestDeocde(requestJson);         Object result=InvokeServiceUtil.invoke(request);         //netty的write方法并没有直接写入通道(为避免多次唤醒多路复用选择器)         //而是把待发送的消息放到缓冲数组中,flush方法再全部写到通道中 //        ctx.write(resp);         //记得加分隔符 不然客户端一直不会处理         RPCResponse response=new RPCResponse();         response.setRequestID(request.getRequestID());         response.setResult(result);         String respStr=RPC.responseEncode(response);         ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes());         ctx.writeAndFlush(responseBuf);     } 

主要是编解码json 反射对应的方法 我们看看反射的工具类

/**      * 反射调用相应实现类并结果      * @param request      * @return      */     public static Object invoke(RPCRequest request){         Object result=null;//内部变量必须赋值 全局变量才不用         //实现类名         String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName());         try {             Class implClass=Class.forName(implClassName);             Object[] parameters=request.getParameters();             int parameterNums=request.getParameters().length;             Class[] parameterTypes=new Class[parameterNums];             for (int i = 0; i <parameterNums ; i++) {                 parameterTypes[i]=parameters[i].getClass();             }             Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes);             Object implObj=implClass.newInstance();             result=method.invoke(implObj,parameters);         } catch (ClassNotFoundException e) {             e.printStackTrace();         } catch (NoSuchMethodException e) {             e.printStackTrace();         } catch (InstantiationException e) {             e.printStackTrace();         } catch (IllegalAccessException e) {             e.printStackTrace();         } catch (InvocationTargetException e) {             e.printStackTrace();         }         return result;     } 

解析Parameters getClass获取他们的类类型 反射调用对应的方法

这里需要注意一个点

  • 本文最初采用Gson处理json 但gson默认会把int类型转为double类型 例如2变为2.0 不适用本场景 我也不想去专门适配
  • 所以换用了jackson
  • 常见json处理框架 反序列化为对象时 int,long等基本类型都会变成他们的包装类Integer Long
  • 所以本例程中 远程调度接口方法的形参不可以使用int等基本类型
  • 否则method.invoke(implObj,parameters);会找不到对应的方法报错
  • 因为parameters已经是包装类了 而method还是int这些基本类 所以找不到对应方法

最后是借助spring配置基础配置 我写了两个类 ServerConfig ClientConfig 作为调用端和服务端的配置 只需在spring中配置这两个bean 并启动IOC容器即可

调用端

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">     <bean class="org.meizhuo.rpc.client.ClientConfig">         <property name="host" value="127.0.0.1"></property>         <property name="port" value="9999"></property>     </bean> </beans> 

实现端

<?xml version="1.0" encoding="UTF-8"?>  <beans xmlns="http://www.springframework.org/schema/beans"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">      <bean class="org.meizhuo.rpc.server.ServerConfig">          <property name="port" value="9999"></property>          <property name="serverImplMap">              <map>                  <!--配置对应的抽象接口及其实现-->                  <entry key="rpcTest.Service" value="rpcTest.ServiceImpl"></entry>              </map>          </property>      </bean>  </beans> 

最后有个小问题

我们的框架是作为一个依赖包引入的 我们不可能在我们的框架中读取对应的spring xml 这样完全是去了框架的灵活性 那我们怎么在运行过程中获得我们所处于的IOC容器 已获得我们的正确配置信息呢 答案是spring提供的ApplicationContextAware接口

/**  * Created by wephone on 17-12-26.  */ public class ClientConfig implements ApplicationContextAware {      private String host;     private int port;     //调用超时时间     private long overtime;      public String getHost() {         return host;     }      public void setHost(String host) {         this.host = host;     }      public int getPort() {         return port;     }      public void setPort(int port) {         this.port = port;     }      public long getOvertime() {         return overtime;     }      public void setOvertime(long overtime) {         this.overtime = overtime;     }      /**      * 加载Spring配置文件时,如果Spring配置文件中所定义的Bean类      * 如果该类实现了ApplicationContextAware接口      * 那么在加载Spring配置文件时,会自动调用ApplicationContextAware接口中的      * @param applicationContext      * @throws BeansException      */     @Override     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {         RPC.clientContext=applicationContext;     } } 

这样我们在RPC类内部就维护了一个静态IOC容器的context 只需如此获取配置 RPC.getServerConfig().getPort()

 public static ServerConfig getServerConfig(){         return serverContext.getBean(ServerConfig.class);     } 

就这样 这个RPC框架的核心部分 已经讲述完毕了

本例程仅为1.0版本 后续博客中 会加入异常处理 zookeeper支持 负载均衡策略等 博客:zookeeper支持 欢迎持续关注 欢迎star 提issue

本文发表于2018年01月15日 12:35
(c)注:本文转载自https://my.oschina.net/u/3768288/blog/1607067,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1660 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1