관리 메뉴

IT.FARMER

spring, kafka offset commit 정책 본문

MessageQueue/Kafka

spring, kafka offset commit 정책

아이티.파머 2023. 6. 14. 18:23
반응형

1. KafKa Offsets commit 정책

consumer configuration 에서 commit config 를 false로 설정 되었을경우, 즉 commit을 자동으로 하지 않았을 경우에 어떤게 commit을 할지 결정 하는 모드 이다.

커밋 오프셋을 하기 위한 몇가지 옵션이 존재 하며, 기본 BATCH 이고 다른 옵션으로 사용 가능하다.

근데 이렇게 기본옵션인 auto commit을 사용하게 되면 배치모드로 동작하게되고 특정 부분에서 문제가 생겼을때, 유실되는 부분이 생길수있다. (spring boot 의 자동커밋 기본 시간은 5000ms)

그래서 중요한 업무를 수행하는 컨슈머에서는 auto commit 보다는 수동 commit을 하는것이 좋다고 생각한다.

Kafka 커밋 종류

Offset Commit 종류

  • autoCommt : 자동커밋
    • properties 설정시 enable.auto.commit=true 로 설정한다.
    • auto.commit.interval.ms 옵션을 사용하여 자동커밋 주기 설정을 한다.
  • commitSync : 동기커밋
    • 순차적으로 실행 가능하지만 속도가 느려질수있다.
    • properties 설정시 enable.auto.commit=false 로 설정한다.
    • 명시적으로 commitSync 메소드를 호출하여 메세지처리를 완료하고 오프셋을 변경한다.
private void consume(Consumer<String,String> consumer) {
	ConsunmerRecode<String,String> recodes = consumer.poll(Duration.ofMillis(100))
	    
    recodes.foreach(recode->{
			try {
				// consumer logic 처리 	
				recode....

				try {
					// 수동 커밋 처리
				  consumer.commitSync();
				} catch () {
					// 실패했던 offset으로 되돌리는 offset
					consumer.seek(new TopicPartition(recode.topic(),recode.partition()), recode.offset())
				}		
			}	catch (Exception e) {
				e.printStackTrace();
		  }
		})
}

 

  • commitAsync : 비동기 커밋
    • 비동기 커밋은 커밋이 되었는지 확인하지 않고 동작함으로 동기 커밋보다 속도가 빠르다. 하지만 데이터의 순서는 보장되지 않는다.
    • 비동기 커밋은 메세지 손실에 주의해야한다.
    • properties 설정시 enable.auto.commit=false 로 설정한다.
    • 명시적으로 commitAsync 메소드를 호출하여 메세지처리를 완료하고 오프셋을 변경한다.
    • 비동기 커밋은 동기 커밋예제에서 메소드만 commitAsync로 변경하면 된다.

 

수동 커밋을 사용했을때, 커밋에 실패하였다고 해서 다음번 처리때 방금전 실패한 로직이 재처리 되지 않는다. 수동커밋에 실패한 offset은 건너뛰고 다음번 offset을 가져온다.

그림으로 보면 다음과 같다.

 

3번까지 수동 커밋에 성공하고 4번에서 실패했을때 재처리 로직을 넣지 않는 이상 4번은 재처리 되지 않고 컨슈머는 5번 오프셋의 데이터를 가져온다.

 

해결방법

이를 해결하기 위해서는 ‘consumer.seek()’ 를 사용하여 실패했던 레코드로 offset을 되돌리고 poll시 실패한 데이터를 재처리 한다. (spring kafka 에서는 ‘acknowledgment.nack()’을 사용한다.)

private void consume(Consumer<String,String> consumer) {
	try {
		ConsunmerRecode<String,String> recodes = consumer.poll(Duration.ofMillis(100))

		for(ConsunmerRecode<String,String> recode: recodes) {
			// consumer logic 처리 	
			recode......
	
			try {
				// 수동 커밋 처리
				consumer.commitSync();
			} catch () {
				e.printStackTrace();
			}
		}
	} catch (Exception e) {
    	e.printStackTrace();
	}
}

다음으로 스프링부트와 Kafka 조합을 사용하였을때 설정을 통해 사용할 수있는 offset 커밋 정책에 대해 알아보자.

2. Spring Boot Offset Commit 정책

spring with kafka 에서도 기본 kafka 에서 제공하는 자동커밋,수동커밋(동기/비동기)를 지원한다. springboot 에서는 application.yml 을 통해 offset commit 정책을 정의 할 수있다. 그 종류에 대해 알아보자.

  1. 자동커밋(autoCommit )
    1. 기본설정으로 , 주기적인 설정에 따라 혹은 처리가 완료된 후에 오프셋을 자동으로 커밋한다.
    2. **enable.auto.commit** 기본속성이 ture 로 자동커밋된다.
    3. spring.kafka.consumer.auto-commit-interval 통해 자동오프셋 주기를 설정 할 수있다.
  2. 수동 커밋 (Manual Commit)
    1. 개발자가 명시적으로 오프셋을 커밋하는 방식이다.
    2. Acknowledgment 객체를 통해 수동 커밋이 가능하다.
    3. @KafkaListener 어노테이션의 ackMode 속성을 MANUAL 로 설정하고 Acknowledgement 파라미터를 메서드에 추가하여 사용한다.
