Java数据同步Kafka_⼲货:使⽤Kafkaconnect同步数据⾄
Elasticsearch
接着上篇安装完postgresql connect,我们再安装es connect就容易多了;
安装es connector plugins
因为docker 安装的connect容器⾥没有es的connect plugins,所以我们去 confluent官⽹下载(搜索 Kafka Connect Elasticsearch下载即可)
下载解压后放⾄ connect⽬录(上篇中设置的挂载⽬录)中,如果不记得将容器⽬录挂载到哪可通过如下命令查看:
docker inspect 容器id |grep Mounts -A 20
放置完成后重启connect 容器,并请求如下http验证:
get ip:8093/connector-plugins
connect下载创建es sink connector
post ip:8093/connectors 为何不可为⼤⽜?
{
"name": "es-sink1",
"config": {
"connector.class": "t.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "ip:9200",
"connection.username": "elastic",
"connection.password": "elastic_xdeas",
"type.name": "_doc",
"key.ignore": "false",
"topics": "know.knowledge.formal_new",
"hod": "upsert",
"ull.values": "delete",
"transforms": "key,ExtractFieldObject",
"pe": "org.t.transforms.ExtractField$Key",
"transforms.key.field": "id",
"pe": "org.t.transforms.ExtractField$Value",
"transforms.ExtractFieldObject.field": "after"
}
}
(为何不可为⼤⽜?)
这⾥的es connector 配置的着重解析⼀下:
key.ignore 如果设置为true,ES⾥⾯_id的值会⾃动⽣成,这样的话表⾥某⾏记录只要⼀变化,es就会增加⼀条数据,所以⼀定要设置为
false;topics:需要订阅的topic,即上篇配置完pg connector后⽣成的topic;
transforms:数据转换有关;
pe和transforms.key.field这⾥配置的意思是将表中的id作为es⾥⾯的⽂档id;
"transforms.ExtractFieldObject.field": "after" 字段筛选,我们只需要"after"字段的数据,
因为如果没有pe 和 transforms.ExtractFieldObject.field的配置,其他的⼀些⽆关紧要的元数据也会
进⼊es,索引⾥数据会是下⾯这样:
(再次吐槽官⽅⽂档,这⾥也是花了很多时间才摸索这试出来,太难了)
"payload":{"before":null,
"after":
{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1 ["1"],"origin":"4"},
"source":
{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema"
"op":"u","ts_ms":1591006642869,"transaction":null}}
验证:获取所有的connectors:get ip:8093/connectors/
同步验证
如上述操作没问题,修改表数据,能看到es中⾃动创建了索引并将最新数据同步了过来,索引名即对应上步配置的topics :
know.knowledge.formal_new
总结:kafka connector 是kafka内置的数据传输⼯具,上⽂我们创建了⼀个postgresql connector(依赖debezium的PostgresConnector)其实就是等价于我们在kafka的config⽬录中添加了⼀个connect-file-source.properties配置⽂件(source代表数据来源);这⾥我们创建的es sink connector等价于在config⽬录添加了⼀个connect-file-sink.properties配置⽂件(sink代表数据输出);这⾥采⽤docker 和api管理kafka的connector就显得⽅便多了;