博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【RPC】一步一步实现基于netty+zookeeper的RPC框架(二)
阅读量:4078 次
发布时间:2019-05-25

本文共 9522 字,大约阅读时间需要 31 分钟。

上一篇实现了服务注册发现和基本的字符串通信功能,这一篇则是实现我们平常使用RPC框架的使用类来调用的功能。

实现consumer端通过接口类来调用远程服务,主要核心在于使用动态代理和反射,这里就一步一步来实现。

这里贴出github代码地址,想直接看代码的可以直接下载运行:https://github.com/whiteBX/wrpc

首先来看consumer端代码,RPCConsumer完整代码如下:

public class RPCConsumer {
/** * url处理器 */ private UrlHolder urlHolder = new UrlHolder(); /** * netty客户端 */ private NettyClient nettyClient = new NettyClient(); /** * 远程调用 * * @param appCode * @param param * @return */ public String call(String appCode, String param) {
try {
// 从zookeeper获取服务地址 String serverIp = urlHolder.getUrl(appCode); if (serverIp == null) {
System.out.println("远程调用错误:当前无服务提供者"); return "connect error"; } // 连接netty,请求并接收响应 RpcClientNettyHandler clientHandler = new RpcClientNettyHandler(); clientHandler.setParam(param); nettyClient.initClient(serverIp, clientHandler); String result = clientHandler.process(); System.out.println(MessageFormat.format("调用服务器:{0},请求参数:{1},响应参数:{2}", serverIp, param, result)); return result; } catch (Exception e) {
System.out.println("远程服务调用失败:" + e); return "error"; } } /** * 获取代理类 * @param clazz * @param appCode * @return */ public
T getBean(final Class
clazz, final String appCode) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] {
clazz }, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String param = JSON.toJSONString(args[0]); String beanMessage = MessageFormat.format(ConsumerConstant.BEAN_STRING, clazz.getName(), method.getName(), method.getParameterTypes()[0].getName()); return JSON.parseObject(call(appCode, beanMessage.concat(param)), method.getReturnType()); } }); }}

这里getBean方法主要是通过动态代理来获取代理类,传入的就是我们自己的Service的接口类,其中绑定了InvocationHandler来在调用接口类的方法时执行对应的远程调用操作,包括远程调用包装和返回值解析.

接下来看provider相关类,在RPCProvider类中新增服务注册方法,将我们自己的Service注册进系统缓存:

public class RPCProvider {
/** * netty客户端 */ private static NettyClient nettyClient = new NettyClient(); /** * zookeeper客户端 */ private static ZKClient zkClient = new ZKClient(); public void registry(String server, int port) {
// 开启netty监听客户端连接 nettyClient.startServer(port); // 创建zk连接并创建临时节点 ZooKeeper zooKeeper = zkClient.newConnection(ProviderConstant.ZK_CONNECTION_STRING, ProviderConstant.ZK_SESSION_TIME_OUT); String serverIp = server + CommonConstant.COMMOA + port; zkClient.createEphemeralNode(zooKeeper, ProviderConstant.APP_CODE, serverIp.getBytes()); } /** * 注册服务提供者 * @param clazz * @param obj */ public void provide(Class
clazz, Object obj) {
ProviderBeanHolder.regist(clazz.getName(), obj); }}

其中的ProviderBeanHolder类代码如下,主要负责缓存服务注册信息:

public class ProviderBeanHolder {
/** * bean注册缓存 */ private static Map
providerList = new HashMap
(); /** * 注册 * @param clazzName * @param obj */ public static void regist(String clazzName, Object obj) {
providerList.put(clazzName, obj); System.out.println("注册provider:" + clazzName); } /** * 获取 * @param clazzName * @return */ public static Object getBean(String clazzName) {
return providerList.get(clazzName); }}

接下来来看RpcServerNettyHandler类,这个类负责接收客户端请求并处理,这里通过反射来调用服务注册的方法。

public class RpcServerNettyHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("服务端收到请求:" + msg); try {
// 解析出 类名+方法名+请求参数类型(方法签名) String[] splitParam = msg.toString().split(ProviderConstant.DOLLAR_SPLIT); String[] beanMessage = splitParam[0].split(ProviderConstant.SHARP_SPLIT); // 获取注册的服务 Object object = ProviderBeanHolder.getBean(beanMessage[0]); if (object == null) {
System.out.println("服务类未注册:" + beanMessage[0]); } // 通过反射调用服务 Class paramType = Class.forName(beanMessage[2]); Method method = object.getClass().getDeclaredMethod(beanMessage[1], paramType); Object response = method.invoke(object, JSON.parseObject(splitParam[1], paramType)); // 请求响应 ctx.writeAndFlush(JSON.toJSONString(response)); } catch (Exception e) {
System.out.println("服务异常"); } }}

这里主要就是用到了反射相关的知识,实现了服务的调用和响应,到这里相关的代码就实现了,下面来写个service测试一下:

public class HelloRequest {
private int seq; private String content; // 省略getter setter}public class HelloResponse {
private int code; private String message; // 省略getter setter}public interface HelloService {
HelloResponse hello(HelloRequest request);}public class HelloServiceImpl implements HelloService {
public HelloResponse hello(HelloRequest request) {
System.out.println("HelloService收到请求,序列号:" + request.getSeq()); HelloResponse response = new HelloResponse(); response.setCode(200); response.setMessage("success:" + request.getSeq()); return response; }}

