Kafka集群安装


声明:本文转载自https://my.oschina.net/u/2486137/blog/1538655,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

①.kafka需要依赖zk管理,在搭建kafka集群之前需要先搭建zk集群:

https://my.oschina.net/u/2486137/blog/1537389

②.从apache kafka官网下载kafka( 二进制版本)

       注意下载的版本否则会在启动时报错:找不到主类Kafka.kafka.

我这里使用的是2.10版本.

③.配置config/server.properties文件:

    

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements.  See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License.  You may obtain a copy of the License at # #    http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults  ############################# Server Basics #############################  # The id of the broker. This must be set to a unique integer for each broker. #每个Broker在集群中的唯一标识.即使Broker的IP地址发生变化,broker.id只要没变, #则不会影响consumers的消息标识. broker.id=0 #类似于zk的myid,  #是否允许Topic被删除,如果是false,使用管理员工具删除Topic的时候,kafka并不会处理此操作 delete.topic.enable=true  #是否允许自动创建topic,若是false,就需要通过命令创建topic,默认为true,建议设置成false, #并在使用topic之前手动创建. #如果打开此选项(true)则以下2种请求会触发topic的自动创建: #①.producer向某个不存在的topic写入消息 #②.consumer某个不存在的topic读取消息 auto.create.topics.enable =true  ############################# Socket Server Settings ############################# ############################# 下面是服务端网络的相关配置 ############################# # The address the socket server listens on. It will get the value returned from  # java.net.InetAddress.getCanonicalHostName() if not configured. #   FORMAT: # kafka server使用的协议,主机名及端口格式如下: #     listeners = security_protocol://host_name:port #   EXAMPLE: #参考示例: #     listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092,#这是默认配置,使用PLAINTEXT,端口是9092  # Hostname and port the broker will advertise to producers and consumers. If not set,  # it uses the value for "listeners" if configured.  Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092  # The number of threads handling network requests #broker处理消息的最大线程数,一般情况下不需要去修改 num.network.threads=3  # The number of threads doing disk I/O #broker处理磁盘IO的线程数,数值应该大于你的硬盘数 num.io.threads=8  # The send buffer (SO_SNDBUF) used by the socket server #socket的发送缓冲区,socket的调优参数SO_SNDBUFF,如果是-1就使用操作系统的默认值 socket.send.buffer.bytes=102400  # The receive buffer (SO_RCVBUF) used by the socket server #socket的接受缓冲区,socket的调优参数SO_RCVBUFF,如果是-1就使用操作系统的默认值 socket.receive.buffer.bytes=102400  # The maximum size of a request that the socket server will accept (protection against OOM) #socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 socket.request.max.bytes=104857600   ############################# Log Basics #############################  # A comma seperated list of directories under which to store log files #存储log文件的目录,可以将多个目录通过逗号分隔,形成一个目录列表 log.dirs=/tmp/kafka-logs  # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. #每个topic的分区个数,默认为1,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 num.partitions=3  # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. #用来恢复log文件以及关闭时将log数据刷新到磁盘的线程数量,每个目录对应num.recovery.threads.per.data.dir个线程 num.recovery.threads.per.data.dir=1  ############################# Log Flush Policy ############################# ############################# log文件刷盘的相关配置 ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: #    1. Durability: Unflushed data may be lost if you are not using replication. #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis.  # The number of messages to accept before forcing a flush of data to disk #log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作, #但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡. #如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞), #如果此值过小,将会导致"fsync"的次数较多, #这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失. #每隔多少个消息触发一次flush操作,将内存中的数据刷新到磁盘 #log.flush.interval.messages=10000  # The maximum amount of time a message can sit in a log before we force a flush #仅仅通过interval来控制消息的磁盘写入时机,是不足的. #此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. #每隔多少毫秒触发一次flush操作,将内存中的数据刷新到磁盘 #log.flush.interval.ms=1000  ############################# Log Retention Policy ############################# ############################# Log 相关的保存策略 ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. #注意:下面有两种配置,一种是基于时间的策略,另一种是基于日志文件大小的策略,两种. #策略同是配置的话,只要满足其中一种,则触发log删除的操作,删除操作总是删除最旧的日志 # The minimum age of a log file to be eligible for deletion #消息在kafka中保存的时间,168小时前的log,可以被删除掉 log.retention.hours=168  # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #当剩余空间低于log.segment.bytes字节,则开始删除log #log.retention.bytes=1073741824  # The maximum size of a log segment file. When this size is reached a new log segment will be created. # segment日志文件大小的上限值,当超过这个值,会创建新的segment日志文件 log.segment.bytes=1073741824  # The interval at which log segments are checked to see if they can be deleted according # to the retention policies #每隔300000ms,logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除 log.retention.check.interval.ms=300000  ############################# Zookeeper ############################# ############################# Zookeeper的相关配置 ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. #kafka依赖的Zookeeper集群地址,可以配置多个Zookeeper地址,使用,隔开 zookeeper.connect=zk1:2181,zk2:2181,zk3:2181  # Timeout in ms for connecting to zookeeper #Zookeeper连接超时的超时时间 zookeeper.connection.timeout.ms=6000   

