首页 » SEO优化 » php中的dyn技巧_实践支配与运用apache kafka框架技能

php中的dyn技巧_实践支配与运用apache kafka框架技能

访客 2024-11-12 0

扫一扫用手机浏览

文章目录 [+]

apache kafka参考

http://kafka.apache.org/documentation.html

php中的dyn技巧_实践支配与运用apache kafka框架技能

行列步队分类:

php中的dyn技巧_实践支配与运用apache kafka框架技能
(图片来自网络侵删)

点对点:

生产者生产发送到queue中,然后消费者从queue中取出并且消费。
这里要把稳:

被消费往后,queue中不再有存储,以是消费者不可能消费到已经被消费的。

Queue支持存在多个消费者,但是对一个而言,只会有一个消费者可以消费。

发布/订阅

生产者(发布)将发布到topic中,同时有多个消费者(订阅)消费该。
和点对点办法不同,发布到topic的会被所有订阅者消费。

kafka行列步队调研

背景先容

kafka是最初由Linkedin公司开拓,利用Scala措辞编写,Kafka是一个分布式、分区的、多副本的、多订阅者的日志系统(分布式MQ系统),可以用于web/nginx日志,搜索日志,监控日志,访问日志等等。

kafka目前支持多种客户端措辞:java,python,c++,php等等。

总体构造:

kafka名词阐明和事情办法:

Producer :生产者,便是向kafka broker发的客户端。

Consumer :消费者,向kafka broker取消息的客户端

Topic :咋们可以理解为一个行列步队。

Consumer Group (CG):这是kafka用来实现一个topic的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。
一个topic可以有多个CG。
topic的会(不是真的,是观点上的)到所有的CG,但每个CG只会把发给该CG中的一个consumer。
如果须要实现广播,只要每个consumer有一个独立的CG就可以了。
要实现单播只要所有的consumer在同一个CG。
用CG还可以将consumer进行自由的分组而不须要多次发送到不同的topic。

Broker :一台kafka做事器便是一个broker。
一个集群由多个broker组成。
一个broker可以容纳多个topic。

Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即做事器)上,一个topic可以分为多个partition,每个partition是一个有序的行列步队。
partition中的每条都会被分配一个有序的id(offset)。
kafka只担保按一个partition中的顺序将发给consumer,不担保一个topic的整体(多个partition间)的顺序。

Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。
例如你想找位于2049的位置,只要找到2048.kafka的文件即可。
当然the first offset便是00000000000.kafka

kafka特性:

通过O(1)的磁盘数据构造供应的持久化,这种构造对付纵然数以TB的存储也能够保持永劫光的稳定性能。

高吞吐量:纵然是非常普通的硬件kafka也可以支持每秒数十万的。

支持同步和异步两种HA

Consumer客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据

消费状态保存在客户端

存储顺序写

数据迁移、扩容对用户透明

支持Hadoop并行数据加载。

支持online和offline的场景。

持久化:通过将数据持久化到硬盘以及replication防止数据丢失。

scale out:无需停机即可扩展机器。

定期删除机制,支持设定partitions的segment file保留韶光。

可靠性一(致性)

kafka(MQ)要实现从producer到consumer之间的可靠的传送和分发。
传统的MQ系统常日都是通过broker和consumer间的确认(ack)机制实现的,并在broker保存分发的状态。

纵然这样同等性也是很难担保的(参考原文)。
kafka的做法是由consumer自己保存状态,也不要任何确认。
这样虽然consumer包袱更重,但实在更灵巧了。

由于不管consumer上任何缘故原由导致须要重新处理,都可以再次从broker得到。

kafak系统扩展性

kafka利用zookeeper来实现动态的集群扩展,不须要变动客户端(producer和consumer)的配置。
broker会在zookeeper注册并保持干系的元数据(topic,partition信息等)更新。

而客户端会在zookeeper上注册干系的watcher。
一旦zookeeper发生变革,客户端能及时感知并作出相应调度。
这样就担保了添加或去除broker时,各broker间仍能自动实现负载均衡。

kafka设计目标

高吞吐量是其核心设计之一。

数据磁盘持久化:不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。

zero-copy:减少IO操作步骤。

支持数据批量发送和拉取。

支持数据压缩。

Topic划分为多个partition,提高并行处理能力。

Producer负载均衡和HA机制

producer根据用户指定的算法,将发送到指定的partition。

存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上。

多个partition须要选取出lead partition,lead partition卖力读写,并由zookeeper卖力fail over。

通过zookeeper管理broker与consumer的动态加入与离开。

Consumer的pull机制

由于kafka broker会持久化数据,broker没有cahce压力,因此,consumer比较适宜采纳pull的办法消费数据,详细特殊如下:

简化kafka设计,降落了难度。

Consumer根据消费能力自主掌握拉取速率。

consumer根据自身情形自主选择消费模式,例如批量,重复消费,从制订partition或位置(offset)开始消费等.

Consumer与topic关系以及机制

实质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.对付Topic中的一条特定的,

只会被订阅此Topic的每个group中的一个consumer消费,此不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费全体Topic.

如果所有的consumer都具有相同的group,这种情形和JMS queue模式很像;将会在consumers之间负载均衡.

如果所有的consumer都具有不同的group,那这便是"发布-订阅";将会广播给所有的消费者.

在kafka中,一个partition中的只会被group中的一个consumer消费(同一时候);每个group中consumer消费相互独立;我们可以认为一个group是一个"订阅"者,

一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的.

kafka只能担保一个partition中的被某个consumer消费时是顺序的.事实上,从Topic角度来说,当有多个partitions时,仍不是全局有序的.

常日情形下,一个group中会包含多个consumer,这样不仅可以提高topic中的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失落效,

那么其消费的partitions将会有其他consumer自动接管.kafka的设计事理决定,对付一个topic,同一个group中不能有多于partitions个数的consumer同时消费,

否则将意味着某些consumer将无法得到.

Producer均衡算法

kafka集群中的任何一个broker,都可以向producer供应metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"

等信息(请参看zookeeper中的节点信息).当producer获取到metadata信心之后, producer将会和Topic下所有partition leader保持socket连接;

由producer直接通过socket发送到broker,中间不会经由任何"路由层".事实上,被路由到哪个partition上,有producer客户端决定.

比如可以采取"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"均衡分发"是必要的.

在producer真个配置文件中,开拓者可以指定partition路由的办法.

Consumer均衡算法

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的终极目的,是提升topic的并发消费能力.

1) 如果topic1,具有如下partitions: P0,P1,P2,P3

2) 加入group中,有如下consumer: C0,C1

3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3

4) 根据consumer.id排序: C0,C1

5) 打算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i M),P((i + 1) M -1)]

kafka broker集群内broker之间replica机制

kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据到多个server上,任何一个partition有一个leader和多个follower(可以没有);

备份的个数可以通过broker配置文件来设定.leader处理所有的read-write要求,follower须要和leader保持同步.Follower就像一个"consumer",

消费并保存在本地日志中;leader卖力跟踪所有的follower状态,如果follower"掉队"太多或者失落效,leader将会把它从replicas同步列表中删除.

当所有的follower都将一条保存成功,此才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就哀求follower和leader之间必须具有良好的网络环境.

纵然只有一个replicas实例存活,仍旧可以担保的正常发送和吸收,只要zookeeper集群存活即可.(备注:不同于其他分布式存储,比如hbase须要"多数派"存活才行)

kafka剖断一个follower存活与否的条件有2个:

1) follower须要和zookeeper保持良好的链接

2) 它必须能够及时的跟进leader,不能掉队太多.

如果同时知足上述2个条件,那么leader就认为此follower是"生动的".如果一个follower失落效(server失落效)或者掉队太多,

leader将会把它从同步列表中移除[备注:如果此replicas掉队太多,它将会连续从leader中fetch数据,直到足够up-to-date,

然后再次加入到同步列表中;kafka不会改换replicas宿主!由于"同步列表"中replicas须要足够快,这样才能担保producer发布时接管到ACK的延迟较小。

当leader失落效时,需在followers中选取出新的leader,可能此时follower掉队于leader,因此须要选择一个"up-to-date"的follower.kafka中leader选举并没有采取"投票多数派"的算法,

由于这种算法对付"网络稳定性"/"投票参与者数量"等条件有较高的哀求,而且kafka集群的设计,还须要容忍N-1个replicas失落效.对付kafka而言,

每个partition中所有的replicas信息都可以在zookeeper中得到,那么选举leader将是一件非常大略的事情.选择follower时须要兼顾一个问题,

便是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.

在选举新leader,须要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

在整几个集群中,只要有一个replicas存活,那么此partition都可以连续接管读写操作.

总结:

1) Producer端直接连接broker.list列表,从列表中返回TopicMetadataResponse,该Metadata包含Topic下每个partition leader建立socket连接并发送.

2) Broker端利用zookeeper用来注册broker信息,以及监控partition leader存活性.

3) Consumer端利用zookeeper用来注册consumer信息,个中包括consumer消费的partition列表等,同时也用来创造broker列表,并和partition leader建立socket连接,并获取消息.

性能测试

目前我已经在虚拟机上做了性能测试。

测试环境:cpu: 双核 内存 :2GB 硬盘:60GB

测试指标

性能干系解释

结论

堆积压力测试

单个kafka broker节点测试,启动一个kafka broker和Producer,Producer不断向broker发送数据,

直到broker堆积数据为18GB为止(停滞Producer运行)。
启动Consumer,不间断从broker获取数据,

直到全部数据读取完成为止,末了查看Producer==Consumer数据,没有涌现卡去世或broker不相应征象

数据大量堆积不会涌现broker卡去世

或不相应征象

生产者速率

1.200byte/msg,4w/s旁边。
2.1KB/msg,1w/s旁边

性能上是完备知足哀求,其性能紧张由磁盘决定

消费者速率

1.200byte/msg,4w/s旁边。
2.1KB/msg,1w/s旁边

性能上是完备知足哀求,其性能紧张由磁盘决定

2)kafka在zookeeper中存储构造

1.topic注册信息

/brokers/topics/[topic] :

存储某个topic的partitions所有分配信息

Schema:

