Netty 客户端发送消息并同步获取结果
作者
曾文冲
推荐理由
详尽并有图与代码对应的讲解,本月有分享。
UDP Client代码如下:
private NioEventLoopGroup nioEventLoopGroup;
@Autowired
private UdpHandler handler;
@PostConstruct
public void initClient() {
nioEventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.channel(NioDatagramChannel.class);
bootstrap.group(nioEventLoopGroup);
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.handler(handler);
// 监听端口
ChannelFuture sync = bootstrap.bind(0).sync();
sync.channel();
}catch (Exception e){
log.error("创建RFID UDP 客户端出现错我" ,e);
nioEventLoopGroup.shutdownGracefully();
}
}
@PreDestroy
public void close(){
if(!ObjectUtils.isEmpty(nioEventLoopGroup)) {
nioEventLoopGroup.shutdownGracefully();
}
}
UdpHandler如下:
@Slf4j
@Component
public class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private NettyResponse response;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
ByteBuf content = datagramPacket.content();
int dataLength = content.readableBytes();
byte[] data = new byte[dataLength];
content.readBytes(data);
// 3.读取数据
String reply = BytesToHexString(Arrays.copyOfRange(data, 0, dataLength), " ");
response = NettyResponse.builder()
.resultData(reply)
.build();
promise.setSuccess();
log.info("RFID 读卡器返回数据:response={}", response);
}
public synchronized ChannelPromise sendCommandPackage(NettyRequest request) {
while (ctx == null) {
try {
TimeUnit.MILLISECONDS.sleep(1);
log.info("等待ChannelHandlerContext实例化");
} catch (InterruptedException e) {
log.error("等待ChannelHandlerContext实例化过程中出错",e);
}
}
promise = ctx.newPromise();
try {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(request.getData()),
new InetSocketAddress(request.getIp(),
request.getPort())))
.sync();
} catch (InterruptedException e) {
log.error("RFID读卡器重启失败,request={}",request,e);
response = NettyResponse.builder().build();
promise.setSuccess();
}
return promise;
}
/**
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.ctx = ctx;
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
} /**
* Gets called if an user event was triggered.
*/
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("d");
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
} @Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
} @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
} public NettyResponse getResponse() {
return response;
} public String BytesToHexString(byte[] b, String Padding) {
return BytesToHexString(b, Padding, 0, b.length);
} public String BytesToHexString(byte[] b, String Padding, int startIndex, int length) {
String RetStr = "";
for (int i = 0; i < length; i++) {
String TmpStr = Integer.toHexString(b[i + startIndex] & 0xFF);
if (TmpStr.length() < 2) {
TmpStr = "0" + TmpStr;
}
RetStr += TmpStr + Padding;
}
if (RetStr.length() > 0) {
RetStr = RetStr.substring(0, RetStr.length() - Padding.length()).toUpperCase();
}
return RetStr;
}
}
UdpHandler#sendCommandPackage 方法主要是向服务端发数据,并返回ChannelPromise对象,如果发生异常,则构建响应NettyResponse对象; 方法中ctx.writeAndFlush(…)向服务端发送Hex数据;若发送成功,服务端响应时会回调channelRead0方法;
2.channelRead0方法主要获取服务端响应的byte数据,最终把Byte数据转换为Hex数据,然后把Hex数据设置到NettyResponse对象中,另外设置promise为SUCCESS;
3.调用udpHandler.sendCommandPackage方法会返回ChannelPromise对象;调用promise.await等待3秒,然后调用udpHandler.getResponse()方法便可以获取到服务端响应的数据了;
同步获取结果的NettyResponse响应对象
@Builder
@Data
public class NettyResponse {
private Boolean success;
/**
* 返回数据:如果成功,则返回包的数据部分为空。
* 『F0H 02H 0EH 00H』
*/
private String resultData;
public Boolean getSuccess() {
if(UDPResultDataConstants.RESULT_DATA.equals(resultData)){
return Boolean.TRUE;
}
return Boolean.FALSE;
}
}
NettyRequest请求封装对象
@Data
public class NettyRequest {
private String ip;
private Integer port;
/**
* 复位读写器
* 功能:读写器复位,相当于断电后,重新上电
* 命令码: 0EH
* 命令参数:无
* 命令包: 『40H 02H 0EH B0H』
* 返回数据:如果成功,则返回包的数据部分为空。
* 『F0H 02H 0EH 00H』
*/
private byte[] data = new byte[]{0x40, 0x02, 0x0E, (byte) 0xB0};;
}
使用方式如下
@Autowired
private UdpHandler udpHandler;
@GetMapping("reboot")
public Object reboot(Long deviceId){
//获取设备对应的ip和端口
NettyRequest nettyRequest = new NettyRequest();
nettyRequest.setIp("192.168.158.70");
nettyRequest.setPort(1969);
ChannelPromise promise = udpHandler.sendCommandPackage(nettyRequest);
try {
promise.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
NettyResponse result = udpHandler.getResponse();
return new ObjectRestResponse<>().data(result);
}