package com.navercorp.pinpoint.channel.redis.stream;

import com.navercorp.pinpoint.channel.SubChannel;
import com.navercorp.pinpoint.channel.SubConsumer;
import com.navercorp.pinpoint.channel.Subscription;
import com.navercorp.pinpoint.common.util.BytesUtils;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/navercorp/pinpoint/channel/redis/stream/RedisStreamSubChannel.class */
class RedisStreamSubChannel implements SubChannel {
    private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer;
    private final ReactiveRedisTemplate<String, String> redisTemplate;
    private final ReactiveStreamOperations<String, String, String> streamOps;
    private final String name;
    private final String key;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RedisStreamSubChannel(StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer, ReactiveRedisTemplate<String, String> reactiveRedisTemplate, String str, String str2) {
        this.listenerContainer = (StreamMessageListenerContainer) Objects.requireNonNull(streamMessageListenerContainer, "listenerContainer");
        this.redisTemplate = (ReactiveRedisTemplate) Objects.requireNonNull(reactiveRedisTemplate, "redisTemplate");
        this.streamOps = this.redisTemplate.opsForStream();
        this.name = (String) Objects.requireNonNull(str, "name");
        this.key = (String) Objects.requireNonNull(str2, "key");
    }

    public Subscription subscribe(SubConsumer subConsumer) {
        String newGroupName = newGroupName();
        Flux.merge(new Publisher[]{this.streamOps.createGroup(this.key, newGroupName), this.redisTemplate.expire(this.key, Duration.ofMinutes(30L))}).blockLast();
        return new RedisStreamSubscription(this, this.listenerContainer.receive(Consumer.from(newGroupName, newGroupName), StreamOffset.create(this.key, ReadOffset.lastConsumed()), mapRecord -> {
            if (subConsumer.consume(BytesUtils.toBytes((String) ((Map) mapRecord.getValue()).get("content")))) {
                this.streamOps.acknowledge(newGroupName, mapRecord).block();
            }
        }), newGroupName);
    }

    private String newGroupName() {
        return this.name + "-" + UUID.randomUUID();
    }

    public void unsubscribe(Subscription subscription) {
        if (subscription instanceof RedisStreamSubscription) {
            unsubscribe((RedisStreamSubscription) subscription);
        }
    }

    private void unsubscribe(RedisStreamSubscription redisStreamSubscription) {
        try {
            this.listenerContainer.remove(redisStreamSubscription.getRedisSubscription());
        } catch (Exception e) {
        }
    }
}
