基本环境
centos 7.2
logstash
kafka 0.9.1
logstash配置
demo.conf
input{
stdin{}
}
output{
file{
path => "/data/demo.txt"
}
stdout{
codec => rubydebug
}
kafka{
bootstrap_servers => "172.19.170.3:31000"
message_key => "mifi"
topic_id => "test01"
}
}
报错信息
logstash向kafka打数据报错,报错如下
root@Data172-19-170-4:/data# /opt/logstash/bin/logstash agent -f demo.conf
Settings: Default pipeline workers: 2
Logstash startup completed
aaa
{
"message" => "aaa",
"@version" => "1",
"@timestamp" => "2016-03-08T12:12:06.791Z",
"host" => "Data172-19-170-4"
}
bb
{
"message" => "bb",
"@version" => "1",
"@timestamp" => "2016-03-08T12:12:09.700Z",
"host" => "Data172-19-170-4"
}
cc
{
"message" => "cc",
"@version" => "1",
"@timestamp" => "2016-03-08T12:12:11.645Z",
"host" => "Data172-19-170-4"
}
dd
{
"message" => "dd",
"@version" => "1",
"@timestamp" => "2016-03-08T12:12:13.660Z",
"host" => "Data172-19-170-4"
}
log4j, [2016-03-08T12:22:07.883] WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2016-03-08T12:22:10.502] WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2016-03-08T12:27:04.457] WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2016-03-08T12:27:04.459] WARN: org.apache.kafka.common.network.Selector: Error in I/O with Data172-19-170-3/172.19.170.3
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
问题解决了
在阅读了参考
中的说明后将问题定位到kafka配置文件config/server.properties
,发现修改以下配置
#host.name=localhost
#advertised.host.name=<hostname routable by clients>
改为自己主机的ip就好了:
host.name=172.17.0.6
advertised.host.name=172.17.0.6
kafka官方文档对advertised.host.name
的解释是这样的:
Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, it will use the value for "host.name" if configured. Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName()
在我的例子里,我已经设置了host.name
而没有设置advertised.host.name
,但是没有用,于是把advertised.host.name`也设置了才解决了这个问题
下面这段话摘自Apache Kafka FAQ
Why can't my consumers/producers connect to the brokers?
When a broker starts up, it registers its ip/port in ZK. You need to make sure the registered ip is consistent with what's listed in metadata.broker.list in the producer config. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, sometimes (e.g., in EC2), the returned ip is an internal one and can't be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the "hostname" property in server.properties. In another rare case where the binding host/port is different from the host/port for client connection, you can setadvertised.host.name
andadvertised.port
for client connection.
大意是:
当运行broker时,会把broker的ip/port注册到zookeeper,你需要做的是使注册的ip/port
和生产者监听的地址metadata.broker.list
一致。默认情况下,注册的ip是通过方法InetAddress.getLocalHost.getHostAddress
获得的,但是在一些个别擦场景下(如aws ec2),返回的ip是私有ip不是外部公网ip,外部就无法链接到broker。解决办法是在配置文件中指定。一般指定host.name
就够了。但是在个别情况下,broker绑定的ip/host和客户端链接用的ip/host不一样。所以advertised.host.name
和advertised.port
是给客户端配置的。
参考
stackoverflow上的讨论
logstash官方社区的讨论
参数的说明
哦,stdout下面还有报错内容啊…………sf默认不显示滚动条这事儿太操蛋了……
是不是你kafka的主机名设置不太对。discuss上有个跟你一样的讨论:https://discuss.elastic.co/t/error-outputting-data-to-kafka/43192/5