> 所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!
要删除Topic,需要执行下面命令:
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test
这里假设zookeeper地址为localhost,要删除的topic是test,这条命令实际上是在zookeeper的节点/admin/delete_topics下创建一个节点test,节点名为topic名字。(很多博文中说这个节点时临时的,其实不是,是个持久节点,直到topic真正删除时,才会被controller删除) 执行这段命令后控制台输出 Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. 也就是说执行删除命令,不是真正删除,而是标记删除,在zookeeper上添加/admin/delete_topics/test节点,也提醒了我们,需要提前打开delete.topic.enable开关。
Kafka删除Topic的源码分析
在Kafka中,Topic的删除是靠DeleteTopicManager类来完成的。 当Broker被选举成集群Leader之后,KafkaController中的onControllerFailover会被调用,在该方法中会调用deleteTopicManager.start()来启动删除Topic的线程。 而当Broker不再成为集群Leader时,KafkaController中的onControllerResignation会被调用,在该方法中会调用deleteTopicManager.shutdown()来关闭删除Topic的线程。
在KafkaController的onControllerFailover方法中,初始化了partitionStateMachine状态机,并注册了相应的事件监听器,主要是监听zookeeper节点/admin/delete_topics下子节点的变化。
def onControllerFailover() { if(isRunning) { // ... partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() // ... deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
class PartitionStateMachine{ def registerListeners() { registerTopicChangeListener() if(controller.config.deleteTopicEnable) //注册事件监听,关注节点/admin/delete_topics下子节点的变化 registerDeleteTopicListener() } private def registerDeleteTopicListener() = { zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener) } private def deregisterDeleteTopicListener() = { zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener) } }
kafka.controller.PartitionStateMachine.DeleteTopicsListener
DeleteTopicsListener将监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发,将该topic加入到deleteTopicManager的queue中
class DeleteTopicsListener() extends IZkChildListener with Logging { this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: " val zkUtils = controllerContext.zkUtils /** * Invoked when a topic is being deleted * @throws Exception On any error. */ @throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { //监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发 inLock(controllerContext.controllerLock) { var topicsToBeDeleted = { import JavaConversions._ (children: Buffer[String]).toSet } debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) //查询Topic是否存在,若topic已经不存在了,则直接删除/admin/delete_topics/<topic_name>节点 val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t)) if(nonExistentTopics.size > 0) { warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(",")) nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic))) } topicsToBeDeleted --= nonExistentTopics if(topicsToBeDeleted.size > 0) { info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(",")) // mark topic ineligible for deletion if other state changes are in progress // 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。 topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if(preferredReplicaElectionInProgress || partitionReassignmentInProgress) controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } // add topic to deletion list //添加topic到待删除的queue中 controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) } } } /** * * @throws Exception * On any error. */ @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { } }
TopicDeletionManager
class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging { this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], " val controllerContext = controller.controllerContext //partition状态机 val partitionStateMachine = controller.partitionStateMachine //replica状态机 val replicaStateMachine = controller.replicaStateMachine val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic) val deleteLock = new ReentrantLock() val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) val deleteTopicsCond = deleteLock.newCondition() val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable /** * Invoked at the end of new controller initiation */ def start() { if (isDeleteTopicEnabled) { //如果topic.delete.enable=true,则启动删除线程 deleteTopicsThread = new DeleteTopicsThread() if (topicsToBeDeleted.size > 0) deleteTopicStateChanged.set(true) deleteTopicsThread.start() } } /** * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared. */ def shutdown() { // Only allow one shutdown to go through if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) { // Resume the topic deletion so it doesn't block on the condition //此时删除线程有可能处于等待状态,即awaitTopicDeletionNotification方法处于阻塞等待状态,则唤醒该删除线程 resumeTopicDeletionThread() // Await delete topic thread to exit //等待删除线程doWork执行结束 deleteTopicsThread.awaitShutdown() //清除资源 topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() } } }
DeleteTopicsThread
DeleteTopicsThread继承自ShutdownableThread,ShutdownableThread是一个可以循环执行某个方法(doWork方法)的线程,也提供了shutdown方法同步等待该线程真正运行结束,代码比较简单。利用了CountDownLatch来阻塞调用shutdown的线程,待doWork真正执行结束时,再唤醒其他阻塞的线程。
ShutdownableThread
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true) extends Thread(name) with Logging { this.setDaemon(false) this.logIdent = "[" + name + "], " val isRunning: AtomicBoolean = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) def shutdown() = { //设置running状态为false initiateShutdown() //等待在运行的任务执行完毕 awaitShutdown() } def initiateShutdown(): Boolean = { if(isRunning.compareAndSet(true, false)) { info("Shutting down") isRunning.set(false) if (isInterruptible) interrupt() true } else false } /** * After calling initiateShutdown(), use this API to wait until the shutdown is complete */ def awaitShutdown(): Unit = { //等待线程运行结束 shutdownLatch.await() info("Shutdown completed") } /** * This method is repeatedly invoked until the thread shuts down or this method throws an exception */ def doWork(): Unit override def run(): Unit = { info("Starting ") try{ while(isRunning.get()){ doWork() } } catch{ case e: Throwable => if(isRunning.get()) error("Error due to ", e) } //计数器减一,唤醒在awaitShutdown方法上等待的线程 shutdownLatch.countDown() info("Stopped ") } }
DeleteTopicsThread
当删除Topic的事件通知到来,则doWork里方法继续往下执行: 当所有的replica都完成了topic的删除动作,则调用completeDeleteTopic做最后的清理动作,包括zookeeper上节点的删除,以及controller内存中的清理。 如果有replica将该topic标记为不可删除(可能之前是由于该replica处于Preferred副本选举或分区重分配的过程中),如果有,则重试将topic标记成删除状态 如果该topic可以被删除,且还没有处于已经开始删除的状态,则调用onTopicDeletion执行真正的删除逻辑
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) { val zkUtils = controllerContext.zkUtils override def doWork() { //等待删除Topic的事件通知 awaitTopicDeletionNotification() if (!isRunning.get) return inLock(controllerContext.controllerLock) { val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(!topicsQueuedForDeletion.isEmpty) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done //如果所有的replica都完成了topic的删除 if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { // clear up all state for this topic from controller cache and zookeeper completeDeleteTopic(topic) info("Deletion of topic %s successfully completed".format(topic)) } else { //至少一个replica在开始删除状态 if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) { // ignore since topic deletion is in progress val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted) val replicaIds = replicasInDeletionStartedState.map(_.replica) val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition)) info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","), partitions.mkString(","), topic)) } else { // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). //如果没有replica处于开始删除状态(TopicDeletionStarted),并且也不是所有replica都删除了该topic //则判断是否有replica将该topic标记为不可删除,如果有,则重试将topic标记成删除状态 if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { // mark topic for deletion retry markTopicForDeletionRetry(topic) } } } // Try delete topic if it is eligible for deletion. //如果该topic可以被删除,且还没有处于已经开始删除的状态 if(isTopicEligibleForDeletion(topic)) { info("Deletion of topic %s (re)started".format(topic)) // topic deletion will be kicked off //触发topic删除事件 onTopicDeletion(Set(topic)) } else if(isTopicIneligibleForDeletion(topic)) { info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic)) } } } } }
completeDeleteTopic方法
完成删除topic后会调用completeDeleteTopic进行一些清理工作,即: 删除zookeeper上节点/brokers/topics/<topic_name> 删除zookeeper上节点/config/topics/<topic_name> 删除zookeeper上节点/admin/delete_topics/<topic_name> 并删除内存中的topic相关信息。
private def completeDeleteTopic(topic: String) { // deregister partition change listener on the deleted topic. This is to prevent the partition change listener // firing before the new topic listener when a deleted topic gets auto created partitionStateMachine.deregisterPartitionChangeListener(topic) val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica) val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic) // move respective partition to OfflinePartition and NonExistentPartition state partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) val zkUtils = controllerContext.zkUtils //删除zookeeper上节点/brokers/topics/<topic_name> zkUtils.zkClient.deleteRecursive(getTopicPath(topic)) //删除zookeeper上节点/config/topics/<topic_name> zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic)) //删除zookeeper上节点/admin/delete_topics/<topic_name> zkUtils.zkClient.delete(getDeleteTopicPath(topic)) //最后移除内存中的topic相关信息 controllerContext.removeTopic(topic) }
markTopicForDeletionRetry方法
将topic标记成OfflineReplica状态来重试删除
private def markTopicForDeletionRetry(topic: String) { // reset replica states from ReplicaDeletionIneligible to OfflineReplica val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" .format(topic, failedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica) }
onTopicDeletion方法
onTopicDeletion最终会调用startReplicaDeletion方法,来开始删除这个topic的所有分区
private def onTopicDeletion(topics: Set[String]) { info("Topic deletion callback for %s".format(topics.mkString(","))) // send update metadata so that brokers stop serving data for topics to be deleted val partitions = topics.flatMap(controllerContext.partitionsForTopic) // 向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据 controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet) } } private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) { info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(","))) val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted) startReplicaDeletion(replicasPerPartition) } private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) { replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) => var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic)) val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas // move dead replicas directly to failed state //将所有已经挂掉的replica标记成ReplicaDeletionIneligible(无法删除的Replica) replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible) // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader //将所有未挂掉的replica标记成OfflineReplica(下线的Replica),并发送给相应的broker,这样这些broker就不会再向Leader发送该topic的同步请求(FetchRequest) replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica) debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) //给所有replica发送停止fetch请求,请求完成后,回调deleteTopicStopReplicaCallback方法 controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build) if(deadReplicasForTopic.size > 0) { debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) } } }
//开始删除topic开始时,会给存活的broker发送停止fetch的请求,请求完成后,会回调该方法 private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) { val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse] debug("Delete topic callback invoked for %s".format(stopReplicaResponse)) val responseMap = stopReplicaResponse.responses.asScala val partitionsInError = if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)) inLock(controllerContext.controllerLock) { // move all the failed replicas to ReplicaDeletionIneligible //若有replica出现错误,则将它踢出可删除的Replica列表 failReplicaDeletion(replicasInError) if (replicasInError.size != responseMap.size) { //有些Replica已经成功删除了数据 // some replicas could have been successfully deleted val deletedReplicas = responseMap.keySet -- partitionsInError completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))) } } }
Kafka删除Topic的过程
分析完源代码,我们总结下,Kafka删除Topic的过程
- Kafka的broker在被选举成controller后,会执行下面几步 1.1 注册DeleteTopicsListener,监听zookeeper节点/admin/delete_topics下子节点的变化,delete命令实际上就是要在该节点下创建一个节点,名字是待删除topic名,标记该topic是待删除的 1.2 创建一个单独的线程DeleteTopicsThread,来执行topic删除的操作
- DeleteTopicsThread线程启动时会先在awaitTopicDeletionNotification处阻塞并等待删除事件的通知,即有新的topic被添加到queue里等待被删除。
- 当我们使用了delete命令在zookeeper上的节点/admin/delete_topics下创建子节点<topic_name>。
- DeleteTopicsListener会收到ChildChange事件会依次判断如下逻辑: 4.1 查询topic是否存在,若已经不存在了,则直接删除/admin/delete_topics/<topic_name>节点。 4.2 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。 4.3 并将该topic添加到queue中,此时会唤醒DeleteTopicsThread中doWork方法里awaitTopicDeletionNotification处的阻塞线程,让删除线程继续往下执行。
而删除线程执行删除操作的真正逻辑是:
- 它首先会向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据。
- 开始删除这个topic的所有分区 2.1 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了 2.2 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1 2.3 将所有副本置于ReplicaDeletionStarted状态 2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件 2.5 关闭所有空闲的Fetcher线程
- 删除zookeeper上节点/brokers/topics/<topic_name>
- 删除zookeeper上节点/config/topics/<topic_name>
- 删除zookeeper上节点/admin/delete_topics/<topic_name>
- 并删除内存中的topic相关信息。
Kafka删除Topic的流程图

Q&A
前面我们分析了Kafka删除Topic的源代码,也总结了其删除的过程,下面我们再来看看下面这些相关问题,加深对这个过程的理解
Q1:有分区挂掉的情况下,是否能正常删除?
修改三个broker的server.properties,分别开启delete.topic.enable=true 启动zookeeper和三个broker,broker1,broker2,broker3,并启动kafka-manager, 其中zookeeper端口为2181, broker1端口为9091,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_1 broker2端口为9092,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_2 broker3端口为9093,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_3 kafka-manager端口为9000,访问http://localhost:9000可以查看kafka集群情况
开始实验,创建topic test
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic test
并写入几条测试消息
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test 111 222 333 444 555 666
观察zookeeper中路径
ls /brokers/topics/test/partitions [0, 1, 2, 3, 4, 5]
关闭broker2,并执行删除topic命令
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test
观察zookeeper中路径/admin/delete_topics
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics [test]
过几秒后观察只有broker2的log目录下存在topic test的文件夹,而broker1和broker2的log目录下已经删除了test相关log test-0,test-1,test-2,test-3,test-4,test-5
观察broker1的controller.log
[2017-12-06 12:14:39,181] DEBUG [DeleteTopicsListener on 1]: Delete topics listener fired for topics test to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener) [2017-12-06 12:14:39,182] INFO [DeleteTopicsListener on 1]: Starting topic deletion for topics test (kafka.controller.PartitionStateMachine$DeleteTopicsListener) [2017-12-06 12:14:39,184] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,186] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> OnlineReplica, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=3] -> OnlineReplica, [Topic=test,Partition=3,Replica=3] -> OnlineReplica, [Topic=test,Partition=5,Replica=1] -> OnlineReplica, [Topic=test,Partition=0,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=1] -> OnlineReplica, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> OnlineReplica, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,187] INFO [delete-topics-thread-1], Deletion of topic test (re)started (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,188] INFO [Topic Deletion Manager 1], Topic deletion callback for test (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,191] INFO [Topic Deletion Manager 1], Partition deletion callback for [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,194] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionIneligible for replicas [Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,195] INFO [Replica state machine on controller 1]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,195] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,5]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,200] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":1,"leader_epoch":5,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,200] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,2]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,204] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,204] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,3]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,207] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,208] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,4]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,211] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,211] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,1]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,215] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,215] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,2]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,219] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,219] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,0]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,223] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":3,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,223] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,3]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,227] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,227] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,5]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,230] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,231] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,0]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,235] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":4,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,235] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,4]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,239] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,240] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,1]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,243] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,245] DEBUG [Topic Deletion Manager 1], Deletion started for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,245] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionStarted for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,246] DEBUG The stop replica request (delete = true) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = true) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG [Topic Deletion Manager 1], Dead Replicas ([Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2]) found for topic test (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,248] INFO [Topic Deletion Manager 1], Halted deletion of topics test (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,248] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,279] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,279] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,279] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,279] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,282] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,282] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,283] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,283] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,295] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,296] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,296] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,296] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,297] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,298] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,298] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,313] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,314] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,315] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,315] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,329] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,330] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,0],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,330] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,332] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,0],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,333] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,333] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,345] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,345] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,346] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,346] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,347] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,348] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,348] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,360] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,361] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,361] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,361] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,361] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Deletion for replicas 1 for partition [test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,362] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,362] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,363] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
发现broker1和broker3的所有partition全部删除了
Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1]
观察broker1和broker3的server.log,发现topic test的数据文件已被成功清除
[2017-12-06 12:14:39,245] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,248] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,249] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,250] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,251] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,252] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,253] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,268] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,272] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5. (kafka.log.LogManager) [2017-12-06 12:14:39,279] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,290] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,294] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4. (kafka.log.LogManager) [2017-12-06 12:14:39,296] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,309] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,312] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1. (kafka.log.LogManager) [2017-12-06 12:14:39,313] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,324] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,328] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2. (kafka.log.LogManager) [2017-12-06 12:14:39,330] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,341] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,344] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0. (kafka.log.LogManager) [2017-12-06 12:14:39,346] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,356] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,360] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3. (kafka.log.LogManager)
接着启动broker2 观察broker2的server.log,发现topic test的数据文件已被成功清除
[2017-12-06 12:24:05,687] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,691] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,695] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,700] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,706] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,711] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,715] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,742] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,747] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1. (kafka.log.LogManager) [2017-12-06 12:24:05,750] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,764] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,769] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4. (kafka.log.LogManager) [2017-12-06 12:24:05,772] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,785] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,789] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2. (kafka.log.LogManager) [2017-12-06 12:24:05,791] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,803] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,805] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0. (kafka.log.LogManager) [2017-12-06 12:24:05,808] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,818] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,821] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3. (kafka.log.LogManager) [2017-12-06 12:24:05,823] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,833] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,836] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5. (kafka.log.LogManager)
最后观察zookeeper中路径/admin/delete_topics,已经没有test子节点
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics []
观察zookeeper中路径
[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics/test/partitions Node does not exist: /brokers/topics/test/partitions
Q2:是否不需要每台broker都修改topic.delete.enable为true?
在Q1中,我们如果只配置broker1的topic.delete.enable为true,也是能正常删除topic的,我们只要保证broker1是整个集群的controller即可,我们可以先启动broker1,然后再启动broker2,broker3 而在生产环境中,为防止有人误删,我们一般会关闭删除功能,而定期清理不用的topic,在定期做清理时,我们其实可以只改其中一个broker的配置,然后确保它是集群中的controller即可。 而不幸的是,你改了broker1配置,必然要重启该broker,然后其他broker就会成为新的broker,这时,相当于你修改的这个broker1的配置还是不起作用,因为它不是controller了,所以我们可以在broker1重新启动后,分别关闭broker2,broker3,让broker1重新获得controller的控制权,这样就不用一个个去修改broker的配置了 在清理完topic后,记得把topic.delete.enable改回为false。
Q3:如果删除topic时还有producer连接到该topic时,且配置auto.create.topics.enable=true,会发生什么?
本人在做Q1例子的时候,启动了客户端生产消息到topic test中,然后忘记关了,而auto.create.topics.enable是默认打开的,所以我在尝试删除topic的时候,发现总有broker1中的test-0无法删除。 后来才发现是因为有客户端连接到了该topic,在我删除成功后,又自动创建了topic,而默认是1个replica,和1个partition。
Q4: 如何不通过kafka手动删除topic
以删除topic test为例,步骤如下: 1.删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录,如test-0,test-1等 2.删除zookeeper目录下相关topic节点 rmr /brokers/topics/test rmr /config/topics/test rmr /admin/delete_topics/test
[参考资料]
http://www.cnblogs.com/huxi2b/p/4842695.html