首页 > logstash打数据到kafka失败

logstash打数据到kafka失败

基本环境

  1. centos 7.2

  2. logstash

  3. kafka 0.9.1

logstash配置

demo.conf

input{
    stdin{}
}

output{
    file{
        path => "/data/demo.txt"
    }   

    stdout{
        codec => rubydebug
    }   

    kafka{
        bootstrap_servers => "172.19.170.3:31000"
        message_key => "mifi"
        topic_id => "test01"
    }   
}

报错信息

logstash向kafka打数据报错,报错如下

root@Data172-19-170-4:/data# /opt/logstash/bin/logstash agent -f demo.conf 
Settings: Default pipeline workers: 2
Logstash startup completed
aaa
{
       "message" => "aaa",
      "@version" => "1",
    "@timestamp" => "2016-03-08T12:12:06.791Z",
          "host" => "Data172-19-170-4"
}
bb
{
       "message" => "bb",
      "@version" => "1",
    "@timestamp" => "2016-03-08T12:12:09.700Z",
          "host" => "Data172-19-170-4"
}
cc
{
       "message" => "cc",
      "@version" => "1",
    "@timestamp" => "2016-03-08T12:12:11.645Z",
          "host" => "Data172-19-170-4"
}
dd
{
       "message" => "dd",
      "@version" => "1",
    "@timestamp" => "2016-03-08T12:12:13.660Z",
          "host" => "Data172-19-170-4"
}
log4j, [2016-03-08T12:22:07.883]  WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
log4j, [2016-03-08T12:22:10.502]  WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
log4j, [2016-03-08T12:27:04.457]  WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
log4j, [2016-03-08T12:27:04.459]  WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)

问题解决了

在阅读了参考中的说明后将问题定位到kafka配置文件config/server.properties,发现修改以下配置

#host.name=localhost
#advertised.host.name=<hostname routable by clients>

改为自己主机的ip就好了:

host.name=172.17.0.6
advertised.host.name=172.17.0.6

kafka官方文档对advertised.host.name的解释是这样的:

Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, it will use the value for "host.name" if configured. Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName()

在我的例子里,我已经设置了host.name而没有设置advertised.host.name,但是没有用,于是把advertised.host.name`也设置了才解决了这个问题

下面这段话摘自Apache Kafka FAQ

Why can't my consumers/producers connect to the brokers?
When a broker starts up, it registers its ip/port in ZK. You need to make sure the registered ip is consistent with what's listed in metadata.broker.list in the producer config. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, sometimes (e.g., in EC2), the returned ip is an internal one and can't be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the "hostname" property in server.properties. In another rare case where the binding host/port is different from the host/port for client connection, you can set advertised.host.name and advertised.port for client connection.

大意是:
当运行broker时,会把broker的ip/port注册到zookeeper,你需要做的是使注册的ip/port和生产者监听的地址metadata.broker.list一致。默认情况下,注册的ip是通过方法InetAddress.getLocalHost.getHostAddress获得的,但是在一些个别擦场景下(如aws ec2),返回的ip是私有ip不是外部公网ip,外部就无法链接到broker。解决办法是在配置文件中指定。一般指定host.name就够了。但是在个别情况下,broker绑定的ip/host和客户端链接用的ip/host不一样。所以advertised.host.nameadvertised.port是给客户端配置的。

参考


哦,stdout下面还有报错内容啊…………sf默认不显示滚动条这事儿太操蛋了……

是不是你kafka的主机名设置不太对。discuss上有个跟你一样的讨论:https://discuss.elastic.co/t/error-outputting-data-to-kafka/43192/5

【热门文章】
【热门文章】