package org.eclipse.hono.client.pubsub;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.EncodeException;
import io.vertx.core.json.Json;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.MessagingType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.class */
public abstract class AbstractPubSubBasedMessageSender implements MessagingClient, ServiceClient, Lifecycle {
    protected final String projectId;
    private final PubSubPublisherFactory publisherFactory;
    private final String topic;
    private final Tracer tracer;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();

    protected AbstractPubSubBasedMessageSender(PubSubPublisherFactory pubSubPublisherFactory, String str, String str2, Tracer tracer) {
        Objects.requireNonNull(pubSubPublisherFactory);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(tracer);
        this.publisherFactory = pubSubPublisherFactory;
        this.topic = str;
        this.projectId = str2;
        this.tracer = tracer;
    }

    private Span newSpan(String str, String str2, SpanContext spanContext) {
        return TracingHelper.buildSpan(this.tracer, spanContext, str, str2).ignoreActiveSpan().withTag(Tags.COMPONENT.getKey(), "hono-client-pubsub").withTag(Tags.SPAN_KIND.getKey(), "producer").withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), this.topic).withTag(Tags.PEER_SERVICE.getKey(), "pubsub").start();
    }

    public final MessagingType getMessagingType() {
        return MessagingType.pubsub;
    }

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        healthCheckHandler.register("%s-pub-sub-publisher-creation-%s".formatted(this.topic, UUID.randomUUID()), promise -> {
            promise.tryComplete(new Status().setOk(this.lifecycleStatus.isStarted()));
        });
    }

    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture(new IllegalStateException("sender is already started/stopping"));
        }
        this.lifecycleStatus.setStarted();
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        if (this.lifecycleStatus.isStopping()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStopping()) {
            return Future.failedFuture(new IllegalStateException("sender is already stopping"));
        }
        this.lifecycleStatus.setStopped();
        return this.publisherFactory.closeAllPublisher();
    }

    protected final Future<Void> sendAndWaitForOutcome(String str, String str2, String str3, Buffer buffer, Map<String, Object> map, Span span) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(map);
        Objects.requireNonNull(span);
        if (!this.lifecycleStatus.isStarted()) {
            return Future.failedFuture(new ServerErrorException(503, "sender not started"));
        }
        PubsubMessage build = PubsubMessage.newBuilder().putAllAttributes(encodePropertiesAsPubSubAttributes(map, span)).setOrderingKey(str3).setData(ByteString.copyFrom(buffer.getBytes())).build();
        this.log.info("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", new Object[]{str, str2, str3});
        logPubSubMessage(span, build, str, str2);
        return getOrCreatePublisher(str, str2).publish(build).onSuccess(str4 -> {
            logPubSubMessageId(span, str, str4);
        }).onFailure(th -> {
            logError(span, str, str2, str3, th);
            throw new ServerErrorException(str2, 503, th);
        }).mapEmpty();
    }

    protected Span startSpan(String str, String str2, String str3, String str4, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str4);
        return newSpan(str, str4, spanContext).setTag(TracingHelper.TAG_TENANT_ID.getKey(), str2).setTag(TracingHelper.TAG_DEVICE_ID.getKey(), str3);
    }

    private PubSubPublisherClient getOrCreatePublisher(String str, String str2) {
        return this.publisherFactory.getOrCreatePublisher(str, str2);
    }

    private void logPubSubMessageId(Span span, String str, String str2) {
        this.log.info("message published to PubSub [topic: {}, id: {}]", str, str2);
        span.log("message published to PubSub");
        Tags.HTTP_STATUS.set(span, 202);
    }

    private void logPubSubMessage(Span span, PubsubMessage pubsubMessage, String str, String str2) {
        String str3 = (String) pubsubMessage.getAttributesMap().entrySet().stream().map(entry -> {
            return ((String) entry.getKey()) + "=" + ((String) entry.getValue());
        }).collect(Collectors.joining(",", "{", "}"));
        this.log.trace("producing message [topic: {}, tenant: {}, key: {}, timestamp: {}, attributes: {}]", new Object[]{str, str2, pubsubMessage.getOrderingKey(), pubsubMessage.getPublishTime(), str3});
        span.log("publishing message with headers: " + str3);
    }

    private void logError(Span span, String str, String str2, String str3, Throwable th) {
        this.log.debug("sending message failed [topic: {}, key: {}, tenantId: {}, deviceId: {}]", new Object[]{str, str3, str2, str3, th});
        Tags.HTTP_STATUS.set(span, 503);
        TracingHelper.logError(span, th);
    }

    private Map<String, String> encodePropertiesAsPubSubAttributes(Map<String, Object> map, Span span) {
        HashMap hashMap = new HashMap();
        map.forEach((str, obj) -> {
            try {
                hashMap.put(str, getStringEncodedValue(obj));
            } catch (EncodeException e) {
                this.log.info("failed to serialize property with key [{}] to Pub/Sub attribute", str);
                span.log("failed to create Pub/Sub attributes from property: " + str);
            }
        });
        return hashMap;
    }

    private String getStringEncodedValue(Object obj) {
        return obj instanceof String ? (String) obj : Json.encode(obj);
    }
}
