Netty同步获取返回结果

本文主要介绍netty client发送请求后,如何阻塞获取结果的一种方法


前言

众所周知,Netty是异步的、基于事件驱动的网络应用框架,它以高性能、高并发著称。基于事件驱动,简单点说就是 Netty 会根据客户端的连接请求、读、写等事件 做出相应的响应。

但实际使用场景中避免不了需要同步获取server返回值的情况,经过小小的修改后可以实现阻塞式获取response的效果


一、netty使用方式

pom:

<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.72.Final</version>
</dependency>

Server端:

@Slf4j
public class NettyServer {

    //启动netty服务端
    public static void start(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final NettyServerHandler nettyServerHandler = new NettyServerHandler();
        final NettyServerHandler2 nettyServerHandler2 = new NettyServerHandler2();
        final NettyOutBoundHandler nettyOutBoundHandler = new NettyOutBoundHandler();
        final NettyOutBoundHandler2 nettyOutBoundHandler2 = new NettyOutBoundHandler2();
        try {
            //创建服务端的启动对象,并使用链式编程来设置参数
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置一直保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {//设置一个通道测试对象
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            //给pipeline设置通道处理器
                            ch.pipeline()
                                    .addLast(new MyMessageDecoder())
                                    .addLast(new MyMessageEncoder())
                                    //多handler,out在前倒叙,in在后正序
//                                    .addLast(nettyOutBoundHandler2)
//                                    .addLast(nettyOutBoundHandler)
                                    .addLast(nettyServerHandler);
//                                    .addLast(nettyServerHandler2);
                            log.info(ch.remoteAddress() + "已经连接上");
                        }
                    });//给 workerGroup 的EventLoop对应的管道设置处理器
            //启动服务器,并绑定端口并且同步
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

            //给 channelFuture 注册监听器,监听关心的事件,异步的时候使用
            channelFuture.addListener((future) -> {
                if (future.isSuccess()) {
                    System.out.println("监听端口" + port + "成功.......");
                } else {
                    System.out.println("监听端口" + port + "失败......");
                }
            });
            //对关闭通道进行监听,监听到通道关闭后,往下执行
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server端中断,{}",e.getMessage(),e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

serverHandler:

@Slf4j
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    public static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        AttributeKey<String> attributeKey = AttributeKey.valueOf("clientId");
        String s = ctx.channel().attr(attributeKey).get();
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        log.debug("channel active..........");
        log.debug("remote address is " + socketAddress.toString());
        log.debug("client channel id is " + ctx.channel().id());
        clientMap.put(socketAddress.toString(), ctx);
        super.channelActive(ctx);
    }

    /**
     * 触发场景
     * 1.客户端发送关闭帧
     * 2.客户端结束进程
     * 3.服务端主动调用channel.close()
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("client:{}断开连接", ctx.channel().id());
        clientMap.remove(ctx.channel().remoteAddress().toString());
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            String s = (String) msg;
            log.debug("handler 1 received " + s);
            ObjectMapper objectMapper = ApplicationContextHolder.getObjectMapper();
            RequestInfo requestInfo = objectMapper.readValue(s, RequestInfo.class);
            //确认是rpc调用才往下执行
            if (requestInfo != null && "#rpc#".equals(requestInfo.getProtocol())) {
                //反射调用实现类的方法
                String name = requestInfo.getClassName();
                Object service = InitServiceConfig.serviceMap.get(name);
                Method method = service.getClass().getDeclaredMethod(requestInfo.getMethodName(), requestInfo.getParamTypes());
                Object result = method.invoke(service, requestInfo.getParams());
                String response = objectMapper.writeValueAsString(new ResponseInfo(requestInfo.getUuid(),result));
                ctx.writeAndFlush(response);
                log.info("server端返回:" + response);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            ctx.writeAndFlush("");
        }
//        ctx.fireChannelRead("handler1 is complete!");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause.getMessage(), cause);
        ctx.close();
    }


}

RequestInfo、ResponseInfo

@Getter
@Setter
public class RequestInfo {
    private String uuid = UUID.randomUUID().toString();
    /**
     * 调用类名
     */
    private String className;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     *参数类型
     */
    private Class<?>[] paramTypes;
    /**
     *参数列表
     */
    private Object[] params;
    /**
     * 自定义rpc协议
     */
    private String protocol="#rpc#";


}

