package org.eclipse.hono.adapter.coap;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.qpid.proton.message.Message;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
import org.eclipse.californium.elements.auth.ExtensiblePrincipal;
import org.eclipse.californium.scandium.DTLSConnector;
import org.eclipse.californium.scandium.auth.ApplicationLevelInfoSupplier;
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
import org.eclipse.californium.scandium.dtls.pskstore.PskStore;
import org.eclipse.hono.adapter.coap.CoapAdapterProperties;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/coap/AbstractVertxBasedCoapAdapter.class */
public abstract class AbstractVertxBasedCoapAdapter<T extends CoapAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    private CoapServer server;
    private ApplicationLevelInfoSupplier honoDeviceResolver;
    private PskStore pskStore;
    private volatile Endpoint secureEndpoint;
    private volatile Endpoint insecureEndpoint;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Set<Resource> resourcesToAdd = new HashSet();
    private CoapAdapterMetrics metrics = CoapAdapterMetrics.NOOP;

    public final void setHonoDeviceResolver(ApplicationLevelInfoSupplier applicationLevelInfoSupplier) {
        this.honoDeviceResolver = (ApplicationLevelInfoSupplier) Objects.requireNonNull(applicationLevelInfoSupplier);
    }

    public final void setPskStore(PskStore pskStore) {
        this.pskStore = (PskStore) Objects.requireNonNull(pskStore);
    }

    @Autowired
    public final void setMetrics(CoapAdapterMetrics coapAdapterMetrics) {
        this.metrics = coapAdapterMetrics;
    }

    protected CoapAdapterMetrics getMetrics() {
        return this.metrics;
    }

    @Autowired(required = false)
    public final void setResources(Set<Resource> set) {
        this.resourcesToAdd.addAll((Collection) Objects.requireNonNull(set));
    }

    public final int getPortDefaultValue() {
        return 5684;
    }

    public final int getInsecurePortDefaultValue() {
        return 5683;
    }

    protected final int getActualPort() {
        int i = -1;
        Endpoint endpoint = this.secureEndpoint;
        if (endpoint != null) {
            i = endpoint.getAddress().getPort();
        }
        return i;
    }

    protected final int getActualInsecurePort() {
        int i = -1;
        Endpoint endpoint = this.insecureEndpoint;
        if (endpoint != null) {
            i = endpoint.getAddress().getPort();
        }
        return i;
    }

    @Autowired(required = false)
    public final void setCoapServer(CoapServer coapServer) {
        Objects.requireNonNull(coapServer);
        this.server = coapServer;
    }

    public final void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r3 -> {
            return preStartup();
        }).compose(r6 -> {
            Future<NetworkConfig> secureNetworkConfig = getSecureNetworkConfig();
            Future<NetworkConfig> insecureNetworkConfig = getInsecureNetworkConfig();
            return CompositeFuture.all(secureNetworkConfig, insecureNetworkConfig).map(compositeFuture -> {
                CoapServer coapServer = this.server == null ? new CoapServer((NetworkConfig) insecureNetworkConfig.result(), new int[0]) : this.server;
                addResources(coapServer);
                bindSecureEndpoint(coapServer, (NetworkConfig) secureNetworkConfig.result());
                bindInsecureEndpoint(coapServer, (NetworkConfig) insecureNetworkConfig.result());
                coapServer.start();
                if (this.secureEndpoint != null) {
                    this.log.info("coaps/udp endpoint running on {}", this.secureEndpoint.getAddress());
                }
                if (this.insecureEndpoint != null) {
                    this.log.info("coap/udp endpoint running on {}", this.insecureEndpoint.getAddress());
                }
                return compositeFuture;
            });
        }).compose(compositeFuture -> {
            try {
                onStartupSuccess();
                future.complete();
            } catch (Exception e) {
                this.log.error("error in onStartupSuccess", e);
                future.fail(e);
            }
        }, future);
    }

    private void addResources(CoapServer coapServer) {
        this.resourcesToAdd.forEach(resource -> {
            coapServer.add(new Resource[]{new VertxCoapResource(resource, this.context)});
        });
        this.resourcesToAdd.clear();
    }

    private void bindSecureEndpoint(CoapServer coapServer, NetworkConfig networkConfig) {
        ApplicationLevelInfoSupplier applicationLevelInfoSupplier = (ApplicationLevelInfoSupplier) Optional.ofNullable(this.honoDeviceResolver).orElse(new DefaultDeviceResolver(this.context, (CoapAdapterProperties) getConfig(), getCredentialsClientFactory()));
        PskStore pskStore = (PskStore) Optional.ofNullable(this.pskStore).orElseGet(() -> {
            return applicationLevelInfoSupplier instanceof PskStore ? (PskStore) applicationLevelInfoSupplier : new DefaultDeviceResolver(this.context, (CoapAdapterProperties) getConfig(), getCredentialsClientFactory());
        });
        DtlsConnectorConfig.Builder builder = new DtlsConnectorConfig.Builder();
        builder.setClientAuthenticationRequired(((CoapAdapterProperties) getConfig()).isAuthenticationRequired());
        builder.setConnectionThreadCount(((CoapAdapterProperties) getConfig()).getConnectorThreads());
        builder.setAddress(new InetSocketAddress(((CoapAdapterProperties) getConfig()).getBindAddress(), ((CoapAdapterProperties) getConfig()).getPort(getPortDefaultValue())));
        builder.setApplicationLevelInfoSupplier(applicationLevelInfoSupplier);
        builder.setPskStore(pskStore);
        try {
            CoapEndpoint.Builder builder2 = new CoapEndpoint.Builder();
            builder2.setNetworkConfig(networkConfig);
            builder2.setConnector(new DTLSConnector(builder.build()));
            this.secureEndpoint = builder2.build();
            coapServer.addEndpoint(this.secureEndpoint);
        } catch (IllegalStateException e) {
            this.log.warn("failed to create secure endpoint", e);
        }
    }

    private void bindInsecureEndpoint(CoapServer coapServer, NetworkConfig networkConfig) {
        if (((CoapAdapterProperties) getConfig()).isInsecurePortEnabled()) {
            if (((CoapAdapterProperties) getConfig()).isAuthenticationRequired()) {
                this.log.warn("skipping start up of insecure endpoint, configuration requires authentication of devices");
                return;
            }
            CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
            builder.setNetworkConfig(networkConfig);
            builder.setInetSocketAddress(new InetSocketAddress(((CoapAdapterProperties) getConfig()).getInsecurePortBindAddress(), ((CoapAdapterProperties) getConfig()).getInsecurePort(getInsecurePortDefaultValue())));
            this.insecureEndpoint = builder.build();
            coapServer.addEndpoint(this.insecureEndpoint);
        }
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    protected Future<NetworkConfig> getSecureNetworkConfig() {
        Future<NetworkConfig> future = Future.future();
        CoapAdapterProperties coapAdapterProperties = (CoapAdapterProperties) getConfig();
        NetworkConfig networkConfig = new NetworkConfig();
        networkConfig.setInt("PROTOCOL_STAGE_THREAD_COUNT", coapAdapterProperties.getCoapThreads());
        networkConfig.setInt("NETWORK_STAGE_RECEIVER_THREAD_COUNT", coapAdapterProperties.getConnectorThreads());
        networkConfig.setInt("NETWORK_STAGE_SENDER_THREAD_COUNT", coapAdapterProperties.getConnectorThreads());
        loadNetworkConfig(coapAdapterProperties.getNetworkConfig(), networkConfig).compose(networkConfig2 -> {
            return loadNetworkConfig(coapAdapterProperties.getSecureNetworkConfig(), networkConfig2);
        }).setHandler(future);
        return future;
    }

    protected Future<NetworkConfig> getInsecureNetworkConfig() {
        Future<NetworkConfig> future = Future.future();
        CoapAdapterProperties coapAdapterProperties = (CoapAdapterProperties) getConfig();
        NetworkConfig networkConfig = new NetworkConfig();
        networkConfig.setInt("PROTOCOL_STAGE_THREAD_COUNT", coapAdapterProperties.getCoapThreads());
        networkConfig.setInt("NETWORK_STAGE_RECEIVER_THREAD_COUNT", coapAdapterProperties.getConnectorThreads());
        networkConfig.setInt("NETWORK_STAGE_SENDER_THREAD_COUNT", coapAdapterProperties.getConnectorThreads());
        loadNetworkConfig(coapAdapterProperties.getNetworkConfig(), networkConfig).compose(networkConfig2 -> {
            return loadNetworkConfig(coapAdapterProperties.getInsecureNetworkConfig(), networkConfig2);
        }).setHandler(future);
        return future;
    }

    protected Future<NetworkConfig> loadNetworkConfig(String str, NetworkConfig networkConfig) {
        if (str != null && !str.isEmpty()) {
            getVertx().fileSystem().readFile(str, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    this.log.warn("error reading NetworkConfig file [{}]", str, asyncResult.cause());
                    return;
                }
                try {
                    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(((Buffer) asyncResult.result()).getBytes());
                    try {
                        networkConfig.load(byteArrayInputStream);
                        byteArrayInputStream.close();
                    } finally {
                    }
                } catch (IOException e) {
                    this.log.warn("skipping malformed NetworkConfig properties [{}]", str);
                }
            });
        }
        return Future.succeededFuture(networkConfig);
    }

    protected void customizeDownstreamMessage(Message message, CoapContext coapContext) {
    }

    public final void doStop(Future<Void> future) {
        try {
            preShutdown();
        } catch (Exception e) {
            this.log.error("error in preShutdown", e);
        }
        Future future2 = Future.future();
        if (this.server != null) {
            getVertx().executeBlocking(future3 -> {
                this.server.stop();
                future3.complete();
            }, future2);
        } else {
            future2.complete();
        }
        future2.compose(r3 -> {
            return postShutdown();
        }).compose(r32 -> {
            future.complete();
        }, future);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    protected final Future<ExtendedDevice> getAuthenticatedExtendedDevice(Device device, CoapExchange coapExchange) {
        Future<ExtendedDevice> future = Future.future();
        ExtensiblePrincipal peerIdentity = coapExchange.advanced().getRequest().getSourceContext().getPeerIdentity();
        if (peerIdentity instanceof ExtensiblePrincipal) {
            Device device2 = (Device) peerIdentity.getExtendedInfo().get("hono-device", Device.class);
            if (device2 != null) {
                future.complete(new ExtendedDevice(device2, (Device) Optional.ofNullable(device).orElse(device2)));
            } else {
                future.fail(new ClientErrorException(401, "DTLS session does not contain authenticated Device"));
            }
        } else {
            future.fail(new ClientErrorException(401, "DTLS session does not contain ExtensiblePrincipal"));
        }
        return future;
    }

    public final void uploadTelemetryMessage(CoapContext coapContext, Device device, Device device2, boolean z) {
        doUploadMessage((CoapContext) Objects.requireNonNull(coapContext), (Device) Objects.requireNonNull(device), (Device) Objects.requireNonNull(device2), z, Buffer.buffer(coapContext.getExchange().getRequestPayload()), MediaTypeRegistry.toString(coapContext.getExchange().getRequestOptions().getContentFormat()), getTelemetrySender(device.getTenantId()), MetricsTags.EndpointType.TELEMETRY);
    }

    public final void uploadEventMessage(CoapContext coapContext, Device device, Device device2) {
        doUploadMessage((CoapContext) Objects.requireNonNull(coapContext), (Device) Objects.requireNonNull(device), (Device) Objects.requireNonNull(device2), true, Buffer.buffer(coapContext.getExchange().getRequestPayload()), MediaTypeRegistry.toString(coapContext.getExchange().getRequestOptions().getContentFormat()), getEventSender(device.getTenantId()), MetricsTags.EndpointType.EVENT);
    }

    private void doUploadMessage(CoapContext coapContext, Device device, Device device2, boolean z, Buffer buffer, String str, Future<DownstreamSender> future, MetricsTags.EndpointType endpointType) {
        if (str == null) {
            coapContext.respondWithCode(CoAP.ResponseCode.NOT_ACCEPTABLE);
            return;
        }
        if (buffer == null || buffer.length() == 0) {
            coapContext.respondWithCode(CoAP.ResponseCode.NOT_ACCEPTABLE);
            return;
        }
        Future registrationAssertion = getRegistrationAssertion(device2.getTenantId(), device2.getDeviceId(), device, null);
        Future tenantConfiguration = getTenantConfiguration(device2.getTenantId(), null);
        Future compose = tenantConfiguration.compose(tenantObject -> {
            return CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, buffer.length())).map(compositeFuture -> {
                return tenantObject;
            });
        });
        CompositeFuture.all(registrationAssertion, future, compose).compose(compositeFuture -> {
            DownstreamSender downstreamSender = (DownstreamSender) future.result();
            Message newMessage = newMessage(ResourceIdentifier.from(endpointType.getCanonicalName(), device2.getTenantId(), device2.getDeviceId()), "/" + coapContext.getExchange().getRequestOptions().getUriPathString(), str, buffer, (TenantObject) compose.result(), (JsonObject) registrationAssertion.result(), null);
            customizeDownstreamMessage(newMessage, coapContext);
            return z ? downstreamSender.sendAndWaitForOutcome(newMessage) : downstreamSender.send(newMessage);
        }).map(protonDelivery -> {
            this.log.trace("successfully processed message for device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{device2.getTenantId(), device2.getDeviceId(), endpointType.getCanonicalName()});
            this.metrics.reportTelemetry(endpointType, device2.getTenantId(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, z ? MetricsTags.QoS.AT_LEAST_ONCE : MetricsTags.QoS.AT_MOST_ONCE, buffer.length(), coapContext.getTimer());
            coapContext.respondWithCode(CoAP.ResponseCode.CHANGED);
            return protonDelivery;
        }).recover(th -> {
            this.log.debug("cannot process message for device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{device2.getTenantId(), device2.getDeviceId(), endpointType.getCanonicalName(), th});
            this.metrics.reportTelemetry(endpointType, device2.getTenantId(), (TenantObject) tenantConfiguration.result(), ClientErrorException.class.isInstance(th) ? MetricsTags.ProcessingOutcome.UNPROCESSABLE : MetricsTags.ProcessingOutcome.UNDELIVERABLE, z ? MetricsTags.QoS.AT_LEAST_ONCE : MetricsTags.QoS.AT_MOST_ONCE, buffer.length(), coapContext.getTimer());
            CoapErrorResponse.respond(coapContext.getExchange(), th);
            return Future.failedFuture(th);
        });
    }

    protected final Future<Boolean> isGatewayMappingEnabled(String str, String str2, Device device) {
        return Future.succeededFuture(true);
    }
}
