티스토리 뷰
이럴수가
seekToBeginning 메서드를 써서 토픽의 맨 처음부터 데이터를 읽어오려는데 이것이 불가능한 상황이다.
[상황]
1. KafkaRunner는 아래의 코드를 포함한다.
@Log4j2
@Getter
public class NewTemplateRunner implements DisposableBean, ConsumerSeekAware//, ConsumerAwareRebalanceListener
{
private final String topic;
private final String groupId;
private final String[] partitions;
private volatile boolean isClosed = false;
private boolean isOffsetLatest;
private int count = 0;
private boolean isInitialized = false;
private boolean isFinished = false;
private boolean isError = false;
private KafkaMessageListenerContainer container;
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<org.apache.kafka.common.TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
public NewTemplateRunner(String topic, String groupId, String[] partitions, boolean isLatest)
{
this.topic = topic;
this.groupId = groupId;
this.partitions = partitions;
log.info("Created {} / {}", topic, groupId);
this.isOffsetLatest = isLatest;
}
@PostConstruct
public void init()
{
log.info("{}/{} template post constructed... {}. {}, {}", topic, groupId, this.hashCode());
}
@PreDestroy
public void preDestory()
{
log.info("Pre Destory {}/{}", groupId, hashCode());
}
@KafkaListener(id="#{new java.util.Random().nextInt(3) + \"_id\" + __listener.groupId}"
, groupId = "#{__listener.groupId}"
, topics = "#{__listener.topic}"
//, topicPartitions =
// {
// @TopicPartition(topic = "#{__listener.topic}", partitions = "#{__listener.partitions}"
// , partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
// )
// }
, clientIdPrefix = "#{__listener.groupId}"
, concurrency = "#{__listener.partitions.length}"
)
public void consume(ConsumerRecord<String, String> messages, Consumer<?,?> kafkaConsumer){
log.info("Consuming now : {} with partition={}", count, messages.partition());
}
public void stop()
{
if (container != null)
{
isClosed = true;
container.stop();
log.info("Container stopped {}/{} with count = {} and hashcode = {}", groupId, hashCode(),
count, container.hashCode());
}
else
{
log.info("container is null : {}/{}", groupId, hashCode());
}
}
public String getTopic() {
return this.topic;
}
public String getGroupId()
{
return this.groupId;
}
@Override
public void destroy() throws Exception
{
log.info("{}/{} has just been destoryed", groupId, hashCode());
}
public void setContainer(KafkaMessageListenerContainer container)
{
this.container = container;
}
public void setClosed(boolean closed)
{
isClosed = closed;
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
log.info("RegisterSeekCallback called");
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("onPartitionsAssigned called assignment size = {}", assignments.size());
final Set<org.apache.kafka.common.TopicPartition> topicPartitions = assignments.keySet();
for (org.apache.kafka.common.TopicPartition key : assignments.keySet())
{
}
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
seekToStart();
}
@Override
public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
log.info("onPartitionsRevoked called with count {}", count);
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("onIdleContainer called with count {}", count);
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
// or this.callbacks.forEach((tp, callback) -> callback.seek(tp.topic(), tp.partition(), 0L));
}
public void seekToEnd()
{
this.callbacks.forEach((tp, callbacks) -> callbacks.seekToEnd(tp.topic(), tp.partition()));
}
}
onPartitionsAssigned를 보면 seekToStart() 메서드를 호출하며 여기서는 seekToBeginning이 호출된다.
이게 호출되면 원래는 맨 초기값의 offset을 가져가야 한다.
디버그 시 TopicPartitionState에는 state에 EARLIEST로 지정되어 있으나 offset이 258242로 되어 있다..
즉 0이 아니라... 맨 끝 offset으로 RESET 되고 있다.
SubscriptionState.java 에서 transitionState를 보면
762번 줄에서 위와 같은 디버그 상태가 잡힌다. State == FETCHING
즉, FETCHING 이면 offset으로 리셋되도록 코딩되어 있는 것이 확인된다.
//사설
//사설 끝
저렇게 되면 아래 로그가 출력된다.
0으로 셋팅이 안 된다.
그런데 파티션이 4개인다 다 그런가 하면 또 아니었던 게 0, 2, 3 번 파티션은 0으로 이동되지 않았고 1번 파티션만 이동되었다.
코드를 거술러 올라가 이유를 살핀즉...
docker 로 구동한 카프카의 파일 내부이다.
leader-epoch-checkpoint 가 있다.
저 파일의 3번줄 2번째 값이 259150 이다.
아까의 3번 파티션의 경우 258066 값이고 서버 로그에 있던 것과 같은 값임을 알 수 있다.
0으로 동작하던 1번 파티션의 경우 그 값이 0인 것을 확인할 수 있었다.
저 값이 무언가 해서 인터넷 코드 조사를 하던 중 코틀린 코드를 확인했다.
저게 startOffset으로 설정되는 값이다... 리더 관련 제어를 통한 epoch 제어에 따라 값이 설정된다는 것이라는데..
그럼 일부러 저 값을 0으로 바꾸면 0으로 초기화되려나?
--> 안된다. 근본적인 원인이 저게 아님을 알 수 있다.
mbukowicz.github.io/kafka/2020/05/31/how-kafka-stores-messages.html
여기서 참고한다.