Kafka 핵심 용어
- 프로듀서 Producer : 메세지를 생산, 발송
- 컨슈머 Consumer : 메세지를 소비, 수신
- 브로커 Broker : 프로듀서와 컨슈머 사이에서 메세지를 중개, "host:port" 로 브로커 식별
- 토픽 Topic : consumer가 가져가는 event / message를 보관하고 있는 folder의 개념
- 파티션 Partition : 토픽 하나를 여러 개로 나눈 것, 하나를 여러개로 나누면 분산처리 가능 (나눈 파티션 수만큼 컨슈머 연결 가능)
- 세그먼트 Segment : 각 메세지들은 세그먼트라는 로그 파일의 형태로 브로커의 로컬 디스크에 저장
- 오프셋 Offset : 파티션에 메세지가 저장되는 위치(파티션 마다의 고유한 숫자), 순차적으로 증가하는 숫자 형태로 되어있음, 오프셋을 통해 메세지 순서를 보장하고 consumer에서는 마지막으로 읽은 위치를 알 수도 있음

producer에 의해서 브로커로 전송된 메세지는 토픽의 파티션에 저장
각 메세지들은 세그먼트라는 로그파일의 형태로 브로커의 로컬 디스크에 저장
각 partition 마다 N개의 segment들이 존재
💡 정리해보자면
ES : 노드 > 인덱스 > 샤드 > 세그먼트
Kafka : 브로커 > 토픽 > 파티션 > 세그먼트
➡️ ES의 샤드와 kafka의 파티션에 실제 데이터가 저장되는 것
Bootstrap Server
- 처음 접속하는 서버를 지정해주는 것으로 프로듀서에서 부트스트랩 서버를 지정하게 되어있음
- 초기 커넥션 클러스터를 설정하는 것
// 토픽 생성할 때 옵션주는 방법
kafka-topics
--bootstrap-server kafka:9092 // 해당 옵션을 주면 처음 접속할 때 연결될 브로커가 설정됨(하나 이상 설정가능)
--create
--partitions 1
--replication-factor 1
--topic testing
// 프로그램에서 해당 브로커로 접속할 때의 설정 방법
props.put("bootstrap.servers", "kafka:9092");
// 여러대인 경우
props.put("bootstrap.servers", "kafka1:9092, kafka2:9092, kafka3:9092");
⭐️모든 브로커를 등록해주도록 함
그 이유는?
초기에 kafka1:9092만 연결되어있지만 다운되었다고 가정할 경우 곤란한 상황이 발생
그래서 모든 브로커를 등록해놓고 kafka1:9092가 다운되면, kafka2:9092 와 연결하려고 시도
만약 그것또한 다운되면 목록에 있는 다음 부트스트랩 서버인 kafka3:9092로 연결 시도
메세지 보존 정책 Retention Policy
1. Deletion Retention Policy
2. Compaction
로그 세그먼트에는 메세지의 내용만 저장되는 것이 아니라 메세지의 key, value, offset, message, size와 같은 정보가 함께 저장
하나의 로그 세그먼트 크기가 너무 커지면 파일 관리가 어렵기 때문에 최대 크기는 1GB 기본값으로 설정
만약 1GB 보다 크기가 커진다면?
Rolling 전략 사용
e.g) 하나의 로그 세그먼트에 계속 메세지를 덧붙이다가 1GB에 도달하면 close한 뒤 새로운 로그 세그먼트를 생성하는 방식
1GB 크기의 로그 세그먼트 파일이 무한히 늘어날 경우를 대비해 아래와 같은 계획 및 정책을 수립
Deletion Retention Policy
- 오래된 세그먼트 삭제 retention.ms
e.g) retention.ms=0 이란 로그 세그먼트 보관시간이 해당 숫자보다 크면 세그먼트를 삭제한다는 명령
기본 값은 5분 주기이므로 약 5분 후 삭제 작업이 일어남
한번 쓰여진 로그 메세지는 수정 및 삭제가 불가능한데 삭제가 일어나는 것에 대해 충돌이 발생하지 않는가?
➡️ 세그먼트 자체를 삭제시키는 개념으로 개별 메세지 삭제의 개념이 아니기 때문에 충돌 발생하지 않음
➡️ 불변성은 메세지를 한 번 작성하면 변경할 수 없다는 것을 의미하지만, 해당 메세지의 만료에 제한을 부과하지 않음
- 세그먼트 크기 설정 retention.bytes
Compaction
- 로그를 삭제하지 않고 압축하여 보관
- 압축은 모든 경우에 적용되는 것은 아니고, 일부의 경우에서 사용
e.g) 주문이 배송되었을 때, 내가 배송 받은 현재 시점의 상태에만 신경을 쓰지 주문이 접수된 상태에 대해선 고려하지 않음
e.g) 디스플레이의 현재 온도에 대한 전원을 볼 때, 현재 온도에 신경을 쓰지 이전 온도가 몇 도였냐는 중요하게 생각하지 않음
→ 즉, 각 키에 대한 최신 값만(마지막 데이터만) 유지한다는 점에 유의해야함
- 압축이 실행되면, 보관된 메세지만 있는 로그 세그먼트의 새로운 복사본을 생성 → 그 후 파일 포인터를 이동하고 오래된 세그먼트 전체를 삭제 (삭제 기준은 업데이트 시간을 사용) → 그것은 기존 세그먼트를 직접 변경하지 않음

