首页 > kafka 消费者多线程阻塞问题

kafka 消费者多线程阻塞问题

用kafka的high level api会造成消费者线程的堵塞

我是在消费的时候将消息先批量缓存到buffer里,然后集中处理
可是会出现一个问题,当生产的消息不足够buffer时,消费者就阻塞了,不会执行后面进行buffer的处理

我想设定一个时间,比如堵塞5s后,自动将没有满的buffer消息进行处理,这个要怎么解决

例如:
buffer允许100条消息
buffer积累到100条进行一次处理
可是生产者只生产了50条消息,这50条消息接收过来的时候不够100条的buffer
这样buffer就阻塞了
如何能在阻塞一段时间后,能够把50条不满的消息处理了?

kafka版本:0.8.1


while (it.hasNext()) {
    //获取消息

    //消息消息,放入buffer(当缓存的消息达不满buffer时,是不进行处理的,到这里就堵塞了)
    
    //处理消息 
}
【热门文章】
【热门文章】