④.查看启动日志:

[2017-09-16 19:22:12,567] INFO KafkaConfig values:          advertised.host.name = null         advertised.listeners = null         advertised.port = null         authorizer.class.name =          auto.create.topics.enable = true         auto.leader.rebalance.enable = true         background.threads = 10         broker.id = 3         broker.id.generation.enable = true         broker.rack = null         compression.type = producer         connections.max.idle.ms = 600000         controlled.shutdown.enable = true         controlled.shutdown.max.retries = 3         controlled.shutdown.retry.backoff.ms = 5000         controller.socket.timeout.ms = 30000         default.replication.factor = 1         delete.topic.enable = false         fetch.purgatory.purge.interval.requests = 1000         group.max.session.timeout.ms = 300000         group.min.session.timeout.ms = 6000         host.name =          inter.broker.protocol.version = 0.10.1-IV2         leader.imbalance.check.interval.seconds = 300         leader.imbalance.per.broker.percentage = 10         listeners = PLAINTEXT://k1:9092         log.cleaner.backoff.ms = 15000         log.cleaner.dedupe.buffer.size = 134217728         log.cleaner.delete.retention.ms = 86400000         log.cleaner.enable = true         log.cleaner.io.buffer.load.factor = 0.9         log.cleaner.io.buffer.size = 524288         log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308         log.cleaner.min.cleanable.ratio = 0.5         log.cleaner.min.compaction.lag.ms = 0         log.cleaner.threads = 1         log.cleanup.policy = [delete]         log.dir = /tmp/kafka-logs         log.dirs = /usr/local/kafka_2.10/kafka-logs         log.flush.interval.messages = 9223372036854775807         log.flush.interval.ms = null         log.flush.offset.checkpoint.interval.ms = 60000         log.flush.scheduler.interval.ms = 9223372036854775807         log.index.interval.bytes = 4096         log.index.size.max.bytes = 10485760         log.message.format.version = 0.10.1-IV2         log.message.timestamp.difference.max.ms = 9223372036854775807         log.message.timestamp.type = CreateTime         log.preallocate = false         log.retention.bytes = -1         log.retention.check.interval.ms = 300000         log.retention.hours = 168         log.retention.minutes = null         log.retention.ms = null         log.roll.hours = 168         log.roll.jitter.hours = 0         log.roll.jitter.ms = null         log.roll.ms = null         log.segment.bytes = 1073741824         log.segment.delete.delay.ms = 60000         max.connections.per.ip = 2147483647         max.connections.per.ip.overrides =          message.max.bytes = 1000012         metric.reporters = []         metrics.num.samples = 2         metrics.sample.window.ms = 30000         min.insync.replicas = 1         num.io.threads = 8         num.network.threads = 3         num.partitions = 3         num.recovery.threads.per.data.dir = 1         num.replica.fetchers = 1         offset.metadata.max.bytes = 4096         offsets.commit.required.acks = -1         offsets.commit.timeout.ms = 5000         offsets.load.buffer.size = 5242880         offsets.retention.check.interval.ms = 600000         offsets.retention.minutes = 1440         offsets.topic.compression.codec = 0         offsets.topic.num.partitions = 50         offsets.topic.replication.factor = 3         offsets.topic.segment.bytes = 104857600         port = 9092         principal.builder.class = class          org.apache.kafka.common.security.auth.DefaultPrincipalBuilder         producer.purgatory.purge.interval.requests = 1000         ssl.keystore.password = null         ssl.keystore.type = JKS         ssl.protocol = TLS         ssl.provider = null         ssl.secure.random.implementation = null         ssl.trustmanager.algorithm = PKIX         ssl.truststore.location = null         ssl.truststore.password = null         ssl.truststore.type = JKS         unclean.leader.election.enable = true         zookeeper.connect = zk1:2181,zk2:2181,zk3:2181         zookeeper.connection.timeout.ms = 6000         zookeeper.session.timeout.ms = 6000         zookeeper.set.acl = false         zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2017-09-16 19:22:12,910] INFO starting (kafka.server.KafkaServer) [2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledReques tReaper) [2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequ estReaper)          queued.max.requests = 500         quota.consumer.default = 9223372036854775807         quota.producer.default = 9223372036854775807         quota.window.num = 11         quota.window.size.seconds = 1         replica.fetch.backoff.ms = 1000         replica.fetch.max.bytes = 1048576         replica.fetch.min.bytes = 1         replica.fetch.response.max.bytes = 10485760         replica.fetch.wait.max.ms = 500         replica.high.watermark.checkpoint.interval.ms = 5000         replica.lag.time.max.ms = 10000         replica.socket.receive.buffer.bytes = 65536         replica.socket.timeout.ms = 30000         replication.quota.window.num = 11         replication.quota.window.size.seconds = 1         request.timeout.ms = 30000         reserved.broker.max.id = 1000         sasl.enabled.mechanisms = [GSSAPI]         sasl.kerberos.kinit.cmd = /usr/bin/kinit         sasl.kerberos.min.time.before.relogin = 60000         sasl.kerberos.principal.to.local.rules = [DEFAULT]         sasl.kerberos.service.name = null         sasl.kerberos.ticket.renew.jitter = 0.05         sasl.kerberos.ticket.renew.window.factor = 0.8         sasl.mechanism.inter.broker.protocol = GSSAPI         security.inter.broker.protocol = PLAINTEXT         socket.receive.buffer.bytes = 102400         socket.request.max.bytes = 104857600         socket.send.buffer.bytes = 102400         ssl.cipher.suites = null         ssl.client.auth = none         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]         ssl.endpoint.identification.algorithm = null         ssl.key.password = null         ssl.keymanager.algorithm = SunX509         ssl.keystore.location = null  [2017-09-16 19:22:13,241] INFO Connecting to zookeeper on zk1:2181,zk2:2181,zk3:2181 (kafka .server.KafkaServer) [2017-09-16 19:22:15,475] INFO Cluster ID = YkyEXTiPR62G5jdo1v6rKQ (kafka.server.KafkaServer) [2017-09-16 19:22:15,570] INFO Log directory '/usr/local/kafka_2.10/kafka-logs' not found, creating it. (kafka.log.LogMan ager) [2017-09-16 19:22:15,708] INFO Loading logs. (kafka.log.LogManager) [2017-09-16 19:22:15,723] INFO Logs loading complete in 15 ms. (kafka.log.LogManager) [2017-09-16 19:22:21,676] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2017-09-16 19:22:21,844] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManage r) [2017-09-16 19:22:21,850] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka. server.BrokerMetadataCheckpoint) [2017-09-16 19:22:22,028] INFO Awaiting socket connections on k3:9092. (kafka.network.Acceptor) [2017-09-16 19:22:22,032] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer) [2017-09-16 19:22:22,081] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,092] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,174] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,181] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,186] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,218] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator) [2017-09-16 19:22:22,220] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator) [2017-09-16 19:22:22,233] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 8 milliseconds. (kafka. coordinator.GroupMetadataManager) [2017-09-16 19:22:22,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2017-09-16 19:22:22,992] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-09-16 19:22:23,087] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-09-16 19:22:23,090] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(192.168.1 .137,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2017-09-16 19:22:23,092] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka. server.BrokerMetadataCheckpoint) [2017-09-16 19:22:23,498] INFO [Kafka Server 3], started (kafka.server.KafkaServer) [2017-09-16 19:32:22,220] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 0 milliseconds. (kafka. coordinator.GroupMetadataManager)  