위 이미지에서 보면 k1이라는 key가 4개가 존재하는데 이 중 맨 마지막인 k1을 저장함
모든 각 key값의 마지막(최근) 것만 압축하는 것을 말함
💡Compaction의 장점은?
빠른 장애 복구
전체 로그를 복구하지 않고 메세지의 키를 기준으로 최신의 상태만 복구
전체 로그를 복구할 때보다 복구 시간을 줄일 수 있다는 장점이 있음
Active and Inactive Segments
활성 세그먼트인 경우에는 보존정책이 적용되지 않음
아무리 오래된 세그먼트라도 활성 상태라면 절대 삭제 처리 안됨
삭제 후보가 되기 위해선 roll이 되고 비활성 상태가 되어야함!!
압축 또한 활성 세그먼트인 경우 완전히 무시됨
로그에 중복키가 있더라도 압축은 중복키를 제거하지 않음
Segments
- Clean segments : 압축된 로그 세그먼트
- Dirty segments : 압축되지 않은 로그 세그먼트
Replication
리플리케이션이란?
메세지들을 여러 개로 복제해서 카프카 클러스터 내 브로커들에 분산시키는 동작
파티션 단위로 일어남
백업 복사본으로 데이터의 높은 가용성을 보장
replication 동작을 위해서는 토픽 생성 시 필수값으로 replication.factor 라는 옵션을 설정해야함
replication factor수가 커질수록 안정성은 높으나 broker의 리소스를 많이 사용하게 되는 단점이 있음
e.g) replication.factor=3 으로 하면 3개의 replica 생성 (하나의 리더, 두개의 팔로워), 브로커 3개가 필요
- Leader replicas
- client는 항상 하나의 리더에게 write, read 함
- producer는 leader에만 write를 함 (follower에 write 절대 안함)
- consumer 또한 leader만 read 함 (follower read 하지 않음)

- Follower replicas
- 백업 복사본으로 리더를 따라감
- 일반적으로 여러 팔러워 가짐
- leader replica가 이슈가 있을 경우를 대비해 언제든지 새로운 리더가 될 준비를 함
- 지속적으로 새로운 메세지를 확인하고 새로운 메세지가 있으면 리더로부터 메세지를 복제함
Leader는 어떤 기준으로 선택되는 것인가?
- 자동으로 이루어짐
- 카프카는 일반적으로 리더가 되기 위해 ISR을 선택함
- 리더 선정은 동시에 일어나지 않음
- parallel 방식으로 동작되지 않아 파티션 수에 따라 시간이 오래 걸림
- 기존의 Leader가 fail 된 상황에서 ISR이 없는 경우, 어떻게 처리되는지?
=> unclean.leader.election.enable 옵션 설정 (default : false)
해당 옵션이 false일 때, ISR 리스트에 아무 것도 존재하지 않는다면 Leader 선출을 하지 않음 -> 서비스 중단으로 이어질 수 있음
해당 옵션을 true로 설정하면, ISR 리스트에 없는 Replica도 Leader로 선출 -> 데이터 유실
ISR (InSyncReplica)
논리적인 그룹에 Leader와 Follower는 묶여있으며 ISR 그룹에 속하지 못한 Follower는 새로운 Leader가 될 수 있는 자격이 없음
🤔 ISR로 따로 그룹화 한 이유는?
Follower 역시도 불완전한 상태로 존재할 수 있고 불완전한 Follower가 새로운 Leader가 된다면 데이터의 적합성이나 메세지의 손실과 같은 치명적인 문제가 발생할 수 있기 때문임
Topic의 상태가 의심될 때 Topic ISR 상태를 점검해봄으로써 Topic의 상태가 양호한지 불량한지에 대해서 확인할 수 있음