{

"version": "版本编号目前固定为数字1",

"partitions": {

"partitionId编号": [

同步副本组brokerId列表

],

"partitionId编号": [

同步副本组brokerId列表

],

.......

}

}

Example:

{

"version": 1,

"partitions": {

"0": [1, 2],

"1": [2, 1],

"2": [1, 2],

}

}

解释:紫赤色为patitions编号,蓝色为同步副本组brokerId列表

2.partition状态信息

/brokers/topics/[topic]/partitions/[0...N] 个中[0..N]表示partition索引号

/brokers/topics/[topic]/partitions/[partitionId]/state

Schema:

{

"controller_epoch": 表示kafka集群中的中心掌握器选举次数,

"leader": 表示该partition选举leader的brokerId,

"version": 版本编号默认为1,

"leader_epoch": 该partition leader选举次数,

"isr": [同步副本组brokerId列表]

}

Example:

{

"controller_epoch": 1,

"leader": 2,

"version": 1,

"leader_epoch": 0,

"isr": [2, 1]

}

3. Broker注册信息

/brokers/ids/[0...N]

每个broker的配置文件中都须要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)

Schema:

{

"jmx_port": jmx端口号,

"timestamp": kafka broker初始启动时的韶光戳,

"host": 主机名或ip地址,

"version": 版本编号默认为1,

"port": kafka broker的做事端端口号,由server.properties中参数port确定

}

Example:

{

"jmx_port": 6061,

"timestamp":"1403061899859"

"version": 1,

"host": "192.168.1.148",

"port": 9092

}

4. Controller epoch:

/controller_epoch -> int (epoch)

此值为一个数字,kafka集群中第一个broker第一次启动时为1,往后只要集群中center controller中心掌握器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;

5. Controller注册信息:

/controller -> int (broker id of the controller) 存储center controller中心掌握器所在kafka broker的信息

Schema:

{

"version": 版本编号默认为1,

"brokerid": kafka集群中broker唯一编号,

"timestamp": kafka broker中心掌握器变更时的韶光戳

}

Example:

{

"version": 1,

"brokerid": 3,

"timestamp": "1403061802981"

}

Consumer and Consumer group观点:

a.每个consumer客户端被创建时,会向zookeeper注书籍身的信息;

b.此浸染紧张是为了"负载均衡".

c.同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个只发送给个中一个Consumer。

d.Consumer Group中的每个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer;

e.一个Consumer group的多个consumer的所有线程依次有序地消费一个topic的所有partitions,如果Consumer group中所有consumer总线程大于partitions数量,则会涌现空闲情形;

举例解释:

kafka集群中创建一个topic为report-log 4 partitions 索引编号为0,1,2,3

如果目前有三个消费者node:把稳-->一个consumer中一个消费线程可以消费一个或多个partition.

如果每个consumer创建一个consumer thread线程,各个node消费情形如下,node1消费索引编号为0,1分区,node2费索引编号为2,node3费索引编号为3

每个consumer创建2个consumer thread线程,各个node消费情形如下(是从consumer node先后启动状态来确定的),node1消费索引编号为0,1分区;node2费索引编号为2,3;node3为空闲状态

总结:

从以上可知,Consumer Group中各个consumer是根据先后启动的顺序有序消费一个topic的所有partitions的。

如果Consumer Group中所有consumer的总线程数大于partitions数量,则可能consumer thread或consumer会涌现空闲状态。

6. Consumer注册信息:

每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统天生),此id用来标记消费者信息.

/consumers/[groupId]/ids/[consumerIdString]

是一个临时的znode,此节点的值为请看consumerIdString产生规则,即表示此consumer目前所消费的topic + partitions列表.

consumerId产生规则:

StringconsumerUuid = null;

if(config.consumerId!=null && config.consumerId)

consumerUuid = consumerId;

else {

String uuid = UUID.randomUUID()

consumerUuid = "%s-%d-%s".format(

InetAddress.getLocalHost.getHostName, System.currentTimeMillis,

uuid.getMostSignificantBits().toHexString.substring(0,8));

}

String consumerIdString = config.groupId + "_" + consumerUuid;

7. Consumer owner:

/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

当consumer启动时,所触发的操作:

a) 首先进行"Consumer Id注册";

b) 然后在"Consumer id 注册"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失落效,那么其他consumer接管partitions).

c) 在"Broker id 注册"节点下,注册一个watch用来监听broker的存活情形;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

8. Consumer offset:

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

用来跟踪每个consumer目前所消费的partition中最大的offset

此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失落效,

重新触发balance,其他consumer可以连续消费.

9. Re-assign partitions

/admin/reassign_partitions

10. Preferred replication election

/admin/preferred_replica_election

11. 删除topics

/admin/delete_topics

Topic配置

/config/topics/[topic_name]

3)kafka log4j配置

kafka日志文件分为5种类型,依次为:controller,kafka-request,server,state-change,log-cleaner,不同类型log数据,写到不同文件中:

4)kafka replication设计机制

概览:

个中一个broker当选举作为全体集群掌握器,他将卖力几个方面事情:

1.管理或领导分区变革.

2.create topic,delete topic

3.replicas(实行操持,partition)

集群掌握器做出决定往后,操作信息或状态将永久注册并存储在zookeeper上,并且也可以通过RPC办法发送新的决定操作broker。

掌握器发布的决定来源真实,他将用于client要求路由和broker的重启或规复状态。

如果有一个新的broker加入或启动。
controller会通过RPC调用发出新的决定。

潜在的优点:

1.当leader发生变革时,更随意马虎集中到一个地方做调试(打消故障)。

2.当leader发生变革时,ZK可以把读取/写状态信息成批广播到其他broker,因此当leader failover的时候会减少broker之间规复的延迟韶光。

3.须要更少的监听器。

4.利用更高效的RPC通信办法,代替在zookeeper中行列步队实现办法。

潜在的缺陷:

须要考虑controller failover

zookeeper中路径列表解释

1.Controller path:存储当前controller信息.

/controller --> {brokerid} (ephemeral; created by controller)

2.Broker path:存储当前所有活着的brokers信息。

/brokers/ids/[broker_id] --> host:port (ephemeral; created by admin)

3.存储一个主题的所有分区副本任务。
对付每一个副本,我们存储的副本指派一个broker ID。

第一个副本是首选的品。
把稳,对付一个给定的分区,在一个broker上有至多一个副本。
因此,broker ID可以作副本标识.

/brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}} 此路径被controller或leader修正,当前leader只修正ISR一部分信息。
当更新path须要利用条件同步到zookeeper上。

4.LeaderAndISR path:存储一个分区leader and ISR

/brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}} 此路径被controller或leader修正,当前leader只修正ISR一部分信息。
当更新path须要利用条件同步到zookeeper上。

5.分区分配path:当我们重新分配某些分区到不同的brokers时,此path会被利用。
对付每个分区重新分配,他将会存储一个新副本列表和他们相应的brokers信息。

每当某个管理员操作如下命令成功后,且这个分区迁移到目标broker成功后,源broker上的分区会自动删除。

/admin/partitions_add/[topic]/[partition_id] --> {broker_id …} (created by admin) /admin/partitions_remove/[topic]/[partition_id] (created by admin)

kafka中专有词语阐明:

AR(assign replicas):分配副本 ISR(in-sync replicas):在同步中的副本

StopReplicaRequest { request_type_id : int16 // request id version_id : int16 // 当前request的版本 client_id : int32 // this can be the broker id of the controller ack_timeout : int32 // ack相应韶光,单位为毫秒 stopReplicaSet : Set[(topic: String, partitionId: int)) // 须要停滞的分区凑集 } StopReplicaResponse { version_id : int16 // 当前request的版本 responseMap : Map[(topic: String, partitionId: int32) => int16) //error code表 }

5)apache kafka监控系列-监控指标

1、监控目标

1.当系统可能或处于亚康健状态时及时提醒,预防故障发生

2.报警提示 a.短信办法 b.邮件

2、监控内容

2.1 机器监控

Kafka做事器指标

CPU Load

Disk IO

Memory

磁盘log.dirs目录下数据文件大小,要有定时打消策略

2.2 JVM监控

紧张监控JAVA的 GC time(垃圾回收韶光),JAVA的垃圾回收机制对性能的影响比较明显

2.3 Kafka系统监控

1、Kafka总体监控

zookeeper上/XXX/broker/ids目录下节点数量

leader 选举频率

2、Kafka Broker监控

kafka集群中Broker列表,broker运行状况,包括node下线,生动数量

Broker是否供应做事

数据流量 流入速率,流出速率 (message / byte)

ISR 紧缩频率

3、Kafka Controller监控

controller存活数目

4、Kafka Producer监控

producer数量,排队情形

要求相应韶光

QPS/分钟

5、Kafka Consumer监控

consumer行列步队中排队要求数

要求相应韶光

最近一分钟均匀每秒要求数

6、Topic监控

数据量大小;

offset

数据流量 流入速率,流出速率 (message / byte)

3.监控指标

3.1 JVM监控

a.通过JMX获取GC time

b.jvm full gc次数

c.通过jmx监控kafka干系参数

3.2 kafka系统监控

监控数据获取办法

1、生存节点信息可以从zookeeper获取

2、除生存节点 和

a、Broker是否供应做事。

b、Topic数据量大小,

c、Topic的offset 外,其他数据都可以通过JMX获取

6)kafka.common.ConsumerRebalanceFailedException非常办理办法

kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries

at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source)

涌现以上问题缘故原由剖析:

同一个消费者组(consumer group)有多个consumer先后启动,便是一个消费者组内有多个consumer同时负载消费多个partition数据.

办理办法:

1.配置zk问题(kafka的consumer配置)

zookeeper.session.timeout.ms=5000

zookeeper.connection.timeout.ms=10000

zookeeper.sync.time.ms=2000

在利用高等API过程中,一样平常涌现这个问题是zookeeper.sync.time.ms韶光间隔配置过短,不用除有其他缘故原由引起,但笔者碰着一样平常是这个缘故原由。

