首页 > kafka怎么获得最后一条消息的offset?

kafka怎么获得最后一条消息的offset?

kafka怎么获得最后一条消息的offset?


我不生产答案,我只是当一回Stackoverflow的搬运工。今天刚好在Stackoverflow查Kafka的一个问题,顺带看到的。

For finding the start offset to read in Kafka 0.8 Simple Consumer example they say

Kafka includes two constants to help,
kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data
in the logs and starts streaming from there,
kafka.api.OffsetRequest.LatestTime() will only stream new messages.

You can also find the example code there for managing the offset at your consumer end.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

喂,最后你的问题解决了么?好想知道~~~~可以告诉我么?

【热门文章】
【热门文章】