基于Netty自定义RPC

基于Netty自定义RPC

RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主

1.是基于HTTP的restful形式的广义远程调用,以spring could的feign和restTemplate为代表,采用的协议是HTTP的7层 调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。

2.是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网络协议,NIO来异步传输, 序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。

接下来我们主要以第二种的RPC远程调用来自己实现

需求:

​ 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty

步骤

创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据

1.公共模块

包目录结构如下:

image.png

首先,在公共模块中添加netty的maven依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.16.Final</version>
</dependency>

提供者(服务端)及消费者(客户端)工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值

接口的定义

/**
 * @author 振帅
 * @create 2021/1/13 2:10
 * @description: IUserService 
 * 一个普通的接口,参数是支持序列化的String类型,返回值同理
 */
public interface IUserService {
    public String sayHello(String msg);
}

2.提供者的实现(服务端)

包目录结构如下:

ServerBoot :启动类,启动服务

UserServiceHandler:自定义的业务处理器

UserServiceImpl:公共模块接口的实现

pom.xml文件需要引入公共模块

<dependencies>
    <dependency>
        <groupId>com.lagou</groupId>
        <artifactId>rpc_common</artifactId>
        <version>1.0-SNAPSHOT</version>    
    </dependency>
</dependencies>

2.1接口的实现

/**
 * @author 振帅
 * @create 2021/1/13 2:12
 * @description: UserServiceImpl
 */
public class UserServiceImpl implements IUserService {

    //将来客户端要远程调用的方法
    public String sayHello(String msg) {
        System.out.println("==>" + msg);
        return "服务器返回数据 :" + msg;
    }

    //创建一个方法启动服务器
    public static void startServer(String ip, int port) throws InterruptedException {

        //1.创建两个线程池对象
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        //2.创建服务端的启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //3.配置启动引导对象
        serverBootstrap.group(bossGroup,workGroup)
                //设置通道为NIO
                .channel(NioServerSocketChannel.class)
                //创建监听channel
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //获取管道对象
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        //给管道对象pipLine 设置编码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //把我们自定义ChannelHandler添加到通道中
                        pipeline.addLast(new UserServiceHandler());
                    }
                });

        //4.绑定端口
        serverBootstrap.bind(port).sync();
    }
}
netty服务端启动步骤
创建两个线程池对象(NioEventLoopGroup)

1)bossGroup:负责接收用户连接,监听客户端请求

2)workGroup:负责处理用户的io读写操作,处理与客户端的数据通讯

创建启动引导类 设置启动引导类

1)两个线程池添加到组中

2)设置一个通道类型 NIO

3)绑定一个初始化监听

绑定端口 关闭通道
NioEventLoopGroup
 **一个Netty服务端启动时,通常会有两个NioEventLoopGroup:**一个是监听线程组,主要是监听客户端请求,另一个是工作线程组,主要是处理与客户端的数据通讯。 

Netty客户端只有一个NioEventLoopGroup,就是用来处理与服务端通信的线程组。

NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。

Channel

image.png

Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。 ChannelHandler,核心处理业务就在这里,用于处理业务请求。 ChannelHandlerContext,用于传输业务数据。 ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
ServerBootstrap

服务端的启动引导对象

客户端的启动引导对象为Bootstrap

ChannelHandler
来源: https://www.cnblogs.com/qdhxh...

image.png

ChannelHandler:核心处理业务

ChannelHandler下主要是两个子接口

ChannelInboundHandler(入栈): 处理输入数据和Channel状态类型改变。

​ 适配器: ChannelInboundHandlerAdapter(适配器设计模式)

​ 常用的: SimpleChannelInboundHandler

ChannelOutboundHandler(出栈): 处理输出数据

​ 适配器: ChannelOutboundHandlerAdapter

每一个Handler都一定会处理出栈或者入栈(可能两者都处理数据),例如对于入栈的Handler可能会继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,

而SimpleChannelInboundHandler又是继承于ChannelInboundHandlerAdapter,最大的区别在于SimpleChannelInboundHandler会对没有外界引用的资源进行一定的清理,

并且入栈的消息可以通过泛型来规定。

这里为什么有设配器模式呢?

我们在写自定义Handel时候,很少会直接实现上面两个接口,因为接口中有很多默认方法需要实现,所以这里就采用了设配器模式,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter就是设配器模式的产物,让它去实现上面接口,实现它所有方法。那么你自己写自定义Handel时,只要继承它,就无须重写上面接口的所有方法了。

ChannelInitializer

实现了ChannelHandler创建serverbootstrap的时候会经常用到,它用于对刚刚接收的channel进行初始化

ChannelPipeline

管道对象

ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据。它提供了一种高级的截取过滤模式(类似serverlet中的filter功能),让用户可以在ChannelPipeline中完全控制一个事件以及如何处理ChannelHandler与ChannelPipeline的交互。

​ 对于每个新的通道Channel,都会创建一个新的ChannelPipeline,并将器pipeline附加到channel中。

//在ChannelPipeline的第一个位置添加ChannelHandler
addFirst(...)   
//在ChannelPipeline的末尾添加ChannelHandler
addLast(...)   

网络只能传输字节数据,

netty发送或接收消息后,必须将消息数据从一种形式转化为另一种。

接收消息后,需要将消息从字节码转成java对象(由某种解码器解码);

发送消息前,需要将java对象转成字节(由某种类型的编码器进行编码)。

//给管道对象pipLine 设置编码解码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//把我们自定义ChannelHandler添加到通道中
pipeline.addLast(new UserServiceHandler());

2.2自定义ChannelHandler

继承ChannelInboundHandlerAdapter 重写channelRead方法,当客户端读取数据时,该方法会被调用