给大家阐明一下缘故原由:一个消费者组中(consumer数量

如果大家理解上面阐明,下面就更随意马虎了,当consumer调用Rebalance时,它是按照韶光间隔和最大次数采纳失落败重试原则,每当获取partitions失落败后会重试获取。
举个例子,如果某个公司有个会议,B部门在某个韶光段预订该会议室,但是韶光到了去会议室看时,创造A部门还在利用。
这时B部门只有等待了,每隔一段韶光去讯问一下。
如果韶光过于频繁,则会议室一贯会处于占用状态,如果韶光间隔设置长点,可能去个2次,A部门就让出来了。

同理,当新consumer加入重新触发rebalance时,已有(old)的consumer会重新打算并开释占用partitions,但是会花费一定处理韶光,此时新(new)consumer去抢占该partitions很有可能就会失落败。
我们假设设置足够old consumer开释资源的韶光,就不会涌现这个问题。

zookeeper.sync.time.ms韶光设置过短就会导致old consumer还没有来得及开释资源,new consumer重试失落败多次到达阀值就退出了。

zookeeper.sync.time.ms设置韶光阀值,要考虑网络环境,做事器性能等成分在内综合衡量。

kafka zk节点存储,请参考:kafka在zookeeper中存储构造

7)kafak安装与利用

kafak安装与利用

1.序言

学习kafka的根本是先把kafka系统支配起来,然后大略的利用它,从直不雅观上觉得它,然后逐步的深入理解它。

本文先容了kafka支配方法,包括配置,安装和大略的利用。

2.kafka下载和安装

kafka版本一贯在更新,且每次更新,变革均比较大,如配置文件有改动,kafka 0.7到0.8.1版本变革很大,包括加入,支持集群内,支持多个数据目录,要求处理改为异步,实现partition动态管理,基于韶光的日志段删除

2.1下载地址:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz。

kafka目录构造

如 图-1

解释:涂黑部分为我自己创建文件夹

目录

解释

bin

操作kafka的可实行脚本,还包含windows下脚本

config

配置文件所在目录

libs

依赖库目录

logs

日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller

2.1 安装以及启动kafka

步骤1:

lizhitao@localhost:~$ tar -xzf kafka_2.10-0.8.1.1.tgz

lizhitao@localhost:~$ cd kafka_2.10-0.8.1.1.tgz

步骤2:

配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)

进入kafka安装工程根目录编辑 vim config/server.properties 修正属性zookeeper.connect=ip:8081,ip2:8082

步骤3:

kafka最为主要三个配置依次为:broker.id、log.dir、zookeeper.connect

kafka server端config/server.properties参数解释和解释如下:

server.properties配置属性解释

根据属性解释完成配置

broker.id = 1

port = 9092

步骤4: 启动做事

cd kafka-0.8.1

lizhitao@localhost:~$ bin/kafka-server-start.sh config/server.properties

[2014-04-16 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

步骤5:创建topic

lizhitao@localhost:~$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步骤6:验证topic是否创建成功

lizhitao@localhost:~$ bin/kafka-topics.sh --list --zookeeper localhost:2181

步骤7:发送一些验证,在console模式下,启动producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

步骤7:启动一个consumer

lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

3.配置kafka集群模式,须要由多个broker组成

步骤1:

由于须要在同一个目录(config)下配置多个server.properties,操作步骤如下:

lizhitao@localhost:~$ cp config/server.properties config/server-1.properties

lizhitao@localhost:~$ cp config/server.properties config/server-2.properties

步骤2:

须要编辑并设置如下文件属性:

config/server-1.properties:

broker.id=1

port=9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

port=9094

log.dir=/tmp/kafka-logs-2

启动做事

lizhitao@localhost:~$ bin/kafka-server-start.sh config/server-1.properties &

lizhitao@localhost:~$ bin/kafka-server-start.sh config/server-2.properties &

步骤3:

创建topic

lizhitao@localhost:~$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

topic created success....

lizhitao@localhost:~$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3Configs:

Topic: my-replicated-topic Partition: 0Leader: 1Replicas: 1,2,0Isr: 1,2,0

描述topic等分区,同步副本情形

lizhitao@localhost:~$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1 ReplicationFactor:1Configs:

Topic: test Partition: 0 Leader: 0Replicas: 0Isr: 0

步骤4:作为生产者发送

lizhitao@localhost:~$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test message 1

my test message 2

步骤5:消费topic数据

lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

my test message 1

my test message 2

步骤6:

检讨consumer offset位置

lizhitao@localhost:~$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

Group Topic Pid Offset logSize Lag Owner

my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0

my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0

8)apache kafka中server.properties配置文件参数解释

每个kafka broker中配置文件server.properties默认必须配置的属性如下:

[java] view plaincopy

broker.id=0

num.network.threads=2

num.io.threads=8

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=2

log.retention.hours=168

log.segment.bytes=536870912

log.retention.check.interval.ms=60000

log.cleaner.enable=false

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=1000000

9)apache kafka的consumer初始化时获取不到

问题

创造一个问题,如果利用的是一个高等的kafka接口 那么默认的情形下如果某个topic没有变革 则consumer消费不到 比如某个生产了2w条,此时producer不再生产,然后其余一个consumer启动,此时拿不到.

缘故原由阐明:

auto.offset.reset:如果zookeeper没有offset值或offset值超出范围。
那么就给个初始的offset。
有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛非常。
默认largest

默认值:auto.offset.reset=largest

10)Kafka Producer处理逻辑

Kafka Producer处理逻辑

Kafka Producer产生数据发送给Kafka Server,详细的分发逻辑及负载均衡逻辑,全部由producer掩护。

Kafka构造图

Kafka Producer默认调用逻辑

默认Partition逻辑

1、没有key时的分发逻辑

每隔 topic.metadata.refresh.interval.ms 的韶光,随机选择一个partition。
这个韶光窗口内的所有记录发送到这个partition。

发送数据出错后也会重新选择一个partition

2、根据key分发

对key求hash,然后对partition数量求模

Utils.abs(key.hashCode) % numPartitions

如何获取Partition的leader信息(元数据)

决定好发送到哪个Partition后,须要明确该Partition的leader是哪台broker才能决定发送到哪里。

详细实现位置

kafka.client.ClientUtils#fetchTopicMetadata

实现方案

1、从broker获取Partition的元数据。
由于Kafka所有broker存有所有的元数据,以是任何一个broker都可以返回所有的元数据

2、broker选取策略:将broker列表随机排序,从首个broker开始访问,如果出错,访问下一个

3、出错处理:出错后向下一个broker要求元数据

把稳

Producer是从broker获取元数据的,并不关心zookeeper。

broker发生变革后,producer获取元数据的功能不能动态变革。

获取元数据时利用的broker列表由producer的配置中的 metadata.broker.list 决定。
该列表中的机器只要有一台正常做事,producer就能获取元数据。

获取元数据后,producer可以写数据到非 metadata.broker.list 列表中的broker

缺点处理

producer的send函数默认没有返回值。
出错处理有EventHandler实现。

DefaultEventHandler的缺点处理如下:

获取出错的数据

等待一个间隔韶光,由配置 retry.backoff.ms 决定这段韶光是非

重新获取元数据

重新发送数据

出错重试次数由配置 message.send.max.retries 决定

所有重试全部失落败时,DefaultEventHandler会抛出非常。
代码如下

11)apache kafka源代码工程环境搭建(IDEA)

1.gradle安装

gradle安装

2.下载apache kafka源代码

apache kafka下载

3.用gradle构建产生IDEA工程文件

先装好idea的scala插件,不然构建时就会自动下载,由于没有海内镜像,速率会很慢。

lizhitao@users-MacBook-Pro:~/Downloads/kafka_2.10-0.8.1$ gradle idea

如果是eclipse工程,实行:gradle eclipse

天生IDEA工程文件如下:

4.项目导入到IDEA工程中

File-->Open

5.IDEA中查看源码工程

6.Kafka启动时,参数设置

配置server.properties

7.log4j.properties文件路径设置

启动kafka server很奇怪,log4j.properties文件找不到,报如下缺点。

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).

log4j:WARN Please initialize the log4j system properly.

只有把log4j.properties放置到src/main/scala路径下,才能找到文件,然后运行程序,精确输出日志信息。
输出如下所示

12)apache kafka监控系列-KafkaOffsetMonitor

概览

最近kafka server做事上线了,基于jmx指标参数也写到zabbix中了,但总以为短缺点什么东西,可视化可操作的界面。
zabbix中数据比较分散,不能集中看全体集议论形。
或者一个cluster中broker列表,自己写web-console比较耗时耗力,用原型工具画了一些管理界面东西,关键自己也不前端方面技能,这方面比较薄弱。
这不开源社区供应了kafka的web管理平台KafkaOffsetMonitor.就迅速拿过来运行。
大家不要焦急,立时娓娓道来。

解释:

这个运用程序来实时监控你kafka做事的consumer以及他们在partition中的offset(偏移)。

你可以浏览当前的消费者组,每个topic的所有partition的消费情形都可以一览无余。
这实在是很有用得,从这里你很快知道每个partition的message是否很快被消费(没有壅塞)。
他能辅导你(kafka producer和consumer)优化代码。

这个web管理平台保留的partition offset和consumer滞后的历史数据,以是你可以很轻易理解这几天consumer消费情形。

KafkaOffsetMonitor功能:

1.从标题都可以看出来,Kafka Offset Monitor,是对consumer消费情形进行监控,并能列出每个consumer offset,滞后数据。

2.消费者组列表

3.每个topic的所有parition列表(topic,pid,offset,logSize,lag,owner)

4.查看topic的历史消费信息.

虽然功能覆盖面不全,但是很实用。

1.下载

github官网下载

KafkaOffsetMonitor

百度云下载(网速快)

百度云KafkaOffsetMonitor下载

解释:百度云下载为修正版本,由于KafkaOffsetMonitor中有些资源文件(css,js)是访问外网的,特殊是有访问google资源,大家都懂的,常常不能访问。
建议下载修正版

2.安装

KafkaOffsetMonitor运行比较大略,由于所有运行文件,资源文件,jar文件都打包到KafkaOffsetMonitor-assembly-0.2.0.jar了,直接运行就可以,这种办法太棒了。
既不用编译也不用配置,,也不是绝对不配置。

a.新建一个目录kafka-offset-console,然后把jar拷贝到该目录下.

