package org.springframework.cloud.function.stream.config;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.reactive.FluxSender;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker.class */
public class StreamListeningFunctionInvoker implements SmartInitializingSingleton {
    private final FunctionInspector functionInspector;
    private final FunctionCatalog functionCatalog;
    private final CompositeMessageConverterFactory converterFactory;
    private MessageConverter converter;
    private final String defaultRoute;
    private final Map<String, FluxMessageProcessor> processors = new HashMap();
    private int count = -1;
    private static final FluxMessageProcessor NOENDPOINT = flux -> {
        return Flux.empty();
    };
    private static final Object UNCONVERTED = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/cloud/function/stream/config/StreamListeningFunctionInvoker$FluxMessageProcessor.class */
    public interface FluxMessageProcessor {
        Flux<Message<?>> process(Flux<Message<?>> flux);
    }

    public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory, String str) {
        this.functionCatalog = functionCatalog;
        this.functionInspector = functionInspector;
        this.converterFactory = compositeMessageConverterFactory;
        this.defaultRoute = str;
    }

    public void afterSingletonsInstantiated() {
        this.converter = this.converterFactory.getMessageConverterForAllRegistered();
    }

    @StreamListener
    public Mono<Void> handle(@Input("input") Flux<Message<?>> flux, @Output("output") FluxSender fluxSender) {
        return fluxSender.send(flux.groupBy(this::select).flatMap(groupedFlux -> {
            return ((FluxMessageProcessor) groupedFlux.key()).process(groupedFlux);
        }));
    }

    private Flux<Message<?>> function(String str, Flux<Message<?>> flux) {
        Function lookupFunction = this.functionCatalog.lookupFunction(str);
        return flux.publish(flux2 -> {
            Flux flux2 = (Flux) lookupFunction.apply(flux2.map(message -> {
                return convertInput(lookupFunction).apply(message);
            }));
            if (this.functionInspector.isMessage(lookupFunction)) {
                flux2 = flux2.map(obj -> {
                    return MessageUtils.unpack(lookupFunction, obj);
                });
            }
            return flux2.withLatestFrom(headers(flux2), (obj2, map) -> {
                return message(obj2, map);
            });
        });
    }

    private Flux<Map<String, Object>> headers(Flux<Message<?>> flux) {
        return flux.map(message -> {
            return message.getHeaders();
        });
    }

    private Message<?> message(Object obj, Map<String, Object> map) {
        return obj instanceof Message ? MessageBuilder.fromMessage((Message) obj).copyHeadersIfAbsent(map).build() : MessageBuilder.withPayload(obj).copyHeadersIfAbsent(map).build();
    }

    private Flux<Message<?>> consumer(String str, Flux<Message<?>> flux) {
        Consumer lookupConsumer = this.functionCatalog.lookupConsumer(str);
        lookupConsumer.accept(flux.map(message -> {
            return convertInput(lookupConsumer).apply(message);
        }).filter(obj -> {
            return obj != UNCONVERTED;
        }));
        return Flux.empty();
    }

    private Flux<Message<?>> balance(List<String> list, Flux<Message<?>> flux) {
        if (list.isEmpty()) {
            return Flux.empty();
        }
        String choose = choose(list);
        return this.functionCatalog.lookupConsumer(choose) != null ? consumer(choose, flux) : function(choose, flux);
    }

    private synchronized String choose(List<String> list) {
        int i = this.count + 1;
        this.count = i;
        if (i >= list.size() || this.count < 0) {
            this.count = 0;
        }
        return list.get(this.count);
    }

    private FluxMessageProcessor select(Message<?> message) {
        Class inputType;
        Object fromMessage;
        String stash = message.getHeaders().containsKey(StreamConfigurationProperties.ROUTE_KEY) ? stash((String) message.getHeaders().get(StreamConfigurationProperties.ROUTE_KEY)) : null;
        if (stash == null && this.defaultRoute != null) {
            stash = stash(this.defaultRoute);
        }
        if (stash == null) {
            LinkedHashSet<String> linkedHashSet = new LinkedHashSet(this.functionCatalog.getFunctionNames());
            linkedHashSet.addAll(this.functionCatalog.getConsumerNames());
            ArrayList arrayList = new ArrayList();
            if (linkedHashSet.size() == 1) {
                stash = stash((String) linkedHashSet.iterator().next());
            } else {
                for (String str : linkedHashSet) {
                    Object lookupFunction = this.functionCatalog.lookupFunction(str);
                    if (lookupFunction == null) {
                        lookupFunction = this.functionCatalog.lookupConsumer(str);
                    }
                    if (lookupFunction != null && (fromMessage = this.converter.fromMessage(message, (inputType = this.functionInspector.getInputType(lookupFunction)))) != null && inputType.isInstance(fromMessage)) {
                        arrayList.add(str);
                    }
                }
                if (arrayList.size() != 1) {
                    return flux -> {
                        return balance(arrayList, flux);
                    };
                }
                stash = stash((String) arrayList.iterator().next());
            }
        }
        return stash == null ? NOENDPOINT : this.processors.get(stash);
    }

    private String stash(String str) {
        if (this.functionCatalog.lookupFunction(str) != null) {
            if (!this.processors.containsKey(str)) {
                this.processors.put(str, flux -> {
                    return function(str, flux);
                });
            }
            return str;
        }
        if (this.functionCatalog.lookupConsumer(str) == null) {
            return null;
        }
        if (!this.processors.containsKey(str)) {
            this.processors.put(str, flux2 -> {
                return consumer(str, flux2);
            });
        }
        return str;
    }

    private Function<Message<?>, Object> convertInput(Object obj) {
        Class inputType = this.functionInspector.getInputType(obj);
        return message -> {
            return this.functionInspector.isMessage(obj) ? MessageUtils.create(obj, convertPayload(inputType, message), message.getHeaders()) : convertPayload(inputType, message);
        };
    }

    private Object convertPayload(Class<?> cls, Message<?> message) {
        Object payload = cls.isAssignableFrom(message.getPayload().getClass()) ? message.getPayload() : this.converter.fromMessage(message, cls);
        if (payload == null) {
            payload = UNCONVERTED;
        }
        return payload;
    }
}
