U2647's blog 一个热爱学习的 Java 程序员,喜欢 Vue,喜欢深度学习 Dubbo Flutter SpringBoot Debug Notes Java LeetCode Python Redis Android DesignPattern mdi-home-outline 首页 mdi-cloud-outline 标签云 mdi-timeline-text-outline 时间轴 mdi-draw-pen 文章总数 62
Dubbo 学习笔记(零) 自己实现一个 RPC 框架 Dubbo 学习笔记(零) 自己实现一个 RPC 框架 Dubbo RPC Git RPC 原理 mdi-cursor-default-click-outline 点击量 62

0. 什么是 RPC 框架

RPC(Remote Procedure Call) 是一种进程间的通信方式。允许像调用本地服务一样调用远程服务。

简单点说,它是一种通信方式,它的功能就是让你像调用本地服务(函数、方法)一样,调用远程服务(函数,方法)。

比如说,我在服务端有一个接口 (MyService.sayHello())。传统的调用方式是,我们暴露一个 Controller 并绑定到对应的 url地址上,然后通过 http 请求,将参数发送给远程服务器,服务器执行结束后,将结果响应给客户端。

而 RPC 调用方式是,我在客户端导入 MyService 的接口,直接用 MyService.sayHello() 去调用。注意:客户端并没有直接的创建该接口的具体实现对象。而是通过 RPC 的通信方式去来与服务端交互。

1. RPC 框架的基本原理

RPC 框架的基本原理是通过 Socket 和对象序列化来实现的。

首先,客户端和服务端通过 Socket 来建立通信,客户端将需要调用的接口 序列化后发送给服务端。

服务端收到数据后将接口反序列化,通过反射的方式执行该接口。然后将执行结果序列化后发送给客户端。

客户端收到数据后,将结果反序列化,得到接口的执行结果。

下面实现了一个最简单、最基础的 RPC 框架

2. 定义服务

我们先定义一个服务端的 Service,

public interface MyService {
    String sayHello(String name);
}

实现类:

public class MyServiceImpl implements MyService {
    @Override
    public String sayHello(String name) {
        return "hello," + name;
    }
}

这个接口就是我们服务端提供的服务。

3. RPC 框架的服务端实现

先在服务端实现 Socket 持续监听客户端发来的数据,收到数据后让 ProducerAgent 去处理。

public class RpcProducer {
    private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void produce(String host, int port) throws Exception {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(host, port));
        try {
            while (true) {
                executor.execute(new ProducerAgent(serverSocket.accept()));
            }
        } finally {
            serverSocket.close();
        }
    }
}

ProducerAgent 的实现:

public class ProducerAgent implements Runnable {
    Socket client = null;

    public ProducerAgent(Socket accept) {
        client = accept;
    }

    @Override
    public void run() {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        try {

            inputStream = new ObjectInputStream(client.getInputStream());

            String interfaceName = inputStream.readUTF();
            Class<?> service = Class.forName(interfaceName);
            String methodName = inputStream.readUTF();
            Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject();
            Object[] args = (Object[]) inputStream.readObject();

            Method method = service.getMethod(methodName, paramTypes);
            Object result = method.invoke(service.newInstance(), args);
            outputStream = new ObjectOutputStream(client.getOutputStream());
            outputStream.writeObject(result);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //省略部分代码
        }
    }
}

我们依次读取接口的名称、方法名称、参数类型、参数。然后通过反射的机制执行方法,最后将结果序列化后写入到客户端。

4. RPC 框架的客户端实现

public class LocalAgent<T> {
    public T importer(final Class<?> serviceClass, final InetSocketAddress addr) {
        return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
                new Class<?>[]{serviceClass.getInterfaces()[0]},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        ObjectOutputStream outputStream = null;
                        ObjectInputStream inputStream = null;
                        Socket socket = null;
                        try {
                            socket = new Socket();
                            socket.connect(addr);
                            outputStream = new ObjectOutputStream(socket.getOutputStream());

                            outputStream.writeUTF(serviceClass.getName());
                            outputStream.writeUTF(method.getName());
                            outputStream.writeObject(method.getParameterTypes());
                            outputStream.writeObject(args);

                            inputStream = new ObjectInputStream(socket.getInputStream());
                            return inputStream.readObject();
                        } finally {
                            //省略部分代码
                        }
                    }
                });
    }
}