b.新建脚本,由于您可能不是一个kafka集群。
用脚本可以启动多个

lizhitao@users-MacBook-Pro: vim mobile_start_en.sh

#!/bin/bash

java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \

com.quantifind.kafka.offsetapp.OffsetGetterWeb \

--zk 192.168.2.101:2181,192.168.2.102:2182,192.168.2.103:2181/config/mobile/xxx \

--port 8086 \

--refresh 10.seconds \

--retain 7.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &

把稳:/config/mobile/xxx 表示zk的根目录,须要手工创建,也可以不设置

3.运行

lizhitao@users-MacBook-Pro: chmod +x mobile_start_en.sh

lizhitao@users-MacBook-Pro: ./mobile_start_en.sh

6 演示截图:

消费者组列表

topic的所有partiton消费情形列表

kafka正在运行的topic

kafka集群中topic列表

kafka集群中broker列表

13)Kafka Controller设计机制

在kafka集群中,个中一个broker server作为中心掌握器,卖力管理分区和副本状态并实行管理着这些分区的重新分配。
下面解释如何通过中心掌握器操作分区和副本的状态。

名词阐明:

isr:同步副本组

OfflinePartitionLeaderSelector:分区下线后新的领导者选举

OAR:老的分配副本

PartitionStateChange:

其有效状态如下:

NonExistentPartition: 这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。

NewPartition:创建分区后,分区处于NewPartition状态。
在这种状态下,分区副本该当分配给它,但还没有领导者/同步组。

OnlinePartition:一旦一个分区领导者当选出,就会为在线分区状态。

OfflinePartition:如果分区领导者成功选举后,当领导者分区崩溃或挂了,分区状态转变下线分区状态。

其有效的状态转移如下:

NonExistentPartition -> NewPartition

1.群集中心掌握器根据打算规则,从zk中读取分区信息,创建新分区和副本。

NewPartition -> OnlinePartition

1.分配第一个活着的副本作为分区领导者,并且该分区所有副本作为一个同步组,写领导者和同步副本组数据到zk中。

2.对付这个分区,发送LeaderAndIsr要求给每一个副本分区和并发送UpdateMetadata要求到每个活者的broker server。

OnlinePartition,OfflinePartition -> OnlinePartition

1.对付这个分区,须要选择新的领导者和同步副本组,一个副本组要接管LeaderAndIsr要求,末了写领导者和同步副本组信息到zk中。

a.OfflinePartitionLeaderSelector:新领导者=存活副本(最好是在isr);新isr =存活isr如果不是空或恰好为新领导者,否则;正在接管中副本=存活已分配副本。

b.ReassignedPartitionLeaderSelector:新领导者=存活分区重新分配副本;新isr =当前isr;正在接管中副本=重新分配副本

c.PreferredReplicaPartitionLeaderSelector:新领导这=第一次分配副本(如果在isr);新isr =当前isr;接管副本=分配副本

d.ControlledShutdownLeaderSelector:新领导者=当前副本在isr中且没有被关闭,新isr =当前isr -关闭副本;接管副本=存活已分配副本。

2.对付这个分区,发送LeaderAndIsr要求给每一个吸收副本和UpdateMetadata要求到每个broker server

NewPartition,OnlinePartition -> OfflinePartition

1.这只不过标识该分区为下线状态

OfflinePartition -> NonExistentPartition

1.这只不过标识该分区为不存在分区状态

ReplicaStateChange:

有效状态如下:

1.NewReplica:当创建topic或分区重新分配期间副本被创建。
在这种状态下,副本只能成为追随者变更要求状态。

2.OnlineReplica:一旦此分区一个副本启动且部分分配副本,他将处于在线副本状态。
在这种状态下,它可以成为领导者或成为跟随者状态变更要求。

3.OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。

4.NonExistentReplica:如果一个副本被删除了,它将变为此状态。

有效状态转移如下:

NonExistentReplica - - > NewReplica

1.利用当前领导者和isr分区发送LeaderAndIsr要求到新副本和UpdateMetadata要求给每一个存活borker

NewReplica - > OnlineReplica

1.添加新的副本到副本列表中

OnlineReplica,OfflineReplica - > OnlineReplica

1.利用当前领导者和isr分区发送LeaderAndIsr要求到新副本和UpdateMetadata要求给每一个存活borker

NewReplica,OnlineReplica - > OfflineReplica

1.发送StopReplicaRequest到相应副本(w / o删除)

2.从isr和发送LeaderAndIsr要求重删除此副本(isr)领导者副本和UpdateMetadata分区每个存活broker。

OfflineReplica - > NonExistentReplica

1.发送StopReplicaRequest到副本(删除)

KafkaController操作:

当新建topic时:

调用方法onNewPartitionCreation

当创建新分区时:

创建新分区列表 -> 调用方法NewPartition

创建所有新分区副本 -> 调用方法NewReplica

新分区在线列表 -> 调用方法OnlinePartition

新分区所有在线副本 -> OnlineReplica

当broker失落败或挂掉时:

当前broker所有领导者分区为下线分区 -> 调用方法OfflinePartition

下线和在线分区列表 -> OnlinePartition (利用下线分区领导者选举)

在broker上所有fail副本 -> OfflineReplica

当broker启动时:

发送UpdateMetadate要求给新启动broker的所有分区。

新启动broker的分区副本-> OnlineReplica

下线和在线分区列表 -> OnlinePartition (利用下线分区领导者选举)

当新的broker启动时,对付所有分区副本,系统会调用方法onPartitionReassignment实行未完成的分区分配。

当分区重新分配时: (OAR: 老的分配副本; RAR:每当重新分配副本会有新的副本组)

用OAR + RAR副本组修正并分配副本列表.

当处于OAR + RAR时,发送LeaderAndIsr要求给每个副本。

副本处于RAR - OAR -> 调用方法NewReplica

等待直到新的副本加入isr中

副本处于RAR -> 调用方法OnlineReplica

设置AR to RAR并写到内存中

send LeaderAndIsr request 给一个潜在领导者 (如果当前领导者不在RAR中)和一个被分配的副本列表(利用RAR) 和相同sir到每个处于RAR的broker中。

replicas in OAR - RAR -> Offline (逼迫这些副本从isr重剔除)

replicas in OAR - RAR -> NonExistentReplica (逼迫这些副本被删除)

在zk上修正重分配副本到RAR中。

在zk上修正 /admin/reassign_partitions路径,并删除此分区

选举领导者后,副本和isr信息变革,以是重新发送更新元数据要求给每一个broker。

例如, if OAR = {1, 2, 3} and RAR = {4,5,6}, 在zk上重分配副本和领导者/is这些值可能经历以下转化。

AR leader/isr

{1,2,3} 1/{1,2,3} (初始化状态)

{1,2,3,4,5,6} 1/{1,2,3} (step 2)

{1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)

{1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)

{1,2,3,4,5,6} 4/{4,5,6} (step 8)

{4,5,6} 4/{4,5,6} (step 10)

把稳,当只有一个地方我们能存储OAR持久化数据,必须用RAR在zk修正AR节点数据,这样,如果掌握器在这一步之前崩溃,我们仍旧可以规复。

当中央掌握器failover时:

replicaStateMachine.startup():

从任何下线副本或上线副本中初始化每个副本

每个副本 -> OnlineReplica (逼迫LeaderAndIsr要求发送到每个副本)

partitionStateMachine.startup():

重新建分区中初始化每个分区, 下线或上线分区

each OfflinePartition and NewPartition -> OnlinePartition (逼迫领导者选举)

规复分区分配

规复领导者选举

当发送首选副本选举时:

影响分区列表 -> 调用方法OnlinePartition (with PreferredReplicaPartitionLeaderSelector)

关闭broker:

在关闭broker中对付每个分区如果是领导者分区 -> 调用方法OnlinePartition (ControlledShutdownPartitionLeaderSelector)

在关闭broker中每个副本是追随者,将发送StopReplica要求 (w/o deletion)

在关闭broker中每个副本是追随者 -> 调用方法OfflineReplica (逼迫从同步副本组中删除副本)

14)Kafka性能测试报告(虚拟机版)

测试方法

在其他虚拟机上利用 Kafka 自带 kafka-producer-perf-test.sh 脚本进行测试 Kafka 写入性能

考试测验利用 kafka-simple-consumer-perf-test.sh 脚本测试 Kafka Consumer 性能,但由于获取到的数据不靠谱,放弃这个测试方法

性能数据

注:Gzip 和 Snappy 的传输速率 MB/S 是通过压缩前数据打算的,压缩后的实际传输量并没有超过百兆网卡上限

单条大小

batch size/条

线程数

压缩办法

传输速率 MB/S

传输速率 Message/S

0~1000 (avg 500)

200

10

不压缩

11.1513 (约为百兆网卡上线)

23369.8916

0~1000 (avg 500)

200

10

Gzip

14.0450

29425.1878

0~1000 (avg 500)

200

10

Snappy

32.2064

67471.7850

0~100(avg 50)

200

10

不压缩

5.3654

111399.5121

0~100(avg 50)

200

10

Gzip

2.6479

54979.4926

0~100(avg 50)

200

10

Snappy

4.4217

91836.6410

0~1800 (avg 900) 仿线上数据量大小

200

10

不压缩

11.0518 (约为百兆网卡上线)

12867.3632

0~1800 (avg 900) 仿线上数据量大小

200

10

Gzip

17.3944

20261.3717

0~1800 (avg 900) 仿线上数据量大小

200

10

Snappy

31.0658

36174.2150

以下数据为第二天测试数据

0~100(avg 50)

200

10

不压缩

1.8482

38387.7159

0~100(avg 50)

200

10

Gzip

1.3591

28219.0930

0~100(avg 50)

200

10

Snappy

2.0213

41979.7658

0~100(avg 50)

200

50

不压缩

2.0900

43402.7778

0~100(avg 50)

200

50

Gzip

1.4639

30387.7477

0~100(avg 50)

200

50

Snappy

2.0871

43323.8021

0~1000 (avg 500)

200

10

不压缩

9.8287

20594.3530

0~1000 (avg 500)

200

10

Gzip

13.0659

27386.0058

0~1000 (avg 500)

200

10

Snappy

