ES对应Kafka的topic生成不同的Index

假设有两个模块分别叫shopauth和shoporder,日志文件也对应分别是shopauth.logshoporder.log,现在需要把日志对应放到同个kafka里不同的topic里。

shopauth的filebeat.yml内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
filebeat.prospectors:
- type: log
enabled: true
paths:
- /mnt/shopauth/logs/shopauth.log #日志路径
tags: ["logmessages"]
encoding: utf-8
scan_frequency: 10s
harvester_buffer_size: 15000
tail_files: true
fields:
alilogtype: app_log
serverip: 172.16.0.201 #本地IP地址
log_topics: ecnode-log

output.kafka:
enabled: true
hosts: ["172.31.0.84:9092"] #指定kafka的地址信息
topic: '%{[fields][log_topics]}' #自动识别topic

如果只有一个log源输入日志(不是指paths只有一个路径,paths可以同时指定多个路径),那么output.kafka的topic当然可以写死。如果是多个log源写入日志,而且又要对应输入到kafka的topic里,可以使用'%{[fields][log_topics]}',达到自动识别的目的。

但是要注意!如果加上了fields_under_root: true,那么'%{[fields][log_topics]}'是错误的,要改成topic: "%{[log_topic]}"才可以。

shoporder的filebeat.yml内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
filebeat.prospectors:
- type: log
enabled: true
paths:
- /mnt/shoporder/logs/shoporder.log #指定路径
tags: ["logmessages"]
encoding: utf-8
scan_frequency: 10s
harvester_buffer_size: 15000
tail_files: true
fields:
alilogtype: app_log
serverip: 172.16.0.207 #本地IP地址

output.kafka:
enabled: true
hosts: ["172.31.0.84:9092"] #指定kafka的地址信息
topic: 'shoporder-log' #指定topic

启动filebeat和kafka,kafka的配置这里略过不表。在kafka里查看,发现已经生成对应的topic:
akb48

logstash.yml内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
input {
kafka{
bootstrap_servers=>"172.31.0.84:9092" #指定kafka的IP和端口
topics=>["shoporder-log","shopauth-log"] #说明从这两个topic里消费
decorate_events=>"true" #这个很重要,等会细说
codec=>plain
}
}

filter { #利用kafka域的内容构建自定义的域
if [@metadata][kafka][topic] == "shopauth-log" {
mutate {
add_field => {"[@metadata][index]" => "shopauth-log-%{+YYYY-MM}"}
}
} else if [@metadata][kafka][topic] == "shoporder-log" {
mutate {
add_field => {"[@metadata][index]" => "shoporder-log-%{+YYYY-MM}"}
}
}
}

output {
elasticsearch {
hosts=>["172.31.0.76:9200"] #指定es的IP和端口
index=>"%{[@metadata][index]}" #对应生成各自的index
}
}

说一下decorate_events,如果只用了单个logstash,订阅了多个主题,你肯定希望在es中为不同主题创建不同的索引,那么decorate_events就是你想要的配置项。这个值默认是false,当指定这个选项为true时,会附加kafka的一些信息到logstash event的一个名为kafka的域(Metadata fields)中,然后再通过filter进行判断,如果是从shopauth-log这个topic得到信息,就把index设定成shopauth-log-%{+YYYY-MM}"

启动logstash,待其正常之后,在es的控制台查看日志,发现index索引已经成功生成:
akb48

现在在kibana里就可以创建索引然后查找了!

感谢您请我喝咖啡~O(∩_∩)O,如果要联系请直接发我邮箱chenx1242@163.com,我会回复你的
-------------本文结束感谢您的阅读-------------