首页 > 大数据环境搭建: Apache Storm + Kryo + Kafka 小问题

大数据环境搭建: Apache Storm + Kryo + Kafka 小问题

Apache Storm + AVRO + Kafka 三大Apache家族成员进行大数据平台进行搭建算是比较常见的啦, 最近发现Apache Storm上使用了Kryo作为序列化的工具, 特地研究了一下, 发现它确实不错, 可以支持比较复杂的对象. 尝试着搭建一套实时数据的分析环境, 结果遇到了一个小问题.

前面的流程都是正常的, Kafka ProducerKafka Customer都可以正常工作, Storm 也能解析Kafka的数据, 通过Kryo可以正常的序列/反序列化. 但发现当把反序列化好的对象发送到下个bolt时, 接收的数据现在重叠异常了.

举个栗子: 发送了10条消息到指定的topic, 然后Storm解析出这10条消息, 而后发送给下个bolt进行相应的数据处理, 结果发现, 下游的bolt并没有完全接收到这个10条数据内容(乱序), 它确实是收到10次通讯, 只是里面的数据是重复的, 从第3`4条开始就重复显示最后一条数据的内容.

对了, 这个里使用的是StormTrident API, 在整个流程的上半部分, 即Kryo序列化消息内容, 发送到Kafka, Storm 使用 KryoScheme 反序列化消息内容, 都是正常的, 也没有乱序.

不知道有人关注过这个Kryo的框架不, 记得阿里好像也是在用它的. 求大神赐教一二.


真是无语了, 犯了个低级的错误. 在Kryo反序列化的时候, 设置了一个全局对象, 这个对象在多线程的环境中, 会被重复读写覆盖, 把它换成局部变量就好了. 参考代码如下:

public class KryoSerializer<T extends BasicEvent> extends Serializer<T> implements Serializable {

private static final long serialVersionUID = -4684340809824908270L;

// 全局变量是错误的
//private T event; 

@Override
public void write(Kryo kryo, Output output, T event) {
    event.write(output);
}

@Override
public T read(Kryo kryo, Input input, Class<T> type) {'
    // 换成局部变量就好了
    T event = new T();
    event.read(input);
    return event;
}
}
【热门文章】
【热门文章】