ISR 그룹 내에 모든 Follower에게 복제가 완료되면, Relication Factor수만큼 전부 메세지를 저장했다면 커밋되었다는 표시를 하며 마지막 커밋 Offset 위치는 high water mark라고 부름
log-end-offset은 해당 토픽 파티션에 저장된 데이터의 끝을 나타내며, 파티션에 쓰고 클러스터에 커밋된 마지막 메세지의 오프셋임
컨슈머의 current-offset과 브로커의 log-end-offset 간의 차이로 LAG가 만들어짐
LAG = 프로듀서가 보낸 메세지 수 - 컨슈머가 가져간 메세지 수
즉, LAG라는 지표를 통해 컨슈머에 지연이 있는지 없는지를 확인 할 수 있음
Follower Fetching (Follower로부터 읽기)
- brocker.rack 설정 (broker설정) : 새로 생성된 replica가 서로 다른 rack에 할당되도록 하기 위함, 서버 랙 전체에 전력이 나가더라도 다수의 replica가 동시에 다운되지는 않는다!
- client.rack 설정 (consumer설정) : 클라이언트가 위치한 AZ(Availability Zone 클라우드 서비스가 운영되는 데이터 센터의 가상적인 영역)를 정의
- replica.selector.class 설정 (broker설정) : leader replica가 아니라 같은 AZ에 위치한 follower replica로부터 읽어올 수 있도록 해주는 설정
Kafka Producer
프로듀서는 브로커에 특정 토픽을 지정하여 메세지를 전달하는 역할을 담당
프로듀서를 통해 전달되는 메세지의 구조는 다음과 같음

- 토픽 Topic (필수값)
- 메세지 값 Value (필수값)
- 메세지 키 Key : 입력하지 않으면 null로 기본값 세팅 (대부분 필수로 줌)
- 특정 파티션 위치 Partition (옵션)
- 메세지 생성 시간 Timestamp (옵션)
- 커스텀 메타데이터 Headers (옵션)
Producer setting
- Configuration - Properties : 부트스트랩 서버, serializer, 성능 튜닝 등등의 세팅 구성
- (create) Producer Object - KafkaProducer
- (send) Record - ProducerRecord : 실제 데이터 들어감
프로듀서가 브로커로 쏘기 전 위와 같은 과정을 거침
프로듀서 구조와 메세지 전달 과정
프로듀서는 다음 4가지 과정을 통해 메세지를 브로커로 전달
이 과정은 브로커에 메세지를 전송할 수 있도록 변환하거나, 필요한 값을 지정해주는 과정
- 직렬화 Serialization
- 파티셔닝 Partitioning
- 메세지 배치 Record Accumulator
- 압축 Compression
- 전달 Sender
1. Serialization

- kafka는 record(데이터)를 byte array 로만 저장
- producer에서 JSON, String, Avro 같은 형태를 Record형태로 만들어서 key, value 형태로 보내면 그 내부에서 producer 안에 있는 publish&subscribe 라이브러리에서 serializer가 이루어지게 되고 직렬화를 통해 byte array 로 변환되어 kafka에게 전달
- consumer는 kafka에서 byte array 데이터를 deserializer하여 원본 데이터로 활용
- Producer 는 Serializer, Consumer 는 Deserializer를 사용
2. Partitioning
PartitionIndex = Hash(key)%Number Of Partitions

partitioner는 메세지를 topic의 어떤 partition으로 보낼지를 결정함
partitioner를 사용할 때 보통 default partitioner를 사용
Default partitioner란?
producer에서 보내는 메세지의 key값을 hash알고리즘을 이용해 숫자로 만든 다음에 partition의 개수를 가지고 나머지를 구함
그리고 나온 나머지값에 해당하는 partition으로 메세지를 보냄
이 때 전제조건은 key가 null값이 아닐 때
만약 key값이 null 이라면?
sticky 방식으로 메시지를 위해 임의의 파티션이 선택되고, 배치 요구 사항이 충족될 때까지 해당 파티션의 버퍼에 축적되고, 그 배치가 Kafka로 전송된 다음, 새로운 임의의 파티션이 선택되고 프로세스가 반복됨
Producer Message Batch
배치란? 데이터를 하나씩 보내는 것이 아닌 한 번에 모았다가 처리하는 방식

