分布式服务框架之原理实现

Posted by Tango on October 26, 2017

  RPC的全称为Remote Procedure Call, 他是一种进程间通信框架,允许向调用本地方法一样调用远程服务,对于上层应用来说透明化,屏蔽服务调用过程。目前业界由许多开源框架,例如

Apache Thrift(Facebook开源)  

Avro-RPC(Hadoop子项目)  

Hessian(caucho提供的基于binary-RPC)  

gRPC(google开源)

系统架构

虽然各种开源框架实现细节不同,但其基本原理如下所示:

1.服务端服务发布:

 服务提供者根据配置自动连接服务注册中心地址,并通过xml配置文件将服务发布到注册中心,其中包括IP地址/端口号/以及服务名称/协议/版本号.

2.消费端服务获取

 服务信息获取消费者根据配置自动连接服务注册中心地址,根据服务引用信息获取指定服务的地址等路由信息.

3.服务注册中心推送

服务注册中西根据服务订阅关系,动态向指定消费者推送服务地址信息。

4.消费者调用服务

消费者根据服务名称以及IP/端口等信息发起远程调用。消费端应屏蔽底层网络通信的复杂实现,使得消费端像调用本地方法一样调用远程服务。

5.序列化

将请求参数以及请求方法等信息进行序列化。序列化方式分为xml、json以及二进制形式。

6.反序列化

将序列化信息反序列化为对象。

7.返回结果

 服务端根据消费端传来的参数调用本地服务,并将处理结果返回。


原理实现

注册中心

package register;

import services.bean.ServiceInfo;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * 服务注册中心
 * * 服务提供方注册服务消息
 * * 消费方拉取注册的服务信息
 */
public class RegisterCenter {


    private static final String REGISTER_IP = "127.0.0.1";
    //注册服务监听端口
    private static final int REGISTER_PORT = 4080;
    //拉取服务信息监听端口
    private static final int PULL_PORT = 4081;
    //注册服务处理线程
    private static Executor registerExecutor = Executors.newFixedThreadPool(10);
    //拉取服务信息处理线程
    private static Executor pullExecutor = Executors.newFixedThreadPool(10);
    //注册服务信息
    private static final Map<String, ServiceInfo> serviceMaps = new ConcurrentHashMap<>();

    public void init() {

        System.out.println("----------注册中心启动---------");

        //开启服务端注册服务监听线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("------------注册服务监听线程启动----------------");
                while (true) {
                    ServerSocket socket = null;
                    try {

                        socket = new ServerSocket();
                        socket.bind(new InetSocketAddress(REGISTER_IP, REGISTER_PORT));
                        while (true) {
                            registerExecutor.execute(new RegisterServiceTask(socket.accept()));
                        }
                    } catch (Exception e) {
                        System.out.println("RegisterCenter:" + e.getMessage());
                    }

                }

            }
        }).start();

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                //主线程开启消费端拉取服务信息监听
                System.out.println("------------客户端服务请求监听线程启动----------------");
                ServerSocket socket = null;
                try {

                    socket = new ServerSocket();
                    socket.bind(new InetSocketAddress(REGISTER_IP, PULL_PORT));
                    while (true) {
                        pullExecutor.execute(new PullServiceTask(socket.accept()));
                    }
                } catch (Exception e) {
                    System.out.println("RegisterCenter:" + e.getMessage());
                } finally {
                    System.out.println("-------------注册中心停止-----------------");
                }
            }
        }).start();

    }


    //向消费端推送服务注册信息
    private static class PullServiceTask implements Runnable {

        private Socket socket;

        PullServiceTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            System.out.println("---------消费端服务获取线程启动------------");
            ObjectInputStream objectInputStream = null;
            ObjectOutputStream objectOutputStream = null;

            try {
                objectInputStream = new ObjectInputStream(socket.getInputStream());
                objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

                String serviceName = (String) objectInputStream.readObject();
                if (!serviceMaps.containsKey(serviceName)) {
                    objectOutputStream.writeObject(null);
                    return;
                }
                ServiceInfo serviceInfo = serviceMaps.get(serviceName);
                objectOutputStream.writeObject(serviceInfo);
            } catch (Exception e) {
                System.out.println("PullServiceTask:" + e.getMessage());
            } finally {

                try {
                    if (socket != null) {
                        socket.close();
                        socket = null;
                    }
                    if (objectInputStream != null) {
                        objectInputStream.close();
                    }
                    if (objectOutputStream != null) {
                        objectOutputStream.close();
                    }
                } catch (Exception e) {
                    System.out.println("PullServiceTask:" + e.getMessage());
                }

            }
            System.out.println("---------消费端拉取服务信息线程停止------------");
        }
    }

    //服务提供方注册服务
    private static class RegisterServiceTask implements Runnable {

        private Socket socket;

        RegisterServiceTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            System.out.println("---------服务端注册服务线程启动------------");
            ObjectInputStream objectInputStream = null;
            ObjectOutputStream objectOutputStream = null;

            try {
                objectInputStream = new ObjectInputStream(socket.getInputStream());
                ServiceInfo serviceInfo = (ServiceInfo) objectInputStream.readObject();
                String serviceName = serviceInfo.getName();
                serviceMaps.put(serviceName, serviceInfo);
                System.out.println("注册服务:" + serviceInfo.toString());
            } catch (Exception e) {
                System.out.println("RegisterServiceTask:" + e.getMessage());
            } finally {

                try {
                    if (socket != null) {
                        socket.close();
                        socket = null;
                    }
                    if (objectInputStream != null) {
                        objectInputStream.close();
                    }
                    if (objectOutputStream != null) {
                        objectOutputStream.close();
                    }
                } catch (Exception e) {
                    System.out.println("RegisterServiceTask:" + e.getMessage());
                }

            }
            System.out.println("---------服务端注册服务信息线程停止------------");
        }
    }

}

