什么是RPC

1什么是RPC

RPC(Remote Procedure Call)远程过程调用,是一种通过网络从远程计算机请求服务,二部需要了解底层网络技术的方式。

为什么要用RPC?

随着业务的发展,一台服务器不能支撑全部的业务程序运行。需要在不同的机器上部署不同的服务A和服务B。服务A和服务B的通信就不能像在一台机器或一个应用内直接调用对应的方法来获取结果了。整个通信的过程需要通过网络实现。这个通信的过程就是RPC。

RPC演进过程

版本1

调用固定的方法,固定参数。参数和结果都没有封装为对象。
Server:服务A生产者,等待被调用。

//通过Socket通信,在两台服务器之间发送二进制字节流。
public class Server
{
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
    	//创建一个服务端
        ServerSocket serverSocket = new ServerSocket(8088);
        while (running){
        	//阻塞等待请求到来
            Socket client = serverSocket.accept();
            //处理请求信息
            process(client);
            client.close();
        }
        serverSocket.close();
    }

    private static void process(Socket client) throws IOException {
        //读数据
        DataInputStream dis = new DataInputStream(client.getInputStream());
        //写数据
        DataOutputStream dos = new DataOutputStream(client.getOutputStream());
        int id = dis.readInt();
        IUserService userService = new IUserServiceImpl();
        User user =userService.findById(id);
        dos.writeInt(id);
        dos.writeUTF(user.getName());
        dos.flush();
    }
}

Client: 服务B,消费者。发起通信

public class Client
{
    public static void main(String[] args) throws Exception {
        Socket socket= new Socket("127.0.0.1",8088);
        //生成字节数组的id
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        dos.writeInt(123);
        //通过socket发送请求
        socket.getOutputStream().write(baos.toByteArray());
        socket.getOutputStream().flush();
        //阻塞 接收返回结果
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        //接收返回结果 组装对象
        int id = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id,name);
        System.out.println(user);
        dis.close();
        dos.close();
        baos.close();
        socket.close();
    }
}

其他三个类:
User

public class User implements Serializable
{
    private static final long serialVersionUID = 6932937264279505096L;
    private int id;
    private String name;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public User() {
    }

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                '}';
    }
}

IUserService:

public interface IUserService
{
    User findById(int id);
}

IUserServiceImpl:

public class IUserServiceImpl implements IUserService
{
    @Override
    public User findById(int id) {
        return new User(id,"张三");
    }
}

版本2

抽象客户端发送请求过程,将创建Socket=》发送请求=》接收结果的过程封装。调用者可以直接发起请求,而不关心Socket的通信过程。server端不需要修改。
新增类:Stub

public class Stub
{
    public  User findById(int id) throws Exception {
        Socket socket= new Socket("127.0.0.1",8088);
        //生成字节数组的id
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        dos.writeInt(id);
        //通过socket发送请求
        socket.getOutputStream().write(baos.toByteArray());
        socket.getOutputStream().flush();
        //阻塞 接收返回结果
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        //接收返回结果 组装对象
        int aa = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id,name);
//        System.out.println(user);
        dis.close();
        dos.close();
        baos.close();
        socket.close();
        return user;
    }
}

客户端修改:

public class Client
{
    public static void main(String[] args) throws Exception {
        Stub stub=new Stub();
        System.out.println(stub.findById(123));
    }
}

版本3

进一步封装客户端,从Stub类获取IUserService类的代理对象。通过静态代理增强findById方法,通过Socket远程通过信获取查询结果。
Stub类修改:

public class Stub
{
    public static IUserService getStub(){
        InvocationHandler h =new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = new Socket("127.0.0.1",8088);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(baos);
                //写入数据
                dos.writeInt(12);
                //发送带查询的id
                socket.getOutputStream().write(baos.toByteArray());
                socket.getOutputStream().flush();
                //阻塞等待返回
                DataInputStream dis = new DataInputStream(socket.getInputStream());
                int id = dis.readInt();
                String name = dis.readUTF();
                User user = new User(id,name);
                dis.close();
                dos.close();
                baos.close();
                socket.close();
                return user;
            }
        };
        Object object = Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        return (IUserService)object;
    }
}

版本4

进一步封装Stub类,修改静态代理为动态代理。不再局限于findById一个方法。将方法名称传到远程服务器,通过反射执行方法。
Stub类修改:

public class Stub
{
    public static IUserService getStub(){
        InvocationHandler h = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = new Socket("127.0.0.1",8088);
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                //输出请求参数
                oos.writeUTF(method.getName());
                oos.writeObject(method.getParameterTypes());
                oos.writeObject(args);
                //接收返回结果
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                User user = (User)ois.readObject();
                return user;
            }
        };
        Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        return (IUserService)o;
    }
}

