Kafka-集群安装文档(单机版)
kafka集群安装文档
一、架构
# zookeeper和kafka架构
producer producer producer
| | |
--------------------
|
zookeeper zookeeper zookeeper (zookeeper支持failover,至少需要三个zookeeper)
|
--------------------
| | |
kafka kafka kafka(broker)
# kafka架构
push pull
producer---->broker---->consumer
flume kafka logstash-->elasticsearch-->kibana
二、安装
http://kafka.apache.org/downloads.html
# wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
yum -y install java-1.8.0-openjdk.x86_64
tar xzf kafka_2.12-2.2.0.tgz && mv kafka_2.12-2.2.0 /usr/local/kafka && cd /usr/local/kafka
三、配置zookeeper集群
# zk建议是单数
# http://cailin.iteye.com/blog/2014486/
# http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
# zookeeper集群解释(自动冗余broker的信息)
# zookeeper支持failover,至少需要三个(2n+1)zookeeper
# server.zk服务器标识=IP或hostname:选举leader端口(zk之间的通讯):接收选举数据通信端口,还有一个客户端的端口2181,这三个端口不要混淆
# 注意下面的任意端口不要冲突
######## zookeeper1.properties ########
f=config/zookeeper1.properties
[ -f $f ] || cat > $f << EOF
dataDir=/data/zookeeper-s1
clientPort=2181
maxClientCnxns=20
# zookeeper集群配置信息
initLimit=5
syncLimit=2
server.1=10.0.26.26:3181:3881
server.2=10.0.26.26:3182:3882
server.3=10.0.26.26:3183:3883
EOF
######## zookeeper2.properties ########
f=config/zookeeper2.properties
[ -f $f ] || cat > $f << EOF
dataDir=/data/zookeeper-s2
clientPort=2182
maxClientCnxns=20
# zookeeper集群配置信息
initLimit=5
syncLimit=2
server.1=10.0.26.26:3181:3881
server.2=10.0.26.26:3182:3882
server.3=10.0.26.26:3183:3883
EOF
######## zookeeper3.properties ########
f=config/zookeeper3.properties
[ -f $f ] || cat > $f << EOF
dataDir=/data/zookeeper-s3
clientPort=2183
maxClientCnxns=20
# zookeeper集群配置信息
initLimit=5
syncLimit=2
server.1=10.0.26.26:3181:3881
server.2=10.0.26.26:3182:3882
server.3=10.0.26.26:3183:3883
EOF
# 对应的zookeeper的data目录下面声明自己的id(上面server.x中x即对应的id)
mkdir /data/zookeeper-s1/ && echo "1" > /data/zookeeper-s1/myid
mkdir /data/zookeeper-s2/ && echo "2" > /data/zookeeper-s2/myid
mkdir /data/zookeeper-s3/ && echo "3" > /data/zookeeper-s3/myid
# rm -fr /data/kafka-logs-s* /data/zookeeper-s*
# 如何查看zookeeper为主呢
四、配置broker
# (Replication策略需要自己优化)
#advertised.listeners=PLAINTEXT://your.host.name:9092 这个值配置ip端口 主要用于返回给生产者和消费者
######## server1.properties ########
f=config/server1.properties
[ -f $f ] || cat > $f << EOF
broker.id=101
# delete.topic.enable=true # 调试可以开启删除topic
listeners=PLAINTEXT://:9091
advertised.listeners=PLAINTEXT://10.0.26.26:9091
log.dirs=/data/kafka-logs-s1
zookeeper.connect=10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单kafka超过此大小会自动清理
log.retention.bytes=16106127360
EOF
######## server2.properties ########
f=config/server2.properties
[ -f $f ] || cat > $f << EOF
broker.id=102
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://10.0.26.26:9092
log.dirs=/data/kafka-logs-s2
zookeeper.connect=10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单kafka超过此大小会自动清理
log.retention.bytes=16106127360
EOF
######## server3.properties ########
f=config/server3.properties
[ -f $f ] || cat > $f << EOF
broker.id=103
listeners=PLAINTEXT://:9093
advertised.listeners=PLAINTEXT://10.0.26.26:9093
log.dirs=/data/kafka-logs-s3
zookeeper.connect=10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单kafka超过此大小会自动清理
log.retention.bytes=16106127360
EOF
五、启动
# 先启动zookeeper(Kafka uses ZooKeeper so you need to first start a ZooKeeper server)
# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# bin/kafka-server-start.sh -daemon config/server.properties
# 前台打印日志调试(daemon参数用于后台)
# 启动okeeper
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper1.properties
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper2.properties
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper3.properties
# 只能关闭所有zookeeper,单独的话,需要手动处理
# /usr/local/kafka/bin/zookeeper-server-stop.sh
# 查看日志 tailf
tailf logs/zookeeper.out
# 当获取到Follower信息时,说明此zookeeper为leader
INFO leader and follower are in sync, zxid=0x0
INFO Received NEWLEADER-ACK message from 1
INFO Received NEWLEADER-ACK message from 2
INFO Follower sid: 2
INFO Follower sid: 1
# 非leader关键字
INFO Getting a diff from the leader 0x0
# broker(调整内存)
vim /usr/local/kafka/bin/bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
# exec命令之前声明
# exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
# 启动kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server1.properties
/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server2.properties
/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server3.properties
# 只能关闭所有kafka,单独的话,需要手动处理
# ./bin/kafka-server-stop.sh
# 查看日志
tailf logs/server.log
#
INFO New leader is 101 # 101上面的日志,说明此服为leader
六、常见分配
# 单机版(省成本),多分区,跟进项目需求来确定分区数量(broker)
# 下面为统计系统,单个物理服务器,只需要3个分区做并发的效果
# 创建topic
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --replication-factor 1 --partitions 3 --topic zaza
# 查看
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# 详细信息
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 zaza
七、测试
#负载均衡和容错
#Producer和broker之间没有负载均衡机制。
#broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
## 对于producer的broker是没有集群的概念,必须制定多个broker
#bin/kafka-console-producer.sh --broker-list 10.0.26.26:9092,10.0.26.26:9092,10.0.26.26:9092 --topic my-replicated-topic
# 创建topic(nginx-access/mysql-slow-log/dldl-ios):
# replication-factor表示副本的份数,不能超过broker总量(kafka),一般副本建议两个?
# partitions为分区数量,且每个分区的消息内容不会(提高性能)重复(分区可能在多个broker上面)如:/data/kafka-logs/test_topic-0 /data/kafka-logs/test_topic-1
# 单机版,一个副本就行了
bin/kafka-topics.sh --create --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --replication-factor 1 --partitions 3 --topic test_topic
# bin/kafka-topics.sh --delete --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --topic test_topic
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test_topic
# 查看Topic:
bin/kafka-topics.sh --list --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
# 显示所有Topic
bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183
###############################################################
Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test_topic Partition: 0 Leader: 103 Replicas: 103,104,101 Isr: 103,104,101
Topic: test_topic Partition: 1 Leader: 104 Replicas: 104,101,102 Isr: 104,101,102
Topic: test_topic Partition: 2 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test_topic Partition: 0 Leader: 103 Replicas: 103,101,102 Isr: 103,101,102
Topic: test_topic Partition: 1 Leader: 104 Replicas: 104,102,103 Isr: 104,102,103
Topic: test_topic Partition: 2 Leader: 101 Replicas: 101,103,104 Isr: 101,103,104
# 查看
# bin/kafka-topics.sh --list --zookeeper localhost:2181
# 删除命令
# bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic project_2
# 查看指定topic节点信息
bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --topic test_topic
###############################################################
Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test_topic Partition: 0 Leader: 103 Replicas: 103,104,101 Isr: 103,104,101
Topic: test_topic Partition: 1 Leader: 104 Replicas: 104,101,102 Isr: 104,101,102
Topic: test_topic Partition: 2 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
# 发送消息(数据发送自己决策):
# 最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。
bin/kafka-console-producer.sh --broker-list 10.0.26.26:9091,10.0.26.26:9092,10.0.26.26:9093,10.0.26.26:9094 --topic test_topic
23423
^C
# 接收消息(查找数据):
bin/kafka-console-consumer.sh -- 10.0.26.26:9091,10.0.26.26:9092,10.0.26.26:9093,10.0.26.26:9094 --topic test_topic --from-beginning
# 查看topic条数
# 清除所有数据
# rm -fr /tmp/hsperfdata_root/ && rm -fr /data/kafka-logs-s* /data/zookeeper-s* && mkdir /data/zookeeper-s1/ && echo "1" > /data/zookeeper-s1/myid && mkdir /data/zookeeper-s2/ && echo "2" > /data/zookeeper-s2/myid
#
# rm -fr /tmp/hsperfdata_root/ && rm -fr /data/kafka-logs-s* /data/zookeeper-s* && mkdir /data/zookeeper-s3/ && echo "3" > /data/zookeeper-s3/myid && mkdir /data/zookeeper-s4/ && echo "4" > /data/zookeeper-s4/myid
#
# # kill java,对应的进程数据会保留
# /tmp/hsperfdata_root/进程ID
八、zookeeper使用文档
# 连接服务器
./bin/zookeeper-shell.sh 127.0.0.1:2281
# 命令帮助
?
# 查看所有功能
ls /
# 开启kafka之前(每个子目录都被称作为znode)
ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]
# 开启kafka之后(znode可以是临时节点,如新增的controller)
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]
# Zookeeper不仅能够帮你维护当前的集群中机器的服务状态,而且能够帮你选出一个“总管”,让这个总管来管理集群,这就是Zookeeper的另一个功能 Leader Election。
九、空间满了处理(减少分区数量)
1、调整副本数量
a、查看集群信息
bin/kafka-topics.sh --describe --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test_topic Partition: 0 Leader: 103 Replicas: 103,101,102 Isr: 102,103,101
Topic: test_topic Partition: 1 Leader: 101 Replicas: 101,102,103 Isr: 102,103,101
Topic: test_topic Partition: 2 Leader: 102 Replicas: 102,103,101 Isr: 102,101,103
b、准备配置文件
reassign.json,通过查看集群信息,replicas和Leader值相等即可
{"version":1, "partitions":[
{"topic":"test_topic", "partition":0, "replicas":[103]},
{"topic":"test_topic", "partition":1, "replicas":[101]},
{"topic":"test_topic", "partition":2, "replicas":[102]}
]}
c、执行
bin/kafka-reassign-partitions.sh --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --reassignment-json-file reassign.json --execute
# 此时会打印默认配置文件,用于回滚操作
{"version":1,"partitions":[{"topic":"test_topic","partition":2,"replicas":[102,103,101],"log_dirs":["any","any","any"]},{"topic":"test_topic","partition":1,"replicas":[101,102,103],"log_dirs":["any","any","any"]},{"topic":"test_topic","partition":0,"replicas":[103,101,102],"log_dirs":["any","any","any"]}]}
d、验证
bin/kafka-reassign-partitions.sh --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --reassignment-json-file reassign.json --verify
# 验证结果
Status of partition reassignment:
Reassignment of partition test_topic-0 completed successfully
Reassignment of partition test_topic-1 completed successfully
Reassignment of partition test_topic-2 completed successfully
2、调整数据删除策略
# log.retention.hours=168 消息默认保留7天
# 150G:16106127360;250G:268435456000;300G:322122547200
# 单卡kafka超过此大小会自动清理(此参数需要重启server?)
log.retention.bytes=16106127360
# bin/kafka-topics.sh --zookeeper 10.0.26.26:2181,10.0.26.26:2182,10.0.26.26:2183 --alter --topic test_topic --config log.retention.bytes=268435456000
十、其它
# 客户端工具
http://www.kafkatool.com/download.html
# 查看kafka节点信息
zookeeper-shell.sh
Kafka Consumer Group ID 区分不同的类型客户方接收数据
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。
# 安全类
http://www.36dsj.com/archives/29526
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
http://docs.confluent.io/2.0.0/kafka/security.html
# 牛逼的文章 很详细
http://www.jianshu.com/p/95f86466a2d4
http://www.jasongj.com/2015/03/10/KafkaColumn1/
# 参考
Zookeeper介绍
http://cailin.iteye.com/blog/2014486/
# 不错,深入
http://orchome.com/kafka/index
http://czj4451.iteye.com/blog/2041096
http://blog.csdn.net/strawbingo/article/details/45484139
http://www.cnblogs.com/itfly8/p/5155983.html
# 实时游戏实践
http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
http://www.it165.net/admin/html/201604/7394.html
# 如何构建安全的Kafka集群
http://www.36dsj.com/archives/29526
customer 顾客 (一般站在商家的立场上说的)
consumer 消费者(官方,经济学家,统计学家的术语)
# zk介绍
http://blog.csdn.net/damacheng/article/details/42393837
http://www.cnblogs.com/davidwang456/p/4238536.html
# broker实例
http://www.tuicool.com/articles/RNbU32B
# ELK+Kafka 企业日志收集平台
http://blog.sctux.com/?p=445
http://blog.sctux.com/?p=451
http://jasonwilder.com/blog/2013/07/16/centralized-logging-architecture/
问题排查思路
http://www.cnblogs.com/Jack47/p/issues-when-setup-kafka-backed-by-kubernetes-on-aws.html
# 常见错误
报错:
ERROR Invalid config, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
原因: 没有配置对应的id:echo "2" > /data/zookeeper-s2/myid
#
Kafka设计解析
http://www.infoq.com/cn/articles/kafka-analysis-part-1/
http://www.infoq.com/cn/articles/kafka-analysis-part-2/
http://www.infoq.com/cn/articles/kafka-analysis-part-3/
http://www.infoq.com/cn/articles/kafka-analysis-part-4/
# 作者
http://www.jasongj.com/
http://www.jasongj.com/2015/03/10/KafkaColumn1/
- 原文作者:zaza
- 原文链接:https://zazayaya.github.io/2021/03/11/kafka-one-instance-cluster-install.html
- 说明:转载本站文章请标明出处,部分资源来源于网络,如有侵权请及时与我联系!