package io.alkal.kalium.sns_sqs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.alkal.kalium.internals.QueueListener;
import io.alkal.kalium.sns_sqs.serdes.Deserializer;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/alkal/kalium/sns_sqs/ConsumerReaction.class */
public class ConsumerReaction implements Runnable {
    private static final long POST_SUBSCRIPTION_WAIT_IN_MS = 2000;
    private static final Logger logger = Logger.getLogger(ConsumerReaction.class.getName());
    private String reactionId;
    private Collection<Class> objectTypes;
    private QueueListener queueListener;
    private SnsService snsService;
    private SqsService sqsService;
    private ObjectMapper objectMapper;
    private Map<String, String> topicToQueueArn = new HashMap();
    private Deserializer deserializer;

    public ConsumerReaction(String str, Collection<Class> collection, QueueListener queueListener, SqsService sqsService, SnsService snsService, Deserializer deserializer, ObjectMapper objectMapper) {
        this.reactionId = str;
        this.sqsService = sqsService;
        this.snsService = snsService;
        this.deserializer = deserializer;
        this.objectTypes = collection;
        this.queueListener = queueListener;
        this.objectMapper = objectMapper;
    }

    public void init() {
        HashMap hashMap = new HashMap();
        this.objectTypes.forEach(cls -> {
        });
        this.deserializer.setTopicToClassMap(hashMap);
        subscribe((Collection) this.objectTypes.stream().map(cls2 -> {
            return cls2.getSimpleName();
        }).collect(Collectors.toList()));
        logger.info("ConsumerReaction initialized [reactionId=" + this.reactionId + "].");
    }

    void subscribe(Collection<String> collection) {
        collection.stream().forEach(str -> {
            String queueForReaction = this.sqsService.getQueueForReaction(str, this.reactionId);
            this.snsService.subscribeQueue(str, queueForReaction);
            this.sqsService.enableSnsSubscription(queueForReaction, this.snsService.getTopicArn(str));
            this.topicToQueueArn.put(str, queueForReaction);
        });
        try {
            Thread.sleep(POST_SUBSCRIPTION_WAIT_IN_MS);
        } catch (InterruptedException e) {
            logger.warning(e.getMessage());
        }
    }

    Stream<QueueObject> poll(Integer num) {
        LinkedList linkedList = new LinkedList();
        this.topicToQueueArn.entrySet().stream().forEach(entry -> {
            String str = (String) entry.getValue();
            String str2 = (String) entry.getKey();
            linkedList.addAll((Collection) this.sqsService.receiveMessage(str, num).stream().map(message -> {
                QueueObject queueObject = null;
                try {
                    Object deserialize = this.deserializer.deserialize(str2, ((Properties) this.objectMapper.readValue(message.body(), Properties.class)).getProperty("Message"));
                    logger.fine("Object [type=" + str2 + "] for reaction [reactionId=" + this.reactionId + "] arrived!. content: " + deserialize.toString());
                    queueObject = new QueueObject(deserialize, str, message.receiptHandle());
                } catch (JsonProcessingException e) {
                    logger.warning(e.getMessage());
                }
                return queueObject;
            }).collect(Collectors.toList()));
        });
        return linkedList.stream();
    }

    @Override // java.lang.Runnable
    public void run() {
        init();
        logger.info("Start polling for reaction [reactionId=" + this.reactionId + "]!");
        while (true) {
            logger.fine("Reaction [reactionId=" + this.reactionId + "] is polling");
            poll(1).forEach(queueObject -> {
                this.queueListener.onObjectReceived(this.reactionId, queueObject.getObject());
                this.sqsService.messageProcessed(queueObject.getQueueArn(), queueObject.getReceiptHandle());
            });
        }
    }
}
