package org.eclipse.hono.client.kafka.consumer;

import com.fasterxml.jackson.core.JsonLocation;
import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.handler.TimeoutHandler;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.metrics.Metrics;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RegisterForReflection(targets = {KafkaReadStreamImpl.class})
/* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.12.0.jar:org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.class */
public class HonoKafkaConsumer implements Lifecycle {
    private static final long WAIT_FOR_REBALANCE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30);
    private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30);
    private static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250;
    private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started";
    protected final Logger log;
    protected final Vertx vertx;
    protected final AtomicBoolean stopCalled;
    protected final Map<String, String> consumerConfig;
    protected final Set<String> topics;
    protected final Pattern topicPattern;
    private final AtomicReference<Pair<Promise<Void>, Promise<Void>>> subscriptionUpdatedAndPartitionsAssignedPromiseRef;
    private final Handler<KafkaConsumerRecord<String, Buffer>> recordHandler;
    private final AtomicBoolean pollingPaused;
    private final AtomicBoolean recordFetchingPaused;
    private KafkaConsumer<String, Buffer> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private volatile Set<String> subscribedTopicPatternTopics;
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private Handler<Set<TopicPartition>> onPartitionsLostHandler;
    private boolean respectTtl;
    private Duration pollTimeout;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private KafkaClientMetricsSupport metricsSupport;
    private Long pollPauseTimeoutTimerId;
    private Duration consumerCreationRetriesTimeout;

    public HonoKafkaConsumer(Vertx vertx, Set<String> set, Handler<KafkaConsumerRecord<String, Buffer>> handler, Map<String, String> map) {
        this(vertx, (Set) Objects.requireNonNull(set), null, handler, map);
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern pattern, Handler<KafkaConsumerRecord<String, Buffer>> handler, Map<String, String> map) {
        this(vertx, null, (Pattern) Objects.requireNonNull(pattern), handler, map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HonoKafkaConsumer(Vertx vertx, Set<String> set, Pattern pattern, Handler<KafkaConsumerRecord<String, Buffer>> handler, Map<String, String> map) {
        this.log = LoggerFactory.getLogger(getClass());
        this.stopCalled = new AtomicBoolean();
        this.subscriptionUpdatedAndPartitionsAssignedPromiseRef = new AtomicReference<>();
        this.pollingPaused = new AtomicBoolean();
        this.recordFetchingPaused = new AtomicBoolean();
        this.subscribedTopicPatternTopics = new HashSet();
        this.respectTtl = true;
        this.pollTimeout = Duration.ofMillis(250L);
        this.consumerCreationRetriesTimeout = Duration.ZERO;
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.topics = set;
        this.topicPattern = pattern;
        this.recordHandler = (Handler) Objects.requireNonNull(handler);
        this.consumerConfig = (Map) Objects.requireNonNull(map);
        if ((set == null) == (pattern == null)) {
            throw new NullPointerException("either topics or topicPattern has to be set");
        }
        if (map.containsKey("group.id")) {
            return;
        }
        if ("true".equals(map.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
            throw new IllegalArgumentException("group.id config entry has to be set if auto-commit is enabled");
        }
        this.log.trace("no group.id set, using a random UUID as default and disabling auto-commit");
        map.put("group.id", UUID.randomUUID().toString());
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.onPartitionsAssignedHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.onPartitionsRevokedHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsLostHandler(Handler<Set<TopicPartition>> handler) {
        this.onPartitionsLostHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setMetricsSupport(KafkaClientMetricsSupport kafkaClientMetricsSupport) {
        this.metricsSupport = kafkaClientMetricsSupport;
    }

    public final void setConsumerCreationRetriesTimeout(Duration duration) {
        this.consumerCreationRetriesTimeout = duration;
    }

    public final void setRespectTtl(boolean z) {
        this.respectTtl = z;
    }

    public final void setPollTimeout(Duration duration) {
        this.pollTimeout = (Duration) Objects.requireNonNull(duration);
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.asStream().pollTimeout(duration);
        }
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    public final boolean pauseRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(false, true)) {
            return false;
        }
        runOnKafkaWorkerThread(r4 -> {
            Set<org.apache.kafka.common.TopicPartition> assignment = getUnderlyingConsumer().assignment();
            if (assignment.isEmpty()) {
                return;
            }
            getUnderlyingConsumer().pause(assignment);
        });
        return true;
    }

    public final boolean resumeRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(true, false)) {
            return false;
        }
        runOnKafkaWorkerThread(r4 -> {
            Set<org.apache.kafka.common.TopicPartition> assignment = getUnderlyingConsumer().assignment();
            if (assignment.isEmpty()) {
                return;
            }
            getUnderlyingConsumer().resume(assignment);
        });
        return true;
    }

    public final boolean isRecordFetchingPaused() {
        return this.recordFetchingPaused.get();
    }

    public final boolean pauseRecordHandlingAndPolling(Duration duration) {
        if (!this.pollingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.pollPauseTimeoutTimerId = Long.valueOf(this.vertx.setTimer(duration.toMillis(), l -> {
            this.pollPauseTimeoutTimerId = null;
            if (resumeRecordHandlingAndPolling()) {
                this.log.debug("resumed consumer record polling - timeout of {}ms was reached [client-id: {}]", Long.valueOf(duration.toMillis()), getClientId());
            }
        }));
        getKafkaConsumer().pause2();
        return true;
    }

    public final boolean resumeRecordHandlingAndPolling() {
        if (!this.pollingPaused.compareAndSet(true, false)) {
            return false;
        }
        if (this.pollPauseTimeoutTimerId != null) {
            this.vertx.cancelTimer(this.pollPauseTimeoutTimerId.longValue());
            this.pollPauseTimeoutTimerId = null;
        }
        getKafkaConsumer().resume2();
        return true;
    }

    public final boolean isRecordHandlingAndPollingPaused() {
        return this.pollingPaused.get();
    }

    protected final KafkaConsumer<String, Buffer> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Consumer<String, Buffer> getUnderlyingConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer.asStream().unwrap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getClientId() {
        return this.consumerConfig.get("client.id");
    }

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> start() {
        this.context = this.vertx.getOrCreateContext();
        Promise promise = Promise.promise();
        runOnContext(r6 -> {
            ((Future) Optional.ofNullable(this.kafkaConsumerSupplier).map(supplier -> {
                return Future.succeededFuture(KafkaConsumer.create(this.vertx, (Consumer) supplier.get()));
            }).orElseGet(() -> {
                return new KafkaClientFactory(this.vertx).createKafkaConsumerWithRetries(this.consumerConfig, String.class, Buffer.class, this.consumerCreationRetriesTimeout);
            })).onFailure(th -> {
                this.log.error("error creating consumer [client-id: {}]", getClientId(), th);
                promise.fail(th);
            }).onSuccess2(kafkaConsumer -> {
                this.kafkaConsumer = kafkaConsumer;
                Optional.ofNullable(this.metricsSupport).ifPresent(kafkaClientMetricsSupport -> {
                    kafkaClientMetricsSupport.registerKafkaConsumer(this.kafkaConsumer.unwrap());
                });
                this.kafkaConsumer.handler2(kafkaConsumerRecord -> {
                    if (!promise.future().isComplete()) {
                        this.log.debug("postponing record handling until start() is completed [topic: {}, partition: {}, offset: {}]", kafkaConsumerRecord.topic(), Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset()));
                    }
                    promise.future().onSuccess2(r10 -> {
                        if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(kafkaConsumerRecord.headers())) {
                            onRecordHandlerSkippedForExpiredRecord(kafkaConsumerRecord);
                            return;
                        }
                        try {
                            this.recordHandler.handle(kafkaConsumerRecord);
                        } catch (Exception e) {
                            this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", kafkaConsumerRecord.topic(), Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset()), kafkaConsumerRecord.headers(), e);
                        }
                    });
                });
                this.kafkaConsumer.batchHandler(this::onBatchOfRecordsReceived);
                this.kafkaConsumer.exceptionHandler(th2 -> {
                    this.log.error("consumer error occurred [client-id: {}]", getClientId(), th2);
                });
                installRebalanceListeners();
                this.kafkaConsumer.asStream().pollTimeout(Duration.ofMillis(10L));
                subscribeAndWaitForRebalance().onSuccess2(r4 -> {
                    this.kafkaConsumer.asStream().pollTimeout(this.pollTimeout);
                    logSubscribedTopicsOnStartComplete();
                }).onComplete2(promise);
            });
        });
        return promise.future();
    }

    private void logSubscribedTopicsOnStartComplete() {
        if (this.topicPattern == null) {
            this.log.debug("consumer started, subscribed to topics {}", this.topics);
        } else if (this.subscribedTopicPatternTopics.size() <= 5) {
            this.log.debug("consumer started, subscribed to topic pattern [{}], matching topics: {}", this.topicPattern, this.subscribedTopicPatternTopics);
        } else {
            this.log.debug("consumer started, subscribed to topic pattern [{}], matching {} topics", this.topicPattern, Integer.valueOf(this.subscribedTopicPatternTopics.size()));
        }
    }

    protected void onBatchOfRecordsReceived(KafkaConsumerRecords<String, Buffer> kafkaConsumerRecords) {
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
    }

    private void installRebalanceListeners() {
        replaceRebalanceListener(this.kafkaConsumer, new ConsumerRebalanceListener() { // from class: org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer.1
            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> collection) {
                Set<TopicPartition> from = Helper.from(collection);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions assigned: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(collection));
                }
                HonoKafkaConsumer.this.ensurePositionsHaveBeenSetIfNeeded(from);
                HonoKafkaConsumer.this.updateSubscribedTopicPatternTopicsAndRemoveMetrics();
                if (HonoKafkaConsumer.this.recordFetchingPaused.get()) {
                    HonoKafkaConsumer.this.getUnderlyingConsumer().pause(collection);
                }
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(from);
                Set set = (Set) Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(handler -> {
                    return Helper.from(HonoKafkaConsumer.this.getKafkaConsumer().asStream().unwrap().assignment());
                }).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(r6 -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(from);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle(set);
                    }
                });
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> collection) {
                Set<TopicPartition> from = Helper.from(collection);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions revoked: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(collection));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(from);
                HonoKafkaConsumer.this.context.runOnContext(r5 -> {
                    HonoKafkaConsumer.this.onPartitionsRevoked(from);
                });
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> collection) {
                Set<TopicPartition> from = Helper.from(collection);
                if (HonoKafkaConsumer.this.log.isInfoEnabled()) {
                    HonoKafkaConsumer.this.log.info("partitions lost: [{}] [client-id: {}]", HonoKafkaConsumerHelper.getPartitionsDebugString(collection), HonoKafkaConsumer.this.getClientId());
                }
                HonoKafkaConsumer.this.onPartitionsLostBlocking(from);
                HonoKafkaConsumer.this.context.runOnContext(r5 -> {
                    HonoKafkaConsumer.this.onPartitionsLost(from);
                });
            }
        });
    }

    private void ensurePositionsHaveBeenSetIfNeeded(Set<TopicPartition> set) {
        if (set.isEmpty() || !isAutoOffsetResetConfigLatest()) {
            return;
        }
        this.log.trace("checking positions for {} newly assigned partitions...", Integer.valueOf(set.size()));
        Set<org.apache.kafka.common.TopicPartition> set2 = Helper.to(set);
        try {
            LinkedList linkedList = new LinkedList();
            Map<org.apache.kafka.common.TopicPartition, Long> beginningOffsets = getUnderlyingConsumer().beginningOffsets(set2);
            set2.forEach(topicPartition -> {
                long position = getUnderlyingConsumer().position(topicPartition);
                Long l = (Long) beginningOffsets.get(topicPartition);
                if (l == null || position >= l.longValue()) {
                    return;
                }
                this.log.debug("committed offset {} for [{}] is smaller than beginning offset, resetting it to the beginning offset {}", Long.valueOf(position), topicPartition, l);
                getUnderlyingConsumer().seek(topicPartition, l.longValue());
                linkedList.add(topicPartition);
            });
            if (!linkedList.isEmpty()) {
                this.log.info("found out-of-range committed offsets, corresponding records having already been deleted; positions were reset to beginning offsets; partitions: [{}] [client-id: {}]", HonoKafkaConsumerHelper.getPartitionsDebugString(linkedList), getClientId());
            }
        } catch (Exception e) {
            this.log.error("error checking positions for {} newly assigned partitions [client-id: {}]", Integer.valueOf(set.size()), getClientId(), e);
        }
        this.log.trace("done checking positions for {} newly assigned partitions", Integer.valueOf(set.size()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isCooperativeRebalancingConfigured() {
        return ((Boolean) Optional.ofNullable(this.consumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)).map(str -> {
            return Boolean.valueOf(str.equals(CooperativeStickyAssignor.class.getName()));
        }).orElse(false)).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isAutoOffsetResetConfigLatest() {
        return ((Boolean) Optional.ofNullable(this.consumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).map(str -> {
            return Boolean.valueOf(str.equals("latest"));
        }).orElse(true)).booleanValue();
    }

    private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() {
        if (this.topicPattern != null) {
            Set<String> set = this.subscribedTopicPatternTopics;
            try {
                this.subscribedTopicPatternTopics = new HashSet(getUnderlyingConsumer().subscription());
            } catch (Exception e) {
                this.log.warn("error getting subscription", (Throwable) e);
            }
            Set set2 = (Set) set.stream().filter(str -> {
                return !this.subscribedTopicPatternTopics.contains(str);
            }).collect(Collectors.toSet());
            if (set2.isEmpty()) {
                return;
            }
            runOnContext(r8 -> {
                this.vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, l -> {
                    runOnKafkaWorkerThread(r6 -> {
                        removeMetricsForDeletedTopics(set2.stream().filter(str2 -> {
                            return !this.subscribedTopicPatternTopics.contains(str2);
                        }));
                    });
                });
            });
        }
    }

    private Future<Void> subscribeAndWaitForRebalance() {
        if (this.stopCalled.get()) {
            return Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "already stopped"));
        }
        Promise promise = Promise.promise();
        Promise promise2 = Promise.promise();
        Pair of = Pair.of(promise2, promise);
        Pair<Promise<Void>, Promise<Void>> updateAndGet = this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.updateAndGet(pair -> {
            return pair == null ? of : pair;
        });
        if (!updateAndGet.equals(of)) {
            this.log.debug("subscribeAndWaitForRebalance: will wait for ongoing invocation to complete");
            return CompositeFuture.all(updateAndGet.one().future(), updateAndGet.two().future()).mapEmpty();
        }
        if (this.topicPattern != null) {
            this.kafkaConsumer.subscribe(this.topicPattern, promise2);
        } else {
            this.topics.forEach(str -> {
                HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, str).onSuccess2(list -> {
                    if (list.isEmpty()) {
                        this.log.info("subscription topic doesn't exist and didn't get auto-created: {} [client-id: {}]", str, getClientId());
                    }
                });
            });
            this.kafkaConsumer.subscribe(this.topics, promise2);
        }
        if (this.kafkaConsumerWorker == null) {
            this.kafkaConsumerWorker = getKafkaConsumerWorker(this.kafkaConsumer);
        }
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT_MILLIS, l -> {
            if (promise.future().isComplete()) {
                return;
            }
            this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.compareAndSet(updateAndGet, null);
            this.log.warn("timed out waiting for rebalance and update of subscribed topics");
            promise.tryFail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "timed out waiting for rebalance and update of subscribed topics"));
        });
        promise2.future().onFailure(th -> {
            this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.compareAndSet(updateAndGet, null);
        });
        return CompositeFuture.all(promise2.future(), promise.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> set) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> set) {
        Optional.ofNullable(this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.getAndSet(null)).ifPresent(pair -> {
            ((Promise) pair.two()).tryComplete();
        });
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(set);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> set) {
    }

    protected void onPartitionsLostBlocking(Set<TopicPartition> set) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> set) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(set);
        }
    }

    private void onPartitionsLost(Set<TopicPartition> set) {
        if (this.onPartitionsLostHandler != null) {
            this.onPartitionsLostHandler.handle(set);
        }
    }

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        if (this.pollPauseTimeoutTimerId != null) {
            this.vertx.cancelTimer(this.pollPauseTimeoutTimerId.longValue());
            this.pollPauseTimeoutTimerId = null;
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture("not started");
        }
        if (!this.stopCalled.compareAndSet(false, true)) {
            this.log.trace("stop already called");
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        this.kafkaConsumer.close(promise);
        return promise.future().onComplete2(asyncResult -> {
            Optional.ofNullable(this.metricsSupport).ifPresent(kafkaClientMetricsSupport -> {
                kafkaClientMetricsSupport.unregisterKafkaConsumer(this.kafkaConsumer.unwrap());
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runOnContext(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(r4 -> {
                handler.handle(null);
            });
        } else {
            handler.handle(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        if (this.stopCalled.get()) {
            return;
        }
        this.kafkaConsumerWorker.submit(() -> {
            if (this.stopCalled.get()) {
                return;
            }
            try {
                handler.handle(null);
            } catch (Exception e) {
                this.log.error("error running task on Kafka worker thread [client-id: {}]", getClientId(), e);
            }
        });
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        return this.topicPattern == null ? Set.of() : new HashSet(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String str) {
        Objects.requireNonNull(str);
        return this.topics != null ? this.topics.contains(str) : this.subscribedTopicPatternTopics.contains(str);
    }

    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String str) {
        Objects.requireNonNull(str);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(str).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "not started"));
        }
        if (this.stopCalled.get()) {
            return Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "already stopped"));
        }
        if (this.subscribedTopicPatternTopics.contains(str)) {
            this.log.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", str);
            return Future.succeededFuture();
        }
        HashSet hashSet = new HashSet(this.subscribedTopicPatternTopics);
        Promise promise = Promise.promise();
        Future<U> compose = HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, str).onFailure(th -> {
            this.log.warn("ensureTopicIsAmongSubscribedTopics: error getting partitions for topic [{}]", str, th);
        }).compose(list -> {
            if (!list.isEmpty()) {
                return Future.succeededFuture();
            }
            this.log.warn("ensureTopicIsAmongSubscribedTopics: topic doesn't exist and didn't get auto-created: {}", str);
            return Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "command topic doesn't exist and didn't get auto-created"));
        });
        Objects.requireNonNull(promise);
        Future mapEmpty = compose.onFailure(promise::tryFail).mapEmpty();
        this.log.debug("ensureTopicIsAmongSubscribedTopics: wait for subscription update and rebalance [{}]", str);
        subscribeAndWaitForRebalance().compose(r8 -> {
            boolean anyMatch = hashSet.stream().anyMatch(str2 -> {
                return !this.subscribedTopicPatternTopics.contains(str2);
            });
            if (this.subscribedTopicPatternTopics.contains(str)) {
                return (isCooperativeRebalancingConfigured() && anyMatch && isAutoOffsetResetConfigLatest()) ? this.kafkaConsumer.assignment().compose(set -> {
                    if (set.stream().anyMatch(topicPartition -> {
                        return topicPartition.getTopic().equals(str);
                    })) {
                        return Future.succeededFuture(r8);
                    }
                    this.log.debug("ensureTopicIsAmongSubscribedTopics: wait for another rebalance before considering update of topic subscription [{}] as done", str);
                    Promise promise2 = Promise.promise();
                    runOnKafkaWorkerThread(r6 -> {
                        getUnderlyingConsumer().enforceRebalance();
                        runOnContext(r5 -> {
                            subscribeAndWaitForRebalance().onComplete2(promise2);
                        });
                    });
                    return promise2.future();
                }) : Future.succeededFuture(r8);
            }
            this.log.debug("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance; try again [topic: {}]", str);
            return subscribeAndWaitForRebalance();
        }).compose(r7 -> {
            if (this.subscribedTopicPatternTopics.contains(str)) {
                this.log.debug("ensureTopicIsAmongSubscribedTopics: done updating topic subscription [{}]", str);
                return Future.succeededFuture(r7);
            }
            this.log.warn("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance [topic: {}]", str);
            return Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "subscription not updated with topic after rebalance"));
        }).onComplete2(asyncResult -> {
            Futures.tryHandleResult(promise, asyncResult);
        });
        if (!isAutoOffsetResetConfigLatest()) {
            mapEmpty.onSuccess2(r3 -> {
                promise.tryComplete();
            });
        }
        return promise.future();
    }

    private void removeMetricsForDeletedTopics(Stream<String> stream) {
        Metrics internalMetricsObject = getInternalMetricsObject(this.kafkaConsumer.unwrap());
        if (internalMetricsObject != null) {
            stream.forEach(str -> {
                internalMetricsObject.removeSensor("topic." + str + ".bytes-fetched");
                internalMetricsObject.removeSensor("topic." + str + ".records-fetched");
            });
        }
    }

    private Metrics getInternalMetricsObject(Consumer<String, Buffer> consumer) {
        if (!(consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer)) {
            return null;
        }
        try {
            Field declaredField = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics");
            declaredField.setAccessible(true);
            return (Metrics) declaredField.get(consumer);
        } catch (Exception e) {
            this.log.warn("failed to get metrics object", (Throwable) e);
            return null;
        }
    }

    private static void replaceRebalanceListener(KafkaConsumer<String, Buffer> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener) {
        try {
            Field declaredField = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            declaredField.setAccessible(true);
            declaredField.set(kafkaConsumer.asStream(), consumerRebalanceListener);
        } catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private static ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, Buffer> kafkaConsumer) {
        try {
            Field declaredField = KafkaReadStreamImpl.class.getDeclaredField("worker");
            declaredField.setAccessible(true);
            ExecutorService executorService = (ExecutorService) declaredField.get(kafkaConsumer.asStream());
            if (executorService == null) {
                throw new IllegalStateException("worker not set");
            }
            return executorService;
        } catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
    }
}
