logstash 增量定时同步mysql数据到elasticsearch集群

近期文章:

线上搜索业务,使用logstash将 mysql数据同步到阿里云 es集群中,一开始数据量小,基本上都是全量同步,后面数据量大了,跟研发沟通改用增量,且定时同步(搜索不要求实时同步)mysql数据到es集群中,减少数据库查询和 es压力写入压力

安装 logstash

cd /usr/local/data/ wget https://mirrors.huaweicloud.com/logstash/7.10.0/logstash-7.10.0-linux-x86_64.tar.gz tar zxvf logstash-7.10.0-linux-x86_64.tar.gz #下载mysql.jdbc.Driver wget https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33.zip unzip mysql-connector-j-8.0.33.zip #下面只会用到目录下mysql-connector-j-8.0.33.jar

logstash 配置文件

#第一部分字段详解,看下面内容 input {     jdbc {         jdbc_connection_string => "jdbc:mysql://数据库连接地址:3306/数据库库名?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull"         jdbc_user => "your-username"         jdbc_password => "your-password"         jdbc_validate_connection => true         #配置mysql.jdbc.Driver         jdbc_driver_library => "/usr/local/data/mysql-connector-j-8.0.33/mysql-connector-j-8.0.33.jar"         jdbc_driver_class => "com.mysql.jdbc.Driver"         jdbc_paging_enabled => "true"         jdbc_page_size => "10000"         jdbc_default_timezone => "Asia/Shanghai" 	statement => "SELECT * FROM your_table WHERE updated_at > :sql_last_value" 	tracking_column_type => "timestamp"         tracking_column => "updated_at"         use_column_value => true         #clean_run => true         #last_run_metadata_path => "syncpoint_v1" 	codec => plain { charset => "UTF-8"} 	#schedule => "0 */1 * * *" #测试时先关闭定时,不然 logstash 启动之后一直没有执行同步,比如我这里要等一小时才执行一次     } } #第二部分字段详解,看下面内容 filter { 	json { 	       	source => "message"         	remove_field => ["message"]     	} 	mutate {                 split => { "tags" => "," }     		split => { "type_ids" => "," }     		split => { "style_ids" => "," }     		split => { "scene_ids" => "," }     		split => { "language_ids" => "," }     		split => { "system_ids" => "," }     		split => { "character_ids" => "," } 	}         #因为时区问题需要修正时间         ruby {                 code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"         }         ruby {                 code => "event.set('@timestamp',event.get('timestamp'))"         }        ruby {                code => "event.set('save_time', event.get('save_time').time.localtime + 8*60*60)"        }        ruby {                code => "event.set('up_time', event.get('up_time').time.localtime + 8*60*60)"        }        ruby {                code => "event.set('edit_time', event.get('edit_time').time.localtime + 8*60*60)"        } 	#移除timestamp         mutate {                 remove_field => ["timestamp"]         } }  #第三部分字段详解,看下面内容 output {     stdout { #       codec => json_lines         codec => rubydebug }   #第四部分字段详解,看下面内容 elasticsearch { 	codec => plain { charset => "UTF-8"} 	action => "index"         index => "font_v1" #索引名称         document_type => "font"         document_id => "%{id}"         hosts => "http://es请求地址:9200" 	user => ""         password => ""     } }

第一部分字段解释

jdbc_connection_string:MySQL 数据库的连接字符串。 jdbc_user 和 jdbc_password:数据库的用户名和密码。 jdbc_driver_library 和 jdbc_driver_class:指定 MySQL 驱动的路径和类。 jdbc_paging_enabled 和 jdbc_page_size:启用分页,并设置每页的数据量。 jdbc_default_timezone:数据库的时区设置。 statement:SQL 查询语句,用于从数据库中选择数据。这里使用了增量同步,只选择更新时间在上次同步时间之后的数据。 tracking_column_type 和 tracking_column:配置增量同步追踪列,用于记录上次同步时数据库表中哪一列的数值,以便下一次同步时只选择该列数值发生变化的数据。 last_run_metadata_path:记录同步时间的文件路径。 use_column_value:使用 tracking_column 列的当前值来确定同步的起始点,进行更准确的增量同步。 clean_run:如果设置为 true,将在启动时清除 last_run_metadata_path,重新同步所有数据。 schedule:定时同步的 CRON 表达式,这里设置为每小时执行一次。  备注: last_run_metadata_path 和 use_column_value 在 Logstash 中都用于实现增量同步,但它们的实现机制略有不同 last_run_metadata_path: 作用: 用于记录上一次同步的时间戳,通常存储在一个文件中。 使用场景: 适用于需要根据时间戳进行增量同步的场景,比如 WHERE updated_at > last_sync_timestamp。 优点: 实现简单,易于理解。 缺点: 如果同步过程中出现异常,可能会导致时间戳记录不准确,需要手动处理。 use_column_value: 作用: 使用指定列的当前值来确定同步的起始点。 使用场景: 适用于具有递增或唯一性的列,比如自增主键或时间戳,用于基于列值进行增量同步。 优点: 更精确,不容易受到异常情况的影响。 缺点: 需要确保指定的列在同步过程中不会出现重复、缺失或异常值。  选择建议: 如果你的表中有一个递增或唯一性的列,而且这个列的值在同步过程中是可靠的,那么使用 use_column_value 是一个更稳妥的选择。 如果时间戳是你的同步参考,同时你可以容忍在异常情况下手动修复时间戳,那么 last_run_metadata_path 也是一个不错的选择。

