Skip to content

Commit 386e7b6

Browse files
committed
Update Kafka Post " [카프카 핵심 가이드] CHAPTER 3. 카프카 프로듀서: 카프카에 메시지 쓰기 "
1 parent 1a1cfa0 commit 386e7b6

File tree

1 file changed

+143
-26
lines changed

1 file changed

+143
-26
lines changed

_posts/2025-07-24-Kafka-Producer.md

Lines changed: 143 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ author: devFancy
9292

9393
이러한 설정을 아래와 같이 쿠폰 발급 기능을 비동기적으로 처리하기 위해 Spring Boot 환경에서 카프카 프로듀서를 설정한 실제 코드 예시를 가져왔다.
9494

95-
> KafkaProducerConfig.java
95+
> KafkaProducerConfig.java (필수 옵션)
9696
9797
```java
9898
@Configuration
@@ -147,11 +147,35 @@ public class KafkaProducerConfig {
147147

148148
* `send()` 메서드에 **콜백(Callback) 함수**를 함께 전달하는 방식이다. 브로커로부터 응답이 오면 미리 지정한 콜백 함수가 실행된다. **애플리케이션을 차단하지 않으면서도 전송 결과를 확실하게 처리**할 수 있어 가장 널리 사용된다.
149149

150-
쿠폰 시스템의 경우, 사용자가 쿠폰 발급 버튼을 눌렀을 때 즉시 "요청이 처리 중입니다"라고 응답하고 실제 발급 처리는 백그라운드에서 수행하는 것이 좋다. 이는 비동기 전송 방식에 가장 적합한 시나리오다.
150+
151+
```java
152+
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
153+
public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
154+
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
155+
if (!producerRecord.headers().iterator().hasNext()) {
156+
byte[] correlationId = (byte[]) message.getHeaders().get("kafka_correlationId", byte[].class);
157+
if (correlationId != null) {
158+
producerRecord.headers().add("kafka_correlationId", correlationId);
159+
}
160+
}
161+
162+
return this.observeSend(producerRecord);
163+
}
164+
}
165+
```
166+
167+
* 참고: Spring Kafka의 `KafkaTemplate.send()` 메서드는 기본적으로 `CompletableFuture` (또는 이전 버전의 `ListenableFuture`)를 반환하여 비동기적으로 동작한다.
168+
169+
* 따라서 `send()`만 호출하고 반환된 `Future` 객체를 처리하지 않으면 겉보기에는 "파이어 앤 포겟"처럼 보일 수 있다.
170+
171+
* 하지만 실제로는 백그라운드에서 비동기 전송이 이루어지고 있으며, 명시적으로 `whenComplete`와 같은 콜백을 등록하여 성공 또는 실패를 처리하면 완전한 "비동기 전송" 방식으로 활용할 수 있다.
172+
173+
174+
쿠폰 시스템의 경우, 사용자가 쿠폰 발급 버튼을 눌렀을 때 즉시 "`쿠폰 발급 요청이 처리 중입니다`" 라고 응답하고 실제 발급 처리는 백그라운드에서 수행하는 것이 좋다. 이는 비동기 전송 방식에 가장 적합한 시나리오다.
151175

152176
### KafkaTemplate을 이용한 메시지 전송 예시
153177

154-
Spring Boot 환경에서는 앞서 설정한 `KafkaTemplate`을 이용하여 매우 편리하게 비동기 메시지를 전송할 수 있다.
178+
Spring Boot 환경에서는 앞서 설정한 `KafkaTemplate`을 이용하여 편리하게 비동기 메시지를 전송할 수 있다.
155179

156180
실제 애플리케이션에서는 특정 도메인의 메시지를 전송하는 역할을 담당하는 전용 프로듀서 클래스를 만들어 사용하는 것이 일반적이다.
157181

@@ -160,29 +184,54 @@ Spring Boot 환경에서는 앞서 설정한 `KafkaTemplate`을 이용하여 매
160184
> CouponIssueProducer.java
161185
162186
```java
187+
/**
188+
* CouponIssueProducer
189+
* <p>
190+
* 쿠폰 발급 요청을 Kafka 토픽("coupon_issue")으로 비동기 전송합니다.
191+
* 이 메시지는 coupon-consumer 모듈의 CouponIssueConsumer 가 수신하여 DB에 저장합니다.
192+
*/
163193
@Component
164194
public class CouponIssueProducer {
165195

166-
private final KafkaTemplate<String, Object> kafkaTemplate;
167-
168-
public CouponIssueProducer(final KafkaTemplate<String, Object> kafkaTemplate) {
169-
this.kafkaTemplate = kafkaTemplate;
170-
}
171-
172-
public void issue(final UUID userId, final UUID couponId) {
173-
// 1. 카프카로 보낼 데이터를 담은 객체(Payload) 생성
174-
CouponIssueMessage payload = new CouponIssueMessage(userId, couponId);
175-
176-
// 2. 토픽, 헤더 정보 등을 포함한 Message 객체 생성
177-
Message<CouponIssueMessage> message = MessageBuilder
178-
.withPayload(payload)
179-
.setHeader(KafkaHeaders.TOPIC, "coupon_issue")
180-
.setHeader("globalTraceId", MDC.get("globalTraceId")) // 추적 ID
181-
.build();
182-
183-
// 3. KafkaTemplate을 이용해 메시지 비동기 전송
184-
kafkaTemplate.send(message);
185-
}
196+
private final KafkaTemplate<String, Object> kafkaTemplate;
197+
private static final String GLOBAL_TRACE_ID_HEADER = "globalTraceId";
198+
private static final Logger log = LoggerFactory.getLogger(CouponIssueProducer.class);
199+
200+
public CouponIssueProducer(final KafkaTemplate<String, Object> kafkaTemplate) {
201+
this.kafkaTemplate = kafkaTemplate;
202+
}
203+
204+
public void issue(final UUID userId, final UUID couponId) {
205+
// 1. 카프카로 보낼 데이터를 담은 객체(Payload) 생성
206+
CouponIssueMessage payload = new CouponIssueMessage(userId, couponId);
207+
String globalTraceId = MDC.get(GLOBAL_TRACE_ID_HEADER); // MDC에서 추적 ID 가져오기
208+
209+
// 2. 토픽, 헤더 정보 등을 포함한 Message 객체 생성
210+
Message<CouponIssueMessage> message = MessageBuilder
211+
.withPayload(payload)
212+
.setHeader(KafkaHeaders.TOPIC, "coupon_issue")
213+
.setHeader(GLOBAL_TRACE_ID_HEADER, globalTraceId) // 추적 ID
214+
.build();
215+
216+
// 3. KafkaTemplate을 이용해 메시지 비동기 전송 및 결과 처리
217+
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
218+
future.whenComplete((result, exception) -> {
219+
if (exception == null) {
220+
log.info("메시지 전송 성공. GlobalTraceId: {}, Topic: {}, Partition: {}, Offset: {}, Payload: {}",
221+
globalTraceId,
222+
result.getRecordMetadata().topic(),
223+
result.getRecordMetadata().partition(),
224+
result.getRecordMetadata().offset(),
225+
payload);
226+
} else {
227+
log.error("메시지 전송 실패 GlobalTraceId: {}, Payload: {}",
228+
globalTraceId,
229+
payload,
230+
exception);
231+
// 여기에 실패 시 필요한 추가 로직을 구현 (e.g., 재시도, 알림, DB에 실패 기록 등)
232+
}
233+
});
234+
}
186235
}
187236
```
188237

@@ -194,7 +243,9 @@ public class CouponIssueProducer {
194243

195244
2. `Message` 객체 빌드: `MessageBuilder`를 이용해 페이로드와 함께 전송할 토픽, 추적 ID 등 메타데이터를 담은 `Message` 객체를 생성한다.
196245

197-
3. 비동기 전송: `kafkaTemplate.send()`를 호출하여 카프카로 메시지를 최종 발행한다.
246+
3. 비동기 전송: `kafkaTemplate.send()`를 호출하여 카프카로 메시지를 최종 발행한다.
247+
이때 반환되는 `CompletableFuture`를 사용하여 메시지 전송의 성공/실패 여부를 `whenComplete` 콜백을 통해 비동기적으로 처리한다.
248+
이를 통해 애플리케이션의 블로킹 없이 전송 결과를 로깅하거나 추가적인 실패 처리 로직을 구현할 수 있다.
198249

199250

200251
---
@@ -311,7 +362,7 @@ public class CouponIssueScheduler {
311362

312363
위 그림은 `send()` 호출부터 최종 응답까지의 과정을 보여준다. 이 과정에서 중요한 타임아웃 설정은 다음과 같다.
313364

314-
* [1] `send()` 호출과 블로킹 (`max.block.ms`): 메시지 전송은 `send()` 호출로 시작된다. 만약 프로듀서 내부 버퍼가 가득 차 있다면, 버퍼에 공간이 생길 때까지 `send()` 호출이 **블록(block)**되는데, 이때의 최대 대기 시간이 `max.block.ms`다.
365+
* [1] `send()` 호출과 블로킹 (`max.block.ms`): 메시지 전송은 `send()` 호출로 시작된다. 만약 프로듀서 내부 버퍼가 가득 차 있다면, 버퍼에 공간이 생길 때까지 `send()` 호출이 **블록(block)** 되는데, 이때의 최대 대기 시간이 `max.block.ms`다.
315366

316367
* [2] 배치 대기 (`linger.ms`): 버퍼에 들어간 메시지는 전송될 배치를 이루기 위해 잠시 대기한다. 이 대기 시간이 바로 `linger.ms`다.
317368

@@ -321,7 +372,8 @@ public class CouponIssueScheduler {
321372

322373
* [5] 최종 마감 (`delivery.timeout.ms`): 가장 중요한 마스터 타임아웃이다. 위 모든 과정(배치 대기, 전송, 재시도)은 `delivery.timeout.ms`라는 최종 마감 시간 내에 완료되어야 한다. 이 시간을 초과하면 프로듀서는 최종적으로 메시지 전송 실패를 반환한다. 그림에 나온 공식(`delivery.timeout.ms >= linger.ms + request.timeout.ms`)이 이 관계를 잘 보여준다.
323374

324-
(책에서는 `retries``retry.backoff.ms`를 직접 세세하게 조정하기보다, **클러스터가 장애로부터 복구되는 데 걸리는 시간을 고려하여 `delivery.timeout.ms`를 충분히 길게 설정하는 것을 권장한다.** `delivery.timeout.ms`만 잘 설정해두면, 그 시간 안에서 프로듀서가 알아서 최적의 재시도를 수행하기 때문이다.)
375+
(책에서는 `retries``retry.backoff.ms`를 직접 세세하게 조정하기보다, **클러스터가 장애로부터 복구되는 데 걸리는 시간을 고려하여 `delivery.timeout.ms`를 충분히 길게 설정하는 것을 권장한다.**
376+
`delivery.timeout.ms`만 잘 설정해두면, 그 시간 안에서 프로듀서가 알아서 최적의 재시도를 수행하기 때문이다.)
325377

326378
### 4. 데이터 포맷 설정 (Serializer)
327379

@@ -346,3 +398,68 @@ public class CouponIssueScheduler {
346398
* 스키마 레지스트리: 보통 스키마를 중앙에서 관리하는 **스키마 레지스트리(Schema Registry)** 와 함께 사용하여, 모든 프로듀서와 컨슈머가 스키마 정보를 공유하고 데이터 호환성을 보장받는다.
347399

348400
결론적으로, 안정적인 데이터 파이프라인을 구축하기 위해서는 직접 시리얼라이저를 구현하기보다 Avro와 스키마 레지스트리를 도입하는 것이 훨씬 좋은 선택이다
401+
402+
403+
## 프로듀서 설정 예시 (세부 옵션 추가)
404+
405+
프로듀서의 신뢰성, 성능, 그리고 재시도 메커니즘을 상세하게 제어하기 위해 필수 옵션 외에 다음과 같은 설정을 추가할 수 있다.
406+
407+
이러한 설정들은 프로듀서의 전반적인 동작에 큰 영향을 미치므로, 애플리케이션의 요구사항과 카프카 클러스터의 환경에 맞춰 신중하게 조정해야 한다.
408+
409+
> KafkaProducerConfig.java (세부 옵션 포함)
410+
411+
```java
412+
@Configuration
413+
public class KafkaProducerConfig {
414+
415+
@Bean
416+
public ProducerFactory<String, Object> producerFactory() {
417+
Map<String, Object> config = new HashMap<>();
418+
419+
// 필수 옵션
420+
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
421+
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
422+
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
423+
424+
// 데이터 전송 신뢰도 설정: 모든 ISR에 복제될 때까지 대기
425+
// acks=all: 가장 높은 신뢰성을 보장하며, 메시지 유실을 방지한다.
426+
config.put(ProducerConfig.ACKS_CONFIG, "all");
427+
428+
// 재시도 횟수: 일시적인 네트워크 문제나 브로커 장애 시 메시지를 재전송할 최대 횟수이다.
429+
config.put(ProducerConfig.RETRIES_CONFIG, 3);
430+
431+
// 재시도 사이의 대기 시간
432+
// 재시도 간 지연 시간을 주어 브로커가 복구되거나 부하가 줄어들 시간을 벌어준다.
433+
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 1초 대기
434+
435+
// 프로듀서가 전송을 시도하는 총 시간 (재시도 포함)
436+
// 메시지가 전송 대기열에 추가된 순간부터 브로커로부터 최종 응답을 받거나 실패할 때까지의
437+
// 최대 시간이다. 이 시간 안에 전송이 완료되지 않으면 실패로 간주된다.
438+
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 120초 (2분)
439+
440+
// 멱등성 활성화 (Exactly-Once Semantics 에 가까운 효과)
441+
// enable.idempotence=true 설정 시, 카프카는 내부적으로 acks=all, retries=Integer.MAX_VALUE,
442+
// max.in.flight.requests.per.connection=5 이하를 강제하여 메시지 중복을 방지하고 높은 신뢰도를 보장한다.
443+
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
444+
445+
// 배치 처리 설정: 메시지 전송 효율을 높이기 위해 여러 메시지를 모아 배치로 전송한다.
446+
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB
447+
config.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms 대기
448+
449+
// 압축 타입 설정: 네트워크 대역폭과 저장 공간을 절약하기 위해 메시지 압축을 사용한다.
450+
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
451+
452+
return new DefaultKafkaProducerFactory<>(config);
453+
}
454+
455+
// 카프카 토픽에 데이터를 전송하기 위해 사용할 Kafka Template 을 생성
456+
@Bean
457+
public KafkaTemplate<String, Object> kafkaTemplate() {
458+
return new KafkaTemplate<>(producerFactory());
459+
}
460+
}
461+
```
462+
463+
위 예시 코드는 프로듀서의 필수 설정과 함께, 메시지 전송의 신뢰성, 성능, 그리고 타임아웃 및 재시도와 관련된 세부 옵션들을 어떻게 설정하는지 보여준다.
464+
465+
이 설정들은 앞서 "프로듀서 설정하기" 섹션에서 설명한 내용과 직접적으로 연관된다.

0 commit comments

Comments
 (0)