很多时候,出于开发和测试的需要,或者希望Kafka以嵌入式的方式运行,这时就需要通过编程的方式启动Kafka。 Kafka依赖于Zookeeper,所以在启动Kafka之前需要先启动Zookeeper。
Zookeeper服务启动:
Zookeeper有两种启动方式,一是ZooKeeperServerMain
,二是QuorumPeerMain
,分别对应独立方式启动和集群方式启动,在我们这个场景中,应该使用ZooKeeperServerMain
方式,对应的代码片段如下:
QuorumPeerConfig config = new QuorumPeerConfig(); InputStream is = KafkaTest.class.getResourceAsStream("/zookeeper.properties"); Properties p = new Properties(); p.load(is); config.parseProperties(p); ServerConfig serverconfig = new ServerConfig(); serverconfig.readFrom(config); new ZooKeeperServerMain().runFromConfig(serverconfig);
其中zookeeper.properties
为zookeeper的配置文件,可以避免不必要的麻烦,可以从kafka的发布包中提取。 这段代码会启动一个zookeeper服务,因此在执行runFromConfig
方法后当前线程会阻塞。
Kafka服务启动:
这个相对简单,代码片段如下:
InputStream is = KafkaTest.class.getResourceAsStream("/server.properties"); Properties p = new Properties(); p.load(is); is.close(); KafkaServerStartable kafkaServerStartable = KafkaServerStartable.fromProps(p); kafkaServerStartable.startup(); kafkaServerStartable.awaitShutdown();
因为依赖关系的原因,应该在启动完Zookeeper之后再启动Kafka。 对应的pom.xml文件如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency>
如果要落地到具体的工程中,需要使用多线程的方式来启动。