Netty 客户端发送消息并同步获取结果

作者

曾文冲

推荐理由

详尽并有图与代码对应的讲解,本月有分享。

UDP Client代码如下:


private NioEventLoopGroup nioEventLoopGroup;

    @Autowired

    private UdpHandler handler;

    @PostConstruct

    public void initClient() {

        nioEventLoopGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

       try {

           bootstrap.channel(NioDatagramChannel.class);

           bootstrap.group(nioEventLoopGroup);

           bootstrap.handler(new LoggingHandler(LogLevel.INFO));

           bootstrap.handler(handler);

           // 监听端口

           ChannelFuture sync = bootstrap.bind(0).sync();

           sync.channel();

       }catch (Exception e){

           log.error("创建RFID UDP 客户端出现错我" ,e);

           nioEventLoopGroup.shutdownGracefully();

       }

    }

    @PreDestroy

    public void close(){

        if(!ObjectUtils.isEmpty(nioEventLoopGroup)) {

            nioEventLoopGroup.shutdownGracefully();

        }

    }

UdpHandler如下:


@Slf4j

@Component

public class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    private ChannelHandlerContext ctx;

    private ChannelPromise promise;

    private NettyResponse response;

    @Override

    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {

        ByteBuf content = datagramPacket.content();

        int dataLength = content.readableBytes();

        byte[] data = new byte[dataLength];

        content.readBytes(data);

        // 3.读取数据

        String reply = BytesToHexString(Arrays.copyOfRange(data, 0, dataLength), " ");

        response = NettyResponse.builder()

                .resultData(reply)

                .build();

        promise.setSuccess();

        log.info("RFID 读卡器返回数据:response={}", response);

    }

 

    public synchronized ChannelPromise sendCommandPackage(NettyRequest request) {

        while (ctx == null) {

            try {

                TimeUnit.MILLISECONDS.sleep(1);

                log.info("等待ChannelHandlerContext实例化");

            } catch (InterruptedException e) {

                log.error("等待ChannelHandlerContext实例化过程中出错",e);

            }

        }

        promise = ctx.newPromise();

        try {

            ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(request.getData()),

                    new InetSocketAddress(request.getIp(),

                            request.getPort())))

                    .sync();

        } catch (InterruptedException e) {

           log.error("RFID读卡器重启失败,request={}",request,e);

            response = NettyResponse.builder().build();

            promise.setSuccess();

        }

        return promise;

    }

    /**

     * @param ctx

     * @throws Exception

     */

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        super.channelActive(ctx);

        this.ctx = ctx;

    }    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) {

        ctx.flush();

    }    /**

     * Gets called if an user event was triggered.

     */

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        System.out.println("d");

    }    @Override

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        ctx.fireChannelInactive();

    }    @Override

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

        ctx.fireChannelUnregistered();

    }    @Override

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

        ctx.fireChannelRegistered();

    }    public NettyResponse getResponse() {

        return response;

    }    public String BytesToHexString(byte[] b, String Padding) {

        return BytesToHexString(b, Padding, 0, b.length);

    }    public String BytesToHexString(byte[] b, String Padding, int startIndex, int length) {

        String RetStr = "";

        for (int i = 0; i < length; i++) {

            String TmpStr = Integer.toHexString(b[i + startIndex] & 0xFF);

            if (TmpStr.length() < 2) {

                TmpStr = "0" + TmpStr;

            }

            RetStr += TmpStr + Padding;

        }

        if (RetStr.length() > 0) {

            RetStr = RetStr.substring(0, RetStr.length() - Padding.length()).toUpperCase();

        }

        return RetStr;

    }

}

UdpHandler#sendCommandPackage 方法主要是向服务端发数据,并返回ChannelPromise对象,如果发生异常,则构建响应NettyResponse对象; 方法中ctx.writeAndFlush(…)向服务端发送Hex数据;若发送成功,服务端响应时会回调channelRead0方法;

2.channelRead0方法主要获取服务端响应的byte数据,最终把Byte数据转换为Hex数据,然后把Hex数据设置到NettyResponse对象中,另外设置promise为SUCCESS;

3.调用udpHandler.sendCommandPackage方法会返回ChannelPromise对象;调用promise.await等待3秒,然后调用udpHandler.getResponse()方法便可以获取到服务端响应的数据了;

同步获取结果的NettyResponse响应对象


@Builder

@Data

public class NettyResponse {

    private Boolean success;

    /**

     *  返回数据:如果成功,则返回包的数据部分为空。

     *     『F0H  02H  0EH  00H』

     */

    private String resultData;

    public Boolean getSuccess() {

        if(UDPResultDataConstants.RESULT_DATA.equals(resultData)){

            return Boolean.TRUE;

        }

        return Boolean.FALSE;

    }

}

NettyRequest请求封装对象


@Data

public class NettyRequest {

    private String ip;

    private Integer port;

    /**

     * 复位读写器

     *      功能:读写器复位,相当于断电后,重新上电

     *      命令码: 0EH

     *      命令参数:无

     *      命令包: 『40H  02H  0EH  B0H』

     *      返回数据:如果成功,则返回包的数据部分为空。

     *      『F0H  02H  0EH  00H』

     */

    private byte[] data = new byte[]{0x40, 0x02, 0x0E, (byte) 0xB0};;

}

使用方式如下


@Autowired

    private UdpHandler udpHandler;

    @GetMapping("reboot")

    public Object reboot(Long deviceId){

        //获取设备对应的ip和端口

        NettyRequest nettyRequest = new NettyRequest();

        nettyRequest.setIp("192.168.158.70");

        nettyRequest.setPort(1969);

        ChannelPromise promise = udpHandler.sendCommandPackage(nettyRequest);

        try {

            promise.await(3, TimeUnit.SECONDS);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        NettyResponse result = udpHandler.getResponse();

        return new ObjectRestResponse<>().data(result);

    }