package org.eclipse.ditto.services.amqpbridge.messaging;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.qpid.jms.JmsQueue;
import org.eclipse.ditto.services.utils.akka.LogUtil;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/amqpbridge/messaging/CommandConsumerActor.class */
public final class CommandConsumerActor extends AbstractActor {
    static final String ACTOR_NAME_PREFIX = "amqpConsumerActor-";
    private final DiagnosticLoggingAdapter log;
    private final Session session;
    private final String source;
    private final ActorRef commandProcessor;
    private MessageConsumer consumer;

    private CommandConsumerActor(Session session, String str, ActorRef actorRef) {
        this.log = LogUtil.obtain(this);
        this.session = session;
        this.source = str;
        this.commandProcessor = actorRef;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final Session session, final String str, final ActorRef actorRef) {
        return Props.create(CommandConsumerActor.class, new Creator<CommandConsumerActor>() { // from class: org.eclipse.ditto.services.amqpbridge.messaging.CommandConsumerActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public CommandConsumerActor m0create() {
                return new CommandConsumerActor(session, str, actorRef);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().matchAny(obj -> {
            this.log.debug("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    public void preStart() throws JMSException {
        JmsQueue jmsQueue = new JmsQueue(this.source);
        this.log.info("Creating AMQP Consumer for '{}'", this.source);
        if (this.session != null) {
            this.consumer = this.session.createConsumer(jmsQueue);
            this.consumer.setMessageListener(message -> {
                this.commandProcessor.forward(message, getContext());
            });
        }
    }

    public void postStop() throws Exception {
        super.postStop();
        this.log.info("Closing AMQP Consumer for '{}'", this.source);
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}
