티스토리 뷰

이럴수가

 

 seekToBeginning 메서드를 써서 토픽의 맨 처음부터 데이터를 읽어오려는데 이것이 불가능한 상황이다.

 

[상황]

 1. KafkaRunner는 아래의 코드를 포함한다.

@Log4j2
@Getter
public class NewTemplateRunner implements DisposableBean, ConsumerSeekAware//, ConsumerAwareRebalanceListener
{
    private final String topic;
    private final String groupId;
    private final String[] partitions;
    private volatile boolean isClosed = false;
    private boolean isOffsetLatest;
    private int count = 0;
    private boolean isInitialized = false;
    private boolean isFinished = false;
    private boolean isError = false;
    private KafkaMessageListenerContainer container;
    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<org.apache.kafka.common.TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    public NewTemplateRunner(String topic, String groupId, String[] partitions, boolean isLatest)
    {
        this.topic = topic;
        this.groupId = groupId;
        this.partitions = partitions;
        log.info("Created {} / {}", topic, groupId);
        this.isOffsetLatest = isLatest;
    }

    @PostConstruct
    public void init()
    {
        log.info("{}/{} template post constructed... {}. {}, {}", topic, groupId, this.hashCode());
    }

    @PreDestroy
    public void preDestory()
    {
        log.info("Pre Destory {}/{}", groupId, hashCode());
    }

    @KafkaListener(id="#{new java.util.Random().nextInt(3) + \"_id\" + __listener.groupId}"
        , groupId = "#{__listener.groupId}"
        , topics = "#{__listener.topic}"
      //,  topicPartitions =
      //      {
      //          @TopicPartition(topic = "#{__listener.topic}", partitions = "#{__listener.partitions}"
      //          , partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
      //      )
      //  }
        , clientIdPrefix = "#{__listener.groupId}"
        , concurrency = "#{__listener.partitions.length}"
    )


    public void consume(ConsumerRecord<String, String>  messages, Consumer<?,?> kafkaConsumer){
        log.info("Consuming now : {} with partition={}", count, messages.partition());
    }

    public void stop()
    {
        if (container != null)
        {
            isClosed = true;
            container.stop();
            log.info("Container stopped {}/{} with count = {} and hashcode = {}", groupId, hashCode(),
                count, container.hashCode());
        }
        else
        {
            log.info("container is null : {}/{}", groupId, hashCode());
        }
    }


    public String getTopic() {
        return this.topic;
    }

    public String getGroupId()
    {
        return this.groupId;
    }

    @Override
    public void destroy() throws Exception
    {
        log.info("{}/{} has just been destoryed", groupId, hashCode());
    }

    public void setContainer(KafkaMessageListenerContainer container)
    {
        this.container = container;
    }

    public void setClosed(boolean closed)
    {
        isClosed = closed;
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        log.info("RegisterSeekCallback called");
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        log.info("onPartitionsAssigned called assignment size = {}", assignments.size());
        final Set<org.apache.kafka.common.TopicPartition> topicPartitions = assignments.keySet();
        for (org.apache.kafka.common.TopicPartition key : assignments.keySet())
        {
        }
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
        seekToStart();
    }

    @Override
    public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
        log.info("onPartitionsRevoked called with count {}", count);
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        log.info("onIdleContainer called with count {}", count);
    }


    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        //   or  this.callbacks.forEach((tp, callback) -> callback.seek(tp.topic(), tp.partition(), 0L));
    }

    public void seekToEnd()
    {
        this.callbacks.forEach((tp, callbacks) -> callbacks.seekToEnd(tp.topic(), tp.partition()));
    }
}

onPartitionsAssigned를 보면 seekToStart() 메서드를 호출하며 여기서는 seekToBeginning이 호출된다.

 

이게 호출되면 원래는 맨 초기값의 offset을 가져가야 한다.

 

디버그 모드

디버그 시 TopicPartitionState에는 state에 EARLIEST로 지정되어 있으나 offset이 258242로 되어 있다..

즉 0이 아니라... 맨 끝 offset으로 RESET 되고 있다.

 

SubscriptionState.java 에서 transitionState를 보면

762번 줄에서 위와 같은 디버그 상태가 잡힌다. State == FETCHING

 

즉, FETCHING 이면 offset으로 리셋되도록 코딩되어 있는 것이 확인된다.

 

//사설

사설 : 엄청 많은 Optional.empty() 오호라.. 

 

//사설 끝

 

저렇게 되면 아래 로그가 출력된다.

offset이 258866으로 reset되었다는 메시지

0으로 셋팅이 안 된다.

 

그런데 파티션이 4개인다 다 그런가 하면 또 아니었던 게 0, 2, 3 번 파티션은 0으로 이동되지 않았고 1번 파티션만 이동되었다.

 

코드를 거술러 올라가 이유를 살핀즉...

 

docker 로 구동한 카프카의 파일 내부이다.

leader-epoch-checkpoint 가 있다. 

저 파일의 3번줄 2번째 값이 259150 이다.

아까의 3번 파티션의 경우 258066 값이고 서버 로그에 있던 것과 같은 값임을 알 수 있다.

 

0으로 동작하던 1번 파티션의 경우 그 값이 0인 것을 확인할 수 있었다.

 

저 값이 무언가 해서 인터넷 코드 조사를 하던 중 코틀린 코드를 확인했다. 

저게 startOffset으로 설정되는 값이다... 리더 관련 제어를 통한 epoch 제어에 따라 값이 설정된다는 것이라는데..

 

그럼 일부러 저 값을 0으로 바꾸면 0으로 초기화되려나?

 

--> 안된다. 근본적인 원인이 저게 아님을 알 수 있다.

 

mbukowicz.github.io/kafka/2020/05/31/how-kafka-stores-messages.html

 

How Kafka stores messages

Intro Apache Kafka, like any other messaging or database system, is a complicated beast. But when divided into manageable chunks it can be much easier to understand how it all works. In this post let’s focus on one area of Kafka’s inner workings and tr

mbukowicz.github.io

github.com/apache/kafka/blob/ba237c5d21abb8b63c5edf53517654a214157582/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala#L43

 

apache/kafka

Mirror of Apache Kafka. Contribute to apache/kafka development by creating an account on GitHub.

github.com

 

여기서 참고한다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/02   »
1
2 3 4 5 6 7 8
9 10 11 12 13 14 15
16 17 18 19 20 21 22
23 24 25 26 27 28
글 보관함