深入学习Kafka:Topic的删除过程分析

> 所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

要删除Topic,需要执行下面命令:

.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test 

这里假设zookeeper地址为localhost,要删除的topic是test,这条命令实际上是在zookeeper的节点/admin/delete_topics下创建一个节点test,节点名为topic名字。(很多博文中说这个节点时临时的,其实不是,是个持久节点,直到topic真正删除时,才会被controller删除) 执行这段命令后控制台输出 Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. 也就是说执行删除命令,不是真正删除,而是标记删除,在zookeeper上添加/admin/delete_topics/test节点,也提醒了我们,需要提前打开delete.topic.enable开关。

Kafka删除Topic的源码分析

在Kafka中,Topic的删除是靠DeleteTopicManager类来完成的。 当Broker被选举成集群Leader之后,KafkaController中的onControllerFailover会被调用,在该方法中会调用deleteTopicManager.start()来启动删除Topic的线程。 而当Broker不再成为集群Leader时,KafkaController中的onControllerResignation会被调用,在该方法中会调用deleteTopicManager.shutdown()来关闭删除Topic的线程。

在KafkaController的onControllerFailover方法中,初始化了partitionStateMachine状态机,并注册了相应的事件监听器,主要是监听zookeeper节点/admin/delete_topics下子节点的变化。

  def onControllerFailover() {     if(isRunning) {       // ...       partitionStateMachine.registerListeners()       replicaStateMachine.registerListeners()       // ...       deleteTopicManager.start()     }     else       info("Controller has been shut down, aborting startup/failover")   } 
class PartitionStateMachine{   def registerListeners() {     registerTopicChangeListener()     if(controller.config.deleteTopicEnable)       //注册事件监听,关注节点/admin/delete_topics下子节点的变化       registerDeleteTopicListener()   }    private def registerDeleteTopicListener() = {     zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)   }    private def deregisterDeleteTopicListener() = {     zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)   } } 

kafka.controller.PartitionStateMachine.DeleteTopicsListener

DeleteTopicsListener将监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发,将该topic加入到deleteTopicManager的queue中

  class DeleteTopicsListener() extends IZkChildListener with Logging {     this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "     val zkUtils = controllerContext.zkUtils      /**      * Invoked when a topic is being deleted      * @throws Exception On any error.      */     @throws(classOf[Exception])     def handleChildChange(parentPath : String, children : java.util.List[String]) {       //监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发       inLock(controllerContext.controllerLock) {         var topicsToBeDeleted = {           import JavaConversions._           (children: Buffer[String]).toSet         }         debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))         //查询Topic是否存在,若topic已经不存在了,则直接删除/admin/delete_topics/<topic_name>节点         val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))         if(nonExistentTopics.size > 0) {           warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))           nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))         }         topicsToBeDeleted --= nonExistentTopics         if(topicsToBeDeleted.size > 0) {           info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))           // mark topic ineligible for deletion if other state changes are in progress           // 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。           topicsToBeDeleted.foreach { topic =>             val preferredReplicaElectionInProgress =               controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)             val partitionReassignmentInProgress =               controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)             if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)               controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))           }           // add topic to deletion list           //添加topic到待删除的queue中           controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)         }       }     }      /**      *      * @throws Exception    *             On any error.      */     @throws(classOf[Exception])     def handleDataDeleted(dataPath: String) {     }   }  

TopicDeletionManager

class TopicDeletionManager(controller: KafkaController,                            initialTopicsToBeDeleted: Set[String] = Set.empty,                            initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {   this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "   val controllerContext = controller.controllerContext   //partition状态机   val partitionStateMachine = controller.partitionStateMachine   //replica状态机   val replicaStateMachine = controller.replicaStateMachine   val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted   val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)   val deleteLock = new ReentrantLock()   val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++     (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)   val deleteTopicsCond = deleteLock.newCondition()   val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)   var deleteTopicsThread: DeleteTopicsThread = null   val isDeleteTopicEnabled = controller.config.deleteTopicEnable    /**    * Invoked at the end of new controller initiation    */   def start() {     if (isDeleteTopicEnabled) {       //如果topic.delete.enable=true,则启动删除线程       deleteTopicsThread = new DeleteTopicsThread()       if (topicsToBeDeleted.size > 0)         deleteTopicStateChanged.set(true)       deleteTopicsThread.start()     }   }    /**    * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.    */   def shutdown() {     // Only allow one shutdown to go through     if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {       // Resume the topic deletion so it doesn't block on the condition       //此时删除线程有可能处于等待状态,即awaitTopicDeletionNotification方法处于阻塞等待状态,则唤醒该删除线程       resumeTopicDeletionThread()       // Await delete topic thread to exit       //等待删除线程doWork执行结束       deleteTopicsThread.awaitShutdown()       //清除资源       topicsToBeDeleted.clear()       partitionsToBeDeleted.clear()       topicsIneligibleForDeletion.clear()     }   } } 

DeleteTopicsThread

DeleteTopicsThread继承自ShutdownableThread,ShutdownableThread是一个可以循环执行某个方法(doWork方法)的线程,也提供了shutdown方法同步等待该线程真正运行结束,代码比较简单。利用了CountDownLatch来阻塞调用shutdown的线程,待doWork真正执行结束时,再唤醒其他阻塞的线程。

ShutdownableThread

abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)         extends Thread(name) with Logging {   this.setDaemon(false)   this.logIdent = "[" + name + "], "   val isRunning: AtomicBoolean = new AtomicBoolean(true)   private val shutdownLatch = new CountDownLatch(1)    def shutdown() = {     //设置running状态为false     initiateShutdown()     //等待在运行的任务执行完毕     awaitShutdown()   }    def initiateShutdown(): Boolean = {     if(isRunning.compareAndSet(true, false)) {       info("Shutting down")       isRunning.set(false)       if (isInterruptible)         interrupt()       true     } else       false   }    /**    * After calling initiateShutdown(), use this API to wait until the shutdown is complete    */   def awaitShutdown(): Unit = {     //等待线程运行结束     shutdownLatch.await()     info("Shutdown completed")   }    /**    * This method is repeatedly invoked until the thread shuts down or this method throws an exception    */   def doWork(): Unit    override def run(): Unit = {     info("Starting ")     try{       while(isRunning.get()){         doWork()       }     } catch{       case e: Throwable =>         if(isRunning.get())           error("Error due to ", e)     }     //计数器减一,唤醒在awaitShutdown方法上等待的线程     shutdownLatch.countDown()     info("Stopped ")   } } 

DeleteTopicsThread

当删除Topic的事件通知到来,则doWork里方法继续往下执行: 当所有的replica都完成了topic的删除动作,则调用completeDeleteTopic做最后的清理动作,包括zookeeper上节点的删除,以及controller内存中的清理。 如果有replica将该topic标记为不可删除(可能之前是由于该replica处于Preferred副本选举或分区重分配的过程中),如果有,则重试将topic标记成删除状态 如果该topic可以被删除,且还没有处于已经开始删除的状态,则调用onTopicDeletion执行真正的删除逻辑

  class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {     val zkUtils = controllerContext.zkUtils     override def doWork() {       //等待删除Topic的事件通知       awaitTopicDeletionNotification()        if (!isRunning.get)         return        inLock(controllerContext.controllerLock) {         val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted          if(!topicsQueuedForDeletion.isEmpty)           info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))          topicsQueuedForDeletion.foreach { topic =>           // if all replicas are marked as deleted successfully, then topic deletion is done           //如果所有的replica都完成了topic的删除           if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {             // clear up all state for this topic from controller cache and zookeeper             completeDeleteTopic(topic)             info("Deletion of topic %s successfully completed".format(topic))           } else {             //至少一个replica在开始删除状态             if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {               // ignore since topic deletion is in progress               val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)               val replicaIds = replicasInDeletionStartedState.map(_.replica)               val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))               info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),                 partitions.mkString(","), topic))             } else {               // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in               // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion               // or there is at least one failed replica (which means topic deletion should be retried).               //如果没有replica处于开始删除状态(TopicDeletionStarted),并且也不是所有replica都删除了该topic               //则判断是否有replica将该topic标记为不可删除,如果有,则重试将topic标记成删除状态               if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {                 // mark topic for deletion retry                 markTopicForDeletionRetry(topic)               }             }           }           // Try delete topic if it is eligible for deletion.           //如果该topic可以被删除,且还没有处于已经开始删除的状态           if(isTopicEligibleForDeletion(topic)) {             info("Deletion of topic %s (re)started".format(topic))             // topic deletion will be kicked off             //触发topic删除事件             onTopicDeletion(Set(topic))           } else if(isTopicIneligibleForDeletion(topic)) {             info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))           }         }       }     }   } 

completeDeleteTopic方法

完成删除topic后会调用completeDeleteTopic进行一些清理工作,即: 删除zookeeper上节点/brokers/topics/<topic_name> 删除zookeeper上节点/config/topics/<topic_name> 删除zookeeper上节点/admin/delete_topics/<topic_name> 并删除内存中的topic相关信息。

  private def completeDeleteTopic(topic: String) {     // deregister partition change listener on the deleted topic. This is to prevent the partition change listener     // firing before the new topic listener when a deleted topic gets auto created     partitionStateMachine.deregisterPartitionChangeListener(topic)     val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)     // controller will remove this replica from the state machine as well as its partition assignment cache     replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)     val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)     // move respective partition to OfflinePartition and NonExistentPartition state     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)     topicsToBeDeleted -= topic     partitionsToBeDeleted.retain(_.topic != topic)     val zkUtils = controllerContext.zkUtils     //删除zookeeper上节点/brokers/topics/<topic_name>     zkUtils.zkClient.deleteRecursive(getTopicPath(topic))     //删除zookeeper上节点/config/topics/<topic_name>     zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))     //删除zookeeper上节点/admin/delete_topics/<topic_name>     zkUtils.zkClient.delete(getDeleteTopicPath(topic))     //最后移除内存中的topic相关信息     controllerContext.removeTopic(topic)   } 

markTopicForDeletionRetry方法

将topic标记成OfflineReplica状态来重试删除

  private def markTopicForDeletionRetry(topic: String) {     // reset replica states from ReplicaDeletionIneligible to OfflineReplica     val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)     info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"       .format(topic, failedReplicas.mkString(",")))     controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)   } 

onTopicDeletion方法

onTopicDeletion最终会调用startReplicaDeletion方法,来开始删除这个topic的所有分区

  private def onTopicDeletion(topics: Set[String]) {     info("Topic deletion callback for %s".format(topics.mkString(",")))     // send update metadata so that brokers stop serving data for topics to be deleted     val partitions = topics.flatMap(controllerContext.partitionsForTopic)     // 向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据     controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)     val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)     topics.foreach { topic =>       onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)     }   }    private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {     info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))     val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)     startReplicaDeletion(replicasPerPartition)   }    private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {     replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>       var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))       val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic       val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)       val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas       // move dead replicas directly to failed state       //将所有已经挂掉的replica标记成ReplicaDeletionIneligible(无法删除的Replica)       replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)       // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader       //将所有未挂掉的replica标记成OfflineReplica(下线的Replica),并发送给相应的broker,这样这些broker就不会再向Leader发送该topic的同步请求(FetchRequest)       replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))       //给所有replica发送停止fetch请求,请求完成后,回调deleteTopicStopReplicaCallback方法       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,         new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)       if(deadReplicasForTopic.size > 0) {         debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))         markTopicIneligibleForDeletion(Set(topic))       }     }   }  
  //开始删除topic开始时,会给存活的broker发送停止fetch的请求,请求完成后,会回调该方法   private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {     val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]     debug("Delete topic callback invoked for %s".format(stopReplicaResponse))     val responseMap = stopReplicaResponse.responses.asScala     val partitionsInError =       if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet       else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet     val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))     inLock(controllerContext.controllerLock) {       // move all the failed replicas to ReplicaDeletionIneligible       //若有replica出现错误,则将它踢出可删除的Replica列表       failReplicaDeletion(replicasInError)       if (replicasInError.size != responseMap.size) {         //有些Replica已经成功删除了数据         // some replicas could have been successfully deleted         val deletedReplicas = responseMap.keySet -- partitionsInError         completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))       }     }   } 

Kafka删除Topic的过程

分析完源代码,我们总结下,Kafka删除Topic的过程

  1. Kafka的broker在被选举成controller后,会执行下面几步 1.1 注册DeleteTopicsListener,监听zookeeper节点/admin/delete_topics下子节点的变化,delete命令实际上就是要在该节点下创建一个节点,名字是待删除topic名,标记该topic是待删除的 1.2 创建一个单独的线程DeleteTopicsThread,来执行topic删除的操作
  2. DeleteTopicsThread线程启动时会先在awaitTopicDeletionNotification处阻塞并等待删除事件的通知,即有新的topic被添加到queue里等待被删除。
  3. 当我们使用了delete命令在zookeeper上的节点/admin/delete_topics下创建子节点<topic_name>。
  4. DeleteTopicsListener会收到ChildChange事件会依次判断如下逻辑: 4.1 查询topic是否存在,若已经不存在了,则直接删除/admin/delete_topics/<topic_name>节点。 4.2 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。 4.3 并将该topic添加到queue中,此时会唤醒DeleteTopicsThread中doWork方法里awaitTopicDeletionNotification处的阻塞线程,让删除线程继续往下执行。

而删除线程执行删除操作的真正逻辑是:

  1. 它首先会向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据。
  2. 开始删除这个topic的所有分区 2.1 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了 2.2 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1 2.3 将所有副本置于ReplicaDeletionStarted状态 2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件 2.5 关闭所有空闲的Fetcher线程
  3. 删除zookeeper上节点/brokers/topics/<topic_name>
  4. 删除zookeeper上节点/config/topics/<topic_name>
  5. 删除zookeeper上节点/admin/delete_topics/<topic_name>
  6. 并删除内存中的topic相关信息。

Kafka删除Topic的流程图

Kafka删除Topic的流程图

Q&A

前面我们分析了Kafka删除Topic的源代码,也总结了其删除的过程,下面我们再来看看下面这些相关问题,加深对这个过程的理解

Q1:有分区挂掉的情况下,是否能正常删除?

修改三个broker的server.properties,分别开启delete.topic.enable=true 启动zookeeper和三个broker,broker1,broker2,broker3,并启动kafka-manager, 其中zookeeper端口为2181, broker1端口为9091,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_1 broker2端口为9092,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_2 broker3端口为9093,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_3 kafka-manager端口为9000,访问http://localhost:9000可以查看kafka集群情况

开始实验,创建topic test

.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic test 

并写入几条测试消息

.\kafka-console-producer.bat --broker-list localhost:9092 --topic test 111 222 333 444 555 666 

观察zookeeper中路径

ls /brokers/topics/test/partitions [0, 1, 2, 3, 4, 5] 

关闭broker2,并执行删除topic命令

.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test 

观察zookeeper中路径/admin/delete_topics

[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics [test] 

过几秒后观察只有broker2的log目录下存在topic test的文件夹,而broker1和broker2的log目录下已经删除了test相关log test-0,test-1,test-2,test-3,test-4,test-5

观察broker1的controller.log

[2017-12-06 12:14:39,181] DEBUG [DeleteTopicsListener on 1]: Delete topics listener fired for topics test to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener) [2017-12-06 12:14:39,182] INFO [DeleteTopicsListener on 1]: Starting topic deletion for topics test (kafka.controller.PartitionStateMachine$DeleteTopicsListener) [2017-12-06 12:14:39,184] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,186] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> OnlineReplica, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=3] -> OnlineReplica, [Topic=test,Partition=3,Replica=3] -> OnlineReplica, [Topic=test,Partition=5,Replica=1] -> OnlineReplica, [Topic=test,Partition=0,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=1] -> OnlineReplica, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> OnlineReplica, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,187] INFO [delete-topics-thread-1], Deletion of topic test (re)started (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,188] INFO [Topic Deletion Manager 1], Topic deletion callback for test (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,191] INFO [Topic Deletion Manager 1], Partition deletion callback for [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,194] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionIneligible for replicas [Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,195] INFO [Replica state machine on controller 1]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,195] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,5]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,200] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":1,"leader_epoch":5,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,200] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,2]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,204] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,204] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,3]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,207] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,208] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,4]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,211] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,211] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,1]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,215] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,215] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,2]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,219] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,219] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,0]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,223] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":3,"isr":[1]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,223] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,3]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,227] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,227] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,5]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,230] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,231] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,0]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,235] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":4,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,235] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,4]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,239] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,240] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,1]. (kafka.controller.KafkaController) [2017-12-06 12:14:39,243] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 1 is  (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 3 is  (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,245] DEBUG [Topic Deletion Manager 1], Deletion started for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,245] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionStarted for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,246] DEBUG The stop replica request (delete = true) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 1 is  (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = true) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 3 is  (kafka.controller.ControllerBrokerRequestBatch) [2017-12-06 12:14:39,247] DEBUG [Topic Deletion Manager 1], Dead Replicas ([Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2]) found for topic test (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,248] INFO [Topic Deletion Manager 1], Halted deletion of topics test (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,248] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,279] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,279] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,279] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,279] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,282] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,282] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,283] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,283] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,295] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,296] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,296] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,296] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,297] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,298] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,298] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,313] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,313] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,314] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,315] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,315] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,329] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,330] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,0],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,330] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,332] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,0],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,333] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,333] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,345] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,345] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,346] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,346] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,347] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,348] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,348] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,360] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,361] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,361] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,361] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,361] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Deletion for replicas 1 for partition [test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager) [2017-12-06 12:14:39,362] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,362] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine) [2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-12-06 12:14:39,363] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) 