⑤.创建一个topic

 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-1 --partitions 3 --replication-factor 3 --config max.message.bytes=64000 --config flush.messages=1 

⑥.查看topic信息:

可以看到主题,分区,副本等一些信息

[root@localhost kafka_2.10]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-1 Topic:test-1	PartitionCount:3	ReplicationFactor:3	Configs:max.message.bytes=64000,flush.messages=1 	Topic: test-1	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3 	Topic: test-1	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1 	Topic: test-1	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2 

⑦.java client代码:

生产者:

	List<ProducerInterceptor<Integer,String>> interceptors = new ArrayList<ProducerInterceptor<Integer,String>>(); 		interceptors.add(new KafkaProducerInterceptor()); 		Properties props = new Properties(); 		props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST); 		props.put("key.serializer", IntegerSerializer.class); 		props.put("value.serializer", StringSerializer.class); 		props.put("compression.type", "gzip"); 		@SuppressWarnings("resource")         KafkaProducer<Integer, String> producer  = new KafkaProducer<Integer, String>(props); 		String content = ""; 		for(int i =0;i<100;i++){ 			content = "hello:"+(i+1); 			ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>( 					"test-1", i, content); 			producer.send(record, new KafkaHandle()); 			System.out.println("async message:" + content); 		}
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/E:/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/E:/repository/org/slf4j/slf4j-log4j12/1.7.1/slf4j-log4j12-1.7.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 722  [main] INFO  o.a.k.c.p.ProducerConfig - ProducerConfig values:  	interceptor.classes = null 	request.timeout.ms = 30000 	ssl.truststore.password = null 	retry.backoff.ms = 100 	buffer.memory = 33554432 	batch.size = 16384 	ssl.keymanager.algorithm = SunX509 	receive.buffer.bytes = 32768 	ssl.key.password = null 	ssl.cipher.suites = null 	sasl.kerberos.ticket.renew.jitter = 0.05 	sasl.kerberos.service.name = null 	ssl.provider = null 	max.in.flight.requests.per.connection = 5 	sasl.kerberos.ticket.renew.window.factor = 0.8 	sasl.mechanism = GSSAPI 	bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092] 	client.id =  	max.request.size = 1048576 	acks = 1 	linger.ms = 0 	sasl.kerberos.kinit.cmd = /usr/bin/kinit 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 	metadata.fetch.timeout.ms = 60000 	ssl.endpoint.identification.algorithm = null 	ssl.keystore.location = null 	value.serializer = class org.apache.kafka.common.serialization.StringSerializer 	ssl.truststore.location = null 	ssl.keystore.password = null 	block.on.buffer.full = false 	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer 	metrics.sample.window.ms = 30000 	security.protocol = PLAINTEXT 	metadata.max.age.ms = 300000 	ssl.protocol = TLS 	sasl.kerberos.min.time.before.relogin = 60000 	timeout.ms = 30000 	connections.max.idle.ms = 540000 	ssl.trustmanager.algorithm = PKIX 	metric.reporters = [] 	ssl.truststore.type = JKS 	compression.type = gzip 	retries = 0 	max.block.ms = 60000 	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 	send.buffer.bytes = 131072 	reconnect.backoff.ms = 50 	metrics.num.samples = 2 	ssl.keystore.type = JKS  794  [main] INFO  o.a.k.c.p.ProducerConfig - ProducerConfig values:  	interceptor.classes = null 	request.timeout.ms = 30000 	ssl.truststore.password = null 	retry.backoff.ms = 100 	buffer.memory = 33554432 	batch.size = 16384 	ssl.keymanager.algorithm = SunX509 	receive.buffer.bytes = 32768 	ssl.key.password = null 	ssl.cipher.suites = null 	sasl.kerberos.ticket.renew.jitter = 0.05 	sasl.kerberos.service.name = null 	ssl.provider = null 	max.in.flight.requests.per.connection = 5 	sasl.kerberos.ticket.renew.window.factor = 0.8 	sasl.mechanism = GSSAPI 	bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092] 	client.id = producer-1 	max.request.size = 1048576 	acks = 1 	linger.ms = 0 	sasl.kerberos.kinit.cmd = /usr/bin/kinit 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 	metadata.fetch.timeout.ms = 60000 	ssl.endpoint.identification.algorithm = null 	ssl.keystore.location = null 	value.serializer = class org.apache.kafka.common.serialization.StringSerializer 	ssl.truststore.location = null 	ssl.keystore.password = null 	block.on.buffer.full = false 	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer 	metrics.sample.window.ms = 30000 	security.protocol = PLAINTEXT 	metadata.max.age.ms = 300000 	ssl.protocol = TLS 	sasl.kerberos.min.time.before.relogin = 60000 	timeout.ms = 30000 	connections.max.idle.ms = 540000 	ssl.trustmanager.algorithm = PKIX 	metric.reporters = [] 	ssl.truststore.type = JKS 	compression.type = gzip 	retries = 0 	max.block.ms = 60000 	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 	send.buffer.bytes = 131072 	reconnect.backoff.ms = 50 	metrics.num.samples = 2 	ssl.keystore.type = JKS  798  [main] INFO  o.a.k.c.u.AppInfoParser - Kafka version : 0.10.0.1 798  [main] INFO  o.a.k.c.u.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5 async message:hello:1 async message:hello:2 async message:hello:3 async message:hello:4 async message:hello:5 async message:hello:6 async message:hello:7 async message:hello:8 async message:hello:9 async message:hello:10 async message:hello:11 async message:hello:12 async message:hello:13 async message:hello:14 async message:hello:15 async message:hello:16 async message:hello:17 async message:hello:18 async message:hello:19 async message:hello:20 async message:hello:21 async message:hello:22 async message:hello:23 async message:hello:24 async message:hello:25 async message:hello:26 async message:hello:27 async message:hello:28 async message:hello:29 async message:hello:30 async message:hello:31 async message:hello:32 async message:hello:33 async message:hello:34 async message:hello:35 async message:hello:36 async message:hello:37 async message:hello:38 async message:hello:39 async message:hello:40 async message:hello:41 async message:hello:42 async message:hello:43 async message:hello:44 async message:hello:45 async message:hello:46 async message:hello:47 async message:hello:48 async message:hello:49 async message:hello:50 async message:hello:51 async message:hello:52 async message:hello:53 async message:hello:54 async message:hello:55 async message:hello:56 async message:hello:57 async message:hello:58 async message:hello:59 async message:hello:60 async message:hello:61 async message:hello:62 async message:hello:63 async message:hello:64 async message:hello:65 async message:hello:66 async message:hello:67 async message:hello:68 async message:hello:69 async message:hello:70 async message:hello:71 async message:hello:72 async message:hello:73 async message:hello:74 async message:hello:75 async message:hello:76 async message:hello:77 async message:hello:78 async message:hello:79 async message:hello:80 async message:hello:81 async message:hello:82 async message:hello:83 async message:hello:84 async message:hello:85 async message:hello:86 async message:hello:87 async message:hello:88 async message:hello:89 async message:hello:90 async message:hello:91 async message:hello:92 async message:hello:93 async message:hello:94 async message:hello:95 async message:hello:96 async message:hello:97 async message:hello:98 async message:hello:99 async message:hello:100 