Server修改:

public class Server
{
    private static boolean running = true;

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket= new ServerSocket(8088);
        while (running){
            Socket accept = serverSocket.accept();
            process(accept);
            accept.close();
        }
        serverSocket.close();
    }

    /**
     * 处理接收到的参数
     * @param socket
     * @throws Exception
     */
    private static void process(Socket socket) throws Exception
    {
        ObjectInputStream ois =new ObjectInputStream(socket.getInputStream());
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

        String methodName = ois.readUTF();
        Class[] parameterTypes = (Class[])ois.readObject();
        Object[] parameters = (Object[]) ois.readObject();
        // 暂时写死类 下个版本灵活取用
        IUserService iUserService = new IUserServiceImpl();
        //获取方法
        Method method = iUserService.getClass().getMethod(methodName,parameterTypes);
        User user = (User)method.invoke(iUserService,parameters);
        oos.writeObject(user);
    }
}

Client修改:

public class Client
{
    public static void main(String[] args) {
        IUserService iUserService = Stub.getStub();
        User user = iUserService.findById(12);
        System.out.println(user);
    }
}

版本5

进一步封装客户端,不再局限于单个类。通过入参的不同可以调用任意支持远程调用的类的方法。
Stub修改:

public class Stub
{
    static Object getStub(Class c)  {
        InvocationHandler h =new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = new Socket("127.0.0.1",8088);
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                oos.writeUTF(c.getName());
                oos.writeUTF(method.getName());
                oos.writeObject(method.getParameterTypes());
                oos.writeObject(args);
                //接收返回结果
                return  ois.readObject();
            }
        };
        Object o = Proxy.newProxyInstance(c.getClassLoader(),new Class[]{c},h);
        return o;
    }
}

Server修改:

public class Server
{
    private static boolean running = true;
    //类注册集合
    static Map<String,Class> map = new HashMap();
    static {
        map.put(IUserService.class.getName(), IUserServiceImpl.class);
        map.put(ICatService.class.getName(), ICatServiceImpl.class);
    }
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8088);
        while (running){
            Socket socket = serverSocket.accept();
            process(socket);
            socket.close();
        }
        serverSocket.close();
    }

    private static void process(Socket socket) throws Exception {
        ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

        String className = ois.readUTF();
        String methodName = ois.readUTF();
        Class[] parameterTypes = (Class[])ois.readObject();
        Object[] parameters = (Object[])ois.readObject();

        Class service = map.get(className);
        Method method = service.getMethod(methodName,parameterTypes);
        Object o = method.invoke(service.newInstance(), parameters);
        oos.writeObject(o);
        oos.flush();
        oos.close();
        ois.close();
    }
}

Client修改:

public class Client
{
    public static void main(String[] args) {
        IUserService iUserService = (IUserService)Stub.getStub(IUserService.class);
        System.out.println(iUserService.findById(222));
        ICatService iCatService = (ICatService)Stub.getStub(ICatService.class);
        System.out.println(iCatService.getById(456));
    }
}

Stub修改:

public class Stub
{
    static Object getStub(Class c)  {
        InvocationHandler h =new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = new Socket("127.0.0.1",8088);
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                oos.writeUTF(c.getName());
                oos.writeUTF(method.getName());
                oos.writeObject(method.getParameterTypes());
                oos.writeObject(args);
                //接收返回结果
                return  ois.readObject();
            }
        };
        Object o = Proxy.newProxyInstance(c.getClassLoader(),new Class[]{c},h);
        return o;
    }
}

新增三个类:
Cat:

public class Cat implements Serializable
{
    private static final long serialVersionUID = 5158052846575741589L;
    private Integer in;

    private String name;

    public Integer getIn() {
        return in;
    }

    public void setIn(Integer in) {
        this.in = in;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Cat() {
    }

    public Cat(Integer in, String name) {
        this.in = in;
        this.name = name;
    }

    @Override
    public String toString() {
        return "Cat{" +
                "in=" + in +
                ", name='" + name + '\'' +
                '}';
    }
}

ICatService

public interface ICatService
{
    Cat getById(Integer id);
}

ICatServiceImpl

public class ICatServiceImpl implements ICatService {
    @Override
    public Cat getById(Integer id) {
        return new Cat(id,"tom");
    }
}

马士兵老师RPC公开课笔记

上一篇:RPC模式


下一篇:Windows RPC 初体验