package org.eclipse.hono.application.client.kafka.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.StringTag;
import io.opentracing.tag.Tag;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.eclipse.hono.application.client.CommandSender;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.client.SendMessageTimeoutException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-application-kafka-1.12.0.jar:org/eclipse/hono/application/client/kafka/impl/KafkaBasedCommandSender.class */
public class KafkaBasedCommandSender extends AbstractKafkaBasedMessageSender implements CommandSender<KafkaMessageContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaBasedCommandSender.class);
    private static final long DEFAULT_COMMAND_TIMEOUT_IN_MS = 10000;
    private final Vertx vertx;
    private final MessagingKafkaConsumerConfigProperties consumerConfig;
    private final ConcurrentHashMap<String, HonoKafkaConsumer> commandResponseConsumers;
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, ExpiringCommandPromise>> pendingCommandResponses;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private Supplier<String> correlationIdSupplier;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hono-client-application-kafka-1.12.0.jar:org/eclipse/hono/application/client/kafka/impl/KafkaBasedCommandSender$ExpiringCommandPromise.class */
    public class ExpiringCommandPromise {
        private final Promise<DownstreamMessage<KafkaMessageContext>> promise = Promise.promise();
        private final Span span;
        private Long timerId;

        ExpiringCommandPromise(String str, long j, Handler<Void> handler, Span span) {
            Objects.requireNonNull(span);
            this.span = span;
            if (j > 0) {
                this.timerId = Long.valueOf(KafkaBasedCommandSender.this.vertx.setTimer(j, l -> {
                    SendMessageTimeoutException sendMessageTimeoutException = new SendMessageTimeoutException("send command/wait for response timed out after " + j + "ms");
                    this.timerId = null;
                    KafkaBasedCommandSender.LOGGER.debug("cancelling sending command [correlation-id: {}] and waiting for response after {} ms", str, Long.valueOf(j));
                    TracingHelper.logError(span, sendMessageTimeoutException);
                    this.promise.tryFail(sendMessageTimeoutException);
                    Optional.ofNullable(handler).ifPresent(handler2 -> {
                        handler2.handle(null);
                    });
                }));
            }
        }

        final void tryCompleteAndCancelTimer(AsyncResult<DownstreamMessage<KafkaMessageContext>> asyncResult) {
            Objects.requireNonNull(asyncResult);
            Optional ofNullable = Optional.ofNullable(this.timerId);
            Vertx vertx = KafkaBasedCommandSender.this.vertx;
            Objects.requireNonNull(vertx);
            ofNullable.ifPresent((v1) -> {
                r1.cancelTimer(v1);
            });
            if (!asyncResult.succeeded()) {
                this.promise.tryFail(asyncResult.cause());
                return;
            }
            KafkaBasedCommandSender.LOGGER.trace("received command response [correlation-id: {}]", (String) Optional.ofNullable(asyncResult.result()).map((v0) -> {
                return v0.getCorrelationId();
            }).orElse(""));
            this.span.log("received command response");
            this.promise.tryComplete(asyncResult.result());
        }

        final Future<DownstreamMessage<KafkaMessageContext>> future() {
            return this.promise.future();
        }
    }

    public KafkaBasedCommandSender(Vertx vertx, MessagingKafkaConsumerConfigProperties messagingKafkaConsumerConfigProperties, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties messagingKafkaProducerConfigProperties, Tracer tracer) {
        super(kafkaProducerFactory, "command-sender", messagingKafkaProducerConfigProperties, tracer);
        this.commandResponseConsumers = new ConcurrentHashMap<>();
        this.pendingCommandResponses = new ConcurrentHashMap<>();
        this.correlationIdSupplier = () -> {
            return UUID.randomUUID().toString();
        };
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.consumerConfig = (MessagingKafkaConsumerConfigProperties) Objects.requireNonNull(messagingKafkaConsumerConfigProperties);
    }

    @Override // org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender, org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        List list = (List) this.commandResponseConsumers.values().stream().map((v0) -> {
            return v0.stop();
        }).collect(Collectors.toList());
        this.commandResponseConsumers.clear();
        list.add(super.stop());
        return CompositeFuture.join(list).mapEmpty();
    }

    @Override // org.eclipse.hono.application.client.CommandSender
    public Future<Void> sendAsyncCommand(String str, String str2, String str3, String str4, Buffer buffer, String str5, String str6, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(str5);
        return sendCommand(str, str2, str3, str4, buffer, str5, map, true, "send command", spanContext);
    }

    @Override // org.eclipse.hono.application.client.CommandSender
    public Future<Void> sendOneWayCommand(String str, String str2, String str3, String str4, Buffer buffer, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        return sendCommand(str, str2, str3, str4, buffer, null, map, false, "send one-way command", spanContext);
    }

    @Override // org.eclipse.hono.application.client.CommandSender
    public Future<DownstreamMessage<KafkaMessageContext>> sendCommand(String str, String str2, String str3, String str4, Buffer buffer, String str5, Map<String, Object> map, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        long longValue = ((Long) Optional.ofNullable(duration).map(duration2 -> {
            if (duration2.isNegative()) {
                throw new IllegalArgumentException("command timeout duration must be >= 0");
            }
            return Long.valueOf(duration2.toMillis());
        }).orElse(10000L)).longValue();
        String str6 = this.correlationIdSupplier.get();
        Span start = TracingHelper.buildChildSpan(this.tracer, spanContext, "send command and receive response", getClass().getSimpleName()).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT).withTag((Tag<StringTag>) TracingHelper.TAG_TENANT_ID, (StringTag) str).withTag((Tag<StringTag>) TracingHelper.TAG_DEVICE_ID, (StringTag) str2).withTag((Tag<StringTag>) TracingHelper.TAG_CORRELATION_ID, (StringTag) str6).start();
        ExpiringCommandPromise expiringCommandPromise = new ExpiringCommandPromise(str6, longValue, r7 -> {
            removePendingCommandResponse(str, str6);
        }, start);
        subscribeForCommandResponse(str, start).compose(r22 -> {
            this.pendingCommandResponses.computeIfAbsent(str, str7 -> {
                return new ConcurrentHashMap();
            }).put(str6, expiringCommandPromise);
            return sendCommand(str, str2, str3, str4, buffer, str6, map, true, "send command", start.context()).onSuccess2(r6 -> {
                LOGGER.debug("sent command [correlation-id: {}], waiting for response", str6);
                start.log("sent command, waiting for response");
            }).onFailure(th -> {
                LOGGER.debug("error sending command", th);
                if (!expiringCommandPromise.future().isComplete()) {
                    TracingHelper.logError(start, "error sending command", th);
                }
                removePendingCommandResponse(str, str6);
                expiringCommandPromise.tryCompleteAndCancelTimer(Future.failedFuture(th));
            });
        });
        return expiringCommandPromise.future().onComplete2(asyncResult -> {
            start.finish();
        });
    }

    void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = (Supplier) Objects.requireNonNull(supplier);
    }

    void setCorrelationIdSupplier(Supplier<String> supplier) {
        this.correlationIdSupplier = (Supplier) Objects.requireNonNull(supplier);
    }

    private Future<Void> sendCommand(String str, String str2, String str3, String str4, Buffer buffer, String str5, Map<String, Object> map, boolean z, String str6, SpanContext spanContext) {
        HonoTopic honoTopic = new HonoTopic(HonoTopic.Type.COMMAND, str);
        Map<String, Object> headerProperties = getHeaderProperties(str2, str3, str4, str5, z, map);
        String honoTopic2 = honoTopic.toString();
        Span startChildSpan = startChildSpan(str6, honoTopic2, str, str2, spanContext);
        return sendAndWaitForOutcome(honoTopic2, str, str2, buffer, headerProperties, startChildSpan).onComplete2(asyncResult -> {
            startChildSpan.finish();
        });
    }

    private Map<String, Object> getHeaderProperties(String str, String str2, String str3, String str4, boolean z, Map<String, Object> map) {
        Map<String, Object> map2 = (Map) Optional.ofNullable(map).map(HashMap::new).orElseGet(HashMap::new);
        map2.put(MessageHelper.APP_PROPERTY_DEVICE_ID, str);
        map2.put(MessageHelper.SYS_PROPERTY_SUBJECT, str2);
        map2.put(MessageHelper.SYS_PROPERTY_CONTENT_TYPE, Optional.ofNullable(str3).orElse("application/octet-stream"));
        Optional.ofNullable(str4).ifPresent(str5 -> {
            map2.put(MessageHelper.SYS_PROPERTY_CORRELATION_ID, str5);
        });
        map2.put(KafkaRecordHelper.HEADER_RESPONSE_REQUIRED, Boolean.valueOf(z));
        return map2;
    }

    private Handler<DownstreamMessage<KafkaMessageContext>> getCommandResponseHandler(String str) {
        return downstreamMessage -> {
            if (downstreamMessage.getCorrelationId() == null) {
                LOGGER.trace("ignoring received command response - no correlation id set [tenant: {}]", str);
            } else {
                removePendingCommandResponse(str, downstreamMessage.getCorrelationId()).ifPresentOrElse(expiringCommandPromise -> {
                    expiringCommandPromise.tryCompleteAndCancelTimer(mapResponseResult(downstreamMessage));
                }, () -> {
                    LOGGER.trace("ignoring received command response - no response pending [tenant: {}, correlation-id: {}]", str, downstreamMessage.getCorrelationId());
                });
            }
        };
    }

    private Future<DownstreamMessage<KafkaMessageContext>> mapResponseResult(DownstreamMessage<KafkaMessageContext> downstreamMessage) {
        int intValue = ((Integer) Optional.ofNullable(downstreamMessage.getStatus()).orElseGet(() -> {
            LOGGER.warn("response message has no status code header [tenant ID: {}, device ID: {}, correlation ID: {}]", downstreamMessage.getTenantId(), downstreamMessage.getDeviceId(), downstreamMessage.getCorrelationId());
            return Integer.valueOf(JsonLocation.MAX_CONTENT_SNIPPET);
        })).intValue();
        if (StatusCodeMapper.isSuccessful(Integer.valueOf(intValue))) {
            return Future.succeededFuture(downstreamMessage);
        }
        return Future.failedFuture(StatusCodeMapper.from(intValue, (downstreamMessage.getPayload() == null || downstreamMessage.getPayload().length() <= 0) ? null : downstreamMessage.getPayload().toString(StandardCharsets.UTF_8)));
    }

    private Optional<ExpiringCommandPromise> removePendingCommandResponse(String str, String str2) {
        return Optional.ofNullable(this.pendingCommandResponses.get(str)).map(concurrentHashMap -> {
            return (ExpiringCommandPromise) concurrentHashMap.remove(str2);
        });
    }

    private Future<Void> subscribeForCommandResponse(String str, Span span) {
        if (this.commandResponseConsumers.get(str) != null) {
            LOGGER.debug("command response consumer already exists for tenant [{}]", str);
            span.log("command response consumer already exists");
            return Future.succeededFuture();
        }
        Map<String, String> consumerConfig = this.consumerConfig.getConsumerConfig(HonoTopic.Type.COMMAND_RESPONSE.toString());
        String str2 = consumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
        if (str2 != null && !str2.equals("latest")) {
            LOGGER.warn("[auto.offset.reset] value is set to other than [latest]. It will be ignored and internally set to [latest]");
        }
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        consumerConfig.put("group.id", str + "-" + UUID.randomUUID());
        String honoTopic = new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, str).toString();
        HonoKafkaConsumer honoKafkaConsumer = new HonoKafkaConsumer(this.vertx, (Set<String>) Set.of(honoTopic), (Handler<KafkaConsumerRecord<String, Buffer>>) kafkaConsumerRecord -> {
            getCommandResponseHandler(str).handle(new KafkaDownstreamMessage(kafkaConsumerRecord));
        }, consumerConfig);
        honoKafkaConsumer.setPollTimeout(Duration.ofMillis(this.consumerConfig.getPollTimeout()));
        Optional ofNullable = Optional.ofNullable(this.kafkaConsumerSupplier);
        Objects.requireNonNull(honoKafkaConsumer);
        ofNullable.ifPresent(honoKafkaConsumer::setKafkaConsumerSupplier);
        return honoKafkaConsumer.start().recover(th -> {
            LOGGER.debug("error creating command response consumer for tenant [{}]", str, th);
            TracingHelper.logError(span, "error creating command response consumer", th);
            return Future.failedFuture(th);
        }).onSuccess2(r8 -> {
            LOGGER.debug("created command response consumer for tenant [{}]", str);
            span.log("created command response consumer");
            this.commandResponseConsumers.put(str, honoKafkaConsumer);
        });
    }
}
