package reactor.ipc.aeron;

import io.aeron.Publication;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:reactor/ipc/aeron/DefaultAeronOutbound.class */
public final class DefaultAeronOutbound implements Disposable, AeronOutbound {
    private final Scheduler scheduler;
    private final String category;
    private final AeronWrapper wrapper;
    private final String channel;
    private final AeronOptions options;
    private volatile WriteSequencer<ByteBuffer> sequencer;
    private volatile DefaultMessagePublication publication;

    public DefaultAeronOutbound(String str, AeronWrapper aeronWrapper, String str2, AeronOptions aeronOptions) {
        this.category = str;
        this.wrapper = aeronWrapper;
        this.channel = str2;
        this.options = aeronOptions;
        this.scheduler = Schedulers.newSingle(str + "-[sender]", false);
    }

    @Override // reactor.ipc.aeron.AeronOutbound
    public AeronOutbound send(Publisher<? extends ByteBuffer> publisher) {
        return then(this.sequencer.add(publisher));
    }

    @Override // reactor.ipc.aeron.AeronOutbound
    public Mono<Void> then() {
        return Mono.empty();
    }

    public void dispose() {
        this.scheduler.dispose();
        if (this.publication != null) {
            this.publication.dispose();
        }
    }

    public Mono<Void> initialise(long j, int i) {
        return Mono.create(monoSink -> {
            Publication addPublication = this.wrapper.addPublication(this.channel, i, "to send data to", j);
            this.publication = new DefaultMessagePublication(addPublication, this.category, this.options.connectTimeoutMillis(), this.options.backpressureTimeoutMillis());
            this.sequencer = new AeronWriteSequencer(this.scheduler, this.category, this.publication, j);
            createRetryTask(monoSink, addPublication, this.options.connectTimeoutMillis()).schedule();
        });
    }

    private RetryTask createRetryTask(MonoSink<Void> monoSink, Publication publication, int i) {
        return new RetryTask(Schedulers.single(), 100L, i, () -> {
            if (!publication.isConnected()) {
                return false;
            }
            monoSink.success();
            return true;
        }, th -> {
            monoSink.error(new Exception(String.format("Publication %s for sending data in not connected during %d millis", this.publication.asString(), Integer.valueOf(i)), th));
        });
    }

    public MessagePublication getPublication() {
        return this.publication;
    }
}
