假设有两个模块分别叫shopauth和shoporder,日志文件也对应分别是shopauth.log
和shoporder.log
,现在需要把日志对应放到同个kafka里不同的topic里。
shopauth的filebeat.yml
内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| filebeat.prospectors: - type: log enabled: true paths: - /mnt/shopauth/logs/shopauth.log #日志路径 tags: ["logmessages"] encoding: utf-8 scan_frequency: 10s harvester_buffer_size: 15000 tail_files: true fields: alilogtype: app_log serverip: 172.16.0.201 #本地IP地址 log_topics: ecnode-log
output.kafka: enabled: true hosts: ["172.31.0.84:9092"] #指定kafka的地址信息 topic: '%{[fields][log_topics]}' #自动识别topic
|
如果只有一个log源输入日志(不是指paths只有一个路径,paths可以同时指定多个路径),那么output.kafka
的topic当然可以写死。如果是多个log源写入日志,而且又要对应输入到kafka的topic里,可以使用'%{[fields][log_topics]}'
,达到自动识别的目的。
但是要注意!如果加上了fields_under_root: true
,那么'%{[fields][log_topics]}'
是错误的,要改成topic: "%{[log_topic]}"
才可以。
shoporder的filebeat.yml
内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| filebeat.prospectors: - type: log enabled: true paths: - /mnt/shoporder/logs/shoporder.log #指定路径 tags: ["logmessages"] encoding: utf-8 scan_frequency: 10s harvester_buffer_size: 15000 tail_files: true fields: alilogtype: app_log serverip: 172.16.0.207 #本地IP地址
output.kafka: enabled: true hosts: ["172.31.0.84:9092"] #指定kafka的地址信息 topic: 'shoporder-log' #指定topic
|
启动filebeat和kafka,kafka的配置这里略过不表。在kafka里查看,发现已经生成对应的topic:

akb48
而logstash.yml
内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| input { kafka{ bootstrap_servers=>"172.31.0.84:9092" #指定kafka的IP和端口 topics=>["shoporder-log","shopauth-log"] #说明从这两个topic里消费 decorate_events=>"true" #这个很重要,等会细说 codec=>plain } }
filter { #利用kafka域的内容构建自定义的域 if [@metadata][kafka][topic] == "shopauth-log" { mutate { add_field => {"[@metadata][index]" => "shopauth-log-%{+YYYY-MM}"} } } else if [@metadata][kafka][topic] == "shoporder-log" { mutate { add_field => {"[@metadata][index]" => "shoporder-log-%{+YYYY-MM}"} } } }
output { elasticsearch { hosts=>["172.31.0.76:9200"] #指定es的IP和端口 index=>"%{[@metadata][index]}" #对应生成各自的index } }
|
说一下decorate_events
,如果只用了单个logstash,订阅了多个主题,你肯定希望在es中为不同主题创建不同的索引,那么decorate_events
就是你想要的配置项。这个值默认是false
,当指定这个选项为true
时,会附加kafka的一些信息到logstash event
的一个名为kafka的域(Metadata fields
)中,然后再通过filter进行判断,如果是从shopauth-log
这个topic得到信息,就把index设定成shopauth-log-%{+YYYY-MM}"
。
启动logstash,待其正常之后,在es的控制台查看日志,发现index索引已经成功生成:

akb48
现在在kibana里就可以创建索引然后查找了!