package org.eclipse.ditto.services.thingsearch.persistence.write.streaming;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.stream.javadsl.Flow;
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.bulk.BulkWriteResult;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.Metadata;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.WriteResultAndErrors;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BulkWriteResultAckFlow.class */
final class BulkWriteResultAckFlow {
    private static final String ERRORS_COUNTER_NAME = "search-index-update-errors";
    private final ActorRef updaterShard;
    private final Counter errorsCounter = DittoMetrics.counter(ERRORS_COUNTER_NAME);

    private BulkWriteResultAckFlow(ActorRef actorRef) {
        this.updaterShard = actorRef;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BulkWriteResultAckFlow of(ActorRef actorRef) {
        return new BulkWriteResultAckFlow(actorRef);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flow<WriteResultAndErrors, String, NotUsed> start() {
        return Flow.create().mapConcat(this::checkBulkWriteResult);
    }

    private Iterable<String> checkBulkWriteResult(WriteResultAndErrors writeResultAndErrors) {
        if (wasNotAcknowledged(writeResultAndErrors)) {
            acknowledgeFailures(getAllThings(writeResultAndErrors));
            return Collections.singleton(logResult("NotAcknowledged", writeResultAndErrors, false));
        }
        Optional<String> checkForConsistencyError = checkForConsistencyError(writeResultAndErrors);
        if (checkForConsistencyError.isPresent()) {
            acknowledgeFailures(getAllThings(writeResultAndErrors));
            return Collections.singleton(checkForConsistencyError.get());
        }
        List<BulkWriteError> bulkWriteErrors = writeResultAndErrors.getBulkWriteErrors();
        ArrayList arrayList = new ArrayList(bulkWriteErrors.size() + 1);
        ArrayList arrayList2 = new ArrayList(bulkWriteErrors.size());
        arrayList.add(logResult("Acknowledged", writeResultAndErrors, bulkWriteErrors.isEmpty()));
        for (BulkWriteError bulkWriteError : bulkWriteErrors) {
            Metadata metadata = writeResultAndErrors.getWriteModels().get(bulkWriteError.getIndex()).getMetadata();
            arrayList.add(String.format("UpdateFailed for %s due to %s", metadata, bulkWriteError));
            arrayList2.add(metadata);
        }
        acknowledgeFailures(arrayList2);
        return arrayList;
    }

    private void acknowledgeFailures(List<Metadata> list) {
        this.errorsCounter.increment(list.size());
        acknowledge(list, BulkWriteResultAckFlow::createFailureResponse);
    }

    private void acknowledge(List<Metadata> list, Function<Metadata, UpdateThingResponse> function) {
        Iterator<Metadata> it = list.iterator();
        while (it.hasNext()) {
            UpdateThingResponse apply = function.apply(it.next());
            this.updaterShard.tell(ShardedMessageEnvelope.of(apply.getEntityId(), apply.getType(), apply.toJson(), apply.getDittoHeaders()), ActorRef.noSender());
        }
    }

    private static UpdateThingResponse createFailureResponse(Metadata metadata) {
        return createResponse(metadata, false);
    }

    private static UpdateThingResponse createResponse(Metadata metadata, boolean z) {
        return UpdateThingResponse.of(metadata.getThingId(), metadata.getThingRevision(), (PolicyId) metadata.getPolicyId().map(PolicyId::of).orElse(null), (Long) metadata.getPolicyId().flatMap(policyId -> {
            return metadata.getPolicyRevision();
        }).orElse(null), z, DittoHeaders.empty());
    }

    private static boolean wasNotAcknowledged(WriteResultAndErrors writeResultAndErrors) {
        return !writeResultAndErrors.getBulkWriteResult().wasAcknowledged();
    }

    private static Optional<String> checkForConsistencyError(WriteResultAndErrors writeResultAndErrors) {
        return !areAllIndexesWithinBounds(writeResultAndErrors.getBulkWriteErrors(), writeResultAndErrors.getWriteModels().size()) ? Optional.of(String.format("ConsistencyError[indexOutOfBound]: %s", writeResultAndErrors.toString())) : Optional.empty();
    }

    private static boolean areAllIndexesWithinBounds(List<BulkWriteError> list, int i) {
        return list.stream().mapToInt((v0) -> {
            return v0.getIndex();
        }).allMatch(i2 -> {
            return 0 <= i2 && i2 < i;
        });
    }

    private static List<Metadata> getAllThings(WriteResultAndErrors writeResultAndErrors) {
        return (List) writeResultAndErrors.getWriteModels().stream().map((v0) -> {
            return v0.getMetadata();
        }).collect(Collectors.toList());
    }

    private static String logResult(String str, WriteResultAndErrors writeResultAndErrors, boolean z) {
        Optional<Throwable> unexpectedError = writeResultAndErrors.getUnexpectedError();
        if (unexpectedError.isPresent()) {
            Throwable th = unexpectedError.get();
            StringWriter stringWriter = new StringWriter();
            stringWriter.append((CharSequence) String.format("%s: UnexpectedError[stacktrace=", str));
            th.printStackTrace(new PrintWriter(stringWriter));
            return stringWriter.append((CharSequence) "]").toString();
        }
        if (z) {
            BulkWriteResult bulkWriteResult = writeResultAndErrors.getBulkWriteResult();
            return String.format("%s: Success[ack=%b,errors=%d,matched=%d,upserts=%d,inserted=%d,modified=%d,deleted=%d]", str, Boolean.valueOf(bulkWriteResult.wasAcknowledged()), Integer.valueOf(writeResultAndErrors.getBulkWriteErrors().size()), Integer.valueOf(bulkWriteResult.getMatchedCount()), Integer.valueOf(bulkWriteResult.getUpserts().size()), Integer.valueOf(bulkWriteResult.getInsertedCount()), Integer.valueOf(bulkWriteResult.getModifiedCount()), Integer.valueOf(bulkWriteResult.getDeletedCount()));
        }
        BulkWriteResult bulkWriteResult2 = writeResultAndErrors.getBulkWriteResult();
        return String.format("%s: PartialSuccess[ack=%b,errorCount=%d,matched=%d,upserts=%d,inserted=%d,modified=%d,deleted=%d,errors=%s]", str, Boolean.valueOf(bulkWriteResult2.wasAcknowledged()), Integer.valueOf(writeResultAndErrors.getBulkWriteErrors().size()), Integer.valueOf(bulkWriteResult2.getMatchedCount()), Integer.valueOf(bulkWriteResult2.getUpserts().size()), Integer.valueOf(bulkWriteResult2.getInsertedCount()), Integer.valueOf(bulkWriteResult2.getModifiedCount()), Integer.valueOf(bulkWriteResult2.getDeletedCount()), writeResultAndErrors.getBulkWriteErrors());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 411624002:
                if (implMethodName.equals("checkBulkWriteResult")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BulkWriteResultAckFlow") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/WriteResultAndErrors;)Ljava/lang/Iterable;")) {
                    BulkWriteResultAckFlow bulkWriteResultAckFlow = (BulkWriteResultAckFlow) serializedLambda.getCapturedArg(0);
                    return bulkWriteResultAckFlow::checkBulkWriteResult;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