---------------------------
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ResponseInfo {
    private String requestId;
    private Object result;
}

InitServiceConfig就是一个持有一个server端所有service的工具类,实现反射调用对应service方法

client端

@Slf4j
@Configuration
public class ClientInitConfig implements CommandLineRunner {

    public static NettyClientHandler nettyClientHandler;
    public static AnotherHandler anotherHandler;

    @Async
    @Override
    public void run(String... args) {
        nettyClientHandler = new NettyClientHandler();
        anotherHandler = new AnotherHandler();
        //客户端需要一个事件循环组就可以
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            //创建客户端的启动对象 bootstrap ,不是 serverBootStrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) //设置客户端通道的实现数 (反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            AttributeKey<String> attributeKey = AttributeKey.valueOf("clientId");
                            ch.attr(attributeKey).set("client001");
                            log.debug("client channel id is " + ch.id());
                            ch.pipeline()
                                    .addLast(new MyMessageDecoder())
                                    .addLast(new MyMessageEncoder())
                                    .addLast(nettyClientHandler);//加入自己的处理器
//                                    .addLast(anotherHandler);
                            log.debug("client channel初始化.....");
                        }
                    });
            log.info("client is ready!");
            //连接服务器
            final ChannelFuture channelFuture = bootstrap.connect(ClientBootStrap.getHost(), ClientBootStrap.getPort()).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("client连接终止,{}", e.getMessage(), e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

clientHandler:

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    public static ChannelHandlerContext context;

    private final Map<String, BlockedQueue<ResponseInfo>> queueMap = new ConcurrentHashMap<>();

    /**
     * 通道连接时,就将上下文保存下来,因为这样其他函数也可以用
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.debug("client channel is active..........");
        context = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("client shutdown.......");
        super.channelInactive(ctx);
    }

    //当服务端返回消息时,将消息复制到类变量中,然后唤醒正在等待结果的线程,返回结果
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.info("收到服务端发送的消息:" + msg);
        try {
            ResponseInfo responseInfo = ApplicationContextHolder.getObjectMapper().readValue(msg.toString(), ResponseInfo.class);
            queueMap.get(responseInfo.getRequestId()).add(responseInfo);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage(), e);
        }
    }

    //异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause.getMessage(), cause);
    }

    public String send(RequestInfo requestInfo) {
        try {
            //测试单个请求阻塞
            if ("user001".equals(requestInfo.getParams()[0])) {
                Thread.sleep(10000);
            }
            ObjectMapper objectMapper = ApplicationContextHolder.getObjectMapper();
            String s = objectMapper.writeValueAsString(requestInfo);
            queueMap.putIfAbsent(requestInfo.getUuid(), new BlockedQueue<>());
            context.writeAndFlush(s);
            log.info("client发出数据:" + s);
            Object result = getRpcResponse(requestInfo.getUuid()).getResult();
            return objectMapper.writeValueAsString(result);
        } catch (InterruptedException | JsonProcessingException e) {
            log.error(e.getMessage(), e);
            return null;
        }
    }

    public ResponseInfo getRpcResponse(String requestId) {
        try {
            return queueMap.get(requestId).poll();
        } finally {
            queueMap.remove(requestId);
        }
    }
}

BlockedQueue:

public class BlockedQueue<T> {
    private final List<T> data = new ArrayList<>(1);
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public void add(T t) {
        lock.lock();
        try {
            data.add(t);
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public T poll() {
        lock.lock();
        try {
            while (data.isEmpty()) {
                condition.await();
            }
            return data.remove(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return null;
    }

}

二、原理

细化阻塞对象

主要是每个请求类赋一个uuid,然后放到map中,value为自定义的BlockedQueue对象,模拟阻塞获取data的方法,让线程阻塞在最小的地方;
client端发送完请求后用该requestId到map中拿BlockedQueue对象,内部使用ReentrantLock+Condition方式实现阻塞,这样每一个请求拿到的BlockedQueue各不相同,lock互不影响;

总结

之前看到有的文章在clientHandler中就使用ReentrantLock,在send时lock,这种方式会极大的影响并发性能,导致同一时刻只有一个线程可以send,并不可取;
也有使用FutureTask配合线程池的方式,submit一个新线程,可能是我配置的方式不对,没有get到该方式的要领,导致无法同步获取结果;

以上方式仅供参考,欢迎大家讨论