package io.kroxylicious.filter.encryption;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.kroxylicious.proxy.filter.FetchResponseFilter;
import io.kroxylicious.proxy.filter.FilterContext;
import io.kroxylicious.proxy.filter.ProduceRequestFilter;
import io.kroxylicious.proxy.filter.RequestFilterResult;
import io.kroxylicious.proxy.filter.ResponseFilterResult;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.record.MemoryRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/filter/encryption/RecordEncryptionFilter.class */
public class RecordEncryptionFilter<K> implements ProduceRequestFilter, FetchResponseFilter {
    private static final Logger log = LoggerFactory.getLogger(RecordEncryptionFilter.class);
    private final TopicNameBasedKekSelector<K> kekSelector;
    private final EncryptionManager<K> encryptionManager;
    private final DecryptionManager decryptionManager;
    private final FilterThreadExecutor filterThreadExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordEncryptionFilter(EncryptionManager<K> encryptionManager, DecryptionManager decryptionManager, TopicNameBasedKekSelector<K> topicNameBasedKekSelector, @NonNull FilterThreadExecutor filterThreadExecutor) {
        this.kekSelector = topicNameBasedKekSelector;
        this.encryptionManager = encryptionManager;
        this.decryptionManager = decryptionManager;
        this.filterThreadExecutor = filterThreadExecutor;
    }

    public static <T> CompletionStage<List<T>> join(List<? extends CompletionStage<T>> list) {
        CompletableFuture[] completableFutureArr = (CompletableFuture[]) list.stream().map((v0) -> {
            return v0.toCompletableFuture();
        }).toArray(i -> {
            return new CompletableFuture[i];
        });
        return CompletableFuture.allOf(completableFutureArr).thenApply(r4 -> {
            return Stream.of((Object[]) completableFutureArr).map((v0) -> {
                return v0.join();
            }).toList();
        });
    }

    public CompletionStage<RequestFilterResult> onProduceRequest(short s, RequestHeaderData requestHeaderData, ProduceRequestData produceRequestData, FilterContext filterContext) {
        return maybeEncodeProduce(produceRequestData, filterContext).thenCompose(produceRequestData2 -> {
            return filterContext.forwardRequest(requestHeaderData, produceRequestData);
        });
    }

    private CompletionStage<ProduceRequestData> maybeEncodeProduce(ProduceRequestData produceRequestData, FilterContext filterContext) {
        Map map = (Map) produceRequestData.topicData().stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, Function.identity()));
        return this.filterThreadExecutor.completingOnFilterThread(this.kekSelector.selectKek(map.keySet())).thenCompose(map2 -> {
            return join(map2.entrySet().stream().flatMap(entry -> {
                String str = (String) entry.getKey();
                Object value = entry.getValue();
                return ((ProduceRequestData.TopicProduceData) map.get(str)).partitionData().stream().map(partitionProduceData -> {
                    if (value == null) {
                        return CompletableFuture.completedStage(partitionProduceData);
                    }
                    MemoryRecords records = partitionProduceData.records();
                    EncryptionManager<K> encryptionManager = this.encryptionManager;
                    int index = partitionProduceData.index();
                    EncryptionScheme<K> encryptionScheme = new EncryptionScheme<>(value, EnumSet.of(RecordField.RECORD_VALUE));
                    Objects.requireNonNull(filterContext);
                    CompletionStage<MemoryRecords> encrypt = encryptionManager.encrypt(str, index, encryptionScheme, records, filterContext::createByteBufferOutputStream);
                    Objects.requireNonNull(partitionProduceData);
                    return encrypt.thenApply((v1) -> {
                        return r1.setRecords(v1);
                    });
                });
            }).toList()).thenApply(list -> {
                return produceRequestData;
            });
        }).exceptionallyCompose(th -> {
            log.atWarn().setMessage("failed to encrypt records, cause message: {}").addArgument(th.getMessage()).setCause(log.isDebugEnabled() ? th : null).log();
            return CompletableFuture.failedStage(th);
        });
    }

    public CompletionStage<ResponseFilterResult> onFetchResponse(short s, ResponseHeaderData responseHeaderData, FetchResponseData fetchResponseData, FilterContext filterContext) {
        return maybeDecodeFetch(fetchResponseData.responses(), filterContext).thenCompose(list -> {
            return filterContext.forwardResponse(responseHeaderData, fetchResponseData.setResponses(list));
        }).exceptionallyCompose(th -> {
            log.atWarn().setMessage("failed to decrypt records, cause message: {}").addArgument(th.getMessage()).setCause(log.isDebugEnabled() ? th : null).log();
            return CompletableFuture.failedStage(th);
        });
    }

    private CompletionStage<List<FetchResponseData.FetchableTopicResponse>> maybeDecodeFetch(List<FetchResponseData.FetchableTopicResponse> list, FilterContext filterContext) {
        ArrayList arrayList = new ArrayList(list.size());
        for (FetchResponseData.FetchableTopicResponse fetchableTopicResponse : list) {
            arrayList.add(maybeDecodePartitions(fetchableTopicResponse.topic(), fetchableTopicResponse.partitions(), filterContext).thenApply(list2 -> {
                fetchableTopicResponse.setPartitions(list2);
                return fetchableTopicResponse;
            }));
        }
        return join(arrayList);
    }

    private CompletionStage<List<FetchResponseData.PartitionData>> maybeDecodePartitions(String str, List<FetchResponseData.PartitionData> list, FilterContext filterContext) {
        ArrayList arrayList = new ArrayList(list.size());
        for (FetchResponseData.PartitionData partitionData : list) {
            if (!(partitionData.records() instanceof MemoryRecords)) {
                throw new IllegalStateException();
            }
            arrayList.add(maybeDecodeRecords(str, partitionData, partitionData.records(), filterContext));
        }
        return join(arrayList);
    }

    private CompletionStage<FetchResponseData.PartitionData> maybeDecodeRecords(String str, FetchResponseData.PartitionData partitionData, MemoryRecords memoryRecords, FilterContext filterContext) {
        DecryptionManager decryptionManager = this.decryptionManager;
        int partitionIndex = partitionData.partitionIndex();
        Objects.requireNonNull(filterContext);
        CompletionStage<MemoryRecords> decrypt = decryptionManager.decrypt(str, partitionIndex, memoryRecords, filterContext::createByteBufferOutputStream);
        Objects.requireNonNull(partitionData);
        return decrypt.thenApply((v1) -> {
            return r1.setRecords(v1);
        });
    }
}
