filebeat之pipeline实践

背景

上一次我们使用filebeat进行数据采集 filebeat与logstash实践,传输到logstash,并使用的logstash进行数据的过滤处理,本着能减少一个环节就是一个环节,这次我们就省去logstash这个环节,使用filebeats的pipeline的功能来做一次数据处理,并直接入库到es.

本次,我们依然使用的上一次的示例日志数据,整个过程如下

  • 通过kibana的开发工具,在es中进行添加 pipeline,当然你也可以使用es的api进行添加。
  • 在filebeat中定义使用的pipeline名称。
  • 当filebeat采集到数据后,直接发往es,es进行入库前的pipeline处理。

实践

filebeat 定义输出 elasticsearch和pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
filebeat.inputs:
- type: filestream
enabled: true
paths:
- /usr/share/filebeat/logfiles/*.log
include_lines: ['A large volume of broadcast packets has been detected']
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
setup.ilm.enabled: false
setup.template.name: "vpn-log"
setup.template.pattern: "vpn-log-*"
output.elasticsearch:
index: "vpn-log-%{+yyyy-MM-dd}"
hosts: ["10.8.99.34:9200"]
username: "elastic"
password: "XXX"
pipeline: "vpn_log_pipeline"
processors:
- drop_fields:
fields:
- agent.ephemeral_id
- agent.hostname
- agent.id
- agent.type
- agent.version
- ecs.version
- input.type
- log.offset
- version

在kibana的开发工具进行创建pipeline处理

先测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 模拟测试pipeline
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description" : "vpn_log_pipeline",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"""%{TIMESTAMP_ISO8601:error_time} \[HUB "%{NOTSPACE:hub}"\] Session "%{NOTSPACE:session}": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is %{NOTSPACE:mac_address}, the source IP address is %{IPV4:source_ip}, the destination IP address is %{IPV4:destination_ip}. The number of broadcast packets is equal to or larger than %{NUMBER:items_per_second} items per 1 second """
],
"ignore_failure" : true
},
"convert" : {
"field" : "items_per_second",
"type" : "integer",
"ignore_failure" : true
}
},
{
"date" : {
"field" : "error_time",
"target_field" : "@timestamp",
"formats" : [
"yyyy-MM-dd HH:mm:ss.SSS"
],
"timezone" : "Asia/Shanghai"
}
}
]
},
"docs": [
{
"_source": {
"message": """2022-01-17 14:19:07.047 [HUB "hub_dkwbj"] Session "SID-BRIDGE-20": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 70-B5-E8-2F-C9-5C, the source IP address is 192.168.9.134, the destination IP address is 0.0.0.0. The number of broadcast packets is equal to or larger than 34 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect)."""
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 增加pipeline
PUT _ingest/pipeline/vpn_log_pipeline
{
"description": "vpn_log_pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:error_time} \\[HUB \"%{NOTSPACE:hub}\"\\] Session \"%{NOTSPACE:session}\": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is %{NOTSPACE:mac_address}, the source IP address is %{IP:source_ip}, the destination IP address is %{IP:destination_ip}. The number of broadcast packets is equal to or larger than %{NUMBER:items_per_second} items per 1 second "
],
"ignore_failure": true
},
"convert": {
"field": "items_per_second",
"type": "integer",
"ignore_failure": true
}
},
{
"date": {
"field": "error_time",
"target_field": "@timestamp",
"formats": [
"yyyy-MM-dd HH:mm:ss.SSS"
],
"timezone": "Asia/Shanghai"
}
}
]
}

// 其他操作
GET _ingest/pipeline/vpn_log_pipeline
DELETE _ingest/pipeline/vpn_log_pipeline
1
2
3
4
5
6
7
8
9
10
11
//模拟测试添加的pipeline
GET _ingest/pipeline/vpn_log_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": """2022-01-17 14:19:07.047 [HUB "hub_dkwbj"] Session "SID-BRIDGE-20": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 70-B5-E8-2F-C9-5C, the source IP address is 192.168.9.134, the destination IP address is 0.0.0.0. The number of broadcast packets is equal to or larger than 34 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect)."""
}
}
]
}

开启filebeat,日志文件将传输到es并通过pipeline进行处理,其中pipeline中使用了processors的grok,与logstash相似。

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践