package org.eclipse.hono.client.amqp;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.log.Fields;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.ext.web.handler.TimeoutHandler;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageNotProcessedException;
import org.eclipse.hono.client.MessageUndeliverableException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.SendMessageTimeoutException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-amqp-common-1.12.0.jar:org/eclipse/hono/client/amqp/GenericSenderLink.class */
public class GenericSenderLink extends AbstractHonoClient {
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GenericSenderLink.class);
    private final String tenantId;
    private final String targetAddress;
    private final SendMessageSampler sampler;
    private boolean errorInfoLoggingEnabled;

    protected GenericSenderLink(HonoConnection honoConnection, ProtonSender protonSender, String str, String str2, SendMessageSampler sendMessageSampler) {
        super(honoConnection);
        this.sender = (ProtonSender) Objects.requireNonNull(protonSender);
        this.tenantId = str;
        this.targetAddress = str2;
        this.sampler = (SendMessageSampler) Objects.requireNonNull(sendMessageSampler);
        if (protonSender.isOpen()) {
            this.offeredCapabilities = (List) Optional.ofNullable(protonSender.getRemoteOfferedCapabilities()).map((v0) -> {
                return List.of(v0);
            }).orElse(Collections.emptyList());
        }
    }

    public static Future<GenericSenderLink> create(HonoConnection honoConnection, String str, Handler<String> handler) {
        Objects.requireNonNull(honoConnection);
        Objects.requireNonNull(str);
        String rewrite = AddressHelper.rewrite(str, honoConnection.getConfig());
        return honoConnection.createSender(rewrite, ProtonQoS.AT_LEAST_ONCE, handler).map(protonSender -> {
            return new GenericSenderLink(honoConnection, protonSender, null, rewrite, SendMessageSampler.noop());
        });
    }

    public static Future<GenericSenderLink> create(HonoConnection honoConnection, String str, String str2, SendMessageSampler sendMessageSampler, Handler<String> handler) {
        Objects.requireNonNull(honoConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(sendMessageSampler);
        return create(honoConnection, str, str2, null, sendMessageSampler, handler);
    }

    public static Future<GenericSenderLink> create(HonoConnection honoConnection, String str, String str2, String str3, SendMessageSampler sendMessageSampler, Handler<String> handler) {
        Objects.requireNonNull(honoConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(sendMessageSampler);
        String targetAddress = AddressHelper.getTargetAddress(str, str2, str3, honoConnection.getConfig());
        return honoConnection.createSender(targetAddress, ProtonQoS.AT_LEAST_ONCE, handler).map(protonSender -> {
            return new GenericSenderLink(honoConnection, protonSender, str2, targetAddress, sendMessageSampler);
        });
    }

    public final Future<Void> close() {
        log.debug("closing sender ...");
        return closeLinks();
    }

    public final boolean isOpen() {
        return HonoProtonHelper.isLinkOpenAndConnected(this.sender);
    }

    public final Future<ProtonDelivery> send(Message message, Span span) {
        return checkForCreditAndSend(message, span, () -> {
            return sendMessage(message, span);
        });
    }

    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message, Span span) {
        return checkForCreditAndSend(message, span, () -> {
            return sendMessageAndWaitForOutcome(message, span, true);
        });
    }

    public Future<ProtonDelivery> sendAndWaitForRawOutcome(Message message, Span span) {
        return checkForCreditAndSend(message, span, () -> {
            return sendMessageAndWaitForOutcome(message, span, false);
        });
    }

    public void setErrorInfoLoggingEnabled(boolean z) {
        this.errorInfoLoggingEnabled = z;
    }

    private Future<ProtonDelivery> checkForCreditAndSend(Message message, Span span, Supplier<Future<ProtonDelivery>> supplier) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        Objects.requireNonNull(supplier);
        Tags.MESSAGE_BUS_DESTINATION.set(span, getMessageAddress(message));
        TracingHelper.TAG_QOS.set(span, this.sender.getQoS().toString());
        Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_PRODUCER);
        TracingHelper.setDeviceTags(span, this.tenantId, MessageHelper.getDeviceId(message));
        TracingHelper.injectSpanContext(this.connection.getTracer(), span.context(), message);
        return this.connection.executeOnContext(promise -> {
            if (!this.sender.sendQueueFull()) {
                ((Future) supplier.get()).onComplete2(promise);
                return;
            }
            NoConsumerException noConsumerException = new NoConsumerException("no credit available");
            logMessageSendingError("error sending message [ID: {}, address: {}], no credit available (drain={})", message.getMessageId(), getMessageAddress(message), Boolean.valueOf(this.sender.getDrain()));
            TracingHelper.TAG_CREDIT.set(span, (Integer) 0);
            logError(span, noConsumerException);
            span.finish();
            promise.fail(noConsumerException);
            this.sampler.queueFull(this.tenantId);
        });
    }

    private Future<ProtonDelivery> sendMessage(Message message, Span span) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        String format = String.format("%s-%d", getClass().getSimpleName(), Long.valueOf(MESSAGE_COUNTER.getAndIncrement()));
        message.setMessageId(format);
        logMessageIdAndSenderInfo(span, format);
        SendMessageSampler.Sample start = this.sampler.start(this.tenantId);
        AtomicReference atomicReference = new AtomicReference();
        ClientConfigProperties config = this.connection.getConfig();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Long valueOf = config.getSendMessageTimeout() > 0 ? Long.valueOf(this.connection.getVertx().setTimer(config.getSendMessageTimeout(), l -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                handleSendMessageTimeout(message, config.getSendMessageTimeout(), (ProtonDelivery) atomicReference.get(), start, null, span);
            }
        })) : null;
        ProtonDelivery send = this.sender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.connection.getVertx().cancelTimer(valueOf.longValue());
            }
            DeliveryState remoteState = protonDelivery.getRemoteState();
            start.completed(remoteState);
            if (atomicBoolean.get()) {
                log.debug("ignoring received delivery update for message [ID: {}, address: {}]: waiting for the update has already timed out", format, getMessageAddress(message));
            } else if (protonDelivery.remotelySettled()) {
                logUpdatedDeliveryState(span, message, protonDelivery);
            } else {
                logMessageSendingError("peer did not settle message [ID: {}, address: {}, remote state: {}], failing delivery", format, getMessageAddress(message), remoteState.getClass().getSimpleName());
                TracingHelper.logError(span, new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "peer did not settle message, failing delivery"));
            }
            span.finish();
        });
        atomicReference.set(send);
        log.trace("sent AT_MOST_ONCE message [ID: {}, address: {}], remaining credit: {}, queued messages: {}", format, getMessageAddress(message), Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
        return Future.succeededFuture(send);
    }

    private Future<ProtonDelivery> sendMessageAndWaitForOutcome(Message message, Span span, boolean z) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        AtomicReference atomicReference = new AtomicReference();
        Promise promise = Promise.promise();
        String format = String.format("%s-%d", getClass().getSimpleName(), Long.valueOf(MESSAGE_COUNTER.getAndIncrement()));
        message.setMessageId(format);
        logMessageIdAndSenderInfo(span, format);
        SendMessageSampler.Sample start = this.sampler.start(this.tenantId);
        ClientConfigProperties config = this.connection.getConfig();
        Long valueOf = config.getSendMessageTimeout() > 0 ? Long.valueOf(this.connection.getVertx().setTimer(config.getSendMessageTimeout(), l -> {
            if (promise.future().isComplete()) {
                return;
            }
            handleSendMessageTimeout(message, config.getSendMessageTimeout(), (ProtonDelivery) atomicReference.get(), start, promise, null);
        })) : null;
        atomicReference.set(this.sender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.connection.getVertx().cancelTimer(valueOf.longValue());
            }
            DeliveryState remoteState = protonDelivery.getRemoteState();
            if (promise.future().isComplete()) {
                log.debug("ignoring received delivery update for message [ID: {}, address: {}]: waiting for the update has already timed out", format, getMessageAddress(message));
                return;
            }
            if (!protonDelivery.remotelySettled()) {
                logMessageSendingError("peer did not settle message [ID: {}, address: {}, remote state: {}], failing delivery", format, getMessageAddress(message), remoteState.getClass().getSimpleName());
                promise.fail(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "peer did not settle message, failing delivery"));
                return;
            }
            logUpdatedDeliveryState(span, message, protonDelivery);
            start.completed(remoteState);
            if (Accepted.class.isInstance(remoteState)) {
                promise.complete(protonDelivery);
            } else if (z) {
                promise.handle((AsyncResult) mapUnacceptedOutcomeToErrorResult(protonDelivery));
            } else {
                promise.complete(protonDelivery);
            }
        }));
        log.trace("sent AT_LEAST_ONCE message [ID: {}, address: {}], remaining credit: {}, queued messages: {}", format, getMessageAddress(message), Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
        return promise.future().onSuccess2(protonDelivery2 -> {
            Tags.HTTP_STATUS.set(span, (Integer) 202);
        }).onFailure(th -> {
            TracingHelper.logError(span, th);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
        }).onComplete2(asyncResult -> {
            span.finish();
        });
    }

    protected void handleSendMessageTimeout(Message message, long j, ProtonDelivery protonDelivery, SendMessageSampler.Sample sample, Promise<ProtonDelivery> promise, Span span) {
        Objects.requireNonNull(message);
        String str = HonoProtonHelper.isLinkOpenAndConnected(this.sender) ? "" : " (link or connection already closed)";
        String str2 = "waiting for delivery update timed out after " + j + "ms";
        ServerErrorException sendMessageTimeoutException = HonoProtonHelper.isLinkOpenAndConnected(this.sender) ? new SendMessageTimeoutException(str2) : new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, str2 + str);
        logMessageSendingError("waiting for delivery update timed out for message [ID: {}, address: {}] after {}ms{}", message.getMessageId(), getMessageAddress(message), Long.valueOf(j), str);
        if (protonDelivery != null) {
            ProtonHelper.released(protonDelivery, true);
        }
        if (span != null) {
            TracingHelper.logError(span, sendMessageTimeoutException.getMessage());
            Tags.HTTP_STATUS.set(span, Integer.valueOf(TimeoutHandler.DEFAULT_ERRORCODE));
            span.finish();
        }
        if (promise != null) {
            promise.fail(sendMessageTimeoutException);
        }
        if (sample != null) {
            sample.timeout();
        }
    }

    private Future<ProtonDelivery> mapUnacceptedOutcomeToErrorResult(ProtonDelivery protonDelivery) {
        DeliveryState remoteState = protonDelivery.getRemoteState();
        if (Accepted.class.isInstance(remoteState)) {
            throw new IllegalStateException("delivery is expected to be rejected, released or modified here, not accepted");
        }
        Throwable th = null;
        if (Rejected.class.isInstance(remoteState)) {
            th = (ServiceInvocationException) Optional.ofNullable(((Rejected) remoteState).getError()).map(StatusCodeMapper::fromTransferError).orElseGet(() -> {
                return new ClientErrorException(400);
            });
        } else if (Released.class.isInstance(remoteState)) {
            th = new MessageNotProcessedException();
        } else if (Modified.class.isInstance(remoteState)) {
            th = ((Modified) remoteState).getUndeliverableHere().booleanValue() ? new MessageUndeliverableException() : new MessageNotProcessedException();
        }
        return Future.failedFuture(th);
    }

    protected final void logMessageIdAndSenderInfo(Span span, String str) {
        HashMap hashMap = new HashMap(2);
        hashMap.put(TracingHelper.TAG_MESSAGE_ID.getKey(), str);
        hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(this.sender.getCredit()));
        span.log(hashMap);
    }

    protected final void logUpdatedDeliveryState(Span span, Message message, ProtonDelivery protonDelivery) {
        Objects.requireNonNull(span);
        Object obj = message.getMessageId() != null ? message.getMessageId().toString() : "";
        String messageAddress = getMessageAddress(message);
        DeliveryState remoteState = protonDelivery.getRemoteState();
        if (Accepted.class.isInstance(remoteState)) {
            log.trace("message [ID: {}, address: {}] accepted by peer", obj, messageAddress);
            span.log("message accepted by peer");
            Tags.HTTP_STATUS.set(span, (Integer) 202);
            return;
        }
        HashMap hashMap = new HashMap();
        if (Rejected.class.isInstance(remoteState)) {
            Rejected rejected = (Rejected) protonDelivery.getRemoteState();
            Tags.HTTP_STATUS.set(span, (Integer) 400);
            if (rejected.getError() == null) {
                logMessageSendingError("message [ID: {}, address: {}] rejected by peer", obj, messageAddress);
                hashMap.put(Fields.MESSAGE, "message rejected by peer");
            } else {
                logMessageSendingError("message [ID: {}, address: {}] rejected by peer: {}, {}", obj, messageAddress, rejected.getError().getCondition(), rejected.getError().getDescription());
                hashMap.put(Fields.MESSAGE, String.format("message rejected by peer: %s, %s", rejected.getError().getCondition(), rejected.getError().getDescription()));
            }
        } else if (Released.class.isInstance(remoteState)) {
            logMessageSendingError("message [ID: {}, address: {}] not accepted by peer, remote state: {}", obj, messageAddress, remoteState.getClass().getSimpleName());
            Tags.HTTP_STATUS.set(span, Integer.valueOf(TimeoutHandler.DEFAULT_ERRORCODE));
            hashMap.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
        } else if (Modified.class.isInstance(remoteState)) {
            Modified modified = (Modified) protonDelivery.getRemoteState();
            logMessageSendingError("message [ID: {}, address: {}] not accepted by peer, remote state: {}", obj, messageAddress, modified);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(modified.getUndeliverableHere().booleanValue() ? 404 : TimeoutHandler.DEFAULT_ERRORCODE));
            hashMap.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
        }
        TracingHelper.logError(span, hashMap);
    }

    private String getMessageAddress(Message message) {
        Objects.requireNonNull(message);
        return (String) Optional.ofNullable(message.getAddress()).orElse(this.targetAddress);
    }

    private void logMessageSendingError(String str, Object... objArr) {
        if (this.errorInfoLoggingEnabled) {
            log.info(str, objArr);
        } else {
            log.debug(str, objArr);
        }
    }
}