- `AcknowledgingMessageListener` 를 상속하여 만든 예제
    - 파라미터에 Acknowledgment 을 추가하여 메세지 처리가 완료되면 오프셋 커밋을 수행한다.
    - 오류가 발생한경우엔 에러로그를 발생시키고,  오프셋커밋을 수행하지 않도록 한다.
    - **acknowledgment.nack()은** 컨슈머에서 오류가 발생했을때 해당메세드를 다시 처리하기 위해 오프셋 커밋을 되돌리는 메서드 이다.   → 재처리를 하고 싶지 않은경우엔 에러 로그만 찍고 로그데이터를 RDB에 저장하면 된다.

```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaListener implements AcknowledgingMessageListener<String, String> {

    @KafkaListener(topics = "my-topic")
    @Override
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            // 메시지 처리 로직 수행
            System.out.println("Received message: " + record.value());

            // 오프셋 커밋 수행
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 에러 처리 로직 수행
            handleError(record, acknowledgment, e);
        }
    }

    private void handleError(ConsumerRecord<String, String> record, Acknowledgment acknowledgment, Exception e) {
        // 에러 처리 로직 수행
        System.err.println("Error handling message: " + record.value());
        e.printStackTrace();

        // 에러가 발생했을 경우에도 오프셋 커밋을 수행하지 않음
        acknowledgment.nack(acknowledgment.acknowledged().size());
    }
}
```
  1. 배치커밋 (Batch Commit) - 수동
    1. 여러메세지를 한번에 커밋한다.
    2. spring.kafka.lintener.typebatch 로 설정하여 사용한다. AckMode.BATCH 를 사용하여 일정 레코드가 처리된 이후 배치 오프셋을 커밋한다.
  2. 마지막 오프셋 커밋 (Last offset Commit) - 수동
    1. 처리가 완료된 마지막 레코드의 오프셋을 커밋하는 방식이다.
    2. spring.kafka.listener.typerecord로 설정하여 사용 한다.
    3. AckMode.RECODE 를 사용하여 각 레코드의 처리가 완료될때 마다 오프셋을 커밋 할 수있다.

1번, 2번3번4번은 수동커밋에 해당된다.

kafka 에서는 기본적으로 비동기 커밋방식을 제공하며, 수동모드를 사용해도 기본적으로 비동기 커밋으로 동작된다. 이를 동기커밋으로 변경하려면 KafkaTemplate 을 사용하여 동기 커밋이 되도록 구현하여야 한다.

다음은 Springboot를 사용할때 KafkaTemplate을 이용한 동기 커밋에 대한 예제이다.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyConsumer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenAndReply(@Payload ConsumerRecord<String, String> record) {
        try {
            // 메시지 처리 로직
            System.out.println("Received message: " + record.value());

            // 동기 커밋 수행
            kafkaTemplate.sendOffsetsForCommittedRecords();
            System.out.println("Commit completed");
        } catch (Exception ex) {
            System.err.println("Failed to process message: " + ex.getMessage());
        }
    }
}

컨슈머에서 데이터를 처리한뒤에 kafkaTemplate.sendOffsetsForCommittedRecords() 를 사용하여 동기 커밋을 수행한다.

개인적으로 생각했을때 kafka 사용시 단순한 메세지 처리를 할때 혹은 분산처리를 할때는 spring boot + kafka 조합이 편해보였다. 허나 실제 원하는 결과를 얻고자할때(동기처리, 비동기처리 등등)는 기본적인Spring에 KafkaTemplate을 사용하는게 좀더 수월한것 같다. 완전하게 수동셋팅하고 사용하고자 할때는 Spring Boot에 플레인한 kafka 라이브러리만 올려서 사용해도 좋을것 같다.

지금 하고있는 프로젝트에서는 Spring Boot kafka의 조합의 프로젝트 셋팅으로 되어 있고, API를 호출하여 데이터를 가져오는 서비스로써, 레코드 하나당 처리하는 부분이 정확하게 보여야 하기때문에 ack-mode를 RECODE 모드로 하고 수동커밋으로 설정하여 사용 하였다.

  • ack-mode: record
  • enable.auto.commit: false

수동커밋 사용시 AcknowledgingMessageListener 를 상속받아 구현하고 오프셋커밋을 acknowledgment.acknowledge() 수행하여야 하나 이렇게 하지 않아도 오프셋이 커밋되었는데 이는 스프링카프카의 KafkaMessageListenerContainer 라는 컨테이너에대해 알고 있어야 한다.

해당 내용을 간단하게 설명하자면 ack-mode가 recode 이기때문에 commit mode가 false로 되어 있더라도 ‘KafkaMessageListenerContainer’ 컨테이너에서는 commitIfNecessary() 를 Recode 단위로 커밋한다.

하지만 위에 언급했듯이 수동모드를 이용할때는 리스너를 상속받아 구현하고 Acknowledgment 파라미터를 사용하여 명시적으로 ‘acknowledgment.acknowledge()’ 호출해주는 것이 베스트 프랙티스라 한다.

반응형