手寫rpc實現詳解

一、手寫rpc框架

RPC(Remote Procedure Call,遠程過程調用),是一種用於客戶端和服務器端進行通信的協議。通過RPC可以實現跨語言、跨平台、不同機器之間的服務調用,讓遠程的服務器和本地的客戶端就像本地調用一樣方便地進行通信。手寫rpc框架的實現,可以讓我們更加深入地了解RPC背後的原理,同時也能夠提高我們的編程能力。

下面,我們將從手寫rpc框架的搭建和實現兩個方面,展開對手寫rpc的詳細闡述。

二、手寫rpc框架搭建

1、搭建服務端

在搭建服務端時,我們需要考慮如何處理請求、如何暴露服務、如何啟動服務等問題。

private Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

public void start() throws IOException {
    ServerSocket serverSocket = new ServerSocket(port);
    while (true) {
        Socket socket = serverSocket.accept();
        executor.execute(new RequestHandlerRunnable(socket, service));
    }
}

private static class RequestHandlerRunnable implements Runnable {
    private final Socket socket;
    private final Object service;

    private RequestHandlerRunnable(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        try {
            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
            try {
                String methodName = input.readUTF();
                Class[] parameterTypes = (Class[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();

                Method method = service.getClass().getMethod(methodName, parameterTypes);
                Object result = method.invoke(service, arguments);

                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
            } catch (Throwable t) {
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(t);
            } finally {
                input.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、搭建客戶端

在搭建客戶端時,我們需要考慮如何處理請求、如何發送請求、如何接收返回值等問題。

public class RpcClient {
    private final String host;
    private final int port;

    public RpcClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object call(String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
        Socket socket = new Socket(host, port);
        try {
            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
            output.writeUTF(methodName);
            output.writeObject(parameterTypes);
            output.writeObject(arguments);

            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
            Object result = input.readObject();

            if (result instanceof Throwable) {
                throw (Throwable) result;
            }
            return result;
        } finally {
            socket.close();
        }
    }
}

三、手寫rpc框架實現

1、服務註冊與發現

服務註冊和發現是指將提供的服務註冊到註冊中心,並通過查詢註冊中心獲取已註冊服務的IP和端口,從而實現客戶端的遠程調用。

public interface ServiceRegistry {
    /**
     * Register a service to the registry with its host and port
     *
     * @param serviceName  the name of service to register
     * @param serviceHost  the host of the service provider
     * @param servicePort  the port of the service provider
     */
    void register(String serviceName, String serviceHost, int servicePort) throws Exception;

    /**
     * Lookup for available service providers of the {@code serviceName}
     *
     * @param serviceName the name of service that given the service providers provides
     * @return a list of all service providers' {@link ServiceEndPoint} of the given service name
     */
    List lookup(String serviceName) throws Exception;
}

2、動態代理和序列化

客戶端利用動態代理,將需要遠程調用的接口動態生成代碼,實現遠程接口的本地代理。在本地代理方法中,需要將接口、方法、參數等信息序列化並發送到服務端,同時需要接收服務端返回的數據,並進行反序列化,將結果返回給調用方。

public class RpcProxy implements InvocationHandler, Serializable {
    private static final long serialVersionUID = 9130440123381758788L;
    private final Class interfaceClazz;
    private final String serviceName;

    public RpcProxy(Class interfaceClazz, String serviceName) {
        this.interfaceClazz = interfaceClazz;
        this.serviceName = serviceName;
    }

    @SuppressWarnings("unchecked")
    public T getProxy() {
        return (T) Proxy.newProxyInstance(interfaceClazz.getClassLoader(), new Class[] {interfaceClazz}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl();
        InetSocketAddress serviceAddress = serviceDiscovery.lookup(serviceName);

        try {
            RpcClient rpcClient = new RpcClient(serviceAddress.getAddress().getHostAddress(), serviceAddress.getPort());
            RpcRequest request = new RpcRequest();
            request.setServiceName(serviceName);
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setArguments(args);

            return rpcClient.call(request);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

public class RpcDecoder extends ByteToMessageDecoder {
    private Class genericClass;

    public RpcDecoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }

        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }
}

public class RpcEncoder extends MessageToByteEncoder {
    private Class genericClass;

    public RpcEncoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

以上是手寫rpc框架的主要實現內容,注釋較為詳細,可供參考。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/160603.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-11-21 01:17
下一篇 2024-11-21 01:17

相關推薦

  • 使用RPC研發雲實現分布式服務交互

    本文將基於RPC研發雲,闡述分布式服務交互實現的過程和實現方式。 一、RPC研發雲簡介 RPC研發雲是一種基於分布式架構的服務框架,在處理不同語言之間的通信上變得越來越流行。通過使…

    編程 2025-04-28
  • 神經網絡代碼詳解

    神經網絡作為一種人工智能技術,被廣泛應用於語音識別、圖像識別、自然語言處理等領域。而神經網絡的模型編寫,離不開代碼。本文將從多個方面詳細闡述神經網絡模型編寫的代碼技術。 一、神經網…

    編程 2025-04-25
  • Linux sync詳解

    一、sync概述 sync是Linux中一個非常重要的命令,它可以將文件系統緩存中的內容,強制寫入磁盤中。在執行sync之前,所有的文件系統更新將不會立即寫入磁盤,而是先緩存在內存…

    編程 2025-04-25
  • Linux修改文件名命令詳解

    在Linux系統中,修改文件名是一個很常見的操作。Linux提供了多種方式來修改文件名,這篇文章將介紹Linux修改文件名的詳細操作。 一、mv命令 mv命令是Linux下的常用命…

    編程 2025-04-25
  • 詳解eclipse設置

    一、安裝與基礎設置 1、下載eclipse並進行安裝。 2、打開eclipse,選擇對應的工作空間路徑。 File -> Switch Workspace -> [選擇…

    編程 2025-04-25
  • nginx與apache應用開發詳解

    一、概述 nginx和apache都是常見的web服務器。nginx是一個高性能的反向代理web服務器,將負載均衡和緩存集成在了一起,可以動靜分離。apache是一個可擴展的web…

    編程 2025-04-25
  • git config user.name的詳解

    一、為什麼要使用git config user.name? git是一個非常流行的分布式版本控制系統,很多程序員都會用到它。在使用git commit提交代碼時,需要記錄commi…

    編程 2025-04-25
  • Python輸入輸出詳解

    一、文件讀寫 Python中文件的讀寫操作是必不可少的基本技能之一。讀寫文件分別使用open()函數中的’r’和’w’參數,讀取文件…

    編程 2025-04-25
  • Python安裝OS庫詳解

    一、OS簡介 OS庫是Python標準庫的一部分,它提供了跨平台的操作系統功能,使得Python可以進行文件操作、進程管理、環境變量讀取等系統級操作。 OS庫中包含了大量的文件和目…

    編程 2025-04-25
  • Java BigDecimal 精度詳解

    一、基礎概念 Java BigDecimal 是一個用於高精度計算的類。普通的 double 或 float 類型只能精確表示有限的數字,而對於需要高精度計算的場景,BigDeci…

    編程 2025-04-25

發表回復

登錄後才能評論