Netty同步获取返回结果
本文主要介绍netty client发送请求后,如何阻塞获取结果的一种方法
前言
众所周知,Netty是异步的、基于事件驱动的网络应用框架,它以高性能、高并发著称。基于事件驱动,简单点说就是 Netty 会根据客户端的连接请求、读、写等事件 做出相应的响应。
但实际使用场景中避免不了需要同步获取server返回值的情况,经过小小的修改后可以实现阻塞式获取response的效果
一、netty使用方式
pom:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.72.Final</version>
</dependency>
Server端:
@Slf4j
public class NettyServer {
//启动netty服务端
public static void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final NettyServerHandler nettyServerHandler = new NettyServerHandler();
final NettyServerHandler2 nettyServerHandler2 = new NettyServerHandler2();
final NettyOutBoundHandler nettyOutBoundHandler = new NettyOutBoundHandler();
final NettyOutBoundHandler2 nettyOutBoundHandler2 = new NettyOutBoundHandler2();
try {
//创建服务端的启动对象,并使用链式编程来设置参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置一直保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//设置一个通道测试对象
@Override
protected void initChannel(SocketChannel ch) {
//给pipeline设置通道处理器
ch.pipeline()
.addLast(new MyMessageDecoder())
.addLast(new MyMessageEncoder())
//多handler,out在前倒叙,in在后正序
// .addLast(nettyOutBoundHandler2)
// .addLast(nettyOutBoundHandler)
.addLast(nettyServerHandler);
// .addLast(nettyServerHandler2);
log.info(ch.remoteAddress() + "已经连接上");
}
});//给 workerGroup 的EventLoop对应的管道设置处理器
//启动服务器,并绑定端口并且同步
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//给 channelFuture 注册监听器,监听关心的事件,异步的时候使用
channelFuture.addListener((future) -> {
if (future.isSuccess()) {
System.out.println("监听端口" + port + "成功.......");
} else {
System.out.println("监听端口" + port + "失败......");
}
});
//对关闭通道进行监听,监听到通道关闭后,往下执行
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server端中断,{}",e.getMessage(),e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
serverHandler:
@Slf4j
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
AttributeKey<String> attributeKey = AttributeKey.valueOf("clientId");
String s = ctx.channel().attr(attributeKey).get();
SocketAddress socketAddress = ctx.channel().remoteAddress();
log.debug("channel active..........");
log.debug("remote address is " + socketAddress.toString());
log.debug("client channel id is " + ctx.channel().id());
clientMap.put(socketAddress.toString(), ctx);
super.channelActive(ctx);
}
/**
* 触发场景
* 1.客户端发送关闭帧
* 2.客户端结束进程
* 3.服务端主动调用channel.close()
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("client:{}断开连接", ctx.channel().id());
clientMap.remove(ctx.channel().remoteAddress().toString());
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
String s = (String) msg;
log.debug("handler 1 received " + s);
ObjectMapper objectMapper = ApplicationContextHolder.getObjectMapper();
RequestInfo requestInfo = objectMapper.readValue(s, RequestInfo.class);
//确认是rpc调用才往下执行
if (requestInfo != null && "#rpc#".equals(requestInfo.getProtocol())) {
//反射调用实现类的方法
String name = requestInfo.getClassName();
Object service = InitServiceConfig.serviceMap.get(name);
Method method = service.getClass().getDeclaredMethod(requestInfo.getMethodName(), requestInfo.getParamTypes());
Object result = method.invoke(service, requestInfo.getParams());
String response = objectMapper.writeValueAsString(new ResponseInfo(requestInfo.getUuid(),result));
ctx.writeAndFlush(response);
log.info("server端返回:" + response);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
ctx.writeAndFlush("");
}
// ctx.fireChannelRead("handler1 is complete!");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
ctx.close();
}
}
RequestInfo、ResponseInfo
@Getter
@Setter
public class RequestInfo {
private String uuid = UUID.randomUUID().toString();
/**
* 调用类名
*/
private String className;
/**
* 方法名称
*/
private String methodName;
/**
*参数类型
*/
private Class<?>[] paramTypes;
/**
*参数列表
*/
private Object[] params;
/**
* 自定义rpc协议
*/
private String protocol="#rpc#";
}
---------------------------
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ResponseInfo {
private String requestId;
private Object result;
}
InitServiceConfig就是一个持有一个server端所有service的工具类,实现反射调用对应service方法
client端
@Slf4j
@Configuration
public class ClientInitConfig implements CommandLineRunner {
public static NettyClientHandler nettyClientHandler;
public static AnotherHandler anotherHandler;
@Async
@Override
public void run(String... args) {
nettyClientHandler = new NettyClientHandler();
anotherHandler = new AnotherHandler();
//客户端需要一个事件循环组就可以
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
//创建客户端的启动对象 bootstrap ,不是 serverBootStrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道的实现数 (反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
AttributeKey<String> attributeKey = AttributeKey.valueOf("clientId");
ch.attr(attributeKey).set("client001");
log.debug("client channel id is " + ch.id());
ch.pipeline()
.addLast(new MyMessageDecoder())
.addLast(new MyMessageEncoder())
.addLast(nettyClientHandler);//加入自己的处理器
// .addLast(anotherHandler);
log.debug("client channel初始化.....");
}
});
log.info("client is ready!");
//连接服务器
final ChannelFuture channelFuture = bootstrap.connect(ClientBootStrap.getHost(), ClientBootStrap.getPort()).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("client连接终止,{}", e.getMessage(), e);
} finally {
group.shutdownGracefully();
}
}
}
clientHandler:
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public static ChannelHandlerContext context;
private final Map<String, BlockedQueue<ResponseInfo>> queueMap = new ConcurrentHashMap<>();
/**
* 通道连接时,就将上下文保存下来,因为这样其他函数也可以用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.debug("client channel is active..........");
context = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("client shutdown.......");
super.channelInactive(ctx);
}
//当服务端返回消息时,将消息复制到类变量中,然后唤醒正在等待结果的线程,返回结果
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("收到服务端发送的消息:" + msg);
try {
ResponseInfo responseInfo = ApplicationContextHolder.getObjectMapper().readValue(msg.toString(), ResponseInfo.class);
queueMap.get(responseInfo.getRequestId()).add(responseInfo);
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
}
}
//异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
}
public String send(RequestInfo requestInfo) {
try {
//测试单个请求阻塞
if ("user001".equals(requestInfo.getParams()[0])) {
Thread.sleep(10000);
}
ObjectMapper objectMapper = ApplicationContextHolder.getObjectMapper();
String s = objectMapper.writeValueAsString(requestInfo);
queueMap.putIfAbsent(requestInfo.getUuid(), new BlockedQueue<>());
context.writeAndFlush(s);
log.info("client发出数据:" + s);
Object result = getRpcResponse(requestInfo.getUuid()).getResult();
return objectMapper.writeValueAsString(result);
} catch (InterruptedException | JsonProcessingException e) {
log.error(e.getMessage(), e);
return null;
}
}
public ResponseInfo getRpcResponse(String requestId) {
try {
return queueMap.get(requestId).poll();
} finally {
queueMap.remove(requestId);
}
}
}
BlockedQueue:
public class BlockedQueue<T> {
private final List<T> data = new ArrayList<>(1);
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void add(T t) {
lock.lock();
try {
data.add(t);
condition.signal();
} finally {
lock.unlock();
}
}
public T poll() {
lock.lock();
try {
while (data.isEmpty()) {
condition.await();
}
return data.remove(0);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
}
二、原理
细化阻塞对象
主要是每个请求类赋一个uuid,然后放到map中,value为自定义的BlockedQueue对象,模拟阻塞获取data的方法,让线程阻塞在最小的地方;
client端发送完请求后用该requestId到map中拿BlockedQueue对象,内部使用ReentrantLock+Condition方式实现阻塞,这样每一个请求拿到的BlockedQueue各不相同,lock互不影响;
总结
之前看到有的文章在clientHandler中就使用ReentrantLock,在send时lock,这种方式会极大的影响并发性能,导致同一时刻只有一个线程可以send,并不可取;
也有使用FutureTask配合线程池的方式,submit一个新线程,可能是我配置的方式不对,没有get到该方式的要领,导致无法同步获取结果;
以上方式仅供参考,欢迎大家讨论