20.1827

42265.4269

0~1000 (avg 500)

200

1

不压缩

7.0980

14885.6041

0~1000 (avg 500)

200

1

Gzip

7.4438

15587.7356

0~1000 (avg 500)

200

1

Snappy

15.3256

32088.3070

测试结论

1、线上的实际message均匀大小略小于1k,在这种情形下(对应 0~1800 的test case),虚拟机可以应对每秒上万条写入要求。
测试环境下,网络带宽是其瓶颈。
通过压缩可以绕过瓶颈,Snappy算法可以处理36000+条要求每秒

2、在利用小数据进行测试时,Kafka每秒可以处理10万条旁边数据,网络和IO都不是瓶颈,解释Kafka在虚拟机上处理写入要求的上限约为10万条每秒。

3、第二天的测试在相同条件下与第一天差距很大(0~100 大小数据,10线程,batch size 200),第二天在不压缩情形下只有第一天的三分之一的处理能力,snappy压缩情形下也只有二分之一处理能力,解释虚拟机的性能不足稳定。

4、生产者线程数比拟,解释在网络和IO及Kafka处理能力没有达到瓶颈时,更多的线程能够增加写入速率,但是增长不明显。

测试推论

1、虚拟机上的Kafka最高也可以处理10万条要求,物理机的处理能力强得多,应该超过10万条每秒的处理能力。
对应线上均匀数据大小靠近1K,处理数据流量能力不会低于100MB/S,靠近千兆网卡上限。
解释物理机上,在碰着网络带宽瓶颈前,Kafka性能应该不会是瓶颈。

2、虚拟机测试是在单topic 单replication 的情形下测试的。
无法确定在多个replication时性能低落情形。
从网上查找看,性能低落不是很明显。

3、从测试看,虚拟机的性能能够承担线上要求。
但虚拟机性能不稳定,须要非常谨慎。

15)apache kafka监控系列-kafka-web-console

Kafka Web Console是kafka的开源web监控程序.

功能先容如下:

brokers列表

连接kafka的zk集群列表

所有topic列表,操作相应topic可以浏览查看相应message生产和消费流量图.

1.下载Kafka Web Console

Kafka Web Console

2.安装sbt

a. centos : yum install sbt

b. ubuntu : apt-get install sbt

3.配置Kafka Web Console

a.增加数据库依赖包(mysql),解压kafka-web-console.tar.gz,进入目录cd kafka-web-console

编辑文件vim build.sbt

增加mysql配置:

......

libraryDependencies ++= Seq( jdbc, cache, "org.squeryl" % "squeryl_2.10" % "0.9.5-6", "com.twitter" % "util-zk_2.10" % "6.11.0", "com.twitter" % "finagle-core_2.10" % "6.15.0", "org.apache.kafka" % "kafka_2.10" % "0.8.1", "org.quartz-scheduler" % "quartz" % "2.2.1", "mysql" % "mysql-connector-java" % "5.1.9" exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") ) .......

4.配置mysql的jdbc驱动

vim application.conf

增加代码如下:

....... db.default.driver=com.mysql.jdbc.Driver db.default.url="jdbc:mysql://192.168.2.105:3306/mafka?useUnicode=true&characterEncoding=UTF8&connectTimeout=5000&socketTimeout=10000" db.default.user=xxx db.default.password=xxx .......

5.实行sql语句(如下绿色选框所示)

6.编译

lizhitao@localhost:~$ sbt package

打包编译时会从官网高下载很多jar,由于网络缘故原由,以是很慢,须要耐心等待。

把稳:下载的jar是隐蔽的,在cd ~/.ivy2 目录(相应子目录)下可以看到所有jar.

ivy2所有jar包百度云下载

ivy2所有jar包下载

7.运行

lizhitao@localhost:~$ sbt run

8.浏览访问

访问地址: http://ip:9000/

16)apache kafka迁移与扩容工具用法

参考官网site:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool

解释:

当我们对kafka集群扩容时,须要知足2点哀求:

将指定topic迁移到集群内新增的node上。

将topic的指定partition迁移到新增的node上。

1. 迁移topic到新增的node上

如果现在一个kafka集群运行三个broker,broker.id依次为101,102,103,后来由于业务数据溘然暴增,须要新增三个broker,broker.id依次为104,105,106.目的是要把push-token-topic迁移到新增node上。
脚本(json格式)如下所示:

lizhitao@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka --topics-to-move-json-file migration-push-token-topic.json --broker-list "104,105,106" --generate

脚本migration-push-token-topic.json文件内容如下:

{ "topics": [ { "topic": "push-token-topic" } ], "version":1 }

天生分配partitions的json脚本:

Current partition replica assignment {"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[8]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":3,"replicas":[5]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":9,"replicas":[5]},{"topic":"cluster-switch-topic","partition":1,"replicas":[5]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[5]},{"topic":"cluster-switch-topic","partition":2,"replicas":[4]},{"topic":"cluster-switch-topic","partition":0,"replicas":[4]},{"topic":"cluster-switch-topic","partition":6,"replicas":[4]},{"topic":"cluster-switch-topic","partition":8,"replicas":[4]}]}

重新分配parttions的json脚本如下:

migration-topic-cluster-switch-topic.json {"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[5]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":3,"replicas":[4]},{"topic":"cluster-switch-topic","partition":9,"replicas":[4]},{"topic":"cluster-switch-topic","partition":1,"replicas":[4]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[4]},{"topic":"cluster-switch-topic","partition":2,"replicas":[5]},{"topic":"cluster-switch-topic","partition":0,"replicas":[5]},{"topic":"cluster-switch-topic","partition":6,"replicas":[5]},{"topic":"cluster-switch-topic","partition":8,"replicas":[5]}]} lizhitao@localhost:$ bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka01 --reassignment-json-file migration-topic-cluster-switch-topic.json --execute

2.topic修正(replicats-factor)副本个数

lizhitao@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka --reassignment-json-file replicas-update-push-token-topic.json --execute

如果初始时push-token-topic为一个副本,为了提高可用性,须要改为2副本模式。

脚本replicas-push-token-topic.json文件内容如下:

{ "partitions": [ { "topic": "log.mobile_nginx", "partition": 0, "replicas": [101,102,104] }, { "topic": "log.mobile_nginx", "partition": 1, "replicas": [102,103,106] }, { "topic": "xxxx", "partition": 数字, "replicas": [数组] } ], "version":1 }

3.topic的分区扩容用法

a.先扩容分区数量,脚本如下:

例如:push-token-topic初始分区数量为12,目前到增加到15个

lizhitao@localhost:$ ./bin/kafka-topics.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka --alter --partitions 15 --topic push-token-topic

b.设置topic分区副本

lizhitao@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka --reassignment-json-file partitions-extension-push-token-topic.json --execute

脚本partitions-extension-push-token-topic.json文件内容如下:

{ "partitions": [ { "topic": "push-token-topic", "partition": 12, "replicas": [101,102] }, { "topic": "push-token-topic", "partition": 13, "replicas": [103,104] }, { "topic": "push-token-topic", "partition": 14, "replicas": [105,106] } ], "version":1 }

17)kafka LeaderNotAvailableException

常常producer和consumer会包如下非常

LeaderNotAvailableException

缘故原由:

1.个中该分区所在的broker挂了,如果是多副本,该分区所在broker恰好为leader

18)apache kafka jmx监控指标参数

Kafka利用Yammer Metrics来监控server和client指标数据。

JMX监控指标参数列表如下:

参数

Mbean名称

解释

Message in rate

"kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics"

所有topic(进出)流量

Byte in rate

"kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"

Request rate

"kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"

Byte out rate

"kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"

Log flush rate and time

"kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"

# of under replicated partitions (|ISR| < |all replicas|)

"kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager"

0

Is controller active on broker

"kafka.controller":name="ActiveControllerCount",type="KafkaController"

only one broker in the cluster should have 1

Leader election rate

"kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats"

non-zero when there are broker failures

Unclean leader election rate

"kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats"

0

Partition counts

"kafka.server":name="PartitionCount",type="ReplicaManager"

mostly even across brokers

Leader replica counts

"kafka.server":name="LeaderCount",type="ReplicaManager"

mostly even across brokers

ISR shrink rate

"kafka.server":name="ISRShrinksPerSec",type="ReplicaManager"

If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.

ISR expansion rate

"kafka.server":name="ISRExpandsPerSec",type="ReplicaManager"

See above

Max lag in messages btw follower and leader replicas

"kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager"

副本滞后数量

Lag in messages per follower replica

"kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics"

副本滞后数量

Requests waiting in the producer purgatory

"kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory"

Requests waiting in the fetch purgatory

"kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"

Request total time

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"

Time the request waiting in the request queue

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"

Time the request being processed at the leader

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"

Time the request waits for the follower

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"

Time to send the response

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"

Number of messages the consumer lags behind the producer by

"kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"

参数

Mbean名称

解释

Message in rate

"kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics"

所有topic(进出)流量

Byte in rate

"kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"

Request rate

"kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"

Byte out rate

"kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"

Log flush rate and time

"kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"

# of under replicated partitions (|ISR| < |all replicas|)

"kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager"

0

Is controller active on broker

"kafka.controller":name="ActiveControllerCount",type="KafkaController"

only one broker in the cluster should have 1

Leader election rate

"kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats"

non-zero when there are broker failures

Unclean leader election rate

"kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats"

0

Partition counts

"kafka.server":name="PartitionCount",type="ReplicaManager"

mostly even across brokers

Leader replica counts

"kafka.server":name="LeaderCount",type="ReplicaManager"

mostly even across brokers

ISR shrink rate

"kafka.server":name="ISRShrinksPerSec",type="ReplicaManager"

If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.

ISR expansion rate

"kafka.server":name="ISRExpandsPerSec",type="ReplicaManager"

See above

Max lag in messages btw follower and leader replicas

"kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager"

副本滞后数量

Lag in messages per follower replica

"kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics"

副本滞后数量

Requests waiting in the producer purgatory

"kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory"

Requests waiting in the fetch purgatory

"kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"

Request total time

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"

Time the request waiting in the request queue

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"

Time the request being processed at the leader

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"