- send()
- 브로커로 보내는 시간이 아니라 버퍼까지 집어넣는 시간을 말함
- 프로듀서 ➡️ serialization ➡️ partitioning ➡️ buffer 까지의 시간을 말함
- 버퍼가 full (꽉차서 못보내거나 갑자기 버퍼가 확 쌓인 상황) 나면 send()가 기다림
- send는 동기방식으로 기다리고 버퍼에서 브로커로 쏘고 나면 비동기이므로 기다리지 않고 쏘면 끝
- Producer ➡️ (동기) Buffer ➡️ (비동기) Broker
버퍼 안에는 토픽-파티션 단위의 구조로 데이터가 쌓임
그 이유는?
브로커에 저장되는 단위가 토픽-파티션 단위이기 때문에 같은 구조로 모았다가 해당 구조 그대로 브로커에 보내도록 함
배치는 일반적으로 좋지만, 배치가 너무 커지는 것은 좋지 않음
throughput(처리량)과 latency(응답시간)를 가지고 배치 사이즈를 어떻게 할지 등을 정해 적절한 세팅을 하도록 함
🚀 Producer 에서 throughput 최대화하는 방법 (Producer option)
- batch.size (default 16KB)
- size를 정의하여 메세지의 용량이 size에 도달 할 때 까지 기다렸다가 보냄
- 한 번에 보내는 사이즈를 정함
- linger.ms (default 0 - 대기하지 않고 바로 보냄)
- 배치가 찰 때까지 기다리는 시간
- batch.size가 도달하지 않으면 메세지를 보내지 않기 때문에 마냥 기다릴 수는 없어 해당 시간을 설정하여 size가 도달하지 않더라도 시간이 초과하면 메세지를 보내게 됨
- e.g) 만약 1로 설정할 경우 보내고 난 후 비어있는 상태에서 1초가 지나면 다시 또 보내는 방식 (배치가 다 차지않아도 1초마다 보내는 것)
- buffer.memory (default 32MB)
- 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 프로듀서 버퍼의 최대 크기
- 프로듀서가 보내지 못한 메세지를 보관할 메모리의 크기
- max.block.ms (default 60000=1분)
- buffer가 꽉 찼을 때 send()메소드를 블록하는 시간
- Producer에서 Broker로 Message를 보낼 때 최대 연결 대기 시간으로 해당 시간이 지나면 Message 전송은 실패로 처리
- compression.type (default none)
- 데이터를 압축해서 보낼 수 있는데, 어떤 타입으로 압축할지를 정하는 옵션 (데이터의 유형에 따라 타입 선택)
- 옵션으로는 none, gzip, snappy, lz4 같은 다양한 포맷이 존재하는데 이 알고리즘별로 throughput에 영향을 줌

버퍼 안에서 압축이 가능 (프로듀서에서 압축을 하고 보냄)
json 80%는 압축을 권고
압축은 consumer에서 자동으로 압축 해제함
레코드를 압축하면 브로커의 네트워크 사용률과 디스크 공간을 절약하는 데 도움이 됨
producer - broker가 메세지를 받았는지 아는 방법
acks : 브로커로 배치를 쏘면 잘 받았는지에 대한 response 말함

