准备工作
试验机器:阿里云centos 7.5
,IP地址是172.31.0.84
。
本文是单台kafka+zookeeper
架构,如果土豪可以尝试3台zookeeper+3台kafka。
1
2
3
4
5yum install java-1.8.0-openjdk* -y
wget http://apache.website-solution.net/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -zxvf kafka_2.11-2.1.0.tgz -C /opt
wget http://apache.website-solution.net/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz -C /usr/local
其实最新的kafka里面已经包括zookeeper了,不过我习惯了单独启动zookeeper,还是单独下载单独配置单独启动。
启动zookeeper
首先先去zookeeper下的conf文件夹里编写配置文件:
1
2/usr/local/zookeeper-3.4.10/conf
cp zoo_sample.cfg zoo.cfg
然后编辑zoo.cfg
,把最下面几行改成这样:
1
2
3
4
5autopurge.snapRetainCount=3 #保留3个文件
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1 #保留一小时以内的日志
server.1=172.31.0.84:2888:3888 #本机IP地址
2888端口:表示的是这个服务器与集群中的Leader服务器交换信息的端口;
3888端口:表示的是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
然后回到/usr/local/zookeeper-3.4.10/bin
里,执行./zkServer.sh start
,执行完毕之后,再用./zkServer.sh status
检查一下状态,由于是单台,所以状态应该是standalone
。
启动kafka
同zookeeper一样,先去kafka的conf文件夹/opt/kafka_2.11-2.1.0/config
里,在配置文件zookeeper.properties
最下面加上如下几句话:
1
2
3
4tickTime=2000
initLimit=20
syncLimit=10
server.1=172.31.0.84:2888:3888 #zookeeper的地址,也就是本机地址
tickTime:这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳。
修改好了zookeeper.properties
之后,才是正式的kafka配置文件server.properties
:
1
2
3
4
5
6
7
8broker.id=1
port=9092 #broker监听的端口
host.name=172.31.0.84 #填服务器 IP
log.dir = / data / kafka - logs # 该目录可以不用提前创建,在启动时自己会创建
zookeeper.connect = 172.31.0.84:2181 # 这个就是zookeeper的ip及端口
num.partitions = 16 # 需要配置较大 分片影响读写速度
log.dirs = /data/kafka-logs # 数据目录也要单独配置磁盘较大的地方
log.retention.hours = 168 # 时间按需求保留过期时间,避免磁盘满
确认zookeeper状态是启动之后,./bin/kafka-server-start.sh ./config/server.properties &
来启动Kafka服务,然后检查一下端口9092是否正常打开(9092端口不会立即启动,需要等待一会时间)。
kafka简单操作语句
一些常用的操作语句(前提是你在kafka的bin文件夹内)如下:
1
2
3
4
5
6
7
8
9
10
11
12#创建topic
./kafka-topics.sh --create --zookeeper zookeeperIP地址:2181 --replication-factor 1 --partitions 1 --topic chenshuotest #factor大小不能超过broker的个数
#查看topic
./kafka-topics.sh --list --zookeeper zookeeperIP地址:2181
#在topic里增加信息
./kafka-console-producer.sh --broker-list kafkaIP地址:9092 --topic chenshuotest
#消费掉topic里的信息,需要在另外一个xshell窗口界面操作
./kafka-console-consumer.sh --bootstrap-server kafkaIP地址9092 --topic chenshuotest --from-beginning
#查看group
./kafka-consumer-groups.sh --bootstrap-server kafkaIP地址:9092 --list
#查看消费情况
./kafka-consumer-groups.sh --bootstrap-server kafkaIP地址:9092 --describe --group logstash
kafka与mq类似,都有一个“入队”、“出队”、“堆积值”的展示,如下:
简单说一下上面这个图:
- PARTITION:分区编号,它与
server.properties
里的num.partitions
是一致的,这个值不是越大越好,会增加不可用的风险,而且过多的分区意味要打开过多的句柄,增加点对点的延迟; - CURRENT-OFFSET:表示消费者组最新消费的位移值;
- LOG-END-OFFSET:表示topic所有分区当前的日志终端位移值,即总生产值;
- LAG:表示有多少条message没有被消费;
如果想用zabbix监控LAG值,很简单:
1
./kafka-consumer-groups.sh --bootstrap-server kafkaIP地址:9092 --describe --group logstash | grep 目标topic |awk '{print $5}'
参考文档
http://www.cnblogs.com/JetpropelledSnake/p/10057545.html (zookeeper+kafka集群的配置,请看这里)