package com.hazelcast.jet.cdc.impl;

import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.Operation;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.util.Properties;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:com/hazelcast/jet/cdc/impl/ChangeRecordCdcSourceP.class */
public class ChangeRecordCdcSourceP extends CdcSourceP<ChangeRecord> {
    public static final String DB_SPECIFIC_EXTRA_FIELDS_PROPERTY = "db.specific.extra.fields";
    private final SequenceExtractor sequenceExtractor;

    public ChangeRecordCdcSourceP(@Nonnull Properties properties, @Nonnull EventTimePolicy<? super ChangeRecord> eventTimePolicy) {
        super(properties, eventTimePolicy);
        try {
            this.sequenceExtractor = (SequenceExtractor) newInstance(properties.getProperty(CdcSourceP.SEQUENCE_EXTRACTOR_CLASS_PROPERTY), "sequence extractor ");
        } catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.jet.cdc.impl.CdcSourceP
    @Nullable
    public ChangeRecord map(SourceRecord sourceRecord) {
        long longValue;
        Supplier supplier;
        Supplier supplier2;
        if (sourceRecord == null || sourceRecord.topic().startsWith("__debezium")) {
            return null;
        }
        long source = this.sequenceExtractor.source(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
        long sequence = this.sequenceExtractor.sequence(sourceRecord.sourceOffset());
        String convertToString = Values.convertToString(sourceRecord.keySchema(), sourceRecord.key());
        Struct struct = (Struct) sourceRecord.value();
        Schema valueSchema = sourceRecord.valueSchema();
        Struct struct2 = (Struct) struct.get("source");
        Operation operation = struct.schema().field("op") != null ? Operation.get(struct.getString("op")) : Operation.UNSPECIFIED;
        if (operation == Operation.UNSPECIFIED) {
            longValue = ((Struct) struct.get("source")).getInt64("ts_ms").longValue();
            supplier = () -> {
                return Values.convertToString(valueSchema, struct);
            };
            supplier2 = () -> {
                return Values.convertToString(valueSchema, struct);
            };
        } else {
            Object obj = valueSchema.field("before") != null ? struct.get("before") : null;
            Object obj2 = valueSchema.field("after") != null ? struct.get("after") : null;
            longValue = struct.getInt64("ts_ms").longValue();
            supplier = obj == null ? null : () -> {
                return Values.convertToString(valueSchema.field("before").schema(), obj);
            };
            supplier2 = obj2 == null ? null : () -> {
                return Values.convertToString(valueSchema.field("after").schema(), obj2);
            };
        }
        return new ChangeRecordImpl(longValue, source, sequence, operation, convertToString, (Supplier<String>) supplier, (Supplier<String>) supplier2, fieldOrNull(struct2, "table"), fieldOrNull(struct2, "schema"), fieldOrNull(struct2, "db"));
    }

    private static String fieldOrNull(Struct struct, String str) {
        if (struct.schema().field(str) != null) {
            return struct.getString(str);
        }
        return null;
    }
}
