package org.springframework.integration.redis.inbound;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.redis.event.RedisExceptionEvent;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/redis/inbound/RedisQueueInboundGateway.class */
public class RedisQueueInboundGateway extends MessagingGatewaySupport implements ApplicationEventPublisherAware, BeanClassLoaderAware {
    private static final String QUEUE_NAME_SUFFIX = ".reply";
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
    private final RedisTemplate<String, byte[]> template;
    private final BoundListOperations<String, byte[]> boundListOperations;
    private ApplicationEventPublisher applicationEventPublisher;
    private boolean serializerExplicitlySet;
    private Executor taskExecutor;
    private RedisSerializer<?> serializer;
    private long receiveTimeout = 1000;
    private long recoveryInterval = 5000;
    private boolean extractPayload = true;
    private volatile boolean listening;
    private volatile Runnable stopCallback;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/redis/inbound/RedisQueueInboundGateway$ListenerTask.class */
    public class ListenerTask implements SchedulingAwareRunnable {
        ListenerTask() {
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            boolean isActive;
            while (RedisQueueInboundGateway.this.isActive()) {
                try {
                    RedisQueueInboundGateway.this.listening = true;
                    RedisQueueInboundGateway.this.receiveAndReply();
                } finally {
                    if (RedisQueueInboundGateway.this.isActive()) {
                        RedisQueueInboundGateway.this.restart();
                    } else if (RedisQueueInboundGateway.this.stopCallback != null) {
                        RedisQueueInboundGateway.this.stopCallback.run();
                        RedisQueueInboundGateway.this.stopCallback = null;
                    }
                }
            }
            if (isActive) {
                return;
            }
        }
    }

    public RedisQueueInboundGateway(String str, RedisConnectionFactory redisConnectionFactory) {
        Assert.hasText(str, "'queueName' is required");
        Assert.notNull(redisConnectionFactory, "'connectionFactory' must not be null");
        this.template = new RedisTemplate<>();
        this.template.setConnectionFactory(redisConnectionFactory);
        this.template.setEnableDefaultSerializer(false);
        this.template.setKeySerializer(new StringRedisSerializer());
        this.template.afterPropertiesSet();
        this.boundListOperations = this.template.boundListOps(str);
    }

    public void setExtractPayload(boolean z) {
        this.extractPayload = z;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setBeanClassLoader(ClassLoader classLoader) {
        if (this.serializerExplicitlySet) {
            return;
        }
        this.serializer = new JdkSerializationRedisSerializer(classLoader);
    }

    public void setSerializer(RedisSerializer<?> redisSerializer) {
        this.serializer = redisSerializer;
        this.serializerExplicitlySet = true;
    }

    public void setReceiveTimeout(long j) {
        Assert.isTrue(j >= 0, "'receiveTimeout' must be >= 0.");
        this.receiveTimeout = j;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
    }

    protected void onInit() {
        super.onInit();
        if (!this.extractPayload) {
            Assert.notNull(this.serializer, "'serializer' has to be provided where 'extractPayload == false'.");
        }
        if (this.taskExecutor == null) {
            String componentName = getComponentName();
            this.taskExecutor = new SimpleAsyncTaskExecutor((componentName == null ? "" : componentName + "-") + getComponentType());
        }
        if ((this.taskExecutor instanceof ErrorHandlingTaskExecutor) || getBeanFactory() == null) {
            return;
        }
        MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler();
        messagePublishingErrorHandler.setBeanFactory(getBeanFactory());
        messagePublishingErrorHandler.setDefaultErrorChannel(getErrorChannel());
        this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, messagePublishingErrorHandler);
    }

    public String getComponentType() {
        return "redis:queue-inbound-gateway";
    }

