Search
๐Ÿ™Š

Spring Cloud Stream

Intro::

spring-cloud-stream์„ ์‚ฌ์šฉํ•ด์„œ kafka ๋ฉ”์‹œ์ง€๋ฅผ ์†ก์ˆ˜์‹ ํ•˜๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค.

kafka cluster ๊ตฌํ˜„

version: "3.8" services: controller-1: image: apache/kafka:latest container_name: controller-1 environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: controller KAFKA_LISTENERS: CONTROLLER://:9093 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093 controller-2: image: apache/kafka:latest container_name: controller-2 environment: KAFKA_NODE_ID: 2 KAFKA_PROCESS_ROLES: controller KAFKA_LISTENERS: CONTROLLER://:9093 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093 controller-3: image: apache/kafka:latest container_name: controller-3 environment: KAFKA_NODE_ID: 3 KAFKA_PROCESS_ROLES: controller KAFKA_LISTENERS: CONTROLLER://:9093 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093 broker-1: image: apache/kafka:latest container_name: broker-1 ports: - "29092:9092" environment: KAFKA_NODE_ID: 4 KAFKA_PROCESS_ROLES: broker KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092' KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' depends_on: - controller-1 - controller-2 - controller-3 broker-2: image: apache/kafka:latest container_name: broker-2 ports: - "39092:9092" environment: KAFKA_NODE_ID: 5 KAFKA_PROCESS_ROLES: broker KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092' KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' depends_on: - controller-1 - controller-2 - controller-3 broker-3: image: apache/kafka:latest container_name: broker-3 ports: - "49092:9092" environment: KAFKA_NODE_ID: 6 KAFKA_PROCESS_ROLES: broker KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092' KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' depends_on: - controller-1 - controller-2 - controller-3 kafka-init: image: apache/kafka:latest container_name: kafka-init depends_on: - broker-1 - broker-2 - broker-3 volumes: - ./scripts:/opt/kafka/scripts entrypoint: ["sh", "/opt/kafka/scripts/init-kafka.sh"]
YAML
๋ณต์‚ฌ
#!/bin/bash sleep 10 # Kafka ํ† ํ”ฝ ์ƒ์„ฑ /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker-1:19092,broker-2:19092,broker-3:19092 \ --create --topic test-topic --partitions 3 --replication-factor 1 /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker-1:19092,broker-2:19092,broker-3:19092 \ --create --topic test-topic-dlq --partitions 3 --replication-factor 1 echo "init kafka"
Bash
๋ณต์‚ฌ

controller

kafka broker์˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๋…ธ๋“œ์ด๋‹ค.

broker

์‹ค์งˆ์ ์ธ ์นดํ”„์นด ์„œ๋ฒ„

build.gradle

