首页 > netty使用mqtt的解码器, 对方总是接收不到消息

netty使用mqtt的解码器, 对方总是接收不到消息

client端的代码如图


  public class Client {
    public final static String HOST = "127.0.0.1";
    public final static int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MqttDecoder());
                            ch.pipeline().addLast(new MqttEncoder());
                            ch.pipeline().addLast(new ClientInBoundHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
            future.channel().closeFuture().sync();
        }finally {
            worker.shutdownGracefully();
        }
    }
}

那个具体的ClientInBoundHandler

public class ClientInBoundHandler extends SimpleChannelInboundHandler<MqttMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttConnectMessage connectMessage = new MqttConnectMessage(fixedHeader, null, null);
        ctx.channel().writeAndFlush(connectMessage);

//        ByteBuf byteBuf = Unpooled.copiedBuffer("hello".getBytes());
//        ctx.writeAndFlush(byteBuf);
    }

}
  1. 我现在运行, server端那边什么都收不到. 我给server端的decode打断点, client的encode打断点都没有执行到, 也不知道是不是我断点打错了

  2. 我把上面注释去掉 , server端能报错, 说协议有错误, 我觉得奇怪,为什么这样反而能收到?


channelActive 里的方法换成我这个
System.out.println("channelActive");

     MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_LEAST_ONCE,false,1);
     MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,false);
     MqttConnAckMessage conAck = new MqttConnAckMessage(mqttFixedHeader,variableHeader);
     ctx.channel().write(conAck);
     ctx.channel().flush();
     System.out.println(conAck);

你的mqtt encoder继承的是哪个outbound handler?另外,查查你这个encoder是否声明了泛型,声明了哪个类型。把encoder的代码贴出来看看吧

【热门文章】
【热门文章】