filebeat与logstash实践

背景

我们在之前有用过ELK,并详细使用过logstash,作为数据从mysql到es的cdc的传输工具。可查看 ElasticStack-logstash篇
这一次让我们来通过filebeat采集,logstash过滤处理一下日志文件,通过采集日志文件进行数据提取,入库到mongodb

仅采集含有 A large volume of broadcast packets has been detected 内容的数据,并将所需要的数据提取出来入库

示例数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2021-12-01 00:00:07.115 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 50-9A-4C-27-F9-D3, the source IP address is fe80::e8d3:8281:e69e:afda, the destination IP address is ff02::1:3. The number of broadcast packets is equal to or larger than 32 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:00:07.115 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 50-9A-4C-27-F9-D3, the source IP address is 192.168.9.103, the destination IP address is 224.0.0.252. The number of broadcast packets is equal to or larger than 32 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:01:34.923 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 50-9A-4C-27-F9-D3, the source IP address is 192.168.9.103, the destination IP address is 224.0.0.251. The number of broadcast packets is equal to or larger than 40 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:01:34.923 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 50-9A-4C-27-F9-D3, the source IP address is fe80::e8d3:8281:e69e:afda, the destination IP address is ff02::fb. The number of broadcast packets is equal to or larger than 40 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:03:48.133 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 48-4D-7E-BE-B0-87, the source IP address is 192.168.9.21, the destination IP address is 224.0.0.251. The number of broadcast packets is equal to or larger than 52 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:03:48.133 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 48-4D-7E-BE-B0-87, the source IP address is fe80::c129:65df:e7de:f745, the destination IP address is ff02::fb. The number of broadcast packets is equal to or larger than 52 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:03:48.133 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 50-9A-4C-27-F9-D3, the source IP address is 192.168.9.103, the destination IP address is 224.0.0.251. The number of broadcast packets is equal to or larger than 60 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:03:48.133 [HUB "hub_dkwbj"] Session "SID-BRIDGE-5": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 50-9A-4C-27-F9-D3, the source IP address is fe80::e8d3:8281:e69e:afda, the destination IP address is ff02::fb. The number of broadcast packets is equal to or larger than 60 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect).
2021-12-01 00:11:07.141 On the TCP Listener (Port 5555), a Client (IP address 167.248.133.58, Host name "scanner-09.ch1.censys-scanner.com", Port number 40418) has connected.
2021-12-01 00:11:07.141 For the client (IP address: 167.248.133.58, host name: "scanner-09.ch1.censys-scanner.com", port number: 40418), connection "CID-8671" has been created.
2021-12-01 00:11:08.058 Connection "CID-8671" has been terminated.
2021-12-01 00:11:08.058 The connection with the client (IP address 167.248.133.58, Port number 40418) has been disconnected.
2021-12-01 00:11:08.289 On the TCP Listener (Port 5555), a Client (IP address 167.248.133.58, Host name "scanner-09.ch1.censys-scanner.com", Port number 34038) has connected.
2021-12-01 00:11:08.289 For the client (IP address: 167.248.133.58, host name: "scanner-09.ch1.censys-scanner.com", port number: 34038), connection "CID-8672" has been created.
2021-12-01 00:11:08.531 SSL communication for connection "CID-8672" has been started. The encryption algorithm name is "AES128-SHA".
2021-12-01 00:11:10.011 Connection "CID-8672" terminated by the cause "A client which is non-SoftEther VPN software has connected to the port." (code 5).

创建docker网络

1
docker network create --driver bridge leiqin

制作包含logstash-output-mongodb的logstash镜像包

创建安装了logstash-output-mongodb的镜像包dockerfile文件
logstash.dockerfile文件

1
2
FROM docker.elastic.co/logstash/logstash:7.13.0
RUN logstash-plugin install --version=3.1.5 logstash-output-mongodb

打包自己的logstash镜像

1
docker build -f logstash.dockerfile -t dakewe/logstash:1.0 .

docker-compose定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
version: '3.0'
services:

filebeat:
image: docker.elastic.co/beats/filebeat:7.13.0
container_name: filebeat
volumes:
- ./filebeat/config/filebeat.yml:/usr/share/filebeat/filebeat.yml
- ./filebeat/data:/usr/share/filebeat/data
- ./filebeat/logs:/usr/share/filebeat/logs
- ./filebeat/logfiles:/usr/share/filebeat/logfiles
environment:
LS_JAVA_OPTS: "-Xmx1024m -Xms1024m"
networks:
- leiqin

logstash:
image: dakewe/logstash:1.0
container_name: logstash
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
environment:
LS_JAVA_OPTS: "-Xmx1024m -Xms1024m"
networks:
- leiqin

networks:
leiqin:
external: true

安装logstash-output-mongodb(使用官方镜像包情况)

tip: 如果是安装的官方的镜像包,安装后,请进入容器内安装logstash-output-mongodb

不要安装3.1.6新版本,请指定3.1.5版本。具体的坑详见:Github作者回复

1
bin/logstash-plugin install --version=3.1.5 logstash-output-mongodb

fitebeat 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

filebeat.inputs:
- type: filestream
enabled: true
paths:
- /usr/share/filebeat/logfiles/*.log
include_lines: ['A large volume of broadcast packets has been detected']

filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false

output.logstash:
# The Logstash hosts
hosts: ["logstash:5044"]

logstash输出打印到终端

我们先让filebeat的文件到logstash直接输出处理

1
2
3
4
5
6
7
8
9
10
11
input {
beats {
port => 5044
}
}

output {
stdout {
codec => rubydebug
}
}

logstash 输出到mongodb

在logstash过滤,入库到mongodb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
input {
beats {
port => 5044
}
}

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} \[HUB \"%{NOTSPACE:hub}\"\] Session \"%{NOTSPACE:session}\": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is %{NOTSPACE:mac_address}, the source IP address is %{IP:source_ip}, the destination IP address is %{IP:destination_ip}. The number of broadcast packets is equal to or larger than %{NUMBER:items_per_second} items per 1 second "}
}

grok {
match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<file_name>.*).*"}
}

date {
match => [ "time","ISO8601"]
timezone => "Asia/Chongqing"
target => "created_at"
}

mutate{
remove_field => ["host"]
remove_field => ["agent"]
remove_field => ["input"]
remove_field => ["tags"]
remove_field => ["ecs"]
remove_field => ["time"]
remove_field => ["log"]
}
}

output {
stdout {
codec => rubydebug
}

mongodb {
collection => "vpn_log"
generateId => "true"
database => "log"
uri => "mongodb://localhost:27017"
}
}

总结

较为简单,如果配合elk,效果更佳。

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践