개요
컨슈머의 주요 옵션 값을 살펴보고 그에 따른 관계를 파악한다. (내가 헷갈리는 것 위주로)
session.timeout.ms
- 컨슈머와 브로커 사이의 세션 타임아웃 시간
- 브로커는 session.timeout.ms 를 통해서 컨슈머가 생존여부를 파악한다.
(컨슈머가 주기적으로 브로커에게 하트비트를 보낸다.) - 만약 컨슈머가 그룹 코디네이터에게 heartbeat 를 보내지 않은 상태에서 session.timeout.ms 가 지나버리면, 코디네이터는 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머를 컨슈머 그룹에서 제외시켜버린다. 그리고 컨슈머 그룹은 리밸런싱을 수행한다.
- session.timeout.ms 를 기본값보다 낮게 설정 시, 실패를 빠르게 감지할 수 있다. 그러나 가비지 컬렉션이나 poll loop 를 완료하는 시간이 길어지게 되면 원하지 않게 리밸런싱이 일어날 수 있다.
- session.timeout.ms 를 기본값보다 높게 설정 시, 리밸런싱이 일어날 확률이 적어지지만, 오류를 감지하는데 시간이 오래 걸릴 수 있다.
heartbeat.interval.ms
- 그룹 코디네이터에게 얼마나 자주 kafka consumer poll() 메소드로 heartbeat 를 보낼 것인지 조정한다.
- 일반적으로 session.timeout.ms 보다 1/3 정도로 설정한다.
max.poll.interval.ms
- 컨슈머가 지속적으로 heartbeat 만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수 있다.
- 이럴 때 해당 컨슈머가 특정한 파티션을 무한정으로 해당 파티션을 점유할 수 없도록 주기적으로 poll 을 호출하지 않으면 장애라고 판단한다. 이후 컨슈머 그룹에서 제외한 후, 동일 컨슈머 그룹 내의 다른 컨슈머가 해당 파티션에서 메시지를 들고 갈 수 있도록 한다.
그 외 다른 옵션들이 있지만 각설하고, session.timeout.ms 와 max.poll.interval.ms 의 차이를 구분을 어렵다. 두 개의 옵션 모두 해당 컨슈머가 장애라고 판단하는 기준치가 되는 시간이기 때문이다. 그래서 구글링...
👀
우선 하나의 시나리오를 생각하자. 토픽으로 들어온 메시지를 처리하는데 1분의 시간이 걸린다고 가정하자. 이 때, heartbeat & poll 이 강하게 결합하고 있으면, session.timeout.ms 는 1분보다 더 큰 값으로 설정하여야 한다. ( 그렇지 않음면, session.timeout.ms 에 의해 컨슈머가 장애가 났다고 판단이 될 수 있기 때문 ) 그러나 메시지 처리 시, 컨슈머가 죽어버리거나 컨슘이 실패되었다면 해당 오류 감지를 1분 이상의 시간이 소요된다.
-
위의 문제점이 있어서 heartbeat 와 poll 을 분리하여 연속적인 poll 호출에 대해서 중간에 heartbeat 를 보낼 수 있도록 하는 것이다. 두 개의 스레드가 돌고 있는 형태이다.
- heartbeat 전용 스레드 ( heartbeat.interval.ms 와 연관 )
- processing 전용 스레드 ( max.poll.interval.ms 와 연관 )
만약 우리가 session.timeout.ms=30000ms (30초) 로 설정했다고 가정해보자. 컨슈머의 heartbeat 스레드는 브로커에게 heartbeat 를 30초가 만료되기 이전에 보내야 한다.
반면에 하나의 메시지를 처리하는데 1분이 소요되는 processing 스레드가 있다고 가정한다면 max.poll.interval.ms 를 처리시간 1분을 고려해서 그것보다 조금 더 크게 설정해주어야 한다. processing 스레드가 죽는다면 이를 감지하기 위해 max.poll.interval.ms 가 필요하다. 그러나 전체 컨슈머가 죽어버리는 경우도 고려해볼 수 있는데, 이를 감지하는데에는 session.timeout.ms 으로 장애를 판단한다.
👀
결과적으로 max.poll.interval.ms 는 session.timeout.ms 값보다 커야한다. 두 개의 interval.ms 값을 통해서 세션 타임 타임아웃이 되었더라도 충분하게 카프카 메시지의 처리시간을 확보할 수 있는 것이고, 만약에 컨슈머에 문제가 발생하였더라면 그 값을 max.poll.interval.ms 로 사용하여 에러 디텍팅을 가능토록 하는 것이다.
참고링크
- stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10
- [book] 카프카 데이터 플랫폼의 최강자
- cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
- docs.confluent.io/current/installation/configuration/consumer-configs.html#consumer-configurations
'Spring' 카테고리의 다른 글
20111113 [transcation] 스프링 선언적 트랜잭션 (0) | 2020.11.20 |
---|---|
20201025 [spring-cache] cache abstraction (0) | 2020.10.25 |
20200801 [springboot] gradle-multi-module 시, 신경써야할 것 정리. (0) | 2020.08.07 |
20200523 [jpa] 스프링부트 실행 시, 디비 데이터 삽입 및 테스트 코드 상에서 데이터 삽입. (수정 : 2021-08-22) (0) | 2020.07.01 |
20200502 [kafka] springboot-kafka 작성해보기. (0) | 2020.05.05 |