ElasticStack-logstash篇

概述

根据 ElasticStack-安装篇 安装好logstash,我们开始进行配置和同步数据。
本次实践通过logstash同步mysql到es。

配置

进入logstash容器进行安装logstash-input-jdbc

1
docker exec -it docker_logstash bash

安装logstash-input-jdbc

1
./bin/logstash-plugin install logstash-input-jdbc

下载 mysql-connector-java-8.0.24.jar
放入/data/dockers/logstash/config/mysql

在文件目录/data/dockers/logstash/config/conf.d下创建jdbc.conf文件,进行mysql数据到es的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
input{
jdbc{
# 连接数据库
jdbc_connection_string => "jdbc:mysql://47.119.168.111:3306/fob?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
jdbc_user => "root"
jdbc_password => "XXXXX"
# 连接数据库的驱动包
jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-8.0.24.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
codec => plain { charset => "UTF-8" }

# 数据追踪
# 追踪的字段
tracking_column => "updated_at"
# 上次追踪的元数据存放位置
last_run_metadata_path => "/usr/share/logstash/config/lastrun/logstash_jdbc_last_run"
# 设置时区
jdbc_default_timezone => "Asia/Shanghai"
# sql 文件地址
# statement_filepath => ""
# sql
statement => "SELECT g.id AS id,g.product_name AS product_name,g.shop_id AS shop_id,g.category_id AS category_id,g.keyword AS keyword,g.status AS status FROM fg_product g WHERE g.updated_at > :sql_last_value"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run =>false
# 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
schedule => "* * * * *"
}
}
output{
elasticsearch{
# 要导入到的Elasticsearch所在的主机
hosts => "47.119.168.111:9200"
# 要导入到的Elasticsearch的索引的名称
index => "fob_index"
# 类型名称(类似数据库表名)
document_type => "fg_product"
# 主键名称(类似数据库表名)
document_id => "%{id}"
}

stdout{
# JSON 格式输出
codec => json_lines
}
}


查看mysql数据是否进入到es
k0CrP0

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践