    private void handlePopException(Exception exc) {
        this.listening = false;
        if (!isActive()) {
            this.logger.debug(() -> {
                return "Failed to execute listening task. " + exc.getClass() + ": " + exc.getMessage();
            });
            return;
        }
        this.logger.error(exc, () -> {
            return "Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval + " milliseconds.";
        });
        publishException(exc);
        sleepBeforeRecoveryAttempt();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveAndReply() {
        try {
            byte[] bArr = (byte[]) this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
            if (bArr != null) {
                if (!isActive()) {
                    this.boundListOperations.rightPush(bArr);
                    return;
                }
                String deserialize = StringRedisSerializer.UTF_8.deserialize(bArr);
                if (deserialize == null) {
                    return;
                }
                try {
                    byte[] bArr2 = (byte[]) this.template.boundListOps(deserialize).rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
                    if (bArr2 != null) {
                        getRequestSendAndProduceReply(bArr2, deserialize);
                    }
                } catch (Exception e) {
                    handlePopException(e);
                }
            }
        } catch (Exception e2) {
            handlePopException(e2);
        }
    }

    private void getRequestSendAndProduceReply(byte[] bArr, String str) {
        Message sendAndReceiveMessage;
        if (!isActive()) {
            this.template.boundListOps(str).rightPush(bArr);
            byte[] serialize = StringRedisSerializer.UTF_8.serialize(str);
            if (serialize != null) {
                this.boundListOperations.rightPush(serialize);
                return;
            }
            return;
        }
        Message<Object> prepareRequestMessage = prepareRequestMessage(bArr);
        if (prepareRequestMessage == null || (sendAndReceiveMessage = sendAndReceiveMessage(prepareRequestMessage)) == null) {
            return;
        }
        byte[] bArr2 = null;
        if (this.extractPayload) {
            bArr2 = extractReplyPayload(sendAndReceiveMessage);
        } else if (this.serializer != null) {
            bArr2 = this.serializer.serialize(sendAndReceiveMessage);
        }
        if (bArr2 != null) {
            this.template.boundListOps(str + QUEUE_NAME_SUFFIX).leftPush(bArr2);
        }
    }

    @Nullable
    private Message<Object> prepareRequestMessage(byte[] bArr) {
        Message<Object> message;
        if (this.extractPayload) {
            Object obj = bArr;
            if (this.serializer != null) {
                obj = this.serializer.deserialize(bArr);
                if (obj == null) {
                    return null;
                }
            }
            message = getMessageBuilderFactory().withPayload(obj).build();
        } else {
            try {
                message = (Message) this.serializer.deserialize(bArr);
                if (message == null) {
                    return null;
                }
            } catch (Exception e) {
                throw new MessagingException("Deserialization of Message failed.", e);
            }
        }
        return message;
    }

    private byte[] extractReplyPayload(Message<?> message) {
        return !(message.getPayload() instanceof byte[]) ? (!(message.getPayload() instanceof String) || this.serializerExplicitlySet) ? this.serializer.serialize(message.getPayload()) : StringRedisSerializer.UTF_8.serialize((String) message.getPayload()) : (byte[]) message.getPayload();
    }

    protected void doStart() {
        super.doStart();
        restart();
    }

    private void sleepBeforeRecoveryAttempt() {
        if (this.recoveryInterval > 0) {
            try {
                Thread.sleep(this.recoveryInterval);
            } catch (InterruptedException e) {
                this.logger.debug("Thread interrupted while sleeping the recovery interval");
                Thread.currentThread().interrupt();
            }
        }
    }

    private void publishException(Exception exc) {
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent(new RedisExceptionEvent(this, exc));
        } else {
            this.logger.debug(() -> {
                return "No application event publisher for exception: " + exc.getMessage();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void restart() {
        this.taskExecutor.execute(new ListenerTask());
    }

    protected void doStop(Runnable runnable) {
        this.stopCallback = runnable;
        doStop();
    }

    protected void doStop() {
        super.doStop();
        this.listening = false;
    }

    public boolean isListening() {
        return this.listening;
    }

    @ManagedMetric
    public long getQueueSize() {
        Long size = this.boundListOperations.size();
        if (size == null) {
            return 0L;
        }
        return size.longValue();
    }

    @ManagedOperation
    public void clearQueue() {
        this.boundListOperations.getOperations().delete(this.boundListOperations.getKey());
    }
}
