kafka集群安装文档

一、架构

# zookeeper和kafka架构
producer producer  producer
|        |         |
--------------------
|
zookeeper zookeeper zookeeper (zookeeper支持failover,至少需要三个zookeeper)
|
--------------------
|        |         |
kafka  kafka    kafka(broker)

# kafka架构
push       pull
producer---->broker---->consumer
flume         kafka     logstash-->elasticsearch-->kibana

二、安装

http://kafka.apache.org/downloads.html
# wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
yum -y install java-1.8.0-openjdk.x86_64
tar xzf kafka_2.12-2.2.0.tgz && mv kafka_2.12-2.2.0 /usr/local/kafka && cd /usr/local/kafka

三、配置zookeeper集群

# zk建议是单数
# http://cailin.iteye.com/blog/2014486/
# http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
# zookeeper集群解释(自动冗余broker的信息)
# zookeeper支持failover,至少需要三个(2n+1)zookeeper
# server.zk服务器标识=IP或hostname:选举leader端口(zk之间的通讯):接收选举数据通信端口,还有一个客户端的端口2181,这三个端口不要混淆
# 注意下面的任意端口不要冲突

######## zookeeper1.properties ########
f=config/zookeeper1.properties
[ -f $f ] || cat > $f << EOF
dataDir=/data/zookeeper-s1
clientPort=2181
maxClientCnxns=20
# zookeeper集群配置信息
initLimit=5
syncLimit=2
server.1=10.0.26.26:3181:3881
server.2=10.0.26.26:3182:3882
server.3=10.0.26.26:3183:3883
EOF

######## zookeeper2.properties ########
f=config/zookeeper2.properties
[ -f $f ] || cat > $f << EOF
dataDir=/data/zookeeper-s2
clientPort=2182
maxClientCnxns=20
# zookeeper集群配置信息
initLimit=5
syncLimit=2
server.1=10.0.26.26:3181:3881
server.2=10.0.26.26:3182:3882
server.3=10.0.26.26:3183:3883
EOF

######## zookeeper3.properties ########
f=config/zookeeper3.properties
[ -f $f ] || cat > $f << EOF
dataDir=/data/zookeeper-s3
clientPort=2183
maxClientCnxns=20
# zookeeper集群配置信息
initLimit=5
syncLimit=2
server.1=10.0.26.26:3181:3881
server.2=10.0.26.26:3182:3882
server.3=10.0.26.26:3183:3883
EOF

# 对应的zookeeper的data目录下面声明自己的id(上面server.x中x即对应的id)
mkdir /data/zookeeper-s1/ && echo "1" > /data/zookeeper-s1/myid
mkdir /data/zookeeper-s2/ && echo "2" > /data/zookeeper-s2/myid
mkdir /data/zookeeper-s3/ && echo "3" > /data/zookeeper-s3/myid
# rm -fr /data/kafka-logs-s* /data/zookeeper-s*
# 如何查看zookeeper为主呢

四、配置broker

# (Replication策略需要自己优化)
#advertised.listeners=PLAINTEXT://your.host.name:9092 这个值配置ip端口 主要用于返回给生产者和消费者

######## server1.properties ########
f=config/server1.properties
[ -f $f ] || cat > $f << EOF
broker.id=101
# delete.topic.enable=true # 调试可以开启删除topic
listeners=PLAINTEXT://:9091
advertised.listeners=PLAINTEXT://10.0.26.26:9091
log.dirs=/data/kafka-logs-s1
zookeeper.connect=10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单kafka超过此大小会自动清理
log.retention.bytes=16106127360
EOF

######## server2.properties ########
f=config/server2.properties
[ -f $f ] || cat > $f << EOF
broker.id=102
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://10.0.26.26:9092
log.dirs=/data/kafka-logs-s2
zookeeper.connect=10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单kafka超过此大小会自动清理
log.retention.bytes=16106127360
EOF

######## server3.properties ########
f=config/server3.properties
[ -f $f ] || cat > $f << EOF
broker.id=103
listeners=PLAINTEXT://:9093
advertised.listeners=PLAINTEXT://10.0.26.26:9093
log.dirs=/data/kafka-logs-s3
zookeeper.connect=10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单kafka超过此大小会自动清理
log.retention.bytes=16106127360
EOF

五、启动

# 先启动zookeeper(Kafka uses ZooKeeper so you need to first start a ZooKeeper server)
# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# bin/kafka-server-start.sh -daemon config/server.properties

# 前台打印日志调试(daemon参数用于后台)
# 启动okeeper
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper1.properties
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper2.properties
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper3.properties

# 只能关闭所有zookeeper,单独的话,需要手动处理
# /usr/local/kafka/bin/zookeeper-server-stop.sh