Time the request waits for the follower

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"

Time to send the response

"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"

Number of messages the consumer lags behind the producer by

"kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"

19)apache kafka性能测试命令利用和构建kafka-perf

本来想用kafka官方供应的工具做性能测试的。
但事与愿违,当我实行官方供应的kafka测试脚本,却报错没有找到ProducerPerformance,后来浏览一些代码文件,才创造没有把perf性能测试程序打包到kafka_2.x.0-0.8.x.x.jar发行版本中。

现在来教您如何打包做测试。

1.准备事情:

安装gradle

2.下载kafka源代码

kafka-0.8.1源代码

3.编译kafka-perf_2.x-0.8.1.x.jar

编译把稳事变:默认情形下是编译为2.8.0版本,也可以指定版本编译。
目前编译高版本的kafka-perf(2.8.0以上版本)是由问题的,由于build.gradle配置参数有问题(版天职歧,会报如下缺点,版本不兼容缺点),如果要构建高版本kafka-perf多版本修正内容如下:

下载build.gradle 更换掉kafka-0.8.1.1-src根目录下文件即可

编译构建实行命令:

gradle jar 默认天生2.8.0版本的kafka和kafka-perf的jar gradle jar_core_2_8_0 天生2.8.0版本的kafka的jar gradle jar_core_2_8_2 天生2.8.2版本的kafka的jar gradle jar_core_2_9_1 天生2.9.1版本的kafka的jar gradle jar_core_2_9_2 天生2.9.2版本的kafka的jar gradle jar_core_2_10_1 天生2.10.1版本的kafka的jar gradle perf:jar 天生2.8.0版本的kafka和kafka-perf的jar gradle perf_2_9_1 天生2.9.1版本的kafka和kafka-perf的jar gradle perf_2_10_1 天生2.10.1版本的kafka和kafka-perf的jar gradle -PscalaVersion=2.8.0 jar 编译scala 2.8.0版本编译所有jar gradle -PscalaVersion=2.8.2 jar 编译scala 2.8.2版本编译所有jar gradle -PscalaVersion=2.9.1 jar 编译scala 2.9.1版本编译所有jar gradle -PscalaVersion=2.10.1 jar 编译scala 2.10.1版本编译所有jar

如果不想编译jar,可以直接下载:kafka-perf_2.x.x-0.8.1.jar

lizhitao@users-MacBook-Pro:~/mt_wp/tmp$ cd kafka-0.8.1.1-src lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$gradle jar lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$gradle perf:jar BUILD SUCCESSFUL Total time: 54.41 secs

编译jar包目录如下:

a. kafka_2.x-0.8.1.1.jar

kafka-0.8.1.1-src/core/build

b.kafka-perf_2.x-0.8.1.x.jar

kafka-0.8.1.1-src/perf/build/libs

kafka多版本jar:

4. kafka性能测试命令用法:

4.1 创建topic

bin/kafka-topics.sh --zookeeper 192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka02 --create --topic test-rep-one --partitions 6 --replication-factor 1

4.2 kafka-producer-perf-test.sh中参数解释:

messages 生产者发送总的数量 message-size 每条大小 batch-size 每次批量发送的数量 topics 生产者发送的topic threads 生产者利用几个线程同时发送 broker-list 安装kafka做事的机器ip:port列表 producer-num-retries 一个失落败发送重试次数 request-timeout-ms 一个要求发送超时时间

4.3 bin/kafka-consumer-perf-test.sh中参数解释:

zookeeperzk 配置 messages 消费者消费总数量 topic 消费者须要消费的topic threads 消费者利用几个线程同时消费 group 消费者组名称 socket-buffer-sizesocket 缓冲大小 fetch-size 每次向kafka broker要求消费大小 consumer.timeout.ms 消费者去kafka broker拿去一条超时时间

4.4 生产者发送数据:

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ bin/kafka-producer-perf-test.sh --messages 5000000 --message-size 5000 --batch-size 5000 --topics test-rep-one --threads 8 --broker-list mobile-esb03:9092,mobile-esb04:9092,mobile-esb05:9092

4.5 消费者消费数据

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ bin/kafka-consumer-perf-test.sh --zookeeper 192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka02 --messages 50000000 --topic test-rep-one --threads 1

20)apache kafka源码构建打包

准备事情:

安装gradle

1.构建kafka的jar并运行

打包kafka-0.8.1.1下所有jar,包括core,perf,clients等。

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle jar

2.构建源代码jar

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle srcJar

3.运行序列化测试

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle -Dtest.single=RequestResponseSerializationTest core:test

4.gradle任务列表

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle tasks

5.构建所有jar,包括tasks中各个版本jar

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle jarAll

6.指定构建jar包版本

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle -PscalaVersion=2.10.1 jar

7.发布文件到maven仓库

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle uploadArchivesAll

编辑文件~/.gradle/gradle.properties,增加如下内容:

mavenUrl= mavenUsername= mavenPassword= signing.keyId= signing.password= signing.secretKeyRingFile=

21)Apache kafka客户端开拓-java

1.依赖包

org.apache.kafka

kafka_2.10

0.8.1

2.producer程序开拓例子

2.1 producer参数解释

#指定kafka节点列表,用于获取metadata,不必全部指定 metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092 # 指定分区处理类。
默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区 #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner # 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。
压缩后中会有头来指明压缩类型,故在消费者端解压是透明的无需指定。
compression.codec=none # 指定序列化处理类(mafka client API调用解释-->3.序列化约定wiki),默认为kafka.serializer.DefaultEncoder,即byte[] serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder # serializer.class=kafka.serializer.DefaultEncoder # serializer.class=kafka.serializer.StringEncoder # 如果要压缩,这里指定哪些topic要压缩,默认empty,表示不压缩。
#compressed.topics=

########### request ack ############### # producer吸收ack的机遇.默认为0. # 0: producer不会等待broker发送ack # 1: 当leader吸收到之后发送ack # 2: 当所有的follower都同步成功后发送ack. request.required.acks=0 # 在向producer发送ack之前,broker许可等待的最大韶光 # 如果超时,broker将会向producer发送一个error ACK.意味着上一次由于某种 # 缘故原由未能成功(比如follower未能同步成功) request.timeout.ms=10000 ########## end #####################

# 同步还是异步发送,默认“sync”表同步,"async"表异步。
异步可以提高发送吞吐量, # 也意味着将会在本地buffer中,并应时批量发送,但是也可能导致丢失未发送过去的 producer.type=sync ############## 异步发送 (以下四个异步参数可选) #################### # 在async模式下,当message被缓存的韶光超过此值后,将会批量发送给broker,默认为5000ms # 此值和batch.num.messages协同事情. queue.buffering.max.ms = 5000 # 在async模式下,producer端许可buffer的最大量 # 无论如何,producer都无法尽快的将发送给broker,从而导致在producer端大量沉积 # 此时,如果的条数达到阀值,将会导致producer端壅塞或者被抛弃,默认为10000 queue.buffering.max.messages=20000 # 如果是异步,指定每次批量发送数据量,默认为200 batch.num.messages=500 # 当在producer端沉积的条数达到"queue.buffering.max.meesages"后 # 壅塞一定韶光后,行列步队仍旧没有enqueue(producer仍旧没有发送出任何) # 此时producer可以连续壅塞或者将抛弃,此timeout值用于掌握"壅塞"的韶光 # -1: 无壅塞超时限定,不会被抛弃 # 0:立即清空行列步队,被抛弃 queue.enqueue.timeout.ms=-1 ################ end ###############

# 当producer吸收到error ACK,或者没有吸收到ACK时,许可重发的次数 # 由于broker并没有完全的机制来避免重复,以是当网络非常时(比如ACK丢失) # 有可能导致broker吸收到重复的,默认值为3. message.send.max.retries=3 # producer刷新topic metada的韶光间隔,producer须要知道partition leader的位置,以及当前topic的情形 # 因此producer须要一个机制来获取最新的metadata,当producer碰着特定缺点时,将会立即刷新 # (比如topic失落效,partition丢失,leader失落效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000

2.1 指定关键字key,发送到指定partitions

解释:如果须要实现自定义partitions发送,须要实现Partitioner接口

3.consumer程序开拓例子

3.1 consumer参数解释

# zookeeper连接做事器地址,此处为线下测试环境配置(kafka做事-->kafka broker集群线上支配环境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session过期韶光,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,其他消费者要等该指定时间才能检讨到并且触发重新负载均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中。
把稳offset更新时基于time而不是每次得到的。
一旦在更新zookeeper发生非常并重启,将可能拿到已拿到过的 zookeeper.sync.time.ms=2000

#指定消费组 group.id=xxx # 当consumer消费一定量的之后,将会自动向zookeeper提交offset信息 # 把稳offset信息并不是每消费一次就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true auto.commit.enable=true # 自动更新韶光。
默认60 1000 auto.commit.interval.ms=1000 # 当前consumer的标识,可以设定,也可以有系统天生,紧张用来跟踪消费情形,便于不雅观察 conusmer.id=xxx # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生 client.id=xxxx # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 # 的consumer上,如果一个consumer得到了某个partition的消费权限,那么它将会向zk注册 # "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有开释此节点, # 此值用于掌握,注册节点的重试次数. rebalance.max.retries=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的chunk # 每次feth将得到多条,此值为总大小,提升此值,将会花费更多的consumer端内存 fetch.min.bytes=6553600 # 当的尺寸不敷时,server壅塞的韶光,如果超时,将立即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360

# 如果zookeeper没有offset值或offset值超出范围。
那么就给个初始的offset。
有smallest、largest、 # anything可选,分别表示给当前最小的offset、当前最大的offset、抛非常。
默认largest auto.offset.reset=smallest # 指定序列化处理类(mafka client API调用解释-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder

3.2 多线程并行消费topic

ConsumerTest类

总结:

kafka消费者api分为high api和low api,目前上述demo是都是利用kafka high api,高等api不用关心掩护消费状态信息和负载均衡,系统会根据配置参数,

定期flush offset到zk上,如果有多个consumer且每个consumer创建了多个线程,高等api会根据zk上注册consumer信息,进行自动负载均衡操作。

把稳事变:

