1、当kafka 客户端连接如下异常
  Exception in thread "main"
  java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
     at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057)
     at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764)
     at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
     at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609)
     at com.KafkaProducerExample.main(KafkaProducerExample.java:53)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
 
  问题原因:
  1. java client 包与kafka server 版本不一致
  2、/kafka_2.11-0.9.0.0/config/server.properties 
  listerners 需配置ip ,不能配置主机名,因本地Hosts中不存在对应的Ip配置,导致producer 无法连接
  
  官网默认提示 设置host.name advertised.host.name 为Ip 时可以通过ip连接,但经过测试,此配置失败。只有通过修改Listerners 后成功。
  3、查看zk中得配置信息,
  查看配置  get  /brokers/ids/0   当时ip时,可以正常连接。
  
  4、java client 连接实例(生成者)
  Properties props = new Properties(); props.put("bootstrap.servers", "192.168.2.176:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) {     System.out.println(i);  Future future =  producer.send(new ProducerRecord<String, String>("kafka1", Integer.toString(i), Integer.toString(i)));     System.out.println(future.get());    // System.out.println(future.get()); }  producer.close();  消费者实例,不用配置zk连接也可以消费了
  public static void main(String[] args) {     Properties props = new Properties();     props.put("bootstrap.servers", "192.168.2.176:9092");     props.put("group.id", "test");     props.put("enable.auto.commit", "true");     props.put("auto.commit.interval.ms", "1000");     props.put("session.timeout.ms", "30000");     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);     consumer.subscribe(Arrays.asList("kafka1"));     while (true) {         ConsumerRecords<String, String> records = consumer.poll(100);         for (ConsumerRecord<String, String> record : records)             System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());     } }  依赖版本
  <dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka-clients</artifactId>     <version>0.9.0.0</version> </dependency>