# 查看日志 tailf
tailf logs/zookeeper.out
# 当获取到Follower信息时,说明此zookeeper为leader
INFO leader and follower are in sync, zxid=0x0
INFO Received NEWLEADER-ACK message from 1
INFO Received NEWLEADER-ACK message from 2
INFO Follower sid: 2
INFO Follower sid: 1
# 非leader关键字
INFO Getting a diff from the leader 0x0


# broker(调整内存)
vim /usr/local/kafka/bin/bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
# exec命令之前声明
# exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

# 启动kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server1.properties
/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server2.properties
/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server3.properties

# 只能关闭所有kafka,单独的话,需要手动处理
# ./bin/kafka-server-stop.sh

# 查看日志
tailf logs/server.log

# 
INFO New leader is 101  # 101上面的日志,说明此服为leader

六、常见分配

# 单机版(省成本),多分区,跟进项目需求来确定分区数量(broker)
# 下面为统计系统,单个物理服务器,只需要3个分区做并发的效果

# 创建topic
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --replication-factor 1 --partitions 3 --topic zaza
# 查看
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# 详细信息
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 zaza

七、测试

#负载均衡和容错
#Producer和broker之间没有负载均衡机制。
#broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。

## 对于producer的broker是没有集群的概念,必须制定多个broker
#bin/kafka-console-producer.sh --broker-list 10.0.26.26:9092,10.0.26.26:9092,10.0.26.26:9092 --topic my-replicated-topic

# 创建topic(nginx-access/mysql-slow-log/dldl-ios):
# replication-factor表示副本的份数,不能超过broker总量(kafka),一般副本建议两个?
# partitions为分区数量,且每个分区的消息内容不会(提高性能)重复(分区可能在多个broker上面)如:/data/kafka-logs/test_topic-0 /data/kafka-logs/test_topic-1
# 单机版,一个副本就行了
bin/kafka-topics.sh --create --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --replication-factor 1 --partitions 3 --topic test_topic
# bin/kafka-topics.sh --delete --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --topic test_topic
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test_topic
# 查看Topic:
bin/kafka-topics.sh --list --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# 显示所有Topic
bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
###############################################################
Topic:test_topic      PartitionCount:3        ReplicationFactor:3     Configs:
Topic: test_topic     Partition: 0    Leader: 103     Replicas: 103,104,101   Isr: 103,104,101
Topic: test_topic     Partition: 1    Leader: 104     Replicas: 104,101,102   Isr: 104,101,102
Topic: test_topic     Partition: 2    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
Topic:test_topic      PartitionCount:3        ReplicationFactor:3     Configs:
Topic: test_topic     Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
Topic: test_topic     Partition: 1    Leader: 104     Replicas: 104,102,103   Isr: 104,102,103
Topic: test_topic     Partition: 2    Leader: 101     Replicas: 101,103,104   Isr: 101,103,104

# 查看
# bin/kafka-topics.sh --list --zookeeper localhost:2181
# 删除命令
# bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic project_2

# 查看指定topic节点信息
bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --topic test_topic
###############################################################
Topic:test_topic      PartitionCount:3        ReplicationFactor:3     Configs:
Topic: test_topic     Partition: 0    Leader: 103     Replicas: 103,104,101   Isr: 103,104,101
Topic: test_topic     Partition: 1    Leader: 104     Replicas: 104,101,102   Isr: 104,101,102
Topic: test_topic     Partition: 2    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103

# 发送消息(数据发送自己决策):
# 最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。
bin/kafka-console-producer.sh --broker-list 10.0.26.26:9091,10.0.26.26:9092,10.0.26.26:9093,10.0.26.26:9094 --topic test_topic
23423
^C
# 接收消息(查找数据):
bin/kafka-console-consumer.sh -- 10.0.26.26:9091,10.0.26.26:9092,10.0.26.26:9093,10.0.26.26:9094 --topic test_topic --from-beginning

# 查看topic条数

# 清除所有数据
# rm -fr /tmp/hsperfdata_root/ && rm -fr /data/kafka-logs-s* /data/zookeeper-s* && mkdir /data/zookeeper-s1/ && echo "1" > /data/zookeeper-s1/myid && mkdir /data/zookeeper-s2/ && echo "2" > /data/zookeeper-s2/myid
# 
# rm -fr /tmp/hsperfdata_root/ && rm -fr /data/kafka-logs-s* /data/zookeeper-s* && mkdir /data/zookeeper-s3/ && echo "3" > /data/zookeeper-s3/myid && mkdir /data/zookeeper-s4/ && echo "4" > /data/zookeeper-s4/myid
# 
# # kill java,对应的进程数据会保留
# /tmp/hsperfdata_root/进程ID