acks = 0 인 경우
비동기 방식으로 쏘고 끝
속도는 빠르나, 데이터 유실 발생 위험이 있음
acks = 1 인 경우
보내고 나서 리더에만 잘 저장되면 ack를 줌
팔로워에 저장이 확인되지 않으므로 데이터 유실 위험 있음
acks = all(-1) 인 경우
리더에 저장되고 팔로워가 다 가져가고 나면
그 다음에 ack를 줌
이 때 팔로워 두 개가 있는데
하나는 ISR 이고, 하나는 그렇지 않다면
이 때에도 all로 ack줌
(ISR이 하나라도 있으면 ack 줌)
acks=all 이 강해지려면 min.in.sync.replicas를 2이상으로 설정하도록 함
이 옵션은 메시지가 손실되지 않도록 하기 위해 최소한으로 유지해야 하는 ISR(In-Sync Replicas)의 수를 지정
리더를 replica 1개로 칭하므로 팔로워를 가지고 있어야하기 때문에 2이상을 주어야 적어도 팔로워 1개를 갖게 되는 것!
Send()
메세지 보내는 방식 3가지
- 메세지를 보내고 확인하지 않기
- 카프카가 항상 살아있는 상태이고, 프로듀서가 자동으로 재전송
- 자바 future 퓨처 객체로 RecordMetadata를 리턴받지만 성공여부는 알 수 없음
- 동기 전송 (Sync)
- send() 메소드의 Future 객체를 리턴하고, get()메소드를 통해 future를 기다린 후 성공 여부를 알 수 있음
- 한 번에 하나의 레코드만 처리할 수 있음
- 비동기 전송 (Async)
- 앞의 동기 방식으로 전송할 경우, 만약 프로듀서가 보낸 모든 메세지에 대해 응답을 기다리며 스레드들이 정지한다면 효율성이 떨어지고 시간이 오래 걸림
- 이전 동기방식에서 get()메소드로 future객체를 받을 때까지 기다리지 않고, 콜백 형태로 ProducerCallback 객체를 넣어줌으로써 비동기적으로 실행할 수 있음
- send() 메소드를 콜백 callback과 같이 호출하고, 카프카 브로커에서 응답을 받으면 콜백함
Callback 함수
// 성공 결과 metadata 와 exception 담아서 가져옴
onCompletion(RecordMetadata metadata, java.lang.Exception exception) {
if(e != null){
e.printStackTrace();
} else {
// 출력
System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Received Message :%s\n",metadata.topic(),metadata.partition(),metadata.offset(),record.key(),record.value());
}
}
// send()메소드 안에서 callback 함수 실행
producer.send(record, (recordMetadata, e) -> {
if (e != null) // e라는 에러객체가 null이 아니면 에러발생
e.printStackTrace();
else // null 이면 에러발생하지 않은 것으로 성공메세지 출력
System.out.println("Message String = " + record.value() +
", Offset = " + recordMetadata.offset());
}
});
recordMetadata : 메세지가 성공적으로 전송된 경우 메세지에 해당 메타데이터가 포함
e : 메세지 도중 발생한 에러 객체를 전달하는 것으로 에러가 없다면 null값 반환
Retry

- send() 후 버퍼에 들어가면 버퍼에서 브로커로 메세지 보냄
- 브로커에서 response 보내고 콜백함수에 응답메세지 넣어줌 (metatdata)
- 에러날 경우 retry 할 지 결정하는데 안하는 경우 exception 던져줌
- retry는 자동으로 이루어짐
- In Flight Requests : 실제 날려보내는 큐
producer - 메세지 전송 실패하면 어떻게 하는지?
- retries : 레코드 전송에 실패한 경우 재시도하는 횟수
- 최적의 방법은 retries 보다는 delivery.timeout.ms로 제한하는 것을 추천함
broker 간의 통신 옵션
- max.block.ms (default 1m)
- 버퍼가 꽉 찼을 때 기다리는 최대 시간
- linger.ms (default 0)
- 배치가 찰 때까지 기다리는 시간
- request.timeout.ms (default 30s)
- 요청 응답에 대한 클라이언트 최대 대기 시간
- retry.backoff.ms (default 100)
- 다시 시도하기 전에 실패한 요청 후에 얼마나 많은 시간이 추가되는지
- delivery.timeout.ms (default 2m)
- 브로커로부터 ack를 받기위해 대기하는 시간이며 실패 시 재전송에 허용된 시간
- linger.ms 에서 request.timeout.ms 까지의 합 보다 같거나 커야함
💡 구조를 알고 단계에 맞는 옵션과 정의를 알고 있어야함

여기까지가 Producer
여기부터 Consumer
Data Fetch
브로커 안에 있는 토픽을 가져가는 것
- fetch 는 일반적으로 메시지 배치에서도 작동
Data fetch 를 누가하는지?
- Consumers
- Followers
Consumer Fetch request option
- size
- fetch.min.bytes (default 1byte) : 한 번에 가져올 수 있는 최소 크기로 다 채워지지 않으면 기다림
- max.partition.fetch.bytes (default 1MB) : 파티션 당 가져올 수 있는 최대 크기 (패치할 때 데이터의 최대 사이즈)
- max.poll.records (default 500 records) : poll 가져오는 최대 레코드 수
- time
- fetch.max.wait.ms (default 500ms) : 이 설정값보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간

