搭建Kakfa2.11为ELK服务

准备工作

试验机器:阿里云centos 7.5,IP地址是172.31.0.84

本文是单台kafka+zookeeper架构,如果土豪可以尝试3台zookeeper+3台kafka。

1
2
3
4
5
yum install java-1.8.0-openjdk* -y
wget http://apache.website-solution.net/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -zxvf kafka_2.11-2.1.0.tgz -C /opt
wget http://apache.website-solution.net/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz -C /usr/local

其实最新的kafka里面已经包括zookeeper了,不过我习惯了单独启动zookeeper,还是单独下载单独配置单独启动。

启动zookeeper

首先先去zookeeper下的conf文件夹里编写配置文件:

1
2
/usr/local/zookeeper-3.4.10/conf
cp zoo_sample.cfg zoo.cfg

然后编辑zoo.cfg,把最下面几行改成这样:

1
2
3
4
5
autopurge.snapRetainCount=3		#保留3个文件
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1 #保留一小时以内的日志
server.1=172.31.0.84:2888:3888 #本机IP地址

2888端口:表示的是这个服务器与集群中的Leader服务器交换信息的端口;
3888端口:表示的是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

然后回到/usr/local/zookeeper-3.4.10/bin里,执行./zkServer.sh start,执行完毕之后,再用./zkServer.sh status检查一下状态,由于是单台,所以状态应该是standalone

启动kafka

同zookeeper一样,先去kafka的conf文件夹/opt/kafka_2.11-2.1.0/config里,在配置文件zookeeper.properties最下面加上如下几句话:

1
2
3
4
tickTime=2000
initLimit=20
syncLimit=10
server.1=172.31.0.84:2888:3888 #zookeeper的地址,也就是本机地址

tickTime:这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳。

修改好了zookeeper.properties之后,才是正式的kafka配置文件server.properties

1
2
3
4
5
6
7
8
broker.id=1
port=9092 #broker监听的端口
host.name=172.31.0.84 #填服务器 IP
log.dir = / data / kafka - logs # 该目录可以不用提前创建,在启动时自己会创建
zookeeper.connect = 172.31.0.84:2181 # 这个就是zookeeper的ip及端口
num.partitions = 16 # 需要配置较大 分片影响读写速度
log.dirs = /data/kafka-logs # 数据目录也要单独配置磁盘较大的地方
log.retention.hours = 168 # 时间按需求保留过期时间,避免磁盘满

确认zookeeper状态是启动之后,./bin/kafka-server-start.sh ./config/server.properties &来启动Kafka服务,然后检查一下端口9092是否正常打开(9092端口不会立即启动,需要等待一会时间)。

kafka简单操作语句

一些常用的操作语句(前提是你在kafka的bin文件夹内)如下:

1
2
3
4
5
6
7
8
9
10
11
12
#创建topic
./kafka-topics.sh --create --zookeeper zookeeperIP地址:2181 --replication-factor 1 --partitions 1 --topic chenshuotest #factor大小不能超过broker的个数
#查看topic
./kafka-topics.sh --list --zookeeper zookeeperIP地址:2181
#在topic里增加信息
./kafka-console-producer.sh --broker-list kafkaIP地址:9092 --topic chenshuotest
#消费掉topic里的信息,需要在另外一个xshell窗口界面操作
./kafka-console-consumer.sh --bootstrap-server kafkaIP地址9092 --topic chenshuotest --from-beginning
#查看group
./kafka-consumer-groups.sh --bootstrap-server kafkaIP地址:9092 --list
#查看消费情况
./kafka-consumer-groups.sh --bootstrap-server kafkaIP地址:9092 --describe --group logstash

kafka与mq类似,都有一个“入队”、“出队”、“堆积值”的展示,如下:
akb48

简单说一下上面这个图:

  1. PARTITION:分区编号,它与server.properties里的num.partitions是一致的,这个值不是越大越好,会增加不可用的风险,而且过多的分区意味要打开过多的句柄,增加点对点的延迟;
  2. CURRENT-OFFSET:表示消费者组最新消费的位移值;
  3. LOG-END-OFFSET:表示topic所有分区当前的日志终端位移值,即总生产值;
  4. LAG:表示有多少条message没有被消费;

如果想用zabbix监控LAG值,很简单:

1
./kafka-consumer-groups.sh --bootstrap-server kafkaIP地址:9092 --describe --group logstash | grep 目标topic |awk  '{print $5}'

参考文档

http://www.cnblogs.com/JetpropelledSnake/p/10057545.html (zookeeper+kafka集群的配置,请看这里)

akb48

感谢您请我喝咖啡~O(∩_∩)O,如果要联系请直接发我邮箱chenx1242@163.com,我会回复你的
-------------本文结束感谢您的阅读-------------