Intro::
ElasticsearchSinkConnector 카프카 싱크 커넥터 설정 예시 입니다.
설정
카프카 커넥트
version: '2'
services:
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.2.0
container_name: kafka-connect
environment:
CONNECT_BOOTSTRAP_SERVERS: "localhost:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://192.168.2.134:8081'
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- /home/danawa/confluentinc-kafka-connect-jdbc-10.7.6:/data/connect-jars
network_mode: host
Docker
복사
싱크 커넥터 사용 방법
1.
카프카 커넥트 도커 내부에서 라이브러리 다운
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
Bash
복사
2.
카프카 커넥트 컨테이너 재기동
a.
카프카 커넥트 api를 사용해서 싱크 커넥터 사용
// sinkConfig.json
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "es_tcmpny_link-new",
"connection.url": "http://localhost:9200",
"key.ignore": "true",
"schema.ignore": "true",
"type.name": "_doc",
"batch.size": "4000",
"max.retries": "5",
"retry.backoff.ms": "1000",
"transforms": "changeIndexName",
"transforms.changeIndexName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeIndexName.regex": "es_tcmpny_link-new",
"transforms.changeIndexName.replacement": "sink-test",
"flush.synchronously": "true"
}
}
JSON
복사
es_tcmpny_lihnk-new 토픽에서 sink-test 인덱스로 데이터를 색인하기 위해서 transforms을 설정해주었다.
topic.index.map을 지정하는 방식이 적용이 안되어서 해당 설정을 적용하였다.
curl -X POST -H "Content-Type: application/json" --data @sinkConfig.json http://localhost:8083/connectors
curl -X GET http://localhost:8083/connectors/elasticsearch-sink-connector/status
curl -X DELETE http://localhost:8083/connectors/elasticsearch-sink-connector
Bash
복사