客户端做的事情是与服务端建立 Socket 通信,然后依次写入 接口名、方法名、参数类型、参数。注意:写入顺序一定要与服务端的读取顺序一致

然后接收服务端的执行结果,反序列化为实际类型。

5. 使用 RPC 框架

RPC 框架的客户端和服务端的基本功能已经实现,下面我们使用刚刚实现的 RPC 框架 来调用一下 MyService.sayHello(String name) 这个接口

首先我们使用一个线程来启动服务端:

    private static void startProduct() {
        //通过一个线程启动服务端
        String host = "localhost";
        int port = 8878;

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("服务端启动........");
                    RpcProducer.produce(host, port);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

下面我们使用 刚刚实现的 RPC 框架来调用服务端的接口

    private static void client() {
        LocalAgent<MyService> serviceLocalAgent = new LocalAgent<>();
        MyService myService = serviceLocalAgent.importer(MyServiceImpl.class,
                new InetSocketAddress("localhost",8878));

        System.out.println(myService.sayHello("RPC"));
    }

在客户端,我们传了一个服务端实现类的类型、一个服务端的地址,然后就可以调用服务端的接口了。注意:我们并没有在客户端创建 接口的具体实现对象,而仅仅是把需要调用的 Class 通过 Socket 发送给了服务端。

6. 添加注册中心

考虑一下上面的代码,我们服务端的地址是写死在客户端和服务端的代码里的,如果服务比较多,而且每个服务的地址都不一样,直接写死是很难维护的,所以我们需要一个注册中心来管理每个服务的地址。

下面是一个最简单的注册中心:

public class RegistrationCenter {

    private Map<String, InetSocketAddress> serviceMap = new HashMap<>();

    public void register(String serviceName, String host, int port) {
        serviceMap.put(serviceName, new InetSocketAddress(host, port));
    }

    public InetSocketAddress getService(String serviceName) {
        return serviceMap.get(serviceName);
    }
}

注册中心维护服务名称与服务地址的映射。

我们修改一下服务端的启动代码,当服务启动之后,向注册中心注册服务。

    private static void startProduct(RegistrationCenter center) {
        //通过一个线程启动服务端
        String host = "localhost";
        int port = 8878;

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("服务端启动........");
                    RpcProducer.produce(host, port);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        //向注册中心注册服务
        center.register("MyService.sayHello", host, port);
    }

修改一下客户端的调用方式,调用服务时,根据服务名从注册中心获取服务地址。:

    private static void client(RegistrationCenter center) {
        LocalAgent<MyService> serviceLocalAgent = new LocalAgent<>();
        MyService myService = serviceLocalAgent.importer(MyServiceImpl.class,
                center.getService("MyService.sayHello"));

        System.out.println(myService.sayHello("RPC"));
    }

7. 总结

这就是 RPC 框架的基本原理,通过 Socket 建立通信,通过序列化与反序列实现数据传输,使用反射执行具体的服务。

假设你对 RPC 还是没有什么概念,也没关系,但是你需要记住下面三个概念:

  1. 生产者:或者说是服务端,更具体点就是上面的ProducerAgent类,

    负责解析消费者发送的参数,通过反射调用对应的服务,将结果序列化后发送给消费者。

  2. 消费者:就是我们的客户端,对应LocalAgent

    负责将需要调用的服务信息发送给生产者,将结果反序列化后获得实际的执行结果。

  3. 注册中心:提供服务的注册和发现的功能。对应RegistrationCenter

    责管理服务与地址的映射。

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
我的GitHub 我的LeetCode 我的掘金
Powered by Hexo Powered by three-cards
Copyright © 2017 - {{ new Date().getFullYear() }} 某ICP备xxxxxxxx号