发现broker1和broker3的所有partition全部删除了

Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1] Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3] Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1] 

观察broker1和broker3的server.log,发现topic test的数据文件已被成功清除

[2017-12-06 12:14:39,245] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,248] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,249] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,250] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,251] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,252] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,253] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,268] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,272] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5. (kafka.log.LogManager) [2017-12-06 12:14:39,279] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,290] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,294] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4. (kafka.log.LogManager) [2017-12-06 12:14:39,296] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,309] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,312] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1. (kafka.log.LogManager) [2017-12-06 12:14:39,313] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,324] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,328] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2. (kafka.log.LogManager) [2017-12-06 12:14:39,330] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,341] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,344] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0. (kafka.log.LogManager) [2017-12-06 12:14:39,346] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:14:39,356] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:14:39,360] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3. (kafka.log.LogManager) 

接着启动broker2 观察broker2的server.log,发现topic test的数据文件已被成功清除

[2017-12-06 12:24:05,687] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,691] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,695] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,700] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,706] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,711] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,715] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,742] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,747] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1. (kafka.log.LogManager) [2017-12-06 12:24:05,750] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,764] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,769] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4. (kafka.log.LogManager) [2017-12-06 12:24:05,772] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,785] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,789] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2. (kafka.log.LogManager) [2017-12-06 12:24:05,791] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,803] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,805] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0. (kafka.log.LogManager) [2017-12-06 12:24:05,808] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,818] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,821] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3. (kafka.log.LogManager) [2017-12-06 12:24:05,823] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager) [2017-12-06 12:24:05,833] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5\00000000000000000000.index (kafka.log.OffsetIndex) [2017-12-06 12:24:05,836] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5. (kafka.log.LogManager) 

