`
m635674608
  • 浏览: 4935369 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

基于TCP的RPC简单实现

    博客分类:
  • java
 
阅读更多

所谓RPC就是远程方法调用(Remote  Process Call ),简单的来说就是通过MQ,TCP,HTTP或者自己写的网络协议来传输我要调用对方的什么接口,对方处理之后再把结果返回给我.就这么简单的一个过程.在一个大型的项目之后基本上各模块都是分开的,以提供服务的方式进行相互调用.如果能够提供智能负载均衡,可选择的java对象编码解码协议,网络传输协议,服务监控,服务版本控制等很多功能的话就是一个SOA架构了.

 

前两天实现了一个基于java Socket 实现的阻塞的RPC.其原理非常简单

  1. 客户端用一个TransportMessage类去包装需要调用的接口,调用的方法,调用方法的参数类型,调用方法的参数值.

  2. 客户端用Socet连接服务端,序列化TransportMessage,传输给服务端.

  3. 服务端循环接收请求,一旦受到请求就起一个线程扔到线程池去执行,执行的内容就是反序列化TransportMessage类,在servicePool池中获取接口实现类,通过调用方法参数类型数组获取Method对象.然后通过method.invoke去调用方法.

  4. 服务器端序列化结果,然后通过socket传输给客户端.

  5. 客户端收到结果,反序列化结果对象.

 

具体代码实现,(为了节省篇幅,setter,getter就不放进来了):

1.远程调用信息封装   TransportMessage.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * @author Lubby
 * @date 2015年4月22日 下午1:06:18
 * 远程调用信息封装.
 * 包括    1.调用接口名称  (包名+接口名)   2.调用方法名  3.调用参数Class类型数组  4.调用接口的参数数组
 */
public class TransportMessage implements Serializable {
    //包名+接口名称  如com.lubby.rpc.service.MathService.
    private String interfaceName;
    //调用方法名   如 getSum
    private String methodName;
    //参数类型 按照接口参数顺序  getSum(int a, int b, String name)方法就是int.class int.class String.class的数组
    private Class[] paramsTypes;
    //参数 按照接口参数顺序 getSum(int a, int b, String name)方法就是 1,3,"Tom"的数组
    private Object[] parameters;
 
    public TransportMessage() {
        super();
        // TODO Auto-generated constructor stub
    }
 
    public TransportMessage(String interfaceName, String methodName,
            Class[] paramsTypes, Object[] parameters) {
        super();
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.paramsTypes = paramsTypes;
        this.parameters = parameters;
    }
 
}

2.客户端调用远程方法类 RPCClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class RPCClient {
    // 服务端地址
    private String serverAddress;
    // 服务端端口
    private int serverPort;
    // 线程池大小
    private int threadPoolSize = 10;
    // 线程池
    private ExecutorService executorService = null;
 
    public RPCClient() {
        super();
        // TODO Auto-generated constructor stub
    }
 
    /**
     * @param serverAddress
     *            TPC服务地址
     * @param serverPort
     *            TPC服务端口
     
     */
    public RPCClient(String serverAddress, int serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
        executorService = Executors.newFixedThreadPool(threadPoolSize);
    }
 
    /**
     * 同步的请求和接收结果
     
     * @param transportMessage
     * @return
     */
    public Object sendAndReceive(TransportMessage transportMessage) {
        Object result = null;
        Socket socket = null;
        try {
             socket = new Socket(serverAddress, serverPort);
              
             //反序列化 TransportMessage对象
            ObjectOutputStream objectOutpusStream = new ObjectOutputStream(
                    socket.getOutputStream());
            objectOutpusStream.writeObject(transportMessage);
 
            ObjectInputStream objectInputStream = new ObjectInputStream(
                    socket.getInputStream());
            //阻塞等待读取结果并反序列化结果对象
            result = objectInputStream.readObject();
            socket.close();
        catch (UnknownHostException e) {
            e.printStackTrace();
        catch (IOException e) {
            e.printStackTrace();
        catch (ClassNotFoundException e) {
            e.printStackTrace();
        }finally{
            try {
                //最后关闭socket
                socket.close();
            catch (IOException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
}

3.服务器处理类 RPCServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public class RPCServer {
    private int threadSize = 10;
    private ExecutorService threadPool;
    private Map<String, Object> servicePool;
    private int port = 4321;
 
    public RPCServer() {
        super();
        synchronized (this) {
            threadPool = Executors.newFixedThreadPool(this.threadSize);
        }
    }
 
    /**
     
     * @param threadSize
     *            内部处理线程池大小
     * @param port
     *            当前TPC服务的端口号
     
     */
 
    public RPCServer(int threadSize, int port) {
        this.threadSize = threadSize;
        this.port = port;
        synchronized (this) {
            threadPool = Executors.newFixedThreadPool(this.threadSize);
        }
    }
 
    /**
     
     
     * @param servicePool
     *            装有service对象的Map, Key为全限定接口名,Value为接口实现类对象
     * @param threadSize
     *            内部处理线程池大小
     * @param port
     *            当前TPC服务的端口号
     
     */
    public RPCServer(Map<String, Object> servicePool, int threadSize, int port) {
        this.threadSize = threadSize;
        this.servicePool = servicePool;
        this.port = port;
        synchronized (this) {
            threadPool = Executors.newFixedThreadPool(this.threadSize);
        }
    }
 
    /**
     * RPC服务端处理函数 监听指定TPC端口,每次有请求过来的时候调用服务,放入线程池中处理.
     
     * @throws IOException
     */
    public void service() throws IOException {
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            Socket receiveSocket = serverSocket.accept();
            final Socket socket = receiveSocket;
            threadPool.execute(new Runnable() {
 
                public void run() {
                    try {
                        process(socket);
 
                    catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    catch (NoSuchMethodException e) {
                        e.printStackTrace();
                    catch (SecurityException e) {
                        e.printStackTrace();
                    catch (IllegalAccessException e) {
                        e.printStackTrace();
                    catch (IllegalArgumentException e) {
                        e.printStackTrace();
                    catch (InvocationTargetException e) {
                        e.printStackTrace();
                    catch (InstantiationException e) {
                        e.printStackTrace();
                    catch (IOException e) {
                        e.printStackTrace();
                    finally {
                        try {
                            socket.close();
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
 
                }
            });
        }
 
    }
 
    /**
     * 调用服务 通过TCP Socket返回结果对象
     
     * @param receiveSocket
     *            请求Socket
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws NoSuchMethodException
     * @throws SecurityException
     * @throws IllegalAccessException
     * @throws IllegalArgumentException
     * @throws InvocationTargetException
     * @throws InstantiationException
     */
    private void process(Socket receiveSocket) throws IOException,
            ClassNotFoundException, NoSuchMethodException, SecurityException,
            IllegalAccessException, IllegalArgumentException,
            InvocationTargetException, InstantiationException {
 
        /*
         * try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO
         * Auto-generated catch block e.printStackTrace(); }
         */
        ObjectInputStream objectinputStream = new ObjectInputStream(
                receiveSocket.getInputStream());
        TransportMessage message = (TransportMessage) objectinputStream
                .readObject();
 
        // 调用服务
        Object result = call(message);
 
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(
                receiveSocket.getOutputStream());
        objectOutputStream.writeObject(result);
        objectinputStream.close();
        objectOutputStream.close();
    }
 
    /**
     * 服务处理函数 通过包名+接口名在servicePool中找到对应服务 通过调用方法参数类型数组获取Method对象
     * 通过Method.invoke(对象,参数)调用对应服务
     
     * @return
     * @throws ClassNotFoundException
     * @throws SecurityException
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws IllegalArgumentException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private Object call(TransportMessage message)
            throws ClassNotFoundException, NoSuchMethodException,
            SecurityException, IllegalAccessException,
            IllegalArgumentException, InvocationTargetException,
            InstantiationException {
        if (servicePool == null) {
            synchronized (this) {
                servicePool = new HashMap<String, Object>();
            }
        }
        String interfaceName = message.getInterfaceName();
        Object service = servicePool.get(interfaceName);
        Class<?> serviceClass = Class.forName(interfaceName);
        // 检查servicePool中对象,若没有着生产对象
        if (service == null) {
            synchronized (this) {
                service = serviceClass.newInstance();
                servicePool.put(interfaceName, service);
            }
        }
        Method method = serviceClass.getMethod(message.getMethodName(),
                message.getParamsTypes());
        Object result = method.invoke(service, message.getParameters());
        return result;
 
    }
}

4.为了方便测试写了个接口和其实现类 MathService 和 MathServiceImpl

1
2
3
public interface MathService {
    public int getSum(int a, int b, String name);
}
1
2
3
4
5
6
7
public class MathServiceImpl implements MathService {
    public int getSum(int a, int b, String name) {
        System.out.println(name);
        return a + b;
    }
 
}

5.服务器端测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ServerTest {
     
    public static void main(String[] args){
        Map<String,Object> servicePool = new  HashMap<String, Object>();
        servicePool.put("com.lubby.rpc.service.MathService"new MathServiceImpl());
        RPCServer server = new RPCServer(servicePool,44321);
        try {
            server.service();
        catch (IOException e) {
            e.printStackTrace();
        }
         
    }
 
}

6.客户端测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ClientTest {
    public static void main(String[] args) {
        String serverAddress = "127.0.0.1";
        int serverPort = 4321;
         
        final RPCClient client = new RPCClient(serverAddress, serverPort);
        final TransportMessage transportMessage = buildTransportMessage();
         
        for (int i = 0; i < 1000; i++) {
            final int waitTime = i * 10;
            new Thread(new Runnable() {
                public void run() {
                    Object result = client.sendAndReceive(transportMessage);
                    System.out.println(result);
                }
            }).start();
        }
    }
 
    private static TransportMessage buildTransportMessage() {
 
        String interfaceName = "com.lubby.rpc.service.MathService";
        Class[] paramsTypes = { int.classint.class, String.class };
        Object[] parameters = { 13"Lubby" };
        String methodName = "getSum";
 
        TransportMessage transportMessage = new TransportMessage(interfaceName,
                methodName, paramsTypes, parameters);
 
        return transportMessage;
    }
 
}

 

7.并发问题

由于ServerSocket是阻塞的,所以在ServerSocket.accept()方法同一时刻只能有一个线程进入,虽然之后的处理都另起一个线程,但是有瓶颈的,

我在用400个线程并发连接服务端的时候基本没问题,但是500个线程并发连接服务端的时候就会有部分线程连接不到服务器端.后面看下NIO回头用NIO来改一下.

 

v

http://my.oschina.net/u/2250599/blog/406474

分享到:
评论

相关推荐

    基于TCP协议的WCF调用简单程序

    基于TCP协议的WCF调用简单程序,里面包含服务构建、自定义宿主托管程序,和客户端调用三个项目,WCF以TCP协议通信。

    简单手写tcp-rpc.zip

    # 手撕RPC实现基于TCP的仿Dubbo实现 &gt;还记得第一次听到这词是在别人的面试视频里,简单了解了一下只知道是远程调用。 &gt;万万没想到我的第一次面试的第一个问题就是与此相关,希望认真准备每一次面试,及时查漏补缺...

    手撕RPC实现基于TCP的仿Dubbo简单实现

    文章目录手撕RPC实现基于TCP的仿Dubbo实现方法调用效果实现分模块写接口通过代理写好了客户端写服务端,并联调rpc代码实现myRpcrpc-clientrpc-interfacerpc-server源码 手撕RPC实现基于TCP的仿Dubbo实现 还记得第一...

    基于TCP协议的二进制RPC通信协议的Java实现源码+项目说明.zip

    基于TCP协议的二进制RPC通信协议的Java实现源码+项目说明.zip 一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式,支持完全基于POJO的发布方式,极大的简化了开发复杂性 - 完全...

    Jprotobuf-rpc-socket:Protobuf RPC是一种基于TCP协议的二进制RPC通信协议的Java实现

    Protobuf RPC是一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式,支持完全基于POJO的发布方式,极大的简化了开发复杂性。 Features: 完全支持POJO方式发布,使用非常简单 内置...

    tocol:提供TCP复用的RPC调用

    TCP连接使用策略是基于TCP连接最近使用时间来判断的 关于Protocol 目前实现的协议有java自带的二进制协议和hessian协议 协议是可扩展的 关于传输协议 该框架使用自定义协议,头四个字节表示数总长度,第五个字节表示...

    smite:基于ZMQ的简单的类RPC消息库

    基于 ZMQ 的简单的类似 RPC 的消息传递库。 有关更多示例,请查看包含集成测试的 tests.py 文件。 基本示例 import smite host = '127.0.0.1' port = 3000 def echo ( text ): return text servant = smite . ...

    NettyRpc:一个基于Netty,ZooKeeper和Spring的简单RPC框架

    基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 ZooKeeper的服务注册表/发现支持 高可用性,负载平衡和故障转移 支持不同的负载均衡策略 支持异步/同步调用 支持不同版本的服务 支持...

    TCP-IP技术大全

    IP和相关协议 第9章 IP协议家族 77 9.1 TCP/IP模型 77 9.1.1 解剖TCP/IP模型 78 9.1.2 协议组件 78 9.2 理解网际协议(IP) 79 9.2.1 IPv4结构 79 9.2.2 IP做什么 80 9.3 理解传输控制协议...

    EasyRpc:EasyRpc是一个基于Netty,ZooKeeper和ProtoStuff的简单,高性能,易于使用的RPC框架。

    EasyRpc EasyRpc是基于Netty,ZooKeeper和ProtoStuff开发的一个简单易用,便于学习的RPC框架。 1特性简单易用;注释完善,方便学习;低延迟,基于Netty 4;解决TCP粘包/拆包问题;支持非双向的同步/异步调用;基于...

    TCP/IP教程TCP/IP基础

    目 录 译者序 前言 第一部分 TCP/IP基础 第1章 开放式通信模型简介 1 1.1 开放式网络的发展 1 1.1.1 通信处理层次化 2 1.1.2 OSI参考模型 3 1.1.3 模型的使用 5 1.2 TCP/IP参考模型 7 ...23.5 简单TCP/IP服务 250 ...

    TCP/IP技术大全

    23.5 简单TCP/IP服务 250 23.6 远程访问服务(RAS) 250 23.7 DHCP服务器 252 23.7.1 安装DHCP服务器服务 252 23.7.2 控制DHCP服务器服务 253 23.7.3 压缩DHCP数据库 253 23.7.4 管理DHCP 254 23.8 使用Microsoft DNS ...

    IO_deep_learning_notes.zip

    203 全手写基于Netty的RPC框架 provider端简单dispatcher实现RPC调用全流程 地址 207 全手写基于Netty的RPC框架 简单重构框架分层及RPC传输的本质及有无状态的RPC区别 地址 216 自定义HTTP协议解析和HTTPserver...

    javasocketserver源码-protobuf-socket-rpc:使用tcp/ip套接字的Java和Pythonprotobufr

    实现,适用于想要简单实现其 protobuf rpc 服务的人。 看: 下载/安装 Java 你可以在downloads/protobuf-socket-rpc-2.0.jar找到用Java 1.6编译的jar,或者你也可以下载源码直接使用。 您还需要在类路径中使用 ...

    JSONRPC:JSON-RPC Swift程序包

    JSONRPC是一个小的Swift软件包,用于轻松实现基于TCP和Unix域套接字的JSON-RPC客户端和服务器。 我正在实现它供我自己使用,但同时也将其置于公共领域供其他人使用。 这是一项正在进行的工作,目前有一些限制(我...

    netty-stroll:RPC基础通信框架

    轻量RPC通信 一个完备的RPC框架在实现远程调用的基础上...基于Netty实现简易独立的HttpServer,作为接收的网关层;同时发布基础包提供作为RPC Client端的基础向下游发起调用 /* * 通过注解和继承 指定path和输入输出类

    Hprose 全名是高性能远程对象服务引擎.rar

    简单的说,RPC 就是从一台机器(客户端)上通过参数传递的方式调用另一台机器(服务器)上的一个函数或方法(可以统称为服务)并得到返回的结果。 RPC 会隐藏底层的通讯细节(不需要直接处理 Socket 通讯或 Http ...

    TCP/IP技术大全(中文PDF非扫描版)

    第六部分 实现TCP/IP 第20章 一般配置问题 211 20.1 安装网卡 211 20.1.1 网卡 211 20.1.2 资源配置 212 20.1.3 安装适配器软件 213 20.1.4 重定向器和API 214 20.1.5 服务 214 20.1.6 NIC接口 215 20.2 网络和传输...

    介绍 Golang 简单服务框架,提供如下功能 高性能TCP网络通信服务端&客户端.rar

    rpcx是一个类似阿里巴巴 Dubbo 和微博 Motan 的分布式的RPC服务框架,基于Golang net/rpc实现。 谈起分布式的RPC框架,比较出名的是阿里巴巴的dubbo,包括由当当网维护的dubbox。 不知道dubbo在阿里的内部竞争中败给...

Global site tag (gtag.js) - Google Analytics