package com.mnt.sio.core;

import com.mnt.base.util.CommonUtil;
import com.mnt.base.util.HashUtil;
import com.mnt.sio.core.annotation.StreamIn;
import com.mnt.sio.core.annotation.StreamOut;
import com.mnt.sio.core.annotation.StreamPipe;
import com.mnt.sio.core.context.StreamContext;
import com.mnt.sio.core.pipe.FieldHandler;
import com.mnt.sio.core.pipe.PipeProcessor;
import com.mnt.sio.core.pipe.RecordHandler;
import com.mnt.sio.core.sin.SIn;
import com.mnt.sio.core.sin.SInBuilder;
import com.mnt.sio.core.sout.SOut;
import com.mnt.sio.core.sout.SOutBuilder;
import com.mnt.sio.metrics.SIOMetrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/mnt/sio/core/SIOEngine.class */
public final class SIOEngine {
    protected final Logger logger = LogManager.getLogger(getClass());
    private Map<String, PipeProcessor> pipeProcessorMap = new HashMap();
    private Map<String, List<FieldHandler>> fieldHandlerMap = new HashMap();
    private Map<String, List<RecordHandler>> recordHandlerMap = new HashMap();
    private Map<String, StreamContext> rtContextMap = new HashMap();
    private List<Pipe> pipes = new ArrayList();

    @Autowired
    private SInBuilder sinBuilder;

    @Autowired
    private SOutBuilder soutBuilder;

    @Autowired
    private SIOMetrics sioMetrics;

    @Autowired(required = false)
    public void setRTFlows(List<PipeProcessor> list) {
        if (CommonUtil.isEmpty(list)) {
            return;
        }
        list.forEach(pipeProcessor -> {
            String pipeName = pipeName(pipeProcessor);
            if (CommonUtil.isEmpty(pipeName)) {
                return;
            }
            this.pipeProcessorMap.put(pipeName, pipeProcessor);
        });
    }

    @Autowired(required = false)
    public void setFieldHandlers(List<FieldHandler> list) {
        if (CommonUtil.isEmpty(list)) {
            return;
        }
        list.forEach(fieldHandler -> {
            String pipeName = pipeName(fieldHandler);
            if (CommonUtil.isEmpty(pipeName)) {
                return;
            }
            this.fieldHandlerMap.computeIfAbsent(pipeName, str -> {
                return new ArrayList();
            }).add(fieldHandler);
        });
    }

    @Autowired(required = false)
    public void setRecordHandlers(List<RecordHandler> list) {
        if (CommonUtil.isEmpty(list)) {
            return;
        }
        list.forEach(recordHandler -> {
            String pipeName = pipeName(recordHandler);
            if (CommonUtil.isEmpty(pipeName)) {
                return;
            }
            this.recordHandlerMap.computeIfAbsent(pipeName, str -> {
                return new ArrayList();
            }).add(recordHandler);
        });
    }

    @Autowired(required = false)
    public void setRTContexts(List<StreamContext> list) {
        if (CommonUtil.isEmpty(list)) {
            return;
        }
        list.forEach(streamContext -> {
            String pipeName = pipeName(streamContext);
            if (CommonUtil.isEmpty(pipeName)) {
                return;
            }
            if (this.rtContextMap.containsKey(pipeName)) {
                this.logger.warn("duplicate context for pipe: {}, exists: {}, duplicate: {}, replaced.", pipeName, this.rtContextMap.get(pipeName), streamContext);
            }
            this.rtContextMap.put(pipeName, streamContext);
        });
    }

    @PostConstruct
    public void init() {
        HashMap hashMap = new HashMap();
        this.pipeProcessorMap.forEach((str, pipeProcessor) -> {
            final StreamContext orDefault = this.rtContextMap.getOrDefault(str, StreamContext.emptyContext());
            StreamIn streamIn = (StreamIn) pipeProcessor.getClass().getAnnotation(StreamIn.class);
            if (streamIn == null) {
                throw new RuntimeException("Flow must define the corresponding RTSource: " + pipeProcessor);
            }
            StreamOut streamOut = (StreamOut) pipeProcessor.getClass().getAnnotation(StreamOut.class);
            List<FieldHandler> list = this.fieldHandlerMap.get(str);
            if (!CommonUtil.isEmpty(list)) {
                pipeProcessor.setFieldHandlers(list);
            }
            List<RecordHandler> list2 = this.recordHandlerMap.get(str);
            if (CommonUtil.isEmpty(list2)) {
                throw new RuntimeException("flow must have at least one record handler, " + pipeProcessor);
            }
            pipeProcessor.setRecordHandlers(list2);
            final SIn build = this.sinBuilder.build(streamIn, hashMap);
            final SOut build2 = this.soutBuilder.build(streamOut, hashMap);
            Pipe pipe = new Pipe() { // from class: com.mnt.sio.core.SIOEngine.1
                private String pipeIdf = HashUtil.hashByMD5(String.valueOf(name()) + sin().desc() + sout().desc());
                private String instanceId = HashUtil.hashByMD5(String.valueOf(name()) + sin().desc() + sout().desc());

                @Override // com.mnt.sio.core.Pipe
                public String name() {
                    return str;
                }

                @Override // com.mnt.sio.core.Pipe
                public String desc() {
                    return pipeProcessor.desc();
                }

                @Override // com.mnt.sio.core.Pipe
                public SIn sin() {
                    return build;
                }

                @Override // com.mnt.sio.core.Pipe
                public PipeProcessor processor() {
                    return pipeProcessor;
                }

                @Override // com.mnt.sio.core.Pipe
                public SOut sout() {
                    return build2;
                }

                @Override // com.mnt.sio.core.Pipe
                public StreamContext context() {
                    return orDefault;
                }

                @Override // com.mnt.sio.core.Pipe
                public String pipeIdf() {
                    return this.pipeIdf;
                }

                @Override // com.mnt.sio.core.Pipe
                public String instanceId() {
                    return this.instanceId;
                }
            };
            this.pipes.add(pipe);
            new PipeWorker(this.sioMetrics, pipe);
        });
    }

    private String pipeName(Object obj) {
        StreamPipe streamPipe = (StreamPipe) obj.getClass().getAnnotation(StreamPipe.class);
        if (streamPipe != null) {
            return streamPipe.value();
        }
        this.logger.warn("pipe component class [{}] without RTPipe annotation def, ignore.", obj.getClass().getName());
        return null;
    }

    public List<Pipe> listPipes() {
        return this.pipes;
    }
}
