package org.graylog2.inputs.transports;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.Uninterruptibles;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/inputs/transports/AmqpConsumer.class */
public class AmqpConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private final String hostname;
    private final int port;
    private final String virtualHost;
    private final String username;
    private final String password;
    private final int prefetchCount;
    private final String queue;
    private final String exchange;
    private final String routingKey;
    private final boolean requeueInvalid;
    private Connection connection;
    private Channel channel;
    private final int heartbeatTimeout;
    private final MessageInput sourceInput;
    private final int parallelQueues;
    private final boolean tls;
    private AmqpTransport amqpTransport;
    private AtomicLong totalBytesRead = new AtomicLong(0);
    private AtomicLong lastSecBytesRead = new AtomicLong(0);
    private AtomicLong lastSecBytesReadTmp = new AtomicLong(0);

    public AmqpConsumer(String str, int i, String str2, String str3, String str4, int i2, String str5, String str6, String str7, int i3, boolean z, boolean z2, int i4, MessageInput messageInput, ScheduledExecutorService scheduledExecutorService, AmqpTransport amqpTransport) {
        this.hostname = str;
        this.port = i;
        this.virtualHost = str2;
        this.username = str3;
        this.password = str4;
        this.prefetchCount = i2;
        this.queue = str5;
        this.exchange = str6;
        this.routingKey = str7;
        this.heartbeatTimeout = i4;
        this.sourceInput = messageInput;
        this.parallelQueues = i3;
        this.tls = z;
        this.requeueInvalid = z2;
        this.amqpTransport = amqpTransport;
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.graylog2.inputs.transports.AmqpConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                AmqpConsumer.this.lastSecBytesRead.set(AmqpConsumer.this.lastSecBytesReadTmp.getAndSet(0L));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    public void run() throws IOException {
        if (!isConnected()) {
            connect();
        }
        for (int i = 0; i < this.parallelQueues; i++) {
            String format = String.format(this.queue, Integer.valueOf(i));
            this.channel.queueDeclare(format, true, false, false, (Map) null);
            this.channel.basicConsume(format, false, new DefaultConsumer(this.channel) { // from class: org.graylog2.inputs.transports.AmqpConsumer.2
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    try {
                        AmqpConsumer.this.totalBytesRead.addAndGet(bArr.length);
                        AmqpConsumer.this.lastSecBytesReadTmp.addAndGet(bArr.length);
                        RawMessage rawMessage = new RawMessage(bArr);
                        if (AmqpConsumer.this.amqpTransport.isThrottled()) {
                            AmqpConsumer.this.amqpTransport.blockUntilUnthrottled();
                        }
                        AmqpConsumer.this.sourceInput.processRawMessage(rawMessage);
                        AmqpConsumer.this.channel.basicAck(deliveryTag, false);
                    } catch (Exception e) {
                        AmqpConsumer.LOG.error("Error while trying to process AMQP message", e);
                        if (AmqpConsumer.this.channel.isOpen()) {
                            AmqpConsumer.this.channel.basicNack(deliveryTag, false, AmqpConsumer.this.requeueInvalid);
                            if (AmqpConsumer.LOG.isDebugEnabled()) {
                                if (AmqpConsumer.this.requeueInvalid) {
                                    AmqpConsumer.LOG.debug("Re-queue message with delivery tag {}", Long.valueOf(deliveryTag));
                                } else {
                                    AmqpConsumer.LOG.debug("Message with delivery tag {} not re-queued", Long.valueOf(deliveryTag));
                                }
                            }
                        }
                    }
                }
            });
        }
    }

    public void connect() throws IOException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(this.hostname);
        connectionFactory.setPort(this.port);
        connectionFactory.setVirtualHost(this.virtualHost);
        connectionFactory.setRequestedHeartbeat(this.heartbeatTimeout);
        if (this.tls) {
            try {
                LOG.info("Enabling TLS for AMQP input [{}/{}].", this.sourceInput.getName(), this.sourceInput.getId());
                connectionFactory.useSslProtocol();
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                throw new IOException("Couldn't enable TLS for AMQP input.", e);
            }
        }
        if (!Strings.isNullOrEmpty(this.username) && !Strings.isNullOrEmpty(this.password)) {
            connectionFactory.setUsername(this.username);
            connectionFactory.setPassword(this.password);
        }
        try {
            this.connection = connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
            if (null == this.channel) {
                LOG.error("No channel descriptor available!");
            }
            if (null != this.channel && this.prefetchCount > 0) {
                this.channel.basicQos(this.prefetchCount);
                LOG.info("AMQP prefetch count overriden to <{}>.", Integer.valueOf(this.prefetchCount));
            }
            this.connection.addShutdownListener(new ShutdownListener() { // from class: org.graylog2.inputs.transports.AmqpConsumer.3
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                    if (shutdownSignalException.isInitiatedByApplication()) {
                        AmqpConsumer.LOG.info("Not reconnecting connection, we disconnected explicitly.");
                        return;
                    }
                    while (true) {
                        try {
                            AmqpConsumer.LOG.error("AMQP connection lost! Trying reconnect in 1 second.");
                            Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                            AmqpConsumer.this.connect();
                            AmqpConsumer.LOG.info("Connected! Re-starting consumer.");
                            AmqpConsumer.this.run();
                            AmqpConsumer.LOG.info("Consumer running.");
                            return;
                        } catch (IOException e2) {
                            AmqpConsumer.LOG.error("Could not re-connect to AMQP broker.", e2);
                        }
                    }
                }
            });
        } catch (TimeoutException e2) {
            throw new IOException("Timeout while opening new AMQP connection", e2);
        }
    }

    public void stop() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            } catch (TimeoutException e) {
                LOG.error("Timeout when closing AMQP channel", e);
                this.channel.abort();
            }
        }
        if (this.connection == null || !this.connection.isOpen()) {
            return;
        }
        this.connection.close();
    }

    public boolean isConnected() {
        return this.connection != null && this.connection.isOpen() && this.channel != null && this.channel.isOpen();
    }

    public AtomicLong getLastSecBytesRead() {
        return this.lastSecBytesRead;
    }

    public AtomicLong getTotalBytesRead() {
        return this.totalBytesRead;
    }
}
