package com.navercorp.pinpoint.channel.redis.stream;

import com.navercorp.pinpoint.channel.ChannelProvider;
import com.navercorp.pinpoint.channel.ChannelProviderRegistry;
import com.navercorp.pinpoint.redis.RedisBasicConfig;
import com.navercorp.pinpoint.redis.value.RedisValueConfig;
import java.net.InetAddress;
import java.time.Duration;
import java.util.concurrent.Executor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

@Configuration(proxyBeanMethods = false)
@Import({RedisBasicConfig.class, RedisValueConfig.class})
/* loaded from: input_file:com/navercorp/pinpoint/channel/redis/stream/RedisStreamConfig.class */
public class RedisStreamConfig {

    @Value("${pinpoint.redis.stream.client.timeout.ms:5000}")
    long clientTimeoutMs;

    @Bean({"redisStreamMessageExecutor"})
    public Executor redisPubSubMessageExecutor() {
        return new SimpleAsyncTaskExecutor("redis-stream-message-executor");
    }

    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, @Qualifier("redisStreamMessageExecutor") Executor executor) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder builder = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder();
        builder.serializer(StringRedisSerializer.UTF_8);
        if (executor != null) {
            builder.executor(executor);
        }
        builder.pollTimeout(Duration.ofMillis(100L));
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> create = StreamMessageListenerContainer.create(redisConnectionFactory, builder.build());
        create.start();
        return create;
    }

    @Bean({"redisStreamPubSubChannelProvider"})
    public ChannelProviderRegistry redisStreamPubSubChannelProvider(ReactiveRedisTemplate<String, String> reactiveRedisTemplate, StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer) throws Exception {
        return ChannelProviderRegistry.of(RedisStreamConstants.SCHEME, ChannelProvider.pair(new RedisStreamPubChannelProvider(reactiveRedisTemplate.opsForStream()), new RedisStreamSubChannelProvider(streamMessageListenerContainer, reactiveRedisTemplate, InetAddress.getLocalHost().getHostName())));
    }
}
