Search
🙊

kafka-connect-elasticsearch-source 카프카 소스 커넥터 설정

Intro::

kafka-connect-elasticsearch-source 카프카 소스 커넥터 설정 예시 입니다.

설정

카프카 커넥트

version: '2' services: kafka-connect: image: confluentinc/cp-kafka-connect-base:6.2.0 container_name: kafka-connect environment: CONNECT_BOOTSTRAP_SERVERS: "localhost:19092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: kafka-connect CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets CONNECT_STATUS_STORAGE_TOPIC: _connect-status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://192.168.2.134:8081' CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" # --------------- CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars # If you want to use the Confluent Hub installer to d/l component, but make them available # when running this offline, spin up the stack once and then run : # docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars volumes: - /home/danawa/confluentinc-kafka-connect-jdbc-10.7.6:/data/connect-jars network_mode: host
Docker
복사

소스 커넥터 사용 방법

1.
카프카 커넥트 도커 내부에서 라이브러리 다운
confluent-hub install dariobalinzo/kafka-connect-elasticsearch-source:latest
Bash
복사
2.
카프카 커넥트 컨테이너 재기동
3.
카프카 커넥트 api를 사용해서 소스 커넥터 사용
// sourceConfig.json { "name": "elastic-source", "config": { "connector.class":"com.github.dariobalinzo.ElasticSourceConnector", "tasks.max": "1", "es.host" : "localhost", "es.port" : "9200", "index.prefix" : "tcmpny_link", "topic.prefix" : "es_", "incrementing.field.name" : "registerDate", "poll.interval.ms": "1000", "query": "{\"match_all\": {}}" } }
JSON
복사
# 작업 등록 curl -X POST -H "Content-Type: application/json" --data @sourceConfig.json http://localhost:8083/connectors # 작업 확인 curl -X GET http://localhost:8083/connectors/elastic-source/status # 작업 취소 curl -X DELETE http://localhost:8083/connectors/elastic-source
Bash
복사
4.
스키마 레지스트리의 경우 api 혹은 카프카 ui에서 등록해두면 됩니다. 미리 등록을 하지 않은 경우 소스 커넥터에서 자동으로 생성하지만, 만약 데이터 스키마가 동일하지 않다면 에러가 발생할 수 있습니다.

References::

kafka-connect-elasticsearch-source
DarioBalinzo