Intro::
kafka-connect-elasticsearch-source 카프카 소스 커넥터 설정 예시 입니다.
설정
카프카 커넥트
version: '2'
services:
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.2.0
container_name: kafka-connect
environment:
CONNECT_BOOTSTRAP_SERVERS: "localhost:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://192.168.2.134:8081'
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- /home/danawa/confluentinc-kafka-connect-jdbc-10.7.6:/data/connect-jars
network_mode: host
Docker
복사
소스 커넥터 사용 방법
1.
카프카 커넥트 도커 내부에서 라이브러리 다운
confluent-hub install dariobalinzo/kafka-connect-elasticsearch-source:latest
Bash
복사
2.
카프카 커넥트 컨테이너 재기동
3.
카프카 커넥트 api를 사용해서 소스 커넥터 사용
// sourceConfig.json
{
"name": "elastic-source",
"config": {
"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host" : "localhost",
"es.port" : "9200",
"index.prefix" : "tcmpny_link",
"topic.prefix" : "es_",
"incrementing.field.name" : "registerDate",
"poll.interval.ms": "1000",
"query": "{\"match_all\": {}}"
}
}
JSON
복사
# 작업 등록
curl -X POST -H "Content-Type: application/json" --data @sourceConfig.json http://localhost:8083/connectors
# 작업 확인
curl -X GET http://localhost:8083/connectors/elastic-source/status
# 작업 취소
curl -X DELETE http://localhost:8083/connectors/elastic-source
Bash
복사
4.
스키마 레지스트리의 경우 api 혹은 카프카 ui에서 등록해두면 됩니다. 미리 등록을 하지 않은 경우 소스 커넥터에서 자동으로 생성하지만, 만약 데이터 스키마가 동일하지 않다면 에러가 발생할 수 있습니다.