1.高等api将会内部实现持久化每个分区末了读到的的offset,数据保存在zookeeper中的消费组名中(如/consumers/push-token-group/offsets/push-token/2。

个中push-token-group是消费组,push-token是topic,末了一个2表示第3个分区),每间隔一个(默认1000ms)韶光更新一次offset,

那么可能在重启消费者时拿到重复的。
此外,当分区leader发生变更时也可能拿到重复的。
因此在关闭消费者时最好等待一定韶光(10s)然后再shutdown()

2.消费组名是一个全局的信息,要把稳在新的消费者启动之前旧的消费者要关闭。
如果新的进程启动并且消费组名相同,kafka会添加这个进程到可用消费线程组中用来消费

topic和触发重新分配负载均衡,那么同一个分区的就有可能发送到不同的进程中。

3.如果消费者组中所有consumer的总线程数量大于分区数,一部分线程或某些consumer可能无法读取消息或处于空闲状态。

4.如果分区数多于线程数(如果消费组中运行者多个消费者,则线程数为消费者组内所有消费者线程总和),一部分线程会读取到多个分区的

5.如果一个线程消费多个分区,那么吸收到的是不能担保顺序的。

备注:可用zookeeper web ui工具管理查看zk目录树数据: xxx/consumers/push-token-group/owners/push-token/2个中

push-token-group为消费组,push-token为topic,2为分区3.查看里面的内容如:

push-token-group-mobile-platform03-1405157976163-7ab14bd1-0表示该分区被该标示的线程所实行。

producer性能优化:异步化,批量发送,详细浏览上述参数解释。
consumer性能优化:如果是高吞吐量数据,设置每次拿取消息(fetch.min.bytes)大些,

拿取消息频繁(fetch.wait.max.ms)些(或韶光间隔短些),如果是低延时哀求,则设置韶光韶光间隔小,每次从kafka broker拿取消息只管即便小些。

22) kafka broker内部架构

下面先容kafka broker的紧张子模块,帮助您更好地学习并理解kafka源代码和架构。

如下先容几个子模块:

Kafka API layer

LogManager and Log

ReplicaManager

ZookeeperConsumerConnector

service Schedule

如下是系统几个模块如何组成到一起架构图:

23)apache kafka源码剖析走读-kafka整体构造剖析

kafka源代码工程目录构造如下图:

下面只对core目录构做作解释,其他都是测试类或java客户端代码

admin --管理员模块,操作和管理topic,paritions干系,包含create,delete topic,扩展patitions

Api --该模块紧张卖力组装数据,组装2种类型数据,

1.读取或解码客户端发送的二进制数据.

2.编码log数据,组装为须要发送的数据。

client --该模块比较大略,就一个类,Producer读取kafka broker元数据信息,

topic和partitions,以及leader

cluster --该模块包含几个实体类,Broker,Cluster,Partition,Replica,阐明他们之间关系:Cluster由多个broker组成,一个Broker包含多个partition,一个 topic的所有partitions分布在不同broker的中,一个Replica包含多个Partition。

common --通用模块,只包含非常类和缺点验证

consumer --consumer处理模块,卖力所有客户端消费者数据和逻辑处理

contoroller --卖力中心掌握器选举,partition的leader选举,副本分配,副本重新分配,

partition和replica扩容。

javaapi --供应java的producer和consumer接口api

log --kafka文件系统,卖力处理和存储所有kafka的topic数据。

message --封装kafka的ByteBufferMessageSet

metrics --内部状态的监控模块

network --网络事宜处理模块,卖力处理和吸收客户端连接

producer --producer实现模块,包括同步和异步发送。

serializer --序列化或反序列化当前

kafka --kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。

tools --一看这便是工具模块,包含内容比较多:

a.导出对应consumer的offset值.

b.导出LogSegments信息,当前topic的log写的位置信息.

c.导出zk上所有consumer的offset值.

d.修正注册在zk的consumer的offset值.

f.producer和consumer的利用例子.

utils --Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。

1.kafka启动类:kafka.scala

kafka为kafka broker的main启动类,其紧张浸染为加载配置,启动report做事(内部状态的监控),注册开释资源的钩子,以及门面入口类。

kafka类代码如下:

KafkaServer部分紧张代码如下:

24)apache kafka源码剖析走读-Producer剖析

producer的发送办法阐发

Kafka供应了Producer类作为java producer的api,该类有sync和async两种发送办法。

sync架构图

async架构图

调用流程如下:

代码流程如下:

Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer、DefaultEventHandler。
在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有创建Producer、EventHandler和ProducerPool,ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。

调用producer.send方法流程:

当运用程序调用producer.send方法时,其内部实在调的是eventhandler.handle(message)方法,eventHandler会首先序列化该,

eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()

调用逻辑阐明:当客户端运用程序调用producer发送messages时(既可以发送单条,也可以发送List多条),调用eventhandler.serialize首先序列化所有,序列化操浸染户可以自定义实现Encoder接口,下一步调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送数据,SyncProducer包装了nio网络操作信息。

Producer的sync与async发送处理,大家看以上架构图一览无余。

partitionAndCollate方法详细浸染:获取所有partitions的leader所在leaderBrokerId(便是在该partiionid的leader分布在哪个broker上),

创建一个HashMap>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送作准备事情。

名称阐明:partKey:分区关键字,当客户端运用程序实现Partitioner接口时,传入参数key为分区关键字,根据key和numPartitions,返回分区(partitions)索引。
记住partitions分区索引是从0开始的。

Producer平滑扩容机制

如果开拓过producer客户端代码,会知道metadata.broker.list参数,它的含义是kafak broker的ip和port列表,producer初始化时,就连接这几个broker,这时大家会有疑问,producer支持kafka cluster新增broker节点?它又没有监听zk broker节点或从zk中获取broker信息,答案是肯定的,producer可以支持平滑扩容broker,他是通过定时与现有的metadata.broker.list通信,获取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。
等待后续运用程序调用。

DefaultEventHandler类中初始化实例化BrokerPartitionInfo类,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代码如下:

当我们启动kafka broker后,并且大量producer和consumer时,常常会报如下非常信息。

root@lizhitao:/opt/soft$ Closing socket connection to 192.168.11.166

笔者也是常常很永劫光看源码剖析,才明白了为什么ProducerConfig配置信息里面并不哀求利用者供应完全的kafka集群的broker信息,而是任选一个或几个即可。
由于他会通过您选择的broker和topics信息而获取最新的所有的broker信息。

值得理解的是用于发送TopicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回ProducerPool,而是直接Close.

重难点理解:

刷新metadata并不仅在第一次初始化时做。
为了能适应kafka broker运行中由于各种缘故原由挂掉、paritition改变等变革,

eventHandler会定期的再去刷新一次该metadata,刷新的间隔用参数topic.metadata.refresh.interval.ms定义,默认值是10分钟。

这里有三点须要强调:

客户端调用send, 才会新建SyncProducer,只有调用send才会去定期刷新metadata

在每次取metadata时,kafka会新建一个SyncProducer去取metadata,逻辑处理完后再close。

根据当前SyncProducer(一个Broker的连接)取得的最新的完全的metadata,刷新ProducerPool中到broker的连接.

每10分钟的刷新会直接重新把到每个broker的socket连接重修,意味着在这之后的第一个要求会有几百毫秒的延迟。
如果不想要该延迟,把topic.metadata.refresh.interval.ms值改为-1,这样只有在发送失落败时,才会重新刷新。
Kafka的集群中如果某个partition所在的broker挂了,可以检讨缺点后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。

解释:每个SyncProducer实例化工具会建立一个socket连接

特殊把稳:

在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo连续实行,在其末端,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)

在ProducerPool中,SyncProducer的数目是由该topic的partition数目掌握的,即每一个SyncProducer对应一个broker,内部封了一个到该broker的socket连接。
每次刷新时,会把已存在SyncProducer给close掉,即关闭socket连接,然后新建SyncProducer,即新建socket连接,去覆盖老的。

如果不存在,则直接创建新的。

25)apache kafka性能优化架构剖析

Apache kafka性能优化架构剖析

运用程序优化:数据压缩

consumer offset默认情形下是定时批量更新topics的partitions offset值

26)apache kafka源码剖析走读-server端网络架构剖析

笔者本日禀析一下kafka网络架构,俗话说人无好的胫骨,就没有好的身体,建筑没有踏实可靠的构造框架,就不会耸立不倒。
同样的做事端程序没有好的网络架构,其性能就会受到极大影响,其他方面再怎么优化,也会受限于此,那kafka网络架构是若何的呢,它不是用的现今盛行的netty,mina的高性能网络架构,而是自己基于java nio开拓的。

kafka网络架构图如下:

27)apache kafka源码剖析走读-ZookeeperConsumerConnector剖析

1.ZookeeperConsumer架构

ZookeeperConsumer类中consumer运行过程架构图:

图1

过程剖析:

ConsumerGroupExample类

2.消费者线程(consumer thread),行列步队,拉取线程(fetch thread)三者之间关系

每一个topic至少须要创建一个consumer thread,如果有多个partitions,则可以创建多个consumer thread线程,consumer thread>==partitions数量,否则会有consumer thread空闲。

部分代码示例如下:

ConsumerConnector consumer

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

Map topicCountMap = new HashMap();

topicCountMap.put("test-string-topic", new Integer(1)); //value表示consumer thread线程数量

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

详细解释一下三者关系:

(1).topic的partitions分布规则

paritions是安装kafka brokerId有序分配的。

例如现在有三个node安装了kafka broker做事端程序,brokerId分别设置为1,2,3,现在准备一个topic为test-string-topic,并且分配12个partitons,此时partitions的kafka broker节点分布情形为 ,partitions索引编号为0,3,6,9等4个partitions在brokerId=1上,1,4,7,10在brokerId=2上,2,5,8,11在brokerId=3上。

创建consumer thread

consumer thread数量与BlockingQueue逐一对应。

a.当consumer thread count=1时

此时有一个blockingQueue1,三个fetch thread线程,该topic分布在几个node上就有几个fetch thread,每个fetch thread会于kafka broker建立一个连接。
3个fetch thread线程去拉取消息数据,终极放到blockingQueue1中,等待consumer thread来消费。

