|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +title: " [카프카 핵심 가이드] CHAPTER 4. 카프카 컨슈머: 컨슈머 설정 및 오프셋과 커밋 " |
| 4 | +categories: [Kafka] |
| 5 | +author: devFancy |
| 6 | +--- |
| 7 | +* content |
| 8 | +{:toc} |
| 9 | + |
| 10 | +> 이 글은 [카프카 핵심 가이드](https://product.kyobobook.co.kr/detail/S000201464167?utm_source=google&utm_medium=cpc&utm_campaign=googleSearch&gad_source=1) 책을 읽고 정리한 글입니다. |
| 11 | +> |
| 12 | +> 이 글에서 다루는 모든 코드는 [깃허브](https://github.com/devFancy/springboot-coupon-system)에서 확인하실 수 있습니다. |
| 13 | +> |
| 14 | +> `참고`: 본 글에서 소개되는 코드 예시는 현재 시점의 구현을 바탕으로 작성되었으며, 프로젝트가 발전함에 따라 내용이 변경되거나 개선될 수 있음을 미리 알려드립니다. |
| 15 | +
|
| 16 | + |
| 17 | +## Prologue |
| 18 | + |
| 19 | +"카프카 핵심 가이드" 4장 내용을 바탕으로 [이전 글](https://devfancy.github.io/Kafka-Consumer/) 에 이어 컨슈머 설정 및 오프셋과 커밋에 대해 내용을 정리했다. |
| 20 | + |
| 21 | +이론적인 개념 설명과 더불어, 실제 Spring Kafka 환경에서 어떻게 컨슈머를 구현하고 사용하는지 개인 프로젝트 코드를 통해 함께 살펴본다. |
| 22 | + |
| 23 | + |
| 24 | +--- |
| 25 | + |
| 26 | +## 컨슈머 설정하기 |
| 27 | + |
| 28 | +컨슈머의 성능과 가용성에 영향을 주는 몇몇 중요한 설정들이 있다. 아래는 여러 설정들 중에 중요한 속성 위주로 정리했다. |
| 29 | + |
| 30 | +> 데이터 가져오기 |
| 31 | +
|
| 32 | +컨슈머가 브로커로부터 데이터를 얼마나, 어떻게 가져올지를 결정하는 설정이다. 처리량(Throughput)과 지연 시간(Latency) 사이의 균형을 맞추는 것이 중요하다. |
| 33 | + |
| 34 | +### fetch.min.bytes |
| 35 | + |
| 36 | +컨슈머가 브로커로부터 데이터를 가져올 때, 최소한 이 크기(byte) 이상의 데이터가 모였을 때만 응답하도록 요청하는 설정이다. (기본값: 1) |
| 37 | + |
| 38 | +토픽에 메시지가 드문드문 들어오는 상황에서, 이 값을 적절히 높이면 브로커는 데이터가 충분히 쌓일 때까지 기다렸다가 한 번에 보내주므로 양쪽의 부하를 줄일 수 있다. |
| 39 | + |
| 40 | +단, 이 값을 높이면 응답을 기다리는 시간 때문에 지연 시간은 늘어날 수 있다. |
| 41 | + |
| 42 | +### fetch.max.wait.ms |
| 43 | + |
| 44 | +`fetch.min.bytes`에 설정된 데이터가 모일 때까지 브로커가 **최대 얼마나 기다릴지**를 결정한다. (기본값: 500ms) |
| 45 | + |
| 46 | +카프카는 `fetch.min.bytes`에 도달하거나, `fetch.max.wait.ms` 시간이 초과되면 둘 중 먼저 만족하는 조건에 따라 컨슈머에게 데이터를 응답한다. |
| 47 | + |
| 48 | +이 설정은 컨슈머의 최대 지연 시간을 제어하는 데 사용된다. |
| 49 | + |
| 50 | + |
| 51 | +### max.poll.records |
| 52 | + |
| 53 | +`poll()` 메서드를 한 번 호출할 때 가져오는 **최대 메시지 개수**를 지정한다. (기본값: 500) |
| 54 | + |
| 55 | +가져온 메시지를 하나씩 처리하는 로직이 매우 오래 걸린다면, 이 값을 작게 조절하여 `poll()` 루프의 실행 시간을 제어할 수 있다. |
| 56 | + |
| 57 | +이는 아래에서 설명할 `max.poll.interval.ms`와 밀접한 관련이 있다. |
| 58 | + |
| 59 | + |
| 60 | +> 그룹 안정성과 상태 관리 |
| 61 | +
|
| 62 | +컨슈머가 그룹 내에서 '살아있음'을 알리고, 비정상적인 상황에서 어떻게 동작할지를 결정하는 매우 중요한 설정이다. |
| 63 | + |
| 64 | +### session.timeout.ms |
| 65 | + |
| 66 | +컨슈머가 브로커(그룹 코디네이터)에게 하트비트(생존 신호)를 보내지 않고 버틸 수 있는 최대 시간이다. (기본값: 10,000ms = 10초) |
| 67 | + |
| 68 | +만약 이 시간 동안 하트비트가 없으면, 코디네이터는 해당 컨슈머에 장애가 발생했다고 판단하고 그룹에서 제외시킨 뒤 **리밸런싱**을 시작한다. |
| 69 | + |
| 70 | +### heartbeat.interval.ms |
| 71 | + |
| 72 | +컨슈머가 얼마나 자주 하트비트를 보낼지 결정하는 주기이다. (기본값: 3,000ms = 3초) |
| 73 | + |
| 74 | +이 값은 반드시 `session.timeout.ms`보다 낮아야 하며, 보통 **1/3 수준으로 설정**하는 것이 일반적인 규칙이다. |
| 75 | + |
| 76 | +### max.poll.interval.ms |
| 77 | + |
| 78 | +`poll()` 호출 사이의 최대 시간 간격이다. (기본값: 300,000ms = 5분) |
| 79 | + |
| 80 | +하트비트는 백그라운드 스레드에서 보내므로, 메시지를 처리하는 메인 스레드가 멈춰도 하트비트는 계속 전송될 수 있다. |
| 81 | + |
| 82 | +이 설정은 이런 '좀비' 상태를 방지하기 위한 안전장치다. `poll()`을 호출한 후 다음 `poll()`이 이 시간 안에 호출되지 않으면, 컨슈머는 그룹에서 이탈하고 리밸런싱이 발생한다. |
| 83 | + |
| 84 | +메시지 처리 로직이 **5분 이상** 걸릴 가능성이 있다면 이 값을 반드시 늘려야 한다. |
| 85 | + |
| 86 | +> `session.timeout.ms` 과 `max.poll.interval.ms` 의 차이점은? |
| 87 | +
|
| 88 | +* `session.timeout.ms`은 컨슈머 **프로세스 자체**가 살아있는지(Liveness) 감시하는 역할이고, |
| 89 | + |
| 90 | +* `max.poll.interval.ms`은 컨슈머의 **메시지 처리 로직**이 일을 하고 있는지(Progress) 감시하는 역할이다. |
| 91 | + |
| 92 | +> 파티션 할당 전략 |
| 93 | +
|
| 94 | +리밸런싱이 발생했을 때, 그룹 내 컨슈머들에게 파티션을 어떻게 분배할지 결정하는 알고리즘이다. `partition.assignment.strategy`로 설정한다. |
| 95 | + |
| 96 | +* `Range` (기본값): 토픽별로 파티션을 정렬한 뒤 컨슈머에게 연속된 범위를 할당한다. 파티션 개수가 컨슈머 수로 나누어 떨어지지 않으면 특정 컨슈머에 할당이 몰릴 수 있다. |
| 97 | + |
| 98 | +* `RoundRobin`: 모든 파티션을 모아 컨슈머들에게 하나씩 순서대로(Round-Robin) 할당한다. 비교적 균등한 분배를 보장한다. |
| 99 | + |
| 100 | +* `Sticky`: 최대한 균등하게 분배하면서, 리밸런싱 시 **파티션 이동을 최소화**하여 안정성을 높인다. |
| 101 | + |
| 102 | +* `CooperativeSticky`: `Sticky` 전략의 개선판으로, **협력적 리밸런싱**을 지원한다. 리밸런싱 중에도 재할당 대상이 아닌 컨슈머들은 **작업을 멈추지 않아** 서비스 중단을 최소화하는 가장 진보된 방식이다. |
| 103 | + |
| 104 | + |
| 105 | +## 오프셋과 커밋 |
| 106 | + |
| 107 | +`poll()`을 호출하면 컨슈머 그룹이 아직 읽지 않은 레코드를 리턴한다. 즉, 컨슈머는 파티션 내에서 어디까지 읽었는지 위치를 추적해야 한다. |
| 108 | + |
| 109 | +카프카에서는 **파티션의 현재 위치를 업데이트하는 작업**을 `오프셋 커밋`(Offset Commit)이라고 부른다. |
| 110 | + |
| 111 | +컨슈머는 자신이 성공적으로 처리한 마지막 메시지의 오프셋을 커밋하고, 이 정보는 `__consumer_offsets`라는 토픽에 저장된다. |
| 112 | + |
| 113 | +리밸런스가 발생하면, 컨슈머는 이 커밋된 오프셋을 읽어와서 작업이 중단된 지점부터 처리를 재개한다. |
| 114 | + |
| 115 | +이 오프셋 관리는 매우 중요하다. 만약 오프셋 커밋이 실제 처리 시점과 맞지 않으면 데이터 중복이나 유실이 발생할 수 있다. |
| 116 | + |
| 117 | +* 메시지 중복: 실제 처리한 메시지보다 **이전 오프셋**이 커밋된 상태에서 장애가 발생하면, 리밸런스 후 다음 컨슈머가 이전 오프셋부터 다시 읽기 시작하여 메시지가 중복 처리된다. |
| 118 | + |
| 119 | +* 메시지 유실: 실제 처리한 메시지보다 **이후 오프셋**이 커밋된 상태에서 장애가 발생하면, 리밸런스 후 다음 컨슈머가 커밋된 오프셋부터 읽기 시작하여 중간 메시지들이 유실된다. |
| 120 | + |
| 121 | + |
| 122 | +### auto.offset.reset |
| 123 | + |
| 124 | +컨슈머 그룹이 특정 파티션에 대해 저장된 오프셋 없이 처음으로 읽기를 시작하거나, 기존 오프셋이 더 이상 유효하지 않을 때 어떻게 동작할지를 결정한다. |
| 125 | + |
| 126 | +* `latest` (기본값): 가장 최신 메시지, 즉 컨슈머가 실행된 **이후**에 들어오는 메시지부터 읽기 시작한다. |
| 127 | + |
| 128 | +* `earliest`: 파티션에 저장된 **가장 오래된** 메시지부터 읽기 시작한다. (모든 데이터를 처음부터 처리) |
| 129 | + |
| 130 | +* `none`: 유효한 오프셋이 없으면 예외(Exception)를 발생시킨다. |
| 131 | + |
| 132 | + |
| 133 | +### enable.auto.commit |
| 134 | + |
| 135 | +`poll()` 메서드가 주기적으로 오프셋을 자동으로 커밋할지 여부를 결정한다. (기본값: true) |
| 136 | + |
| 137 | +* `true` (자동 커밋): 가장 쉬운 방법이지만, **메시지 유실 가능성**이 있다. |
| 138 | + |
| 139 | + * `auto.commit.interval.ms`(기본 5초)마다 `poll()`로 가져온 최신 오프셋을 커밋한다. 컨슈머는 메시지가 실제로 처리되었는지 알지 못한다. |
| 140 | + |
| 141 | + * 따라서 메시지 처리 중에 컨슈머가 다운되면 오프셋은 이미 커밋되었을 수 있어 해당 메시지는 유실된다. |
| 142 | + |
| 143 | +* `false` (수동 커밋): 개발자가 직접 커밋 시점을 제어하여 신뢰성을 높인다. |
| 144 | + |
| 145 | + * `commitSync()`나 `commitAsync()` 같은 API를 사용하여, 메시지 처리가 성공적으로 완료되었음을 보장한 후에만 오프셋을 커밋한다. |
| 146 | + |
| 147 | + * 이는 메시지 유실을 방지하지만, 처리 로직과 커밋 사이에 장애가 발생하면 중복이 발생할 수 있다 (유실보다는 중복이 더 안전한 선택일 때가 많다). |
| 148 | + |
| 149 | + |
| 150 | +## 실제 프로젝트에 적용하기: 쿠폰 시스템 Consumer 설정 |
| 151 | + |
| 152 | +지금까지 카프카 컨슈머의 여러 이론적인 설정과 개념에 대해 알아보았다. |
| 153 | + |
| 154 | +이제 실제 스프링 부트(Spring Boot) 환경의 쿠폰 시스템 프로젝트에서 이러한 개념들이 어떻게 코드로 구현되었는지 `KafkaConsumerConfig` 클래스를 통해 종합적으로 살펴보자. |
| 155 | + |
| 156 | +> KafkaConsumerConfig.java |
| 157 | +
|
| 158 | +```java |
| 159 | +@Configuration |
| 160 | +public class KafkaConsumerConfig { |
| 161 | + |
| 162 | + @Value("${spring.kafka.consumer.bootstrap-servers}") |
| 163 | + private String bootstrapServers; |
| 164 | + |
| 165 | + @Bean |
| 166 | + public ConsumerFactory<String, CouponIssueMessage> consumerFactory() { |
| 167 | + // ... JsonDeserializer 설정 ... |
| 168 | + |
| 169 | + Map<String, Object> config = new HashMap<>(); |
| 170 | + // 필수 설정 |
| 171 | + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| 172 | + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 173 | + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); |
| 174 | + config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1"); |
| 175 | + |
| 176 | + // 1. 신뢰성: 오프셋 수동 커밋 설정 |
| 177 | + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); |
| 178 | + |
| 179 | + // 2. 성능: 한 번의 poll() 호출로 가져올 최대 레코드 수를 지정 |
| 180 | + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); |
| 181 | + |
| 182 | + // 3. 안정성: poll() 간 최대 시간. 메시지 처리 시간보다 넉넉하게 10분으로 설정 |
| 183 | + config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); |
| 184 | + |
| 185 | + // 4. 안정성: 리밸런싱 시 중단을 최소화하는 협력적 할당 전략 사용 |
| 186 | + config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); |
| 187 | + |
| 188 | + return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer); |
| 189 | + } |
| 190 | + |
| 191 | + @Bean |
| 192 | + public ConcurrentKafkaListenerContainerFactory<String, CouponIssueMessage> kafkaListenerContainerFactory( |
| 193 | + ConsumerFactory<String, CouponIssueMessage> consumerFactory, |
| 194 | + RecordInterceptor<String, CouponIssueMessage> mdcRecordInterceptor |
| 195 | + ) { |
| 196 | + ConcurrentKafkaListenerContainerFactory<String, CouponIssueMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| 197 | + factory.setConsumerFactory(consumerFactory); |
| 198 | + |
| 199 | + // 신뢰성: AckMode를 MANUAL로 지정하여 수동 커밋 활성화 |
| 200 | + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); |
| 201 | + |
| 202 | + // 성능: 토픽의 파티션 수와 일치시켜 병렬 처리 성능을 최적화 |
| 203 | + factory.setConcurrency(3); |
| 204 | + |
| 205 | + // 기타: 로깅 및 모니터링 설정 |
| 206 | + factory.getContainerProperties().setObservationEnabled(true); |
| 207 | + factory.setRecordInterceptor(mdcRecordInterceptor); |
| 208 | + return factory; |
| 209 | + } |
| 210 | +} |
| 211 | +``` |
| 212 | + |
| 213 | +위 코드는 신뢰성, 안정성, 성능을 모두 고려하여 다음과 같이 설정했다. |
| 214 | + |
| 215 | +> 신뢰성 (수동 커밋) |
| 216 | +
|
| 217 | +* `ENABLE_AUTO_COMMIT_CONFIG`를 `false`로, `AckMode`를 `MANUAL`로 설정했다. |
| 218 | + |
| 219 | +* 이 조합을 통해 카프카가 자동으로 오프셋을 커밋하는 것을 막고, `@KafkaListener`에서 비즈니스 로직이 완전히 성공했을 때만 `ack.acknowledge()`를 호출하여 직접 오프셋을 커밋할 수 있다. |
| 220 | + |
| 221 | +* 이는 메시지 유실을 방지하는 확실한 방법이다. |
| 222 | + |
| 223 | +> 안정성 (컨슈머 그룹 운영) |
| 224 | +
|
| 225 | +* `MAX_POLL_INTERVAL_MS_CONFIG`를 10분으로 길게 설정하여, 쿠폰 발급과 같이 외부 시스템 연동으로 인해 메시지 처리가 길어지더라도 컨슈머가 '좀비' 상태로 오인받아 그룹에서 쫓겨나는 상황을 방지한다. |
| 226 | + |
| 227 | +* 또한`PARTITION_ASSIGNMENT_STRATEGY_CONFIG`를 `CooperativeStickyAssignor`로 지정했다. |
| 228 | + |
| 229 | + * 이는 컨슈머 인스턴스가 추가되거나 제거될 때 발생하는 리밸런싱 과정에서 전체 컨슈머 그룹의 작업 중단('Stop-the-world')을 막고, |
| 230 | + |
| 231 | + * **변경이 필요한 파티션만 재할당**하여 서비스의 가용성을 극대화하는 좋은 선택이다. |
| 232 | + |
| 233 | +> 성능 (병렬 처리) |
| 234 | +
|
| 235 | +* 컨슈머의 처리량을 높이는 가장 확실한 방법은 병렬 처리를 활용하는 것이다. 코드에서는 `setConcurrency(3)` 설정을 통해 3개의 컨슈머 스레드가 각각 다른 파티션의 메시지를 동시에 처리하도록 했다. |
| 236 | + |
| 237 | +* 이는 마치 은행에 예금 상품 가입 신청서가 하나의 창구에만 쌓여 업무가 지연되자, 은행 지점장이 가입 전용 창구(컨슈머 스레드)를 3개로 늘리는 것과 같다. |
| 238 | + |
| 239 | +* 토픽의 파티션이 3개일 때: 카프카는 3개의 창구에 미리 분류해 둔 신청서 묶음(파티션)을 하나씩 1:1로 배정한다. |
| 240 | + |
| 241 | + * 1번 창구(텔러) → 1번 신청서 묶음 전담 |
| 242 | + |
| 243 | + * 2번 창구(텔러) → 2번 신청서 묶음 전담 |
| 244 | + |
| 245 | + * 3번 창구(텔러) → 3번 신청서 묶음 전담 |
| 246 | + |
| 247 | +* 결과, 3개의 창구에서 동시에 가입 신청(메시지)이 처리되므로, 이론적으로 전체 처리 속도가 3배로 향상된다. 이처럼 최대 성능을 내기 위해서는 보통 `Concurrency` 값을 파티션 개수와 동일하게 맞춘다. |
| 248 | + |
| 249 | +* 여기서 `MAX_POLL_RECORDS_CONFIG`는 한 번에 처리할 작업의 양을 조절하는 역할을 한다. |
| 250 | + |
| 251 | +## References |
| 252 | + |
| 253 | +* [카프카 핵심 가이드](https://product.kyobobook.co.kr/detail/S000201464167?utm_source=google&utm_medium=cpc&utm_campaign=googleSearch&gad_source=1) |
0 commit comments