Search
🙈

ElasticsearchSinkConnector 카프카 싱크 커넥터 설정

Intro::

ElasticsearchSinkConnector 카프카 싱크 커넥터 설정 예시 입니다.

설정

카프카 커넥트

version: '2' services: kafka-connect: image: confluentinc/cp-kafka-connect-base:6.2.0 container_name: kafka-connect environment: CONNECT_BOOTSTRAP_SERVERS: "localhost:19092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: kafka-connect CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets CONNECT_STATUS_STORAGE_TOPIC: _connect-status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://192.168.2.134:8081' CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" # --------------- CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars # If you want to use the Confluent Hub installer to d/l component, but make them available # when running this offline, spin up the stack once and then run : # docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars volumes: - /home/danawa/confluentinc-kafka-connect-jdbc-10.7.6:/data/connect-jars network_mode: host
Docker
복사

싱크 커넥터 사용 방법

1.
카프카 커넥트 도커 내부에서 라이브러리 다운
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
Bash
복사
2.
카프카 커넥트 컨테이너 재기동
a.
카프카 커넥트 api를 사용해서 싱크 커넥터 사용
// sinkConfig.json { "name": "elasticsearch-sink-connector", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "es_tcmpny_link-new", "connection.url": "http://localhost:9200", "key.ignore": "true", "schema.ignore": "true", "type.name": "_doc", "batch.size": "4000", "max.retries": "5", "retry.backoff.ms": "1000", "transforms": "changeIndexName", "transforms.changeIndexName.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.changeIndexName.regex": "es_tcmpny_link-new", "transforms.changeIndexName.replacement": "sink-test", "flush.synchronously": "true" } }
JSON
복사
es_tcmpny_lihnk-new 토픽에서 sink-test 인덱스로 데이터를 색인하기 위해서 transforms을 설정해주었다.
topic.index.map을 지정하는 방식이 적용이 안되어서 해당 설정을 적용하였다.
curl -X POST -H "Content-Type: application/json" --data @sinkConfig.json http://localhost:8083/connectors curl -X GET http://localhost:8083/connectors/elasticsearch-sink-connector/status curl -X DELETE http://localhost:8083/connectors/elasticsearch-sink-connector
Bash
복사

References::

kafka-connect-elasticsearch
confluentinc