Tuning Fetch request
- High Throughput 높은 처리량
- fetch.min.bytes ⬆️
- fetch.max.wait.ms 🔁 (적당히 조절)
- Low Latency 낮은 지연시간
- fetch.max.wait.ms ⬇️
Producer | Consumer |
Configure properties | Configure properties |
Create producer | Create Consumer |
Send records | Retrieve & process records |
Close & clean up | Close & clean up |
consumer 구조와 메세지 읽는 과정
Deserialization
컨슈머에서 바이트로 변환된 텍스트를 deserailization 하여 읽음
컨슈머는 어떤 메세지를 읽어야하는지 어떻게 알까?
- subscribe to topics - 어떤 토픽을 읽겠다 라고 지정
- choose partition to read from - 파티션 선택
- maintain consumer offset in partition - 파티션 내의 offset 유지 (내가 어디까지 읽었는지를 알고 있어야함, 파티션 내의 어디까지 읽어올지를 결정)
summary

- Configure properties
- bootstrap.servers : 초기 연결 설정하는 host:port
- key.deserializer : key를 deserialization 함
- value.deserializer : value를 deserialization 함
- client.id : consumer 식별하는 문자열로 모니터링, 로그에 사용
Grouping Consumer
컨슈머는 그룹으로 동작 (카프카가 자동으로 그룹 관리를 함, 카프카에 내장되어 있음)
그룹에 있는 컨슈머들은 같은 일을 하지만 다른 데이터를 가짐 (same thing, different data)
그 이유는?
작업량 공유되며, 동시에 병렬적으로 같이 스케일 업다운됨 (늘어나고 줄어드는게 같이 일어남)
✅ 그룹은 카프카와 confluent 플랫폼 안의 많은 곳에서 활용
- consumers
- Kafka Connect workers
- Kafka streams applications
- ksqlDB servers
어떻게 그룹핑을 하는지?
group.id 라는 속성으로 구분되어 group인지 아닌지 판단
// 여러개가 하나의 그룹으로 묶임
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order_processor");
Kafka connect workers의 그룹을 구성할 때 정확히 같은 group.id를 사용
group.id 주지않으면 자동으로 만들어서 사용
partition.assignment.strategy
어떤 파티션을 어떤 컨슈머에게 할당할지에 대한 전략으로 두 가지 타입 존재 (range, round robin)
기본값은 range
Rebalancing ?
특정 토픽을 구독하던 consumer group에 변동사항이 발생했을 때 해당 그룹 안에서 파티션을 재분배하는 행위
리밸런싱이 일어나는 동안은 consumption이 일시정지하므로 약간의 지연이 발생함
아래와 같은 상황에서 발생
- consumer group 내에 컨슈머가 생성 혹은 삭제된 경우
- max.poll.interval.ms 로 설정된 시간 내(5분)에 poll()요청을 보내지 못한 경우
- session.timeout.ms 로 설정된 시간 내(45초)에 하트비트를 보내지 못한 경우
- 파티션 수가 증가하였을 때 (감소는 되지 않음 - 감소시키려면 토픽 재 생성해야함)
Range
key를 기반으로 할당, 두 개의 토픽을 조인할 경우 파티션 수와 컨슈머 수가 동일해야 좋음
여러 토픽 간에 데이터를 연관시키고 싶을 때 사용 (하나의 토픽 X)

파티션 개수보다 컨슈머 개수가 더 크면, 파티션을 할당받지 못하는 컨슈머 발생할 수 있음
모든 토픽은 co-partitioned 해야함
- 같은 수의 파티션을 가져야함 same number of partitions
- 같은 파티셔너를 써야함 same partitioner
- 같은 키가 세팅되어야함 same set of keys
Round Robin
데이터를 골고루 나누어 줌, 같은 key를 가진 데이터를 같이 처리할 때 문제
하나의 토픽에 대해서만 읽어오는 경우 consumer와 partition의 균형 할당을 위해 라운드 로빈으로 진행

