package org.eclipse.hono.application.client.kafka.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaApplicationClient;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;

/* loaded from: input_file:BOOT-INF/lib/hono-client-application-kafka-1.7.0.jar:org/eclipse/hono/application/client/kafka/impl/KafkaApplicationClientImpl.class */
public class KafkaApplicationClientImpl extends KafkaBasedCommandSender implements KafkaApplicationClient {
    private final Vertx vertx;
    private final KafkaConsumerConfigProperties consumerConfig;
    private Supplier<KafkaConsumer<String, Buffer>> kafkaConsumerSupplier;

    public KafkaApplicationClientImpl(Vertx vertx, KafkaConsumerConfigProperties kafkaConsumerConfigProperties, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, KafkaProducerConfigProperties kafkaProducerConfigProperties) {
        this(vertx, kafkaConsumerConfigProperties, kafkaProducerFactory, kafkaProducerConfigProperties, NoopTracerFactory.create());
    }

    public KafkaApplicationClientImpl(Vertx vertx, KafkaConsumerConfigProperties kafkaConsumerConfigProperties, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, KafkaProducerConfigProperties kafkaProducerConfigProperties, Tracer tracer) {
        super(kafkaProducerFactory, kafkaProducerConfigProperties, tracer);
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(kafkaConsumerConfigProperties);
        if (!kafkaConsumerConfigProperties.isConfigured() || !kafkaProducerConfigProperties.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.vertx = vertx;
        this.consumerConfig = kafkaConsumerConfigProperties;
    }

    @Override // org.eclipse.hono.application.client.kafka.KafkaApplicationClient, org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createTelemetryConsumer(String str, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        return createKafkaBasedDownstreamMessageConsumer(str, HonoTopic.Type.TELEMETRY, handler, handler2);
    }

    @Override // org.eclipse.hono.application.client.kafka.KafkaApplicationClient, org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createEventConsumer(String str, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        return createKafkaBasedDownstreamMessageConsumer(str, HonoTopic.Type.EVENT, handler, handler2);
    }

    @Override // org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createCommandResponseConsumer(String str, String str2, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        return createKafkaBasedDownstreamMessageConsumer(str, HonoTopic.Type.COMMAND_RESPONSE, handler, handler2);
    }

    void setKafkaConsumerFactory(Supplier<KafkaConsumer<String, Buffer>> supplier) {
        Objects.requireNonNull(supplier);
        this.kafkaConsumerSupplier = supplier;
    }

    private Future<MessageConsumer> createKafkaBasedDownstreamMessageConsumer(String str, HonoTopic.Type type, Handler<DownstreamMessage<KafkaMessageContext>> handler, Handler<Throwable> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(type);
        Objects.requireNonNull(handler);
        return KafkaBasedDownstreamMessageConsumer.create(str, type, (KafkaConsumer) Optional.ofNullable(this.kafkaConsumerSupplier).map((v0) -> {
            return v0.get();
        }).orElseGet(() -> {
            return KafkaConsumer.create(this.vertx, this.consumerConfig.getConsumerConfig(type.toString()));
        }), this.consumerConfig, handler, Objects.nonNull(handler2) ? handler2 : th -> {
        });
    }
}