八、zookeeper使用文档

# 连接服务器
./bin/zookeeper-shell.sh 127.0.0.1:2281
# 命令帮助
?
# 查看所有功能
ls /

# 开启kafka之前(每个子目录都被称作为znode)
ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]
# 开启kafka之后(znode可以是临时节点,如新增的controller)
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]

# Zookeeper不仅能够帮你维护当前的集群中机器的服务状态,而且能够帮你选出一个“总管”,让这个总管来管理集群,这就是Zookeeper的另一个功能 Leader Election。

九、空间满了处理(减少分区数量)

1、调整副本数量

a、查看集群信息

bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --topic test_topic

Topic:test_topic	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test_topic	Partition: 0	Leader: 103	Replicas: 103,101,102	Isr: 102,103,101
	Topic: test_topic	Partition: 1	Leader: 101	Replicas: 101,102,103	Isr: 102,103,101
	Topic: test_topic	Partition: 2	Leader: 102	Replicas: 102,103,101	Isr: 102,101,103

b、准备配置文件

reassign.json,通过查看集群信息,replicas和Leader值相等即可

{"version":1, "partitions":[
{"topic":"test_topic", "partition":0, "replicas":[103]},
{"topic":"test_topic", "partition":1, "replicas":[101]},
{"topic":"test_topic", "partition":2, "replicas":[102]}
]}

c、执行

bin/kafka-reassign-partitions.sh --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --reassignment-json-file reassign.json --execute

# 此时会打印默认配置文件,用于回滚操作
{"version":1,"partitions":[{"topic":"test_topic","partition":2,"replicas":[102,103,101],"log_dirs":["any","any","any"]},{"topic":"test_topic","partition":1,"replicas":[101,102,103],"log_dirs":["any","any","any"]},{"topic":"test_topic","partition":0,"replicas":[103,101,102],"log_dirs":["any","any","any"]}]}

d、验证

bin/kafka-reassign-partitions.sh --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --reassignment-json-file reassign.json --verify

# 验证结果
Status of partition reassignment: 
Reassignment of partition test_topic-0 completed successfully
Reassignment of partition test_topic-1 completed successfully
Reassignment of partition test_topic-2 completed successfully

2、调整数据删除策略

# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单卡kafka超过此大小会自动清理(此参数需要重启server?)
log.retention.bytes=16106127360

# bin/kafka-topics.sh --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --alter --topic test_topic --config log.retention.bytes=268435456000

十、其它

# 客户端工具
http://www.kafkatool.com/download.html

# 查看kafka节点信息
zookeeper-shell.sh

Kafka Consumer Group ID  区分不同的类型客户方接收数据
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。

# 安全类
http://www.36dsj.com/archives/29526
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
http://docs.confluent.io/2.0.0/kafka/security.html

# 牛逼的文章 很详细
http://www.jianshu.com/p/95f86466a2d4
http://www.jasongj.com/2015/03/10/KafkaColumn1/

# 参考
Zookeeper介绍
http://cailin.iteye.com/blog/2014486/
# 不错,深入
http://orchome.com/kafka/index
http://czj4451.iteye.com/blog/2041096
http://blog.csdn.net/strawbingo/article/details/45484139
http://www.cnblogs.com/itfly8/p/5155983.html
# 实时游戏实践
http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
http://www.it165.net/admin/html/201604/7394.html
# 如何构建安全的Kafka集群
http://www.36dsj.com/archives/29526

customer 顾客 (一般站在商家的立场上说的) 
consumer 消费者(官方,经济学家,统计学家的术语)
# zk介绍
http://blog.csdn.net/damacheng/article/details/42393837
http://www.cnblogs.com/davidwang456/p/4238536.html
# broker实例
http://www.tuicool.com/articles/RNbU32B
# ELK+Kafka 企业日志收集平台
http://blog.sctux.com/?p=445
http://blog.sctux.com/?p=451
http://jasonwilder.com/blog/2013/07/16/centralized-logging-architecture/

问题排查思路
http://www.cnblogs.com/Jack47/p/issues-when-setup-kafka-backed-by-kubernetes-on-aws.html

# 常见错误
报错:
ERROR Invalid config, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
原因: 没有配置对应的id:echo "2" > /data/zookeeper-s2/myid
#

Kafka设计解析
http://www.infoq.com/cn/articles/kafka-analysis-part-1/
http://www.infoq.com/cn/articles/kafka-analysis-part-2/
http://www.infoq.com/cn/articles/kafka-analysis-part-3/
http://www.infoq.com/cn/articles/kafka-analysis-part-4/
# 作者
http://www.jasongj.com/
http://www.jasongj.com/2015/03/10/KafkaColumn1/