plugins { id 'java' id 'org.springframework.boot' version '3.3.6' id 'io.spring.dependency-management' version '1.1.6' } group = 'com.example' version = '0.0.1-SNAPSHOT' java { toolchain { languageVersion = JavaLanguageVersion.of(17) } } repositories { mavenCentral() maven { url 'https://repo.spring.io/milestone' } } ext { set('springCloudVersion', "2023.0.3") // Spring Cloud 2024.0.0-RC1 ๋Š” ์•ˆ๋˜๋Š”๋ฐ ์™œ๊ทธ๋Ÿฐ๊ฑด์ง€๋Š” ๋ชจ๋ฅด๊ฒ ๋‹ค.... } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.cloud:spring-cloud-stream' implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka' implementation 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" } } tasks.named('test') { useJUnitPlatform() }
Plain Text
๋ณต์‚ฌ
์™œ ์ธ์ง€๋Š” ๋ชจ๋ฅด๊ฒ ์ง€๋งŒ set('springCloudVersion', "2024.0.0-RC1")๊ณผ ๊ทธ์— ๋งž๋Š” ์Šคํ”„๋ง ๋ถ€ํŠธ ๋ฒ„์ „์„ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋ฉด MessageBuilder์—์„œ ์˜ค๋ฅ˜๊ฐ€ ๋‚˜์„œ ๋ฒ„์ „์„ ํ•œ ๋‹จ๊ณ„ ๋‚ด๋ ธ๋”๋‹ˆ ๋ฌธ์ œ๊ฐ€ ํ•ด๊ฒฐ๋˜์—ˆ๋‹ค.

application.yml

spring: cloud: function: definition: input;dlq # ํ•˜๋‚˜ ์ด์ƒ์˜ ํ•จ์ˆ˜๊ฐ€ ์ •์˜๋œ ๊ฒฝ์šฐ, Spring Cloud Stream์ด ์‹คํ–‰ํ•  ํ•จ์ˆ˜๋ฅผ ์„ ํƒํ•˜๊ธฐ ์œ„ํ•ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. stream: bindings: input-in-0: destination: test-topic content-type: application/json group: test-group consumer: maxAttempts: 3 # ์ตœ๋Œ€ 3๋ฒˆ ์žฌ์‹œ๋„ back-off-initial-interval: 1000 # 1์ดˆ ๊ฐ„๊ฒฉ์œผ๋กœ ์žฌ์‹œ๋„ concurrency: 3 # 3๊ฐœ์˜ ์ปจ์Šˆ๋จธ๋กœ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ dlq-in-0: destination: test-topic-dlq content-type: application/json group: test-dlq-group kafka: bindings: input-in-0: consumer: enable-dlq: true # DLT ํ™œ์„ฑํ™” dlq-name: test-topic-dlq # DLT ํ† ํ”ฝ ์ด๋ฆ„ autoCommitOnError: false # ์˜ค๋ฅ˜ ๋ฐœ์ƒ ์‹œ ์ž๋™ ์ปค๋ฐ‹ ๋ฐฉ์ง€ binder: brokers: localhost:29092,localhost:39092,localhost:49092 auto-create-topics: false
YAML
๋ณต์‚ฌ
โ€ข
๋ฐ”์ธ๋”๋Š” โ€œ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๊ณผ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋ฅผ ์ž‡๋Š” ๋“œ๋ผ์ด๋ฒ„โ€์ด๋ฉฐ, ์‹ค์ œ ๊ตฌํ˜„์ฒด ์„ค์ •์„ ๋‹ด๋‹นํ•ฉ๋‹ˆ๋‹ค.
โ€ข
๋ฐ”์ธ๋”ฉ์€ โ€œ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๋…ผ๋ฆฌ ์ฑ„๋„(๋˜๋Š” ํ•จ์ˆ˜)๊ณผ ์™ธ๋ถ€ ๋ชฉ์ ์ง€๋ฅผ ์—ฐ๊ฒฐโ€ํ•˜๋Š” ์„ค์ •์œผ๋กœ, ์–ด๋А ์ฑ„๋„์ด ์–ด๋–ค ํ† ํ”ฝยทํ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ณ ๋ฐ›์„์ง€ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.

ํŠน์ด์‚ฌํ•ญ

โ€ข
spring.cloud.function
โ—ฆ
๋ฐ”์ธ๋”ฉ ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•ด๋‘์ง€ ์•Š์œผ๋ฉด ๋ฐ”์ธ๋”ฉ์ด ์ œ๋Œ€๋กœ ์•ˆ๋ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
โ€ข
spring.cloud.stream.bindings
โ—ฆ
input-in-0 ์ด๋ฉด Consumer<Message<T>> input() ๊ณผ ๊ฐ™์€ ํ•จ์ˆ˜๊ฐ€ ๋ฐ”์ธ๋”ฉ ๋œ๋‹ค.
โ—ฆ
output-out-0์ด๋ฉด Supplier<Message<T>> output()๊ณผ ๊ฐ™์€ ํ•จ์ˆ˜๊ฐ€ ๋ฐ”์ธ๋”ฉ๋œ๋‹ค.
โ€ข
spring.cloud.stream.kafka
โ—ฆ
์นดํ”„์นด ์„ธ๋ถ€ ์„ค์ •๋ถ€๋ถ„์ž…๋‹ˆ๋‹ค.

Consumer

import com.example.springcloudstreamkafka.share.domain.Product; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import java.util.function.Consumer; public class MessageConsumer { private final ProductMessageQueue productMessageQueue; public MessageConsumer(ProductMessageQueue productMessageQueue) { this.productMessageQueue = productMessageQueue; } @Bean public Consumer<Message<Product>> input() throws RuntimeException { return message -> { boolean added = productMessageQueue.buffer.offer(message); if (!added) { throw new RuntimeException("Queue is full, message discarded: " + message); } System.out.println("Message added to queue: " + message.getPayload()); }; } }
Java
๋ณต์‚ฌ
โ€ข
org.springframework.messaging.Message๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ถ”์ƒํ™”์˜ ์žฅ์ ์„ ์‚ด๋ฆฌ๊ธฐ ๋•Œ๋ฌธ์— ์‹ค์ œ๋กœ ์นดํ”„์นด ๋ฉ”์‹œ์ง€๊ฐ€ ๋“ค์–ด์˜ค๋˜ RabbitMQ ๋ฉ”์‹œ์ง€๊ฐ€ ๋“ค์–ด์˜ค๋˜ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.
โ€ข
applicatoin.yml์— ์„ค์ •ํ•ด ๋†“์€ bindings์™€ binder์˜ ์„ค์ •์„ ํ†ตํ•ด ์ง€์ •ํ•œ ํ† ํ”ฝ์— ๋ฉ”์‹œ์ง€๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ๋น„๋™๊ธฐ์ ์œผ๋กœ ์†Œ๋น„ํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ด๋•Œ ๋žŒ๋‹คํ•จ์ˆ˜๊ฐ€ ํ•ด๋‹น ํ† ํ”ฝ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์ด ๋ฉ๋‹ˆ๋‹ค.
โ€ข
dlq(Dead Letter Queue) ๋ฅผ ์„ค์ •ํ•ด๋‘๋ฉด ์ž๋™์œผ๋กœ ์‹คํŒจ ๋ฉ”์‹œ์ง€๊ฐ€ ์ง€์ •๋œ ํ† ํ”ฝ์œผ๋กœ ์†ก์‹ ๋ฉ๋‹ˆ๋‹ค.

Produce

import com.example.springcloudstreamkafka.share.infra.eventbus.ProductMessageQueue; import com.example.springcloudstreamkafka.share.domain.Product; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Service; @Service public class MessageService { private final StreamBridge streamBridge; private final ProductMessageQueue productMessageQueue; public MessageService(StreamBridge streamBridge, ProductMessageQueue productMessageQueue) { this.streamBridge = streamBridge; this.productMessageQueue = productMessageQueue; } public void sendMessage(Product product) { boolean sent = streamBridge.send("test-topic", product); if (sent) { System.out.println("Message sent successfully to test-topic" + " : " + product); } else { System.out.println("Message sending failed to test-topic" + " : " + product); } } }
Java
๋ณต์‚ฌ
โ€ข
StreamBridge ๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•ฉ๋‹ˆ๋‹ค.
โ—ฆ
์ด๋•Œ ํ† ํ”ฝ์ด ์กด์žฌํ•˜์ง€ ์•Š์•„๋„ ๋ฉ”์‹œ์ง€๊ฐ€ ์ „์†ก๋˜์–ด์ง€๊ณ , ์„ค์ •์— ๋”ฐ๋ผ ํ† ํ”ฝ์ด ์ƒ์„ฑ๋  ์ˆ˜๋„ ์žˆ์œผ๋‹ˆ ์ฃผ์˜ํ•ด์•ผํ•ฉ๋‹ˆ๋‹ค.

Function๊ณผ Consumer, Supplier์˜ ์ฐจ์ด

ํŠน์„ฑ
Function
Consumer
Supplier
์ž…๋ ฅ
์žˆ์Œ
์žˆ์Œ
์—†์Œ
์ถœ๋ ฅ
์žˆ์Œ
์—†์Œ
์žˆ์Œ
์‚ฌ์šฉ ์˜ˆ์ œ
๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ ๋ฐ ๊ฐ€๊ณต
๋ฉ”์‹œ์ง€ ์†Œ๋น„ (๋กœ๊ทธ, DB ์ €์žฅ ๋“ฑ)
๋ฐ์ดํ„ฐ ์ƒ์„ฑ (์ด๋ฒคํŠธ ๋ฐœ์ƒ ๋“ฑ)
๋ฐ”์ธ๋”ฉ ์š”๊ตฌ ์‚ฌํ•ญ
์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ์„ ๋ชจ๋‘ ์ •์˜ ๊ฐ€๋Šฅ
์ž…๋ ฅ ๋ฐ”์ธ๋”ฉ๋งŒ ํ•„์š”
์ถœ๋ ฅ ๋ฐ”์ธ๋”ฉ๋งŒ ํ•„์š”

References::