服务端

package server;

import services.bean.ServiceInfo;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class RpcServer {

    private static final String REGISTER_CENTER_IP = "127.0.0.1";

    private static final int REGISTER_CENTER_PORT = 4080;

    private static Executor serviceExecutor = Executors.newFixedThreadPool(10);

    //注册中心地址
    private static SocketAddress socketAddress = new InetSocketAddress(REGISTER_CENTER_IP, REGISTER_CENTER_PORT);

    //注册服务
    private void registerService(String ip, int port, String serviceName) {

        //连接注册中心
        Socket socket = new Socket();
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;

        try {
            socket.connect(socketAddress);
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            ServiceInfo serviceInfo = new ServiceInfo();
            serviceInfo.setIp(ip);
            serviceInfo.setName(serviceName);
            serviceInfo.setPort(port);
            objectOutputStream.writeObject(serviceInfo);

        } catch (IOException e) {
            System.out.println(e.getMessage());
        } finally {

            try {
                if (socket != null) {
                    socket.close();
                    socket = null;
                }
                if (objectOutputStream != null) {
                    objectOutputStream.close();
                }
                if (objectInputStream != null) {
                    objectInputStream.close();
                }

            } catch (IOException e) {
                System.out.println("RpcServer: " + e.getMessage());
            }

        }

    }

    public void publishService(String ip, int port, String name) {

        //注册中心注册服务
        registerService(ip, port, name);

        //创建服务监听服务端口
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("----------服务端提供服务-----------");
                InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", port);

                try {
                    ServerSocket socket = new ServerSocket();
                    socket.bind(socketAddress);

                    while (true) {
                        serviceExecutor.execute(new ServiceTask(socket.accept()));
                    }

                } catch (IOException e) {
                    System.out.println(e.getMessage());
                }
            }
        }).start();

    }

    //执行service 并将结果返回
    private class ServiceTask implements Runnable {
        private Socket socket;

        ServiceTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            System.out.println("-----------本地执行服务,并返回结果------------");
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                String serviceName = (String) objectInputStream.readObject();
                String methodName = (String) objectInputStream.readObject();
                Class<?>[] paramType = (Class<?>[]) objectInputStream.readObject();
                Object[] args = (Object[]) objectInputStream.readObject();
                System.out.println(serviceName + " " + methodName + " " + paramType);
                Class service = Class.forName(serviceName);
                Object obj = service.newInstance();
                Method method = service.getDeclaredMethod(methodName, paramType);
                Object resObject = method.invoke(obj, args);
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                System.out.println("执行结果:" + resObject);
                objectOutputStream.writeObject(resObject);

            } catch (Exception e) {
                System.out.println("ServiceTask:" + e.getMessage());
            } finally {
                try {
                    if (socket != null) {
                        socket.close();
                    }
                } catch (IOException e) {
                    System.out.println("ServiceTask:" + e.getMessage());
                }
            }

        }
    }
}