/**
 * @author 振帅
 * @create 2021/1/13 2:27
 * @description: UserServiceHandler 自定义的业务处理器
 */
public class UserServiceHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端读取数据时,该方法会被调用
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //注意 客户端将来发送请求的时候会传递一个参数:UserService#sayHello#are you ok
        //1.判断当前的请求是否符合规则
        if (msg.toString().startsWith("UserService")) {
            //2.如果符合规则,调用实现类获取一个result
            UserServiceImpl userService = new UserServiceImpl();
            String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            //3.把调用实现类的方法获取的结果写到客户端
            ctx.writeAndFlush(result);
        }

    }
}

ChannelHandlerContext

上下文对象 存储handler信息 写操作

我们先约定客户端传递参数的格式为:UserService#sayHello# + msg;

所以这里需要 msg.toString().substring(msg.toString().lastIndexOf("#") + 1)来获取msg的信息。

2.3服务端的启动

/**
 *启动类
 */
public class ServerBoot {
    public static void main(String[] args) throws InterruptedException {
       //启动服务器
        UserServiceImpl.startServer("127.0.0.1",8998);
    }
}

3.消费者的实现(客户端)

包目录结构如下:

ConsumerBoot :启动类,启动服务

RPCConsumer:消费者

UserClientHandler:自定义Handler

3.1RPCConsumer的实现

主要有以下几个步骤

创建一个线程池对象 -- 它要处理我们自定义事件 声明一个自定义事件处理器 UserClientHandler 编写方法,初始化客户端(创建连接池 bootStrap 设置bootStrap 连接服务器) 编写一个方法,使用jdk动态代理创建对象
/**
 * 消费者
 */
public class RPCConsumer {

    /**
     * 1.创建一个线程池对象 -- 它要处理我们自定义事件
     */
    private static ExecutorService executorService =
            //线程池线程数以当前CPU核数为准
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


    /**
     * 2.声明一个自定义事件处理器 UserClientHandler
     */
    private static UserClientHandler userClientHandler;


    /**
     * 3.编写方法,初始化客户端(创建连接池 bootStrap 设置bootStrap 连接服务器)
     */
    public static void initClient() throws InterruptedException {
        // 1).初始化UserClientHandler
        userClientHandler = new UserClientHandler();
        // 2).创建连接池对象
        NioEventLoopGroup group = new NioEventLoopGroup();
        // 3).创建客户端的引导对象
        Bootstrap bootstrap = new Bootstrap();
        // 4).配置引导对象
        bootstrap.group(group)
                //设置通道为NIO
                .channel(NioSocketChannel.class)
                //设置请求协议为TCP
                .option(ChannelOption.TCP_NODELAY,true)
                //监听channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //获取管道
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        //设置编码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //添加自定义事件处理器
                        pipeline.addLast(userClientHandler);
                    }
                });
        // 5).连接服务器
        bootstrap.connect("127.0.0.1",8998).sync();
    }

    /**
     * 4.编写一个方法,使用jdk动态代理创建对象
     */
    public static Object createProxy(Class<?> serviceClass, final String providerParam) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serviceClass},
                new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //1.初始化客户端client
                        if (userClientHandler ==null) {
                            initClient();
                        }

                        //2.给UserClientHandler 设置param参数
                        userClientHandler.setParam(providerParam + args[0]);

                        //3.使用线程池,开启一个线程处理call() 写操作 并返回结果
                        Object result = executorService.submit(userClientHandler).get();
                        //4.return 结果
                        return result;
                    }
                });
    }

}

Executors

主要用于提供线程池相关的操作,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

//创建固定数目线程的线程池。
public static ExecutorService newFiexedThreadPool(int Threads) 

Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

ExecutorService

Runtime.getRuntime().availableProcessors()

线程池线程数以当前CPU核数为准

Runtime.getRuntime().availableProcessors()方法询问jvm,jvm去问操作系统,操作系统去问硬件 。

3.2初始化客户端

和服务端的类似,不过客户端只需要创建一个连接池对象

客户端的引导对象是BootStrap,而服务端是ServerBootstrap

option主要是针对boss线程组,child主要是针对worker线程组

option / handler / attr 方法

option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;对于客户端的SocketChannel,主要是用来处理 业务操作;

attr: 设置通道的属性;

option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。

childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)

  对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。

  因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。

3.3UserClientHandler自定义事件处理器

public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    //1.定义成员变量
    private ChannelHandlerContext context;//事件处理器上下文对象(存储handler信息 写操作)
    private String result;//记录服务器返回的数据
    private String param;//记录将要返回给服务器的数据

    //2.实现channelActive 客户端和服务器连接时,该方法就自动执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //初始化ChannelHandlerContext
        this.context = ctx;
    }

    //3.实现channelRead 当我们读到服务器数据时,该方法自动执行
    //synchronized 同步
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //将读到的服务器的数据msg设置为成员变量的值
        this.result = msg.toString();
        //唤醒写操作
        notify();
    }

    //4.将客户端的数据写到服务器
    public synchronized Object call() throws Exception {
        //context给服务器写数据
        context.writeAndFlush(param);
        wait();
        return result;
    }

    //5.设置参数的方法
    public void setParam(String  param){
        this.param = param;
    }

}

3.4客户端的启动

public class ConsumerBoot {

    //定义参数
    private static final String PROVIDER_NAME = "UserService#sayHello#";

    public static void main(String[] args) throws InterruptedException {
        //1.创建代理对象
       IUserService service  = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);

       //2循环给服务器写数据
        while (true) {
            String result = service.sayHello("are you ok !!");
            System.out.println(result);
            Thread.sleep(2000);
        }
    }
}