토픽 관계없이 파티션 순서대로 컨슈머에 할당
파티션 수가 변경되면 rebalancing 시 전체가 정지하고 전체를 다시 재할당 ⛔️할당 불균형 발생 위험 있음
두 가지 타입 존재
- Sticky
- 라운드 로빈보다 균형적이며 range방식보다 rebalancing 오버헤드를 줄인 방식
- 할당이 끊긴 파티션들을 최대한 분산되게 할당 ➡️ 기존 할당은 유지하면서 나머지 부분 할당
- consumer 간 차이가 많이 벌어지지 않도록 파티션 개수가 더 적은 쪽으로 할당 ⭕️불균형 발생 위험 줄어듦

- 위와 같은 경우에는 라운드로빈의 sticky파티셔너를 사용하도록 함
- Cooperative Sticky
- rebalancing 두 번 수행
- 가능한 기존 할당 유지하려고 시도는 하나 필요에 따라 일부 파티션의 할당이 변경될 수 있음
- 파티션 할당을 변경하지 않으면서도 리밸런싱을 조정하는 협력적인 방식으로 동작
- 리밸런싱과 직접적인 관련이 없는 다른 컨슈머들이 데이터 처리를 계속 진행할 수 있게하여, 전체 그룹의 처리 성능에 미치는 영향 최소화
컨슈머가 죽었는지 어떻게 아나?

heartbeat 날려 체크
- heartbeat.interval.ms (default 3s)
- 카프카는 3초마다 브로커에게 자신의 생존 여부인 하트비트를 날림
- session.timeout.ms (default 45s - kafka 3.0 이상 / 그 아래버전은 10s)
- 45초 간 하트비트 없이 패스되면 죽은 것으로 간주되어 consumer group에서 제외됨

