package com.hazelcast.jet.cdc.impl;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.cdc.CdcSinks;
import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.connector.AbstractUpdateMapP;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/cdc/impl/WriteCdcP.class */
public class WriteCdcP<K, V> extends AbstractUpdateMapP<ChangeRecord, K, V> {
    private static final int MAX_PARALLEL_ASYNC_OPS = 1;
    private static final int INITIAL_CAPACITY = 4096;
    private static final float LOAD_FACTOR = 0.75f;
    private final FunctionEx<? super ChangeRecord, ? extends V> valueFn;
    private LinkedHashMap<K, Sequence> sequences;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/cdc/impl/WriteCdcP$Sequence.class */
    public static class Sequence {
        private long timestamp = System.currentTimeMillis();
        private long source;
        private long sequence;

        Sequence(long j, long j2) {
            this.source = j;
            this.sequence = j2;
        }

        boolean isOlderThan(long j) {
            return System.currentTimeMillis() - this.timestamp > j;
        }

        boolean update(long j, long j2) {
            this.timestamp = System.currentTimeMillis();
            if (this.source != j) {
                this.source = j;
                this.sequence = j2;
                return true;
            }
            if (this.sequence >= j2) {
                return false;
            }
            this.sequence = j2;
            return true;
        }
    }

    public WriteCdcP(@Nonnull HazelcastInstance hazelcastInstance, @Nonnull String str, @Nonnull FunctionEx<? super ChangeRecord, ? extends K> functionEx, @Nonnull FunctionEx<? super ChangeRecord, ? extends V> functionEx2) {
        super(hazelcastInstance, MAX_PARALLEL_ASYNC_OPS, str, functionEx);
        this.valueFn = functionEx2;
    }

    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        super.init(outbox, context);
        final long millis = new HazelcastProperties(context.hazelcastInstance().getConfig().getProperties()).getMillis(CdcSinks.SEQUENCE_CACHE_EXPIRATION_SECONDS);
        this.sequences = new LinkedHashMap<K, Sequence>(INITIAL_CAPACITY, LOAD_FACTOR, true) { // from class: com.hazelcast.jet.cdc.impl.WriteCdcP.1
            private static final long serialVersionUID = 1;

            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<K, Sequence> entry) {
                return entry.getValue().isOlderThan(millis);
            }
        };
    }

    boolean updateSequence(K k, long j, long j2) {
        Sequence sequence = this.sequences.get(k);
        if (sequence != null) {
            return sequence.update(j, j2);
        }
        this.sequences.put(k, new Sequence(j, j2));
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void addToBuffer(ChangeRecord changeRecord) {
        Object apply = this.keyFn.apply(changeRecord);
        if (shouldBeDropped(apply, changeRecord)) {
            this.pendingItemCount -= MAX_PARALLEL_ASYNC_OPS;
            return;
        }
        Data keyData = this.serializationContext.toKeyData(apply);
        int partitionId = this.serializationContext.partitionId(keyData);
        if (this.partitionBuffers[partitionId].put(keyData, this.serializationContext.toData(this.valueFn.apply(changeRecord))) != null) {
            this.pendingItemCount -= MAX_PARALLEL_ASYNC_OPS;
        } else {
            int[] iArr = this.pendingInPartition;
            iArr[partitionId] = iArr[partitionId] + MAX_PARALLEL_ASYNC_OPS;
        }
    }

    private boolean shouldBeDropped(K k, ChangeRecord changeRecord) {
        return !updateSequence(k, changeRecord.sequenceSource(), changeRecord.sequenceValue());
    }

    protected EntryProcessor<K, V, Void> entryProcessor(Map<Data, Object> map) {
        return new AbstractUpdateMapP.ApplyValuesEntryProcessor(map);
    }
}
