Intro::
spring-cloud-stream์ ์ฌ์ฉํด์ kafka ๋ฉ์์ง๋ฅผ ์ก์์ ํ๋ ์์ ์
๋๋ค.
kafka cluster ๊ตฌํ
version: "3.8"
services:
controller-1:
image: apache/kafka:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
controller-2:
image: apache/kafka:latest
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
controller-3:
image: apache/kafka:latest
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
broker-1:
image: apache/kafka:latest
container_name: broker-1
ports:
- "29092:9092"
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
depends_on:
- controller-1
- controller-2
- controller-3
broker-2:
image: apache/kafka:latest
container_name: broker-2
ports:
- "39092:9092"
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
depends_on:
- controller-1
- controller-2
- controller-3
broker-3:
image: apache/kafka:latest
container_name: broker-3
ports:
- "49092:9092"
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
depends_on:
- controller-1
- controller-2
- controller-3
kafka-init:
image: apache/kafka:latest
container_name: kafka-init
depends_on:
- broker-1
- broker-2
- broker-3
volumes:
- ./scripts:/opt/kafka/scripts
entrypoint: ["sh", "/opt/kafka/scripts/init-kafka.sh"]
YAML
๋ณต์ฌ
#!/bin/bash
sleep 10
# Kafka ํ ํฝ ์์ฑ
/opt/kafka/bin/kafka-topics.sh --bootstrap-server broker-1:19092,broker-2:19092,broker-3:19092 \
--create --topic test-topic --partitions 3 --replication-factor 1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server broker-1:19092,broker-2:19092,broker-3:19092 \
--create --topic test-topic-dlq --partitions 3 --replication-factor 1
echo "init kafka"
Bash
๋ณต์ฌ
controller
kafka broker์ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ๊ด๋ฆฌํ๋ ๋
ธ๋์ด๋ค.
broker
์ค์ง์ ์ธ ์นดํ์นด ์๋ฒ
build.gradle
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.6'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
}
ext {
set('springCloudVersion', "2023.0.3") // Spring Cloud 2024.0.0-RC1 ๋ ์๋๋๋ฐ ์๊ทธ๋ฐ๊ฑด์ง๋ ๋ชจ๋ฅด๊ฒ ๋ค....
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
Plain Text
๋ณต์ฌ
์ ์ธ์ง๋ ๋ชจ๋ฅด๊ฒ ์ง๋ง set('springCloudVersion', "2024.0.0-RC1")๊ณผ ๊ทธ์ ๋ง๋ ์คํ๋ง ๋ถํธ ๋ฒ์ ์ ์ฌ์ฉํ๊ฒ ๋๋ฉด MessageBuilder์์ ์ค๋ฅ๊ฐ ๋์ ๋ฒ์ ์ ํ ๋จ๊ณ ๋ด๋ ธ๋๋ ๋ฌธ์ ๊ฐ ํด๊ฒฐ๋์๋ค.
application.yml
spring:
cloud:
function:
definition: input;dlq # ํ๋ ์ด์์ ํจ์๊ฐ ์ ์๋ ๊ฒฝ์ฐ, Spring Cloud Stream์ด ์คํํ ํจ์๋ฅผ ์ ํํ๊ธฐ ์ํด ํ์ํฉ๋๋ค.
stream:
bindings:
input-in-0:
destination: test-topic
content-type: application/json
group: test-group
consumer:
maxAttempts: 3 # ์ต๋ 3๋ฒ ์ฌ์๋
back-off-initial-interval: 1000 # 1์ด ๊ฐ๊ฒฉ์ผ๋ก ์ฌ์๋
concurrency: 3 # 3๊ฐ์ ์ปจ์๋จธ๋ก ๋ณ๋ ฌ ์ฒ๋ฆฌ
dlq-in-0:
destination: test-topic-dlq
content-type: application/json
group: test-dlq-group
kafka:
bindings:
input-in-0:
consumer:
enable-dlq: true # DLT ํ์ฑํ
dlq-name: test-topic-dlq # DLT ํ ํฝ ์ด๋ฆ
autoCommitOnError: false # ์ค๋ฅ ๋ฐ์ ์ ์๋ ์ปค๋ฐ ๋ฐฉ์ง
binder:
brokers: localhost:29092,localhost:39092,localhost:49092
auto-create-topics: false
YAML
๋ณต์ฌ
โข
๋ฐ์ธ๋๋ โ์ ํ๋ฆฌ์ผ์ด์
๊ณผ ๋ฉ์์ง ๋ธ๋ก์ปค๋ฅผ ์๋ ๋๋ผ์ด๋ฒโ์ด๋ฉฐ, ์ค์ ๊ตฌํ์ฒด ์ค์ ์ ๋ด๋นํฉ๋๋ค.
โข
๋ฐ์ธ๋ฉ์ โ์ ํ๋ฆฌ์ผ์ด์
์ ๋
ผ๋ฆฌ ์ฑ๋(๋๋ ํจ์)๊ณผ ์ธ๋ถ ๋ชฉ์ ์ง๋ฅผ ์ฐ๊ฒฐโํ๋ ์ค์ ์ผ๋ก, ์ด๋ ์ฑ๋์ด ์ด๋ค ํ ํฝยทํ๋ก ๋ฉ์์ง๋ฅผ ์ฃผ๊ณ ๋ฐ์์ง ์ ์ํฉ๋๋ค.
ํน์ด์ฌํญ
โข
spring.cloud.function
โฆ
๋ฐ์ธ๋ฉ ํจ์๋ฅผ ์ ์ํด๋์ง ์์ผ๋ฉด ๋ฐ์ธ๋ฉ์ด ์ ๋๋ก ์๋ ์ ์์ต๋๋ค.
โข
spring.cloud.stream.bindings
โฆ
input-in-0 ์ด๋ฉด Consumer<Message<T>> input() ๊ณผ ๊ฐ์ ํจ์๊ฐ ๋ฐ์ธ๋ฉ ๋๋ค.
โฆ
output-out-0์ด๋ฉด Supplier<Message<T>> output()๊ณผ ๊ฐ์ ํจ์๊ฐ ๋ฐ์ธ๋ฉ๋๋ค.
โข
spring.cloud.stream.kafka
โฆ
์นดํ์นด ์ธ๋ถ ์ค์ ๋ถ๋ถ์
๋๋ค.
Consumer
import com.example.springcloudstreamkafka.share.domain.Product;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
public class MessageConsumer {
private final ProductMessageQueue productMessageQueue;
public MessageConsumer(ProductMessageQueue productMessageQueue) {
this.productMessageQueue = productMessageQueue;
}
@Bean
public Consumer<Message<Product>> input() throws RuntimeException {
return message -> {
boolean added = productMessageQueue.buffer.offer(message);
if (!added) {
throw new RuntimeException("Queue is full, message discarded: " + message);
}
System.out.println("Message added to queue: " + message.getPayload());
};
}
}
Java
๋ณต์ฌ
โข
org.springframework.messaging.Message๋ฅผ ์ฌ์ฉํ์ฌ ์ถ์ํ์ ์ฅ์ ์ ์ด๋ฆฌ๊ธฐ ๋๋ฌธ์ ์ค์ ๋ก ์นดํ์นด ๋ฉ์์ง๊ฐ ๋ค์ด์ค๋ RabbitMQ ๋ฉ์์ง๊ฐ ๋ค์ด์ค๋ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ค.
โข
applicatoin.yml์ ์ค์ ํด ๋์ bindings์ binder์ ์ค์ ์ ํตํด ์ง์ ํ ํ ํฝ์ ๋ฉ์์ง๊ฐ ๋ค์ด์ค๋ฉด ๋น๋๊ธฐ์ ์ผ๋ก ์๋นํ๊ฒ ๋ฉ๋๋ค. ์ด๋ ๋๋คํจ์๊ฐ ํด๋น ํ ํฝ์ ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ด ๋ฉ๋๋ค.
โข
dlq(Dead Letter Queue) ๋ฅผ ์ค์ ํด๋๋ฉด ์๋์ผ๋ก ์คํจ ๋ฉ์์ง๊ฐ ์ง์ ๋ ํ ํฝ์ผ๋ก ์ก์ ๋ฉ๋๋ค.
Produce
import com.example.springcloudstreamkafka.share.infra.eventbus.ProductMessageQueue;
import com.example.springcloudstreamkafka.share.domain.Product;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;
@Service
public class MessageService {
private final StreamBridge streamBridge;
private final ProductMessageQueue productMessageQueue;
public MessageService(StreamBridge streamBridge, ProductMessageQueue productMessageQueue) {
this.streamBridge = streamBridge;
this.productMessageQueue = productMessageQueue;
}
public void sendMessage(Product product) {
boolean sent = streamBridge.send("test-topic", product);
if (sent) {
System.out.println("Message sent successfully to test-topic" + " : " + product);
} else {
System.out.println("Message sending failed to test-topic" + " : " + product);
}
}
}
Java
๋ณต์ฌ
โข
StreamBridge ๋ฅผ ์ฌ์ฉํด์ ๋ฉ์์ง๋ฅผ ์ ์กํฉ๋๋ค.
โฆ
์ด๋ ํ ํฝ์ด ์กด์ฌํ์ง ์์๋ ๋ฉ์์ง๊ฐ ์ ์ก๋์ด์ง๊ณ , ์ค์ ์ ๋ฐ๋ผ ํ ํฝ์ด ์์ฑ๋ ์๋ ์์ผ๋ ์ฃผ์ํด์ผํฉ๋๋ค.
Function๊ณผ Consumer, Supplier์ ์ฐจ์ด
ํน์ฑ | Function | Consumer | Supplier |
์
๋ ฅ | ์์ | ์์ | ์์ |
์ถ๋ ฅ | ์์ | ์์ | ์์ |
์ฌ์ฉ ์์ | ๋ฐ์ดํฐ ๋ณํ ๋ฐ ๊ฐ๊ณต | ๋ฉ์์ง ์๋น (๋ก๊ทธ, DB ์ ์ฅ ๋ฑ) | ๋ฐ์ดํฐ ์์ฑ (์ด๋ฒคํธ ๋ฐ์ ๋ฑ) |
๋ฐ์ธ๋ฉ ์๊ตฌ ์ฌํญ | ์
๋ ฅ๊ณผ ์ถ๋ ฅ์ ๋ชจ๋ ์ ์ ๊ฐ๋ฅ | ์
๋ ฅ ๋ฐ์ธ๋ฉ๋ง ํ์ | ์ถ๋ ฅ ๋ฐ์ธ๋ฉ๋ง ํ์ |