heartbeat 는 데이터가 없어도 발생
poll은 데이터를 가져오는 것으로 polling이 일어나지 않으면 죽었다고 생각하게 되는 것
사실 상 다음 단계에서 계속 프로세스가 이루어지고 있음
(consumer에서 process 가 오래 걸리면 poll이 오지 못해서 죽었다고 생각함)
- max.poll.interval.ms (default 5m)
- poll이 5분 이상 걸리면 죽었다고 생각
offset
각 파티션마다 메세지가 저장되는 위치
- consumer 에만 offset 관리가 있으며, producer 에는 없음
- offset=3 이라면 0~2까지 컨슈머가 메세지를 읽은 것이고, 다음에 읽어야 하는 메세지가 3번이라는 것을 의미
- offset을 통해 데이터의 순서가 보장
- 컨슈머마다, 파티션마다 offset이 다를 수 있음
Consumer group
컨슈머들의 집합으로 여러 컨슈머들이 하나로 묶인 논리적인 그룹
한 컨슈머 그룹 내에서 1번 컨슈머에서 장애가 발생할 경우, 같은 그룹 내의 2번 컨슈머가 이어 받아 데이터를 계속해서 받아올 수 있음
__consumer_offsets
카프카는 __consumer_offsets라는 이름의 내부 토픽(internal topic)을 기본적으로 가짐
➡️ kafka consumer에 의해서 consume이 시작되는 시점에 생성
해당 토픽은 key - group, topic, partition / value - offset 을 가짐
기본적으로 파티션 수는 50개
이 토픽의 목적은 컨슈머 그룹이 조회한 토픽의 offset들을 기록하는 것
⭐️ kafka consumer의 offset이 커밋되는 과정
토픽을 조회하는 과정에서 컨슈머는 몇 번째 메세지까지 조회했다라는 기록을 해야함
이것을 Offset Commit (=committing offsets) 이라고 부름
offset commit이 중요한 이유는 rebalancing 되는 상황에서 컨슈머가 새롭게 구동되어 토픽 조회를 다시 시작할 때
토픽의 몇 번째 메세지부터 다시 읽어들여야하는지에 관한 문제점을 해결
멱등성 보장!
*멱등성(Idempotence) : 동일한 요청이 여러 차례 반복되어 작업이 처리되어도 동일한 결과를 나타내는 특성
컨슈머가 수천개의 데이터를 조회하는 과정에서 재시작되었다고해서 수천개의 데이터를 처음부터 조회할 수는 없기 때문!
- auto.commit.interval.ms (default 5s)
- 자동 커밋 설정을 함
- enable.auto.commit : false 로 설정하여 자동커밋을 끌 수도 있음
auto commit 문제점은?
consumer에서 poll()메소드를 호출하여 데이터를 가지고 간 이후 consumer에서 장애가 발생하면?
카프카에선 이미 commit이 되어 이미 가져간 record가 정상적으로 처리되었다고 판단
이런 경우 데이터 유실 발생⬆️
또는 commit 전 다운 시 5초동안 데이터 중복처리가 발생할 수도 있고, 이는 멱등성을 보장하지 못함
- At least once
- 이벤트가 발생했을 때, 해당 이벤트가 최소 1번은 발행되어 처리되는 것을 보장한다는 의미
- 메세지 유실 방지를 위해 중복으로 메세지 처리하는 것을 허용 (상실 X 중복 O)
- At most once
- 이벤트가 발생했을 때, 최대 1번만 이벤트 메세지가 발행되게 한다는 의미
- 메세지 유실을 허용하고 중복으로 이벤트 처리를 방지하기 위한 것 (상실 O 중복 X)
- Exactly once
- 이벤트가 발생했을 때, 메세지가 유실 또는 중복이 없어 정확히 1번만 처리된다는 의미
- 구현 난이도 가장 높음 (프로듀서와 컨슈머에서 발생되는 메세지 유실과 중복 처리를 모두 구현해야함) (상실 X 중복 X)
commit은 언제 일어나야 안전할까?
auto commit을 이용하는 방식은 데이터 유실이 발생할 수 있어 안전하지 않음
poll()메소드로 record를 가지고 간 컨슈머가 데이터 처리를 정상적으로 완료한 경우에 commit을 진행하면 데이터를 안전하게 처리했다고 할 수 있음
먼저 auto commit 옵션은 false로 설정한 후 데이터 처리가 완료되면 commitSync()메소드를 호출
commitSync()메소드를 사용하면 poll()메소드를 통해 반환된 레코드의 가장 마지막 offset을 기준으로 commit을 수행하여 데이터가 완전히 처리되었음을 의미
이렇게 commitSync()를 사용하게 되면 안전하게 데이터를 처리하고 유실되지 않는 것을 보장할 수 있지만 데이터가 처리될 때까지 기다리는 과정이 필요해 데이터 처리량이 줄어들게 됨
commit 비동기 처리
commitSync() 메소드를 이용하면 데이터 처리가 생각보다 많이 늦어짐
commit 역시 비동기 방식으로 처리할 수 있는데 commitAsync() 메소드를 이용
커밋 요청을 전송하고 브로커의 응답에 관계없이 다음 데이터를 처리하게 되는데 커밋 요청이 실패하는 경우 데이터의 순서와 중복처리가 발생할 수 있음
[ reference ]
https://colevelup.tistory.com/18
[Kafka] Topic, Partition, Segment, Segment 관리, Offset
카프카(kafka)에서 다뤄지는 주요 개념들인 토픽(Topic), 파티션(partition), 세그먼트(Segment), 오프셋(Offset)에대해서 그리고 세그먼트(segment)의 경우 관리하는 법 까지 알아보도록 하겠습니다. 토픽(Topi
colevelup.tistory.com
https://velog.io/@hyun6ik/Apache-Kafka-Producer
Apache Kafka - Producer
Producer : 메시지를 생산(Produce)해서 kafka의 Topic으로 메시지를 보내는 애플리케이션Consumer : Topic의 메시지를 가져와서 소비(Consume)하는 애플리케이션Consumer Group : Topic의 메시지를 사용하기 위해 협
velog.io
https://always-kimkim.tistory.com/entry/kafka101-producer
[Kafka 101] 카프카 프로듀서 (Kafka Producer)
들어가며 카프카는 메시지를 생산, 발송하는 프로듀서(Producer)와 메시지를 소비, 수신하는 컨슈머(Consumer), 그리고 프로듀서와 컨슈머 사이에서 메시지를 중개하는 브로커(Broker)로 구성됩니다.
always-kimkim.tistory.com
'DataBase' 카테고리의 다른 글
[flume 오류] Exception: java.lang.OutOfMemoryError (0) | 2024.04.19 |
---|---|
[Kafka] Additional Components (0) | 2024.04.09 |
[Data 수집 및 연동] filebeat, kafka, logstash 사용해서 elasticsearch 데이터 업로드 하는 방법 (1) | 2024.03.19 |
[Elastic Search] Lab 5.3 aggregations (0) | 2024.01.30 |
[Elastic Search] Analyzer (tokenizer, char_filter, token filter) (1) | 2024.01.10 |
댓글