假设有两个模块分别叫shopauth和shoporder,日志文件也对应分别是shopauth.log
和shoporder.log
,现在需要把日志对应放到同个kafka里不同的topic里。
shopauth的filebeat.yml
内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19filebeat.prospectors:
- type: log
enabled: true
paths:
- /mnt/shopauth/logs/shopauth.log #日志路径
tags: ["logmessages"]
encoding: utf-8
scan_frequency: 10s
harvester_buffer_size: 15000
tail_files: true
fields:
alilogtype: app_log
serverip: 172.16.0.201 #本地IP地址
log_topics: ecnode-log
output.kafka:
enabled: true
hosts: ["172.31.0.84:9092"] #指定kafka的地址信息
topic: '%{[fields][log_topics]}' #自动识别topic
如果只有一个log源输入日志(不是指paths只有一个路径,paths可以同时指定多个路径),那么output.kafka
的topic当然可以写死。如果是多个log源写入日志,而且又要对应输入到kafka的topic里,可以使用'%{[fields][log_topics]}'
,达到自动识别的目的。
但是要注意!如果加上了fields_under_root: true
,那么'%{[fields][log_topics]}'
是错误的,要改成topic: "%{[log_topic]}"
才可以。
shoporder的filebeat.yml
内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18filebeat.prospectors:
- type: log
enabled: true
paths:
- /mnt/shoporder/logs/shoporder.log #指定路径
tags: ["logmessages"]
encoding: utf-8
scan_frequency: 10s
harvester_buffer_size: 15000
tail_files: true
fields:
alilogtype: app_log
serverip: 172.16.0.207 #本地IP地址
output.kafka:
enabled: true
hosts: ["172.31.0.84:9092"] #指定kafka的地址信息
topic: 'shoporder-log' #指定topic
启动filebeat和kafka,kafka的配置这里略过不表。在kafka里查看,发现已经生成对应的topic:
而logstash.yml
内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27input {
kafka{
bootstrap_servers=>"172.31.0.84:9092" #指定kafka的IP和端口
topics=>["shoporder-log","shopauth-log"] #说明从这两个topic里消费
decorate_events=>"true" #这个很重要,等会细说
codec=>plain
}
}
filter { #利用kafka域的内容构建自定义的域
if [@metadata][kafka][topic] == "shopauth-log" {
mutate {
add_field => {"[@metadata][index]" => "shopauth-log-%{+YYYY-MM}"}
}
} else if [@metadata][kafka][topic] == "shoporder-log" {
mutate {
add_field => {"[@metadata][index]" => "shoporder-log-%{+YYYY-MM}"}
}
}
}
output {
elasticsearch {
hosts=>["172.31.0.76:9200"] #指定es的IP和端口
index=>"%{[@metadata][index]}" #对应生成各自的index
}
}
说一下decorate_events
,如果只用了单个logstash,订阅了多个主题,你肯定希望在es中为不同主题创建不同的索引,那么decorate_events
就是你想要的配置项。这个值默认是false
,当指定这个选项为true
时,会附加kafka的一些信息到logstash event
的一个名为kafka的域(Metadata fields
)中,然后再通过filter进行判断,如果是从shopauth-log
这个topic得到信息,就把index设定成shopauth-log-%{+YYYY-MM}"
。
启动logstash,待其正常之后,在es的控制台查看日志,发现index索引已经成功生成:
现在在kibana里就可以创建索引然后查找了!