package io.kroxylicious.proxy.filter.simpletransform;

import io.kroxylicious.proxy.filter.FetchResponseFilter;
import io.kroxylicious.proxy.filter.FilterContext;
import io.kroxylicious.proxy.filter.ResponseFilterResult;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/filter/simpletransform/FetchResponseTransformationFilter.class */
public class FetchResponseTransformationFilter implements FetchResponseFilter {
    private static final short METADATA_API_VER_WITH_TOPIC_ID_SUPPORT = 12;
    private static final Logger LOGGER = LoggerFactory.getLogger(FetchResponseTransformationFilter.class);
    private final ByteBufferTransformation valueTransformation;

    public FetchResponseTransformationFilter(ByteBufferTransformation byteBufferTransformation) {
        this.valueTransformation = byteBufferTransformation;
    }

    public CompletionStage<ResponseFilterResult> onFetchResponse(short s, ResponseHeaderData responseHeaderData, FetchResponseData fetchResponseData, FilterContext filterContext) {
        List list = fetchResponseData.responses().stream().filter(fetchableTopicResponse -> {
            return fetchableTopicResponse.topic().isEmpty();
        }).map(fetchableTopicResponse2 -> {
            return new MetadataRequestData.MetadataRequestTopic().setName((String) null).setTopicId(fetchableTopicResponse2.topicId());
        }).distinct().toList();
        if (list.isEmpty()) {
            applyTransformation(filterContext, fetchResponseData);
            return filterContext.forwardResponse(responseHeaderData, fetchResponseData);
        }
        LOGGER.debug("Fetch response contains {} unknown topic ids, lookup via Metadata request: {}", Integer.valueOf(list.size()), list);
        return filterContext.sendRequest(new RequestHeaderData().setRequestApiVersion((short) 12), new MetadataRequestData().setTopics(list)).thenCompose(metadataResponseData -> {
            Map map = (Map) metadataResponseData.topics().stream().collect(Collectors.toMap((v0) -> {
                return v0.topicId();
            }, (v0) -> {
                return v0.name();
            }));
            LOGGER.debug("Metadata response yields {}, updating original Fetch response", map);
            for (FetchResponseData.FetchableTopicResponse fetchableTopicResponse3 : fetchResponseData.responses()) {
                fetchableTopicResponse3.setTopic((String) map.get(fetchableTopicResponse3.topicId()));
            }
            applyTransformation(filterContext, fetchResponseData);
            LOGGER.debug("Forwarding original Fetch response");
            return filterContext.responseFilterResultBuilder().forward(responseHeaderData, fetchResponseData).completed();
        });
    }

    private void applyTransformation(FilterContext filterContext, FetchResponseData fetchResponseData) {
        for (FetchResponseData.FetchableTopicResponse fetchableTopicResponse : fetchResponseData.responses()) {
            for (FetchResponseData.PartitionData partitionData : fetchableTopicResponse.partitions()) {
                MemoryRecords records = partitionData.records();
                ByteBufferOutputStream createByteBufferOutputStream = filterContext.createByteBufferOutputStream(records.sizeInBytes());
                MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(createByteBufferOutputStream, (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), -1L, (short) -1, -1, false, false, -1, createByteBufferOutputStream.remaining());
                try {
                    Iterator it = records.batches().iterator();
                    while (it.hasNext()) {
                        for (Record record : (MutableRecordBatch) it.next()) {
                            memoryRecordsBuilder.appendWithOffset(record.offset(), record.timestamp(), record.key(), this.valueTransformation.transform(fetchableTopicResponse.topic(), record.value()));
                        }
                    }
                    partitionData.setRecords(memoryRecordsBuilder.build());
                    memoryRecordsBuilder.close();
                } catch (Throwable th) {
                    try {
                        memoryRecordsBuilder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
        }
    }
}
