mysql同步数据到es

1,下载安装logstash和es版本一致

2,解压文件,在bin目录下新建mysql文件夹,在mysql文件夹新建如下4个文件

jdbc.conf

如果同步至es中的字段不需要设置分词,不然查询时分词会导致结果不正确,模糊匹配不能像关系数据库中使用like那样,就需要在jdbc.conf的output里设置template、template_name和template_overwrite,如下所示:

input {
    stdin {
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/www.test.com"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "D:\desktoptool\elasticsearch\logstash\logstash-7.7.0\bin\mysql\mysql-connector-java-5.1.49.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "300000"
      use_column_value => "true"
      tracking_column => "id"
      statement_filepath => "D:\desktoptool\elasticsearch\logstash\logstash-7.7.0\bin\mysql\jdbc.sql"
	  schedule => "* * * * *"
	  type => "jdbc"
	  jdbc_default_timezone =>"Asia/Shanghai"
    }
}
 
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
 
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "mysql_table_test"
		template => "D:\desktoptool\elasticsearch\logstash\logstash-7.7.0\bin\mysql\es-template.json"
		template_name => "t-statistic-out-logstash"
		template_overwrite => true
		document_type => "out"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

jdbc.sql  如果sql后面没有where条件则是全量同步,反之增量同步

select * from `test` where id>=:sql_last_value 

es-template.json

{
“template” : “t-statistis-out-template”,
“order”:1,
“settings”: {
“index”: {
“refresh_interval”: “5s”
}
},
“mappings”: {
“_default_”: {
“_all” : {“enabled”:false},
“dynamic_templates”: [
{
“message_field” : {
“match” : “message”,
“match_mapping_type” : “string”,
“mapping” : { “type” : “string”, “index” : “not_analyzed” }
}
}, {
“string_fields” : {
“match” : “*”,
“match_mapping_type” : “string”,
“mapping” : { “type” : “string”, “index” : “not_analyzed” }
}
}
],
“properties”: {
“@timestamp”: {
“type”: “date”
},
“@version”: {
“type”: “keyword”
},
“geoip”: {
“dynamic”: true,
“properties”: {
“ip”: {
“type”: “ip”
},
“location”: {
“type”: “geo_point”
},
“latitude”: {
“type”: “half_float”
},
“longitude”: {
“type”: “half_float”
}
}
},
“acc_id”: {
“type”: “keyword”
},
“acc_name”: {
“type”: “keyword”
},
“acc_pp”: {
“type”: “keyword”
},
“account_price_type”: {
“type”: “keyword”
},
“cyacc_no”: {
“type”: “keyword”
},
“order_id”: {
“type”: “keyword”
},
“voucher_id”: {
“type”: “keyword”
}

}
}
},
“aliases”: {}

}

下载 mysql-connector-java-5.1.49.jar 注意版本

3.进入bin目录,打开cmd,执行

.\logstash.bat -f  .\mysql\jdbc.conf

如果发现没生效,设置order大于0,logstash启动的时候会默认发送一个logstash的文件,此时需要删除这个默认的

然后再删除对应的索引,重新启动,就会发现生效了

发表评论

电子邮件地址不会被公开。 必填项已用*标注