package org.eclipse.hono.vertx.example.base;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.CommandClient;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.impl.HonoClientImpl;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessageTap;
import org.eclipse.hono.util.TimeUntilDisconnectNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/vertx/example/base/HonoExampleApplicationBase.class */
public class HonoExampleApplicationBase {
    public static final String HONO_CLIENT_USER = System.getProperty("username", "consumer@HONO");
    public static final String HONO_CLIENT_PASSWORD = System.getProperty("password", "verysecret");
    public static final Boolean USE_PLAIN_CONNECTION = Boolean.valueOf(System.getProperty("plain.connection", "false"));
    public static final Boolean SEND_ONE_WAY_COMMANDS = Boolean.valueOf(System.getProperty("sendOneWayCommands", "false"));
    private static final Logger LOG = LoggerFactory.getLogger(HonoExampleApplicationBase.class);
    private final HonoClient honoClient;
    protected final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
    private final Vertx vertx = Vertx.vertx();
    private final Map<String, Handler<Void>> periodicCommandSenderTimerCancelerMap = new HashMap();
    private final Map<String, TimeUntilDisconnectNotification> pendingTtdNotification = new HashMap();

    public HonoExampleApplicationBase() {
        ClientConfigProperties clientConfigProperties = new ClientConfigProperties();
        clientConfigProperties.setHost(HonoExampleConstants.HONO_AMQP_CONSUMER_HOST);
        clientConfigProperties.setPort(HonoExampleConstants.HONO_AMQP_CONSUMER_PORT);
        if (!USE_PLAIN_CONNECTION.booleanValue()) {
            clientConfigProperties.setUsername(HONO_CLIENT_USER);
            clientConfigProperties.setPassword(HONO_CLIENT_PASSWORD);
            clientConfigProperties.setTrustStorePath("target/config/hono-demo-certs-jar/trusted-certs.pem");
            clientConfigProperties.setHostnameVerificationRequired(false);
        }
        this.honoClient = new HonoClientImpl(this.vertx, clientConfigProperties);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void consumeData() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future future = Future.future();
        future.setHandler(asyncResult -> {
            if (!asyncResult.succeeded()) {
                LOG.error("honoClient could not create downstream consumer for [{}:{}]", new Object[]{HonoExampleConstants.HONO_AMQP_CONSUMER_HOST, Integer.valueOf(HonoExampleConstants.HONO_AMQP_CONSUMER_PORT), asyncResult.cause()});
            }
            countDownLatch.countDown();
        });
        this.honoClient.connect(this::onDisconnect).compose(honoClient -> {
            return createConsumer();
        }).setHandler(future.completer());
        countDownLatch.await();
        if (future.succeeded()) {
            System.in.read();
        }
        this.vertx.close();
    }

    private Future<MessageConsumer> createConsumer() {
        Consumer consumer = MessageTap.getConsumer(this::handleEventMessage, this::handleCommandReadinessNotification);
        Consumer consumer2 = MessageTap.getConsumer(this::handleTelemetryMessage, this::handleCommandReadinessNotification);
        return this.honoClient.createEventConsumer("DEFAULT_TENANT", consumer, r3 -> {
            LOG.error("remotely detached consumer link");
        }).compose(messageConsumer -> {
            return this.honoClient.createTelemetryConsumer("DEFAULT_TENANT", consumer2, r32 -> {
                LOG.error("remotely detached consumer link");
            }).compose(messageConsumer -> {
                LOG.info("Consumer ready for telemetry and event messages.");
                return Future.succeededFuture(messageConsumer);
            }).recover(th -> {
                return Future.failedFuture(th);
            });
        });
    }

    private void onDisconnect(ProtonConnection protonConnection) {
        this.vertx.setTimer(1000L, l -> {
            LOG.info("attempting to re-connect to Hono ...");
            this.honoClient.connect(this::onDisconnect).compose(honoClient -> {
                return createConsumer();
            }).map(messageConsumer -> {
                LOG.info("Reconnected to Hono.");
                return null;
            });
        });
    }

