package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.TimeoutException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.7.0.jar:org/eclipse/hono/client/kafka/consumer/AbstractAtLeastOnceKafkaConsumer.class */
public abstract class AbstractAtLeastOnceKafkaConsumer<T> implements Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractAtLeastOnceKafkaConsumer.class);
    boolean stopped;
    private final KafkaConsumer<String, Buffer> kafkaConsumer;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private final String topicsLogString;
    private final Handler<T> messageHandler;
    private final Handler<Throwable> closeHandler;
    private final Duration pollTimeout;
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted;
    private boolean respectTtl;

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, String str, Handler<T> handler, Handler<Throwable> handler2, long j) {
        this(kafkaConsumer, (Set<String>) Set.of((String) Objects.requireNonNull(str)), handler, handler2, j);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> set, Handler<T> handler, Handler<Throwable> handler2, long j) {
        this(kafkaConsumer, (Set) Objects.requireNonNull(set), null, handler, handler2, j);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Pattern pattern, Handler<T> handler, Handler<Throwable> handler2, long j) {
        this(kafkaConsumer, null, (Pattern) Objects.requireNonNull(pattern), handler, handler2, j);
    }

    private AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> set, Pattern pattern, Handler<T> handler, Handler<Throwable> handler2, long j) {
        String pattern2;
        this.stopped = false;
        this.offsetsToBeCommitted = new HashMap();
        this.respectTtl = true;
        Objects.requireNonNull(kafkaConsumer);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(handler2);
        this.kafkaConsumer = kafkaConsumer;
        this.messageHandler = handler;
        this.closeHandler = handler2;
        this.topics = set;
        this.topicPattern = pattern;
        if (set != null) {
            pattern2 = "[" + ((String) set.stream().limit(3L).collect(Collectors.joining(", "))) + (set.size() > 3 ? ", ...]" : PropertyAccessor.PROPERTY_KEY_SUFFIX);
        } else {
            pattern2 = pattern.toString();
        }
        this.topicsLogString = pattern2;
        this.pollTimeout = Duration.ofMillis(j);
    }

    protected abstract T createMessage(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord);

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> start() {
        if (this.stopped) {
            return Future.failedFuture("consumer already stopped");
        }
        this.kafkaConsumer.partitionsAssignedHandler(this::onPartitionsAssigned);
        this.kafkaConsumer.partitionsRevokedHandler(this::onPartitionsRevoked);
        Promise promise = Promise.promise();
        if (this.topics != null) {
            this.kafkaConsumer.subscribe(this.topics, promise);
        } else {
            this.kafkaConsumer.subscribe(this.topicPattern, promise);
        }
        return promise.future().compose(r5 -> {
            Promise promise2 = Promise.promise();
            this.kafkaConsumer.poll(this.pollTimeout, promise2);
            return promise2.future().onSuccess2(this::handleBatch).recover(th -> {
                return Future.failedFuture(new KafkaConsumerPollException(th));
            }).mapEmpty();
        });
    }

    private void onPartitionsAssigned(Set<TopicPartition> set) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions assigned: [{}]", getPartitionsDebugString(set));
        }
    }

    private void onPartitionsRevoked(Set<TopicPartition> set) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions revoked: [{}]", getPartitionsDebugString(set));
        }
    }

    private String getPartitionsDebugString(Set<TopicPartition> set) {
        return set.size() <= 20 ? ((Map) set.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getTopic();
        }, Collectors.mapping((v0) -> {
            return v0.getPartition();
        }, Collectors.toCollection(TreeSet::new))))).toString() : set.size() + " topic partitions";
    }

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        return tryCommitAndClose();
    }

    public final void setRespectTtl(boolean z) {
        this.respectTtl = z;
    }

    private void handleBatch(KafkaConsumerRecords<String, Buffer> kafkaConsumerRecords) {
        try {
            if (!kafkaConsumerRecords.isEmpty()) {
                LOG.debug("polled {} records on {}", Integer.valueOf(kafkaConsumerRecords.size()), this.topicsLogString);
            }
            for (int i = 0; i < kafkaConsumerRecords.size(); i++) {
                if (this.stopped) {
                    return;
                }
                KafkaConsumerRecord<String, Buffer> recordAt = kafkaConsumerRecords.recordAt(i);
                if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(recordAt.headers())) {
                    addToCurrentOffsets(recordAt);
                } else {
                    try {
                        this.messageHandler.handle(createMessage(recordAt));
                        addToCurrentOffsets(recordAt);
                    } catch (RuntimeException e) {
                        LOG.debug("Message handler failed", (Throwable) e);
                    }
                }
            }
            commit(true).compose(r3 -> {
                return poll();
            }).onSuccess2(this::handleBatch);
        } catch (Exception e2) {
            LOG.error("Consumer failed, closing", (Throwable) e2);
            tryCommitAndClose().onComplete2(asyncResult -> {
                this.closeHandler.handle(e2);
            });
        }
    }

    private Future<KafkaConsumerRecords<String, Buffer>> poll() {
        if (this.stopped) {
            return Future.failedFuture("consumer already stopped");
        }
        Promise promise = Promise.promise();
        this.kafkaConsumer.poll(this.pollTimeout, promise);
        return promise.future().recover(th -> {
            LOG.error("Error polling messages: " + th);
            KafkaConsumerPollException kafkaConsumerPollException = new KafkaConsumerPollException(th);
            closeAndCallHandler(kafkaConsumerPollException);
            return Future.failedFuture(kafkaConsumerPollException);
        });
    }

    private void addToCurrentOffsets(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        this.offsetsToBeCommitted.put(new TopicPartition(kafkaConsumerRecord.topic(), kafkaConsumerRecord.partition()), new OffsetAndMetadata(kafkaConsumerRecord.offset() + 1, ""));
    }

    private Future<Void> commit(boolean z) {
        return this.offsetsToBeCommitted.isEmpty() ? Future.succeededFuture() : this.stopped ? Future.failedFuture("consumer already stopped") : commitCurrentOffsets().recover(th -> {
            LOG.error("Error committing offsets: " + th);
            if ((th instanceof TimeoutException) && z) {
                LOG.debug("Committing offsets timed out. Maybe increase 'default.api.timeout.ms'?");
                return commit(false);
            }
            KafkaConsumerCommitException kafkaConsumerCommitException = new KafkaConsumerCommitException(th);
            closeAndCallHandler(kafkaConsumerCommitException);
            return Future.failedFuture(kafkaConsumerCommitException);
        });
    }

    private Future<Void> commitCurrentOffsets() {
        if (this.offsetsToBeCommitted.isEmpty()) {
            LOG.debug("no offsets to commit");
            return Future.succeededFuture();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("committing offsets: {}", this.offsetsToBeCommitted);
        } else {
            LOG.debug("committing current offsets");
        }
        Promise promise = Promise.promise();
        this.kafkaConsumer.commit(this.offsetsToBeCommitted, promise);
        return promise.future().map((Function) map -> {
            LOG.debug("successfully committed offsets");
            this.offsetsToBeCommitted.clear();
            return null;
        });
    }

    private void closeAndCallHandler(Throwable th) {
        LOG.error("Closing consumer with cause", th);
        closeConsumer().onComplete2(asyncResult -> {
            this.closeHandler.handle(th);
        });
    }

    private Future<Void> tryCommitAndClose() {
        this.stopped = true;
        Promise promise = Promise.promise();
        commitCurrentOffsets().onComplete2(asyncResult -> {
            closeConsumer().onComplete2(asyncResult -> {
                if (asyncResult.succeeded()) {
                    promise.complete();
                } else {
                    promise.fail(asyncResult.cause());
                }
            });
        });
        return promise.future();
    }

    private Future<Void> closeConsumer() {
        this.stopped = true;
        Promise promise = Promise.promise();
        this.kafkaConsumer.close(promise);
        this.offsetsToBeCommitted.clear();
        return promise.future();
    }
}
