package org.springframework.netflix.turbine.amqp;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rx.Observable;
import rx.subjects.PublishSubject;

@ConfigurationProperties("turbine.amqp")
@Configuration
/* loaded from: input_file:org/springframework/netflix/turbine/amqp/TurbineAmqpConfiguration.class */
public class TurbineAmqpConfiguration implements SmartLifecycle {
    private static final Logger log = LoggerFactory.getLogger(TurbineAmqpConfiguration.class);
    private boolean running = false;
    private int port = 8989;

    @Bean
    public PublishSubject<Map<String, Object>> hystrixSubject() {
        return PublishSubject.create();
    }

    @Bean
    public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
        Observable refCount = StreamAggregator.aggregateGroupedStreams(hystrixSubject().groupBy(map -> {
            return InstanceKey.create((String) map.get("instanceId"));
        })).doOnUnsubscribe(() -> {
            log.info("AmqpTurbine => Unsubscribing aggregation.");
        }).doOnSubscribe(() -> {
            log.info("AmqpTurbine => Starting aggregation");
        }).flatMap(groupedObservable -> {
            return groupedObservable;
        }).publish().refCount();
        return RxNetty.createHttpServer(this.port, (httpServerRequest, httpServerResponse) -> {
            log.info("AmqpTurbine => SSE Request Received");
            httpServerResponse.getHeaders().setHeader("Content-Type", "text/event-stream");
            return refCount.doOnUnsubscribe(() -> {
                log.info("AmqpTurbine => Unsubscribing RxNetty server connection");
            }).flatMap(map2 -> {
                return httpServerResponse.writeAndFlush(new ServerSentEvent((String) null, (String) null, JsonUtility.mapToJson(map2)));
            });
        }, PipelineConfigurators.sseServerConfigurator());
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public void start() {
        aggregatorServer().start();
    }

    public void stop() {
        try {
            aggregatorServer().shutdown();
        } catch (InterruptedException e) {
            log.error("Error shutting down", e);
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }
}