最后观察zookeeper中路径/admin/delete_topics,已经没有test子节点

[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics [] 

观察zookeeper中路径

[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics/test/partitions Node does not exist: /brokers/topics/test/partitions 

Q2:是否不需要每台broker都修改topic.delete.enable为true?

在Q1中,我们如果只配置broker1的topic.delete.enable为true,也是能正常删除topic的,我们只要保证broker1是整个集群的controller即可,我们可以先启动broker1,然后再启动broker2,broker3 而在生产环境中,为防止有人误删,我们一般会关闭删除功能,而定期清理不用的topic,在定期做清理时,我们其实可以只改其中一个broker的配置,然后确保它是集群中的controller即可。 而不幸的是,你改了broker1配置,必然要重启该broker,然后其他broker就会成为新的broker,这时,相当于你修改的这个broker1的配置还是不起作用,因为它不是controller了,所以我们可以在broker1重新启动后,分别关闭broker2,broker3,让broker1重新获得controller的控制权,这样就不用一个个去修改broker的配置了 在清理完topic后,记得把topic.delete.enable改回为false。

Q3:如果删除topic时还有producer连接到该topic时,且配置auto.create.topics.enable=true,会发生什么?

本人在做Q1例子的时候,启动了客户端生产消息到topic test中,然后忘记关了,而auto.create.topics.enable是默认打开的,所以我在尝试删除topic的时候,发现总有broker1中的test-0无法删除。 后来才发现是因为有客户端连接到了该topic,在我删除成功后,又自动创建了topic,而默认是1个replica,和1个partition。

Q4: 如何不通过kafka手动删除topic

以删除topic test为例,步骤如下: 1.删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录,如test-0,test-1等 2.删除zookeeper目录下相关topic节点 rmr /brokers/topics/test rmr /config/topics/test rmr /admin/delete_topics/test

[参考资料]

http://www.cnblogs.com/huxi2b/p/4842695.html

注:本文转载自https://my.oschina.net/mozhu/blog/1585839,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除。


评论

赞助商