客户端

package client;

import services.bean.ServiceInfo;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;

public class RpcClient<T> {

    private static final String REGISTER_IP = "127.0.0.1";
    private static final int REGISTER_PORT = 4081;
    private static final SocketAddress REGISTER_SOCKETADDRESS = new InetSocketAddress(REGISTER_IP, REGISTER_PORT);
    private static final Map<String, ServiceInfo> serviceMaps = new HashMap<>();

    //从注册中心拉去服务信息并缓存
    private void pullService(String serviceName) {

        System.out.println("----------从注册中心拉取服务信息----------");
        Socket socket = new Socket();
        ServiceInfo obj = null;
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            socket.connect(REGISTER_SOCKETADDRESS);
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(serviceName);
            Thread.sleep(10);
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            obj = (ServiceInfo) objectInputStream.readObject();
            if (obj == null) {
                return;
            }
            serviceMaps.put(serviceName, obj);

            System.out.println("----------从注册中心拉取服务信息完成----------");
        } catch (Exception e) {
            System.out.println("RpcClient:" + e.getMessage());
        } finally {

            try {
                if (socket != null) {
                    socket.close();
                    socket = null;
                }
                if (objectInputStream != null) {
                    objectInputStream.close();
                }
            } catch (Exception e) {
                System.out.println("RpcClient:" + e.getMessage());
            }


        }


    }

    public Object importer(final Class<?> serviceClass, String service) {

        return Proxy.newProxyInstance(serviceClass.getClassLoader(), serviceClass.getInterfaces(), new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                Socket socket = null;
                ObjectInputStream objectInputStream = null;
                ObjectOutputStream objectOutputStream = null;

                try {

                    if (!serviceMaps.containsKey(service)) {
                        pullService(service);
                    }

                    if (!serviceMaps.containsKey(service)) {
                        return null;
                    }
                    ServiceInfo serviceInfo = (ServiceInfo) serviceMaps.get(service);
                    socket = new Socket();
                    socket.connect(new InetSocketAddress(serviceInfo.getIp(), serviceInfo.getPort()));

                    if (!socket.isConnected()) {
                        //没有连接  则返回null
                        return null;
                    }

                    objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                    objectOutputStream.writeObject(serviceClass.getName());
                    objectOutputStream.writeObject(method.getName());
                    objectOutputStream.writeObject(method.getParameterTypes());
                    objectOutputStream.writeObject(args);

                    objectOutputStream.flush();
                    Thread.sleep(10);
                    objectInputStream = new ObjectInputStream(socket.getInputStream());
                    return objectInputStream.readObject();
                } catch (Exception e) {
                    System.out.println("RpcClient:"+e.getMessage());

                } finally {
                    if (socket != null) {
                        socket.close();
                    }
                    if (objectInputStream != null) {
                        objectInputStream.close();
                    }
                    if (objectInputStream != null) {
                        objectInputStream.close();
                    }

                }
                return null;
            }
        });
    }


}

服务


public interface HelloWorldService {

    String say();
}


public class HelloWorldServiceImpl implements HelloWorldService {

    @Override
    public String say() {
        return "Hello World";
    }
}

测试用例

package test;

import client.RpcClient;
import register.RegisterCenter;
import server.RpcServer;
import services.impl.HelloWorldServiceImpl;
import services.service.HelloWorldService;

public class HelloWorldTest {

    public static void main(String[] args) throws InterruptedException {

        //启动注册中心
        RegisterCenter registerCenter = new RegisterCenter();
        registerCenter.init();
        Thread.sleep(1000);

        //启动服务端,并发布服务
        RpcServer rpcServer = new RpcServer();
        rpcServer.publishService("127.0.0.1",5000,"helloworld.service");

        Thread.sleep(1000);
        RpcClient<HelloWorldService> rpcClient = new RpcClient<>();
        HelloWorldService helloWorldService = (HelloWorldService) rpcClient.importer(HelloWorldServiceImpl.class,"helloworld.service");
        String result =   helloWorldService.say();
        System.out.println(result);

        Thread.sleep(1000);
        System.out.println("---------Test Finished-----------");
    }
}

注:

源代码见GitHub
CSDN博客