    private void printMessage(String str, Message message, String str2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("received " + str2 + " [tenant: " + str + ", device: " + MessageHelper.getDeviceId(message) + ", content-type: " + message.getContentType() + " ]: [" + MessageHelper.getPayloadAsString(message) + "].");
        }
    }

    private void handleCommandReadinessNotification(TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        if (timeUntilDisconnectNotification.getTtd().intValue() <= 0) {
            handlePermanentlyConnectedCommandReadinessNotification(timeUntilDisconnectNotification);
        } else {
            LOG.info("Device is ready to receive a command : [{}].", timeUntilDisconnectNotification.toString());
            createCommandClientAndSendCommand(timeUntilDisconnectNotification);
        }
    }

    private void handlePermanentlyConnectedCommandReadinessNotification(TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        String tenantAndDeviceId = timeUntilDisconnectNotification.getTenantAndDeviceId();
        Optional.ofNullable(this.pendingTtdNotification.get(tenantAndDeviceId)).map(timeUntilDisconnectNotification2 -> {
            if (timeUntilDisconnectNotification.getCreationTime().isAfter(timeUntilDisconnectNotification2.getCreationTime())) {
                LOG.info("Set new ttd value [{}] of notification for [{}]", timeUntilDisconnectNotification.getTtd(), timeUntilDisconnectNotification.getTenantAndDeviceId());
                this.pendingTtdNotification.put(tenantAndDeviceId, timeUntilDisconnectNotification);
            } else {
                LOG.trace("Received notification for [{}] that was already superseded by newer [{}]", timeUntilDisconnectNotification, timeUntilDisconnectNotification2);
            }
            return false;
        }).orElseGet(() -> {
            this.pendingTtdNotification.put(tenantAndDeviceId, timeUntilDisconnectNotification);
            this.vertx.setTimer(1000L, l -> {
                LOG.debug("Handle device notification for [{}].", timeUntilDisconnectNotification.getTenantAndDeviceId());
                Optional.ofNullable(this.pendingTtdNotification.remove(tenantAndDeviceId)).map(timeUntilDisconnectNotification3 -> {
                    if (timeUntilDisconnectNotification3.getTtd().intValue() != -1) {
                        LOG.info("Device notified as not being ready to receive a command (anymore) : [{}].", timeUntilDisconnectNotification.toString());
                        cancelPeriodicCommandSender(timeUntilDisconnectNotification3);
                        LOG.debug("Device will not receive further commands : [{}].", timeUntilDisconnectNotification.getTenantAndDeviceId());
                        return null;
                    }
                    LOG.info("Device notified as being ready to receive a command until further notice : [{}].", timeUntilDisconnectNotification3.toString());
                    cancelPeriodicCommandSender(timeUntilDisconnectNotification);
                    createCommandClientAndSendCommand(timeUntilDisconnectNotification3);
                    this.vertx.setPeriodic(HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000, l -> {
                        createCommandClientAndSendCommand(timeUntilDisconnectNotification3).map(commandClient -> {
                            setPeriodicCommandSenderTimerCanceler(l, timeUntilDisconnectNotification, commandClient);
                            return null;
                        });
                    });
                    return null;
                });
            });
            return true;
        });
    }

    private Future<CommandClient> createCommandClientAndSendCommand(TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        return this.honoClient.getOrCreateCommandClient(timeUntilDisconnectNotification.getTenantId(), timeUntilDisconnectNotification.getDeviceId()).map(commandClient -> {
            commandClient.setRequestTimeout(calculateCommandTimeout(timeUntilDisconnectNotification));
            if (SEND_ONE_WAY_COMMANDS.booleanValue()) {
                sendOneWayCommandToAdapter(commandClient, timeUntilDisconnectNotification);
            } else {
                sendCommandToAdapter(commandClient, timeUntilDisconnectNotification);
            }
            return commandClient;
        }).otherwise(th -> {
            LOG.error("Could not create command client", th);
            return null;
        });
    }

    private long calculateCommandTimeout(TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        return timeUntilDisconnectNotification.getTtd().intValue() == -1 ? HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000 : timeUntilDisconnectNotification.getMillisecondsUntilExpiry();
    }

    private void setPeriodicCommandSenderTimerCanceler(Long l, TimeUntilDisconnectNotification timeUntilDisconnectNotification, CommandClient commandClient) {
        this.periodicCommandSenderTimerCancelerMap.put(timeUntilDisconnectNotification.getTenantAndDeviceId(), r8 -> {
            closeCommandClient(commandClient, timeUntilDisconnectNotification);
            this.vertx.cancelTimer(l.longValue());
            this.periodicCommandSenderTimerCancelerMap.remove(timeUntilDisconnectNotification.getTenantAndDeviceId());
        });
    }

    private boolean cancelPeriodicCommandSender(TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        if (!isPeriodicCommandSenderActiveForDevice(timeUntilDisconnectNotification)) {
            LOG.debug("Wanted to cancel periodic sender for {}, but could not find one", timeUntilDisconnectNotification.getTenantAndDeviceId());
            return false;
        }
        LOG.debug("Cancelling periodic sender for {}", timeUntilDisconnectNotification.getTenantAndDeviceId());
        this.periodicCommandSenderTimerCancelerMap.get(timeUntilDisconnectNotification.getTenantAndDeviceId()).handle((Object) null);
        return true;
    }

    private boolean isPeriodicCommandSenderActiveForDevice(TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        return this.periodicCommandSenderTimerCancelerMap.containsKey(timeUntilDisconnectNotification.getTenantAndDeviceId());
    }

    private void sendCommandToAdapter(CommandClient commandClient, TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        Buffer buildCommandPayload = buildCommandPayload();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending command [{}] to [{}].", "setBrightness", timeUntilDisconnectNotification.getTenantAndDeviceId());
        }
        commandClient.sendCommand("setBrightness", "application/json", buildCommandPayload, buildCommandProperties()).map(bufferResult -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent command payload: [{}].", buildCommandPayload.toString());
                LOG.debug("And received response: [{}].", ((Buffer) Optional.ofNullable((Buffer) bufferResult.getPayload()).orElse(Buffer.buffer())).toString());
            }
            if (timeUntilDisconnectNotification.getTtd().intValue() != -1) {
                closeCommandClient(commandClient, timeUntilDisconnectNotification);
            }
            return bufferResult;
        }).otherwise(th -> {
            if (th instanceof ServiceInvocationException) {
                LOG.debug("Command was replied with error code [{}].", Integer.valueOf(((ServiceInvocationException) th).getErrorCode()));
            } else {
                LOG.debug("Could not send command : {}.", th.getMessage());
            }
            if (timeUntilDisconnectNotification.getTtd().intValue() == -1) {
                return null;
            }
            closeCommandClient(commandClient, timeUntilDisconnectNotification);
            return null;
        });
    }

    private void sendOneWayCommandToAdapter(CommandClient commandClient, TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        Buffer buildOneWayCommandPayload = buildOneWayCommandPayload();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending one-way command [{}] to [{}].", "sendLifecycleInfo", timeUntilDisconnectNotification.getTenantAndDeviceId());
        }
        commandClient.sendOneWayCommand("sendLifecycleInfo", buildOneWayCommandPayload).map(r9 -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent one-way command payload: [{}] and received status [{}].", buildOneWayCommandPayload.toString(), r9);
            }
            if (timeUntilDisconnectNotification.getTtd().intValue() != -1) {
                closeCommandClient(commandClient, timeUntilDisconnectNotification);
            }
            return r9;
        }).otherwise(th -> {
            if (th instanceof ServiceInvocationException) {
                LOG.debug("One-way command was replied with error code [{}].", Integer.valueOf(((ServiceInvocationException) th).getErrorCode()));
            } else {
                LOG.debug("Could not send one-way command : {}.", th.getMessage());
            }
            if (timeUntilDisconnectNotification.getTtd().intValue() == -1) {
                return null;
            }
            closeCommandClient(commandClient, timeUntilDisconnectNotification);
            return null;
        });
    }

    private void closeCommandClient(CommandClient commandClient, TimeUntilDisconnectNotification timeUntilDisconnectNotification) {
        commandClient.close(asyncResult -> {
            if (LOG.isDebugEnabled()) {
                LOG.trace("Closed commandClient for [{}].", timeUntilDisconnectNotification.getTenantAndDeviceId());
            }
        });
    }

    private Map<String, Object> buildCommandProperties() {
        HashMap hashMap = new HashMap(1);
        hashMap.put("appId", "example#1");
        return hashMap;
    }

    private Buffer buildCommandPayload() {
        return Buffer.buffer(new JsonObject().put("brightness", Integer.valueOf((int) (Math.random() * 100.0d))).encodePrettily());
    }

    private Buffer buildOneWayCommandPayload() {
        return Buffer.buffer(new JsonObject().put("info", "app restarted.").encodePrettily());
    }

    private void handleTelemetryMessage(Message message) {
        printMessage("DEFAULT_TENANT", message, "telemetry");
    }

    private void handleEventMessage(Message message) {
        printMessage("DEFAULT_TENANT", message, "event");
    }
}
