일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- Java
- 전역에러처리
- Kafka
- 25186
- node.js
- macrotask
- eventLoop
- 귀납적증명
- nextTick
- graceful shutdown
- 20309
- 알고리즘
- Bitwise AND
- 23560
- PS
- 1781
- node-cron
- hash
- firebase functions deploy limit
- promise.race
- 백준
- ad-hoc
- Docer
- 23289
- microtask
- 코드리뷰를꼼꼼히하자
- 파라매틱서치
- firebase functions
- 25635
- BOJ
- Today
- Total
웰제오의 개발 블로그
Node.js 환경에서의 Kafka Consumer Heartbeat 관련 트러블 슈팅 본문
카프카의 group coordinator 는 현재 그룹에 속해있는 컨슈머들의 상태가 유효한지 계속 확인하며, 리밸런싱을 통해 그룹내에 속한 모든 컨슈머들이 유효성을 보장한다.
만약 카프카가 컨슈머들이 살았는지 죽었는지 확인을 안한다면, 죽어있는 컨슈머가 group 파티션을 계속 잡고있게 되며, 해당 파티션의 record 들이 consume 되지 않는 상황이 발생하므로,
카프카는 주기적으로 컨슈머의 생존여부를 확인해, 죽었다고 판단되는 컨슈머를 group 에서 쫓아내고,
리밸런싱을 통해 이전 컨슈머에게 할당된 파티션을 그룹내의 다른 컨슈머에게 할당해주어야 한다.
카프카는 컨슈머가 죽었다고 판단하는 경우가 두가지가 있는데 (connections.max.idle.ms 설정도 존재하지만 제외하고 얘기하겠다)
- 이전 heartbeat 전송 후 session.timeout.ms 값이 지나도 다음 heartbeat 가 전송되지 않을 경우
- 마지막 record consume 이후로 max.poll.interval.ms 만큼의 시간이 지났을 때
의 경우 카프카는 컨슈머가 죽었다고 판단해, group 에서 제외시킨다.
이번 글에서는, 필자가 겪었던 이슈를 통해, 싱글스레드로 동작하는 node.js 에서 heartbeat 를 전송하는 방법과, 겪었던 문제의 해결책에 대해 공유하려고 한다.
이슈 상황
기존의 컨슈머는 batch 로 record 를 받아와 작업을 수행하게 구성되어 있었는데,
컨슈머에서 일부 로직들을 추가해 dev 환경에서 테스트를 진행해보니, 몇몇 컨슈머가 group 에서 제외 되면서 리밸런싱이 자주 일어나게 되는 이슈를 경험했다.
group 의 리밸런싱이 일어 날 경우, stop-the-world 로 인해 group 의 전체 컨슈머가 일시적으로 record 를 consume 하지 못하게 되므로, 리밸런싱이 일어나는 횟수를 고려할 때 이는 수정이 필요한 작업이라 판단하고 원인 파악에 들어갔다.
추가된 로직이 request 를 날리는 서버가 불안정한 상태였고,
지수 백오프를 통해 request fail 상황에 대응하는 consumer 의 batch record 처리 로직으로 인해 작업이 지연되었고,
이로 인해 batch 작업에 소요되는 시간이, kafka consumer 의 default session.timeout.ms 설정값인 3초를 넘어가면서, 몇몇 컨슈머가 group 에서 evict 되는것이 원인으로 밝혀졌다.
맘 같아선 dependent 한 서버를 바꾸고 싶었지만, 기존에 호출하던 API 말고는 필요한 데이터 조달이 불가능한 상황이었다.
따라서 batch record 처리가 3초를 넘어가도, group coordinator 가 해당 컨슈머를 group 에서 제외시키지 않게 문제를 해결해야 했다
session.timeout.ms 의 증가
우선 위 문제는 정말 간단하게 session.timeout.ms 설정값을 늘림으로서 해결할 수 있다.
하지만 필자는 위 방법을 추천하지는 않는다.
물론 처리하는 데이터의 셋과 용량에 따라 다르겠지만, 일반적인 상황에서 카프카는 대용량 처리에 쓰이는 점을 고려한다면,
session.timeout.ms 을 늘림으로 인해서 error detect 가 늦어지는 만큼, 발생할 수 있는 문제도 더 커질것이다.
따라서 해당 설정 변경 없이 문제를 해결할 수 있는 방법에 대해 알아보자.
Node.js Client 의 명시적인 heartbeat 전송
기존에 Java 로 Kafka consumer 를 운영하는 사람들 (아마 Kafka consumer 를 운용하는 대부분의 사람들이 Java 를 쓰지 않을까 싶다) 이 위의 문제를 보면 의아해 할것이다.
batch record 처리가 오래 걸리는거랑 heartbeat 랑 뭔 상관이지...?
애석하게도, 싱글스레드로 동작하는 Node.js 에서는 상관이 있다.
Java Kafka Client 에는 heartbeat 스레드가 별도로 존재한다. heartbeat.interval.ms 값 설정을 통해, 백그라운드에서 실행되는 heartbeat 스레드가 얼마간의 주기로 heartbeat 를 전송하는지 설정할 수 있는데,
Kafka 는 0.10.1 버전이 릴리즈되기 전까지 session 의 timeout 으로만 컨슈머의 유효여부를 확인했었다.
위 티켓의 링크
하지만 record 의 처리 시간이 session timeout 보다 길어지면서 컨슈머가 group 에서 제외되는 경우가 발생했고,
이 경우 사용자는 컨슈머가 group 에서 제외된 원인의 파악이 힘들다는 문제와 비롯해, 위에서 언급했던 단순 session timeout 증가를 통한 이슈 해결이, 다른 추가 문제점을 야기하게 된다는 상황에 직면하게 되었다.
이를 해결하기 위해, heartbeat 를 통한 session timeout 확인과 max.poll.interval.ms 를 통한 polling 주기 확인이 분리된 것 이다.
문제는 Node.js 컨슈머에서는 위의 해결책이 도움이 되지 않는다
위의 해결책은 Java 에서 별도의 heartbeat 스레드를 통해, record 처리 도중에도 heartbeat 전송이 가능했지만,
싱글스레드로 동작하는 Node.js 에서는 Java 와 같이 heatbeat 의 전송만을 수행하는 스레드를 따로 둘 수 없어 위 방법을 통한 문제의 해결이 불가능 하다.
즉, polling 한 record 의 처리 시간이 session timeout 보다 길어지면서 컨슈머가 group 에서 제외되는 문제가 Node.js 에서는 여전히 남아있는 것으로,
이를 해결하기 위해 Kafkajs (Node.js Kafka client library) 는 내부적으로
- record 를 polling 하는 메소드 종료 직전에 heartbeat 를 전송하고
- setInterval 메소드를 통해 지정한 heartbeat.interval.ms 주기로 heartbeat 를 전송
하게 했다.
하지만 필자의 이슈상황에서 알 수 있듯이, 위 두개의 해결책 만으로는 컨슈머 로직을 작성하는데 있어서 Java 만큼 heartbeat 에서 자유로울 수 없다.
1 번 해결책은 record 의 처리가 오래걸리면 소용이 없고,
2 번 해결책은 필자의 상황같이 event loop 의 poll phase 에서 I/O 작업 완료에 따른 콜백함수가 지속적으로 queueing 될 때, timer phase 로 넘어가지 못해 setInterval 메소드의 콜백함수가 실행되지 못한다
이를 해결하기 위해 Kafkajs 에서는 명시적인 heartbeat 전송을 지원한다.
위의 Kafkajs 공식문서를 보다싶이, eachBatch 에 있어서 사용되는 메소드의 콜백함수로 heartbeat 를 넘겨주는것을 볼 수 있다.
이를 응용해 필자는 heartbeat 메소드를, API 요청 이전, 지수백오프로 인한 retry 실행 이전에 명시적으로 호출함으로서 문제를 해결했다.
const fetchData = async () => {...} // API request
const processMessage = async (message, heartbeat) => {
let retryCount = 0;
// 지수 백오프. 최대 3회
while (retryCount < 3) {
try {
await sleep(retryCount * 1000);
await fetchData();
break;
} catch (err: any) {
await heartbeat(); // request 실패 시 heartbeat 전송
retryCount++;
}
}
if (retryCount >= 3) {
throw new Error("...");
}
}
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat
}) => {
for (let message of batch.messages) {
await processMessage(message, heartbeat);
resolveOffset(message.offset)
}
},
})
Java 처럼 백그라운드에서 heartbeat 전송할 수 있을까?
필자가 경험한 바로는, heartbeat 를 명시적으로 호출하는것이 직접적인 문제 해결과 더불어 추가로 발생할 수 있는 이슈를 예방하는 최선의 선택이라 생각한다
그래도 위와 같은 명시적인 호출이 cool 하지 않다고 생각한다면, 이에 대해 고민해볼만한 가치는 있을 것이다.
...하지만 필자는 아직 방법을 찾지 못했다.
Kafkajs 를 활용한 컨슈머의 record 처리 사이클에서, 문제가 될만한 지점은 record 를 처리하는 도중인데,
아까도 언급했다 싶이, event loop 의 poll phase 에서 I/O 작업 완료에 따른 콜백함수가 지속적으로 queueing 될 때, timer phase 로 넘어가지 못해 setInterval 메소드의 콜백함수가 실행되지 못한다는 문제점이 그 이유이다.
비단 heartbeat 뿐만 아니라, Node.js 환경에서의 반복작업에 대해서는 필자가 작성한 글을 읽어보면 좋을 것 같다.
필자가 마주했던 이슈를 해결하기 위해, Java Kafka 클라이언트를 뜯어보며 많은 공부가 되었다.
무엇보다 session.timeout 과 max.poll.interval 설정이, 과거 Java 진영에서 필자가 마주했던 문제를 해결하기 위해 분사되었다는 점 또한 흥미로웠다
Node.js 런타임 설계방식으로 인해 Java 진영에서의 해결책이 도움이 되지는 못했지만,
Node.js Kafka 클라이언트 라이브러리 제작자가 해당 문제를 염두해 두고 heartbeat 를 명시적으로 전송할 수 있게 해둔 배경에 대해 고민하며, 싱글스레드 환경에서의 health check 를 위한 하나의 방법론에 대해 배워갈 수 있었던, 의미있는 트러블 슈팅이었다.
'개발' 카테고리의 다른 글
EntityManagerFactoryBuilder could not be found 에러 원인과 해결 (2) | 2023.10.09 |
---|---|
[Node.js] Thread Hang 을 야기할 수 있는 작업의 핸들링 (Promise.race, Worker Thread) (1) | 2022.10.27 |
배포 프로세스 최적화를 통한 GCP functions deploy limit 해결 (0) | 2022.10.22 |
AWS ECS graceful shutdown 설정 및 트러블 슈팅 (1) | 2022.10.21 |
Node.js 환경에서의 반복작업 수행 (0) | 2022.10.19 |