package org.axonframework.amqp.eventhandling.spring;

import com.rabbitmq.client.Channel;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.axonframework.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.UnknownSerializedTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

/* loaded from: input_file:org/axonframework/amqp/eventhandling/spring/SpringAMQPMessageSource.class */
public class SpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(SpringAMQPMessageSource.class);
    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList();
    private final AMQPMessageConverter messageConverter;

    public SpringAMQPMessageSource(AMQPMessageConverter aMQPMessageConverter) {
        this.messageConverter = aMQPMessageConverter;
    }

    public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
        this.eventProcessors.add(consumer);
        return () -> {
            return this.eventProcessors.remove(consumer);
        };
    }

    public void onMessage(Message message, Channel channel) throws Exception {
        if (this.eventProcessors.isEmpty()) {
            return;
        }
        try {
            EventMessage readAMQPMessage = this.messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders());
            if (readAMQPMessage != null) {
                this.eventProcessors.forEach(consumer -> {
                    consumer.accept(Collections.singletonList(readAMQPMessage));
                });
            }
        } catch (UnknownSerializedTypeException e) {
            logger.warn("Unable to deserialize an incoming message. Ignoring it. {}", e.toString());
        }
    }
}