消费者:

ExecutorService fixedPool = Executors.newFixedThreadPool(3); 		fixedPool.execute(new Runnable() { 			public void run() { 				Properties props = new Properties(); 				props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST); 				props.put("group.id", KafkaCfg.GROUP_ID); 				props.put("zookeeper.session.timeout.ms", "60000"); 				props.put("zookeeper.sync.time.ms", "200"); 				props.put("enable.auto.commit", "true"); // 自动commit 				props.put("auto.commit.interval.ms", "1000"); 				//latest, earliest, none 				props.put("auto.offset.reset", "earliest"); 				props.put("key.deserializer", IntegerDeserializer.class); 				props.put("value.deserializer", StringDeserializer.class); 				KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props); 				consumer.subscribe(Arrays.asList(KafkaCfg.TOPIC, KafkaCfg.TOPIC2)); // 可消费多个topic,组成一个list 				while (true) { 					ConsumerRecords<Integer, String> records = consumer.poll(100); 					for (ConsumerRecord<Integer, String> record : records) { 						System.out.println("record:"+new Gson().toJson(record)); 					} 				} 				 			} 		});
record:{"topic":"test-1","partition":0,"offset":17,"timestamp":1505629339505,"timestampType":"CREATE_TIME","checksum":3084842117,"serializedKeySize":4,"serializedValueSize":7,"key":1,"value":"hello:2"} record:{"topic":"test-1","partition":0,"offset":18,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2036504617,"serializedKeySize":4,"serializedValueSize":7,"key":7,"value":"hello:8"} record:{"topic":"test-1","partition":0,"offset":19,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2096183246,"serializedKeySize":4,"serializedValueSize":7,"key":8,"value":"hello:9"} record:{"topic":"test-1","partition":0,"offset":20,"timestamp":1505629339524,"timestampType":"CREATE_TIME","checksum":1567468433,"serializedKeySize":4,"serializedValueSize":8,"key":14,"value":"hello:15"} record:{"topic":"test-1","partition":0,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":2250809392,"serializedKeySize":4,"serializedValueSize":8,"key":15,"value":"hello:16"} record:{"topic":"test-1","partition":0,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":795944797,"serializedKeySize":4,"serializedValueSize":8,"key":17,"value":"hello:18"} record:{"topic":"test-1","partition":0,"offset":23,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":1596880373,"serializedKeySize":4,"serializedValueSize":8,"key":21,"value":"hello:22"} record:{"topic":"test-1","partition":0,"offset":24,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":2549012433,"serializedKeySize":4,"serializedValueSize":8,"key":26,"value":"hello:27"} record:{"topic":"test-1","partition":0,"offset":25,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3946489373,"serializedKeySize":4,"serializedValueSize":8,"key":30,"value":"hello:31"} record:{"topic":"test-1","partition":0,"offset":26,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":4171966126,"serializedKeySize":4,"serializedValueSize":8,"key":32,"value":"hello:33"} record:{"topic":"test-1","partition":0,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3143199368,"serializedKeySize":4,"serializedValueSize":8,"key":33,"value":"hello:34"} record:{"topic":"test-1","partition":0,"offset":28,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":889962223,"serializedKeySize":4,"serializedValueSize":8,"key":35,"value":"hello:36"} record:{"topic":"test-1","partition":0,"offset":29,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":658609139,"serializedKeySize":4,"serializedValueSize":8,"key":38,"value":"hello:39"} record:{"topic":"test-1","partition":0,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":1769068338,"serializedKeySize":4,"serializedValueSize":8,"key":42,"value":"hello:43"} record:{"topic":"test-1","partition":0,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":3207409220,"serializedKeySize":4,"serializedValueSize":8,"key":44,"value":"hello:45"} record:{"topic":"test-1","partition":2,"offset":17,"timestamp":1505629339518,"timestampType":"CREATE_TIME","checksum":3419956930,"serializedKeySize":4,"serializedValueSize":7,"key":2,"value":"hello:3"} record:{"topic":"test-1","partition":2,"offset":18,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2857189508,"serializedKeySize":4,"serializedValueSize":7,"key":5,"value":"hello:6"} record:{"topic":"test-1","partition":2,"offset":19,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2866062050,"serializedKeySize":4,"serializedValueSize":7,"key":6,"value":"hello:7"} record:{"topic":"test-1","partition":2,"offset":20,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":577748521,"serializedKeySize":4,"serializedValueSize":8,"key":12,"value":"hello:13"} record:{"topic":"test-1","partition":2,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1649992521,"serializedKeySize":4,"serializedValueSize":8,"key":16,"value":"hello:17"} record:{"topic":"test-1","partition":0,"offset":32,"timestamp":1505629339533,"timestampType":"CREATE_TIME","checksum":2322283505,"serializedKeySize":4,"serializedValueSize":8,"key":47,"value":"hello:48"} record:{"topic":"test-1","partition":0,"offset":33,"timestamp":1505629339535,"timestampType":"CREATE_TIME","checksum":2329901557,"serializedKeySize":4,"serializedValueSize":8,"key":48,"value":"hello:49"} record:{"topic":"test-1","partition":2,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":3854334725,"serializedKeySize":4,"serializedValueSize":8,"key":18,"value":"hello:19"} record:{"topic":"test-1","partition":2,"offset":23,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1792756199,"serializedKeySize":4,"serializedValueSize":8,"key":19,"value":"hello:20"} record:{"topic":"test-1","partition":2,"offset":24,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2514692525,"serializedKeySize":4,"serializedValueSize":8,"key":22,"value":"hello:23"} record:{"topic":"test-1","partition":2,"offset":25,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1562610569,"serializedKeySize":4,"serializedValueSize":8,"key":25,"value":"hello:26"} record:{"topic":"test-1","partition":2,"offset":26,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3501401355,"serializedKeySize":4,"serializedValueSize":8,"key":28,"value":"hello:29"} record:{"topic":"test-1","partition":2,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":2946838050,"serializedKeySize":4,"serializedValueSize":8,"key":31,"value":"hello:32"} record:{"topic":"test-1","partition":2,"offset":28,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":2695171007,"serializedKeySize":4,"serializedValueSize":8,"key":36,"value":"hello:37"} record:{"topic":"test-1","partition":2,"offset":29,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":3877831509,"serializedKeySize":4,"serializedValueSize":8,"key":40,"value":"hello:41"} record:{"topic":"test-1","partition":2,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":2747042666,"serializedKeySize":4,"serializedValueSize":8,"key":41,"value":"hello:42"} record:{"topic":"test-1","partition":2,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":4222789243,"serializedKeySize":4,"serializedValueSize":8,"key":45,"value":"hello:46"} record:{"topic":"test-1","partition":2,"offset":32,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":830470691,"serializedKeySize":4,"serializedValueSize":8,"key":46,"value":"hello:47"} record:{"topic":"test-1","partition":1,"offset":19,"timestamp":1505629339461,"timestampType":"CREATE_TIME","checksum":27654439,"serializedKeySize":4,"serializedValueSize":7,"key":0,"value":"hello:1"} record:{"topic":"test-1","partition":1,"offset":20,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2877195336,"serializedKeySize":4,"serializedValueSize":7,"key":3,"value":"hello:4"} record:{"topic":"test-1","partition":1,"offset":21,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2833341777,"serializedKeySize":4,"serializedValueSize":7,"key":4,"value":"hello:5"} record:{"topic":"test-1","partition":1,"offset":22,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":1116560893,"serializedKeySize":4,"serializedValueSize":8,"key":9,"value":"hello:10"} record:{"topic":"test-1","partition":1,"offset":23,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2285896101,"serializedKeySize":4,"serializedValueSize":8,"key":10,"value":"hello:11"} record:{"topic":"test-1","partition":1,"offset":24,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":672893159,"serializedKeySize":4,"serializedValueSize":8,"key":11,"value":"hello:12"} record:{"topic":"test-1","partition":1,"offset":25,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":1637741071,"serializedKeySize":4,"serializedValueSize":8,"key":13,"value":"hello:14"} record:{"topic":"test-1","partition":2,"offset":33,"timestamp":1505629339543,"timestampType":"CREATE_TIME","checksum":3620398696,"serializedKeySize":4,"serializedValueSize":8,"key":51,"value":"hello:52"} record:{"topic":"test-1","partition":2,"offset":34,"timestamp":1505629339545,"timestampType":"CREATE_TIME","checksum":242342934,"serializedKeySize":4,"serializedValueSize":8,"key":52,"value":"hello:53"} record:{"topic":"test-1","partition":2,"offset":35,"timestamp":1505629339547,"timestampType":"CREATE_TIME","checksum":2840039757,"serializedKeySize":4,"serializedValueSize":8,"key":53,"value":"hello:54"} record:{"topic":"test-1","partition":1,"offset":26,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":464649674,"serializedKeySize":4,"serializedValueSize":8,"key":20,"value":"hello:21"} record:{"topic":"test-1","partition":1,"offset":27,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":3591464331,"serializedKeySize":4,"serializedValueSize":8,"key":23,"value":"hello:24"} record:{"topic":"test-1","partition":1,"offset":28,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2254864424,"serializedKeySize":4,"serializedValueSize":8,"key":24,"value":"hello:25"} record:{"topic":"test-1","partition":1,"offset":29,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3670479813,"serializedKeySize":4,"serializedValueSize":8,"key":27,"value":"hello:28"} record:{"topic":"test-1","partition":1,"offset":30,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1843557739,"serializedKeySize":4,"serializedValueSize":8,"key":29,"value":"hello:30"} record:{"topic":"test-1","partition":1,"offset":31,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":1905538768,"serializedKeySize":4,"serializedValueSize":8,"key":34,"value":"hello:35"} record:{"topic":"test-1","partition":1,"offset":32,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3985428395,"serializedKeySize":4,"serializedValueSize":8,"key":37,"value":"hello:38"} record:{"topic":"test-1","partition":1,"offset":33,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3427427349,"serializedKeySize":4,"serializedValueSize":8,"key":39,"value":"hello:40"} record:{"topic":"test-1","partition":1,"offset":34,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":713267988,"serializedKeySize":4,"serializedValueSize":8,"key":43,"value":"hello:44"} record:{"topic":"test-1","partition":1,"offset":35,"timestamp":1505629339536,"timestampType":"CREATE_TIME","checksum":813675607,"serializedKeySize":4,"serializedValueSize":8,"key":49,"value":"hello:50"} record:{"topic":"test-1","partition":1,"offset":36,"timestamp":1505629339541,"timestampType":"CREATE_TIME","checksum":2006019882,"serializedKeySize":4,"serializedValueSize":8,"key":50,"value":"hello:51"} 

 

 

 

本文发表于2017年09月18日 08:37
(c)注:本文转载自https://my.oschina.net/u/2486137/blog/1538655,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1561 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1