第二部分字段详解

filter更多内容请看官方文档

json 插件: 这个插件用于解析输入中的 JSON 数据。 source => "message" 指定要解析的字段是 "message",而 remove_field => ["message"] 表示解析后移除原始的 "message" 字段。 mutate 插件: 这个插件提供了对事件进行修改的功能。在这里,使用了 split 操作,将一些字段按照逗号分割为数组。这样,原始数据中逗号分割的字符串就被拆分成了数组形式。 ruby 插件: 在这里,使用了 Ruby 代码来修正时间戳的时区event.set('timestamp',event.get('@timestamp').time.localtime + 8*60*60) 将 @timestamp 字段的时间戳由 UTC 转换为东八区时间。 mutate 插件: 又一次使用 mutate 插件来移除原始的 "timestamp" 字段,因为这个字段在经过 Ruby 插件的处理后,已经被更新到了 "@timestamp" 字段中。

第三部分字段详解

output 部分,用于指定 Logstash 处理完数据后的输出目的地, stdout 插件: 这个插件用于将事件输出到控制台,主要用于测试和调试。 JSON Lines 格式: 每行是一个独立的 JSON 对象,这在处理大量日志时非常有用。每个日志事件以一行 JSON 形式表示,这样可以逐行读取和处理,而不需要将整个文件加载到内存中,这样的格式适合于各种分析工具和数据存储系统,使其更容易处理和查询 codec => rubydebug 表示使用 Ruby 格式进行调试输出,这样输出的信息更易于阅读。 如果你启动 Logstash 并查看输出,你将在控制台上看到经过处理的事件以更易读的形式显示。
json格式输出
ruby格式输出

第四部分字段详解

在 Logstash 的 Elasticsearch 输出插件中,action 参数指定要执行的 Elasticsearch 操作。主要的取值有: action => "index": 这是最常用的操作。它表示要将文档添加到 Elasticsearch 索引中。如果文档的 ID 已经存在,它将被替换。如果不存在,则会创建一个新文档。这是通常的写入操作。 action => "update": 表示要在 Elasticsearch 中更新文档。这通常涉及到根据文档的 ID 更新部分字段的值,而不是替换整个文档。 action => "delete": 表示要从 Elasticsearch 中删除文档。这是删除文档的操作。 action => "create": 表示只在文档的 ID 不存在时创建文档。如果文档 ID 已经存在,它将不执行任何操作。 这些操作对应于 Elasticsearch 的 Index API、Update API、Delete API 和 Create API。在 Logstash 配置中,通常使用默认的 "index" 操作,因为它能满足大多数场景的需求。

启动logstash

注意:启动前es中先建好索引,我这边es索引是研发同学建的

/usr/local/data/logstash-7.10.0/bin/logstash \ -f /usr/local/data/logstash-7.10.0/bin/mysql/test.conf \ --path.data=/usr/local/data/logstash-7.10.0/data/test #下面是logstash控制台输出的一小部分结果
ruby输出的结果

控制台正常输出数据后去es那边看下索引,是否有数据,如果es没数据,就是索引没建好,让研发同学重建

使用5年过程中遇到的问题总结:

1、如果多索引多个配置文件,下面的数据目录不要重复

–path.data=/usr/local/data/logstash-7.10.0/data/test #这个目录不要重启,否则将无法启动 logstash

2、如果数据量大 logstash默认的内存不够,可适当增加jvm内存

jvm内存

3、如果数据量大logstash同步到 es很慢, 可以增加配置文件中这个参数 jdbc_page_size => “50000” #值更加实际情况修改

4、服务器不需要很高的配置,我这里20 多个索引同步,配置使用的是 2c,16G(后续加的内存)

服务器配置