消费者线程,缓冲行列步队,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5,6,7,8,9,10,11

fetch thread与partitions分布列表如下

fetch线程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11

b. 当consumer thread count=2时

此时有consumerThread1和consumerThread2分别对应2个行列步队blockingQueue1,blockingQueue2,这2个消费者线程消费partitions依次为:0,1,2,3,4,5与6,7,8,9,10,11;消费者线程,缓冲行列步队,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5

consumer thread2

blockingQueue2

6,7,8,9,10,11

fetch thread与partitions分布列表如下

fetch线程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11

c. 当consumer thread count=4时

消费者线程,缓冲行列步队,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2

consumer thread2

blockingQueue2

3,4,5

consumer thread3

blockingQueue3

6,7,8

consumer thread4

blockingQueue4

9,10,11

fetch thread与partitions分布列表如下

同上

同该当消费线程consumer thread count=n,都是安装上述分布规则来处理的。

3.consumer线程以及行列步队创建逻辑

利用ZookeeperConsumerConnector类创建多线程并行消费测试类,ConsumerGroupExample类初始化,调用createMessageStreams方法,实际是在consume方法处理的逻辑,创建KafkaStream,以及壅塞行列步队(LinkedBlockingQueue),KafkaStream与行列步队个数逐一对应,消费者线程数量决定壅塞行列步队的个数。

registerConsumerInZK()方法:设置消费者组,注册消费者信息consumerIdString到zookeeper上。

consumerIdString产生规则部分代码如下:

consumer初始化逻辑处理:

1.实例化并注册loadBalancerListener监听,ZKRebalancerListener监听consumerIdString状态变革

触发consumer reblance条件如下几个:

ZKRebalancerListener:当/kafka01/consumer/[consumer-group]/ids子节点变革时,会触发

ZKTopicPartitionChangeListener:当该topic的partitions发生变革时,会触发。

val topicPath = "/kafka01/brokers/topics" + "/" + "topic-1"

zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)

consumer reblance逻辑

consumer offset更新机制

reblance打算规则:(有待补充)

28)kafka的ZkUtils类的java版本部分代码

29)kafka & mafka client开拓与实践

请单击这里下载(下载网址:http://download.csdn.net/detail/zhongwen7710/8173117)

30) kafka文件系统设计那些事

1.文件系统解释

文件系统一样平常分为系统和用户2种类型,系统级文件系统:ext3,ext4,dfs,ntfs等等,,笔者并不会向大家先容那种纷繁繁芜的分布式或系统级文件系统,而是从kafka架构高性能角度考虑,深入阐发kafka文件系统存储构造设计。

2.kafka文件系统架构

2.1 文件系统数据流

下面用图形表示先容客户端处理几个过程如下:

当建立连接要求时,首先客户端向kafka broker发送连接要求,broker中由Acceptor thread线程吸收并建立连接后,把client的socket以轮询办法转交给相应的processor thread。

当client向broker发送数据要求,由processor thread处理并吸收client数据放到request缓冲区中,以待IO thread进行逻辑处理和打算并把返回result放到response缓冲区中.

接着唤醒processor thread,processor thread抱住response行列步队循环发送所有response数据给client.

2.2 kafka文件系统存储构造

paritions分布规则,kafka集群由多个kafka broker组成,一个topic的partitions会分布在一个或多个broker上,topic的partitions在kafka集群上分配规则为,安装paritions索引编号依次有序分布在broker上,

当partitions数量 > brokers数量,会依次循环再次迭代分配。

partitions命名规则,paritions名称为:topic-name-index, index分区索引编号,从0开始依次递增。

producer,每个producer可以发送msg到topic任意一个或多个partitons。

consumer,同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个只发送给个中一个Consumer.

2.3 kafka的文件系统构造-目录

目前如果kafka集群中只有一个broker,数据文件目录为message-folder,例如笔者创建一个topic名称为:report_push, partitions=4

存储路径和目录规则为:

xxx/message-folder

|--report_push-0

|--report_push-1

|--report_push-2

|--report_push-3

2.4 kafka的文件系统构造-partiton文件存储办法

每个partition(topic-name-index)目录中存储海量msg,那它是怎么存储的呢?文件存储构造是若何?

这么多(海量)是存储在一个大文件中,类似DB那样存储,还是其他办法存储构造呢?笔者后续会像剥洋葱一样,给大家一层一层依次分解并剖析。

数据库和kafka文件系统比较,相信大家都用过数据库,数据库底层文件系统相称繁芜,由于数据库特点,须要按照关键字,id快速查询,修正,删除,日志,回滚等等。

以是数据库文件系统是分页存储的树形构造,须要支持大量随机事物操作。
比较数据库支持查询,事物等等繁芜文件,则kafka行列步队类型文件系统大略多了,kafka文件系统存储特点是,

只须要支持producer和consumer顺序生产和就够了,(msg)生命周期由consumer决定。

partiton文件存储构造剖析,每个partition就像如上图4,一个巨大文件数据被均匀分配到多个文件大小相等的文件中。
即相称于一个大文件被切成很多相等大小的文件段segment file

(数量不一定相等)。
由于每个topic中生命周期由末了一个consumer决定,当某个或些被末了一个consumer(consumer group)后,就可以删除该。
显然易见,

这样做的目的是broker能快速回收磁盘空间,而且小文件也能mmap全部到内存。
紧张目的便是提高磁盘利用率和处理性能。

2.5 kafka的文件系统构造-partiton文件存储segment file组成

读者从2.4节理解到kafka文件系统partition存储办法,下面向大家先容一下partion文件存储中segement file组成构造。
一个商业化行列步队的性能好坏,

其文件系统存储构造设计是衡量一个行列步队做事程序最关键指标之一,他也是行列步队中最核心且最能表示行列步队技能水平的部分。
在本节中我们将走进segment file内部一探究竟。

segment file组成:由2大部分组成,分别为segment data file和segment index file,此2个文件逐一对应,成对涌现.

segment index file索引文件组成构造如下:

00000000000000000000.index 文件名称,文件串大小最大支持2^64bit

每次记录相应log文件记录的相对条数和物理偏移位置位置,共8bytes

4byte 当前segment file offset - last seg file offset记录条数 offset

4byte对应segment file物理偏移地址 position

………

segment data file索引文件组成构造如下:

00000000000000000000.log 文件名称,文件串大小最大支持2^64bit,与index对应

参数解释:

4 byte CRC32:利用crc32算法打算除CRC32这4byte外的buffer。

1 byte “magic":表示数据文件协议版本号

1 byte “attributes":表示标识独立版本,标识压缩类型,编码类型。

key data:可选,可以存储判断或表示这个块的元数据信息。

payload data:体,该体可能会存储多笔记载,内部是按照序号有序存储的。

2.6 kafka文件系统-consumer读取流程

segment index file:

稀疏索引办法,减少索引文件大小,这样可以直接内存操作,稀疏索引只为数据文件的每个存储块设一个键-指针对,它比稠密索引节省了更多的存储空间,但查找给定值的记录需更多的韶光,通过二分查找快速找到segment data file物理位置,如果在index file没有找到data file详细位置,则data file相对位置连续顺序读取查找,直到找到为止。

2.7 kafka的文件系统构造-总体目录构造

同一个topic下有不同分区,每个分区下面会划分为多个(段)文件,只有一个当前文件在写,其他文件只读。
当写满一个文件(写满的意思是达到设定值)则切换文件,新建一个当前文件用来写,老确当前文件切换为只读。
文件的命名以起始偏移量来命名。
看一个例子,假设report_push这个topic下的0-0分区可能有以下这些文件:

• 00000000000000000000.index

• 00000000000000000000.log

• 00000000000000368769.index

• 00000000000000368769.log

• 00000000000000737337.index

• 00000000000000737337.log

• 00000000000001105814.index

• 00000000000001105814.log

………………..

个中 00000000000000000000.index表示最开始的文件,起始偏移量为0.第二个文件00000000000000368769.index的量起始偏移量为368769.同样,第三个文件00000000000000737337.index的起始偏移量为737337.

以起始偏移量命名并排序这些文件,那么当消费者要拉取某个起始偏移量位置的数据变的相称大略,只要根据传上来的offset二分查找文件列表,定位到详细文件,

然后将绝对offset减去文件的起始节点转化为相对offset,即可开始传输数据。
例如,同样以上面的例子为例,假设消费者想抓取从第368969位置开始的数据,则根据368969二分查找,

定位到00000000000000368769.log这个文件(368969在368769和737337之间),根据索引文件二分搜索可以确定读取数据最大大小。

2.8 kafka文件系统–实际效果

基本不会有磁盘读的大量操作,都在内存进行,只有定期磁盘批量写操作。

3.总结

高效文件系统特点

一个大文件分成多个小文件段。

多个小文件段,随意马虎定时打消或删除已经消费完文件,减少磁盘占用。

index全部映射到memory直接操作,避免segment file被交流到磁盘增加IO操作次数。

根据索引信息,可以确定发送response到consumer的最大大小。

索引文件元数据存储用的是相对前一个segment file的offset存储,节省空间大小。

31)kafka的ZookeeperConsumer实现

kafka的ZookeeperConsumer数据获取的步骤如下:

入口ZookeeperConsumerConnector def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])

: Map[String,List[KafkaStream[T]]] 方法

客户端启动后会在消费者注册目录上添加子节点变革的监听ZKRebalancerListener,ZKRebalancerListener实例会在内部创建一个线程,这个线程定时检讨监听的事宜有没有实行(消费者发生变革),如果没有变革则wait1秒钟,当发生了变革就调用 syncedRebalance 方法,去rebalance消费者。

ConsumerIterator的实现可能会造成数据的重复发送(这要看生产者如何生产数据),FetchedDataChunk是一个数据凑集,它内部会包含很多数据块,一个数据块可能包含多条,但同一个数据块中的只有一个offset,以是当一个块有多条数据,处理完部分数据发生非常时,消费者重新去取数据,就会再次取得这个数据块,然后消费过的数据就会被重新消费。

这篇文章转载自田加国:http://www.tianjiaguo.com/system-architecture/kafka/kafka的zookeeperconsumer实现/

标签:

相关文章