测试代码如下:

// ProviderTest相关代码    public static void main(String[] args) throws InterruptedException {
RPCProvider provider = new RPCProvider(); provider.registry("127.0.0.1", 8091); provider.registry("127.0.0.1", 8092); provider.registry("127.0.0.1", 8093); provider.registry("127.0.0.1", 8094); provider.registry("127.0.0.1", 8095); provider.provide(HelloService.class, new HelloServiceImpl()); Thread.sleep(Long.MAX_VALUE); } // ConsumerTest相关代码 public static void main(String[] args) {
RPCConsumer consumer = new RPCConsumer(); HelloService helloService = consumer.getBean(HelloService.class, ConsumerConstant.APP_CODE); int i = 0; while (true) {
HelloRequest request = new HelloRequest(); request.setSeq(i++); HelloResponse helloResponse = helloService.hello(request); System.out.println("客户端收到响应:" + JSON.toJSONString(helloResponse)); Thread.sleep(Long.MAX_VALUE); } }

执行结果如下:

// 服务端日志zookeeper连接成功临时节点创建成功:/registry/100000/0000000025zookeeper连接成功临时节点创建成功:/registry/100000/0000000026zookeeper连接成功临时节点创建成功:/registry/100000/0000000027zookeeper连接成功临时节点创建成功:/registry/100000/0000000028zookeeper连接成功临时节点创建成功:/registry/100000/0000000029注册provider:org.white.wrpc.hello.service.HelloService服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":0}服务端收到请求,序列号:0服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":1}服务端收到请求,序列号:1服务端收到请求:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":2}服务端收到请求,序列号:2// 客户端日志zookeeper连接成功调用服务器:127.0.0.1,8094,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":0},响应参数:{
"code":200,"message":"success:0"}客户端收到响应:{
"code":200,"message":"success:0"}调用服务器:127.0.0.1,8094,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":1},响应参数:{
"code":200,"message":"success:1"}客户端收到响应:{
"code":200,"message":"success:1"}调用服务器:127.0.0.1,8092,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":2},响应参数:{
"code":200,"message":"success:2"}客户端收到响应:{
"code":200,"message":"success:2"}

此时修改Provider类里的端口,重新启动4个服务,可以看到客户端有如下日志:

zookeeper连接成功zookeeper连接成功zookeeper连接成功zookeeper连接成功调用服务器:127.0.0.1,8095,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":29},响应参数:{
"code":200,"message":"success:29"}客户端收到响应:{
"code":200,"message":"success:29"}调用服务器:127.0.0.1,8091,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":30},响应参数:{
"code":200,"message":"success:30"}客户端收到响应:{
"code":200,"message":"success:30"}调用服务器:127.0.0.1,8097,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":31},响应参数:{
"code":200,"message":"success:31"}客户端收到响应:{
"code":200,"message":"success:31"}调用服务器:127.0.0.1,8092,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":32},响应参数:{
"code":200,"message":"success:32"}客户端收到响应:{
"code":200,"message":"success:32"}调用服务器:127.0.0.1,8099,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":33},响应参数:{
"code":200,"message":"success:33"}客户端收到响应:{
"code":200,"message":"success:33"}调用服务器:127.0.0.1,8096,请求参数:org.white.wrpc.hello.service.HelloService#hello#org.white.wrpc.hello.model.request.HelloRequest${
"seq":34},响应参数:{
"code":200,"message":"success:34"}

可以看到,新增了服务,客户端什么都不用改,就能连接到新的服务,此时关闭新启的服务,会发现客户端会访问剩余的服务,不会出现任何问题。

到这里就实现了RPC框架的自己注册bean并通过接口调用.

后续将一步一步实现负载均衡/调用链路Trace记录/限流等功能,欢迎持续关注!

转载地址:http://ohsni.baihongyu.com/

你可能感兴趣的文章
关于fwrite写入文件后打开查看是乱码的问题
查看>>
用结构体指针前必须要用malloc,不然会出现段错误
查看>>
Linux系统中的美
查看>>
一些实战项目(linux应用层编程,多线程编程,网络编程)
查看>>
我觉得专注于去学东西就好了,与世无争。
查看>>
原来k8s docker是用go语言写的,和现在所讲的go是一个东西!
查看>>
STM32CubeMX 真的不要太好用
查看>>
STM32CubeMX介绍、下载与安装
查看>>
电机和桨叶要搭配选择
查看>>
现在发现如果无人机的电机不同,浆可能是不能混用的。
查看>>
不要买铝合金机架的无人机,不耐摔,易变形弯曲。
查看>>
ACfly也是基于FreeRTOS的
查看>>
F330装GPS的位置
查看>>
STM32时钟系统
查看>>
我想先用三个或者五个激光测距做无人机的室内定位和避障
查看>>
pixhawk也可以用Airsim仿真
查看>>
《无人机电机与电调技术》可以看看
查看>>
我发现七月在线的GAAS课程基本都讲到了
查看>>
电机堵转
查看>>
一个真正好的无人机应该是需要自己慢慢去调参的,别人的默认参数是可以飞但是可能达不到perfect的效果。
查看>>