/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.jetty.io.content;

import java.util.concurrent.Flow;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.thread.AutoLock;

public class ContentSourcePublisher
implements Flow.Publisher<Content.Chunk> {
    private final Content.Source content;

    public ContentSourcePublisher(Content.Source content) {
        this.content = content;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber) {
        subscriber.onSubscribe(new SubscriptionImpl(this.content, subscriber));
    }

    private static class SubscriptionImpl
    implements Flow.Subscription {
        private final AutoLock lock = new AutoLock();
        private final Content.Source content;
        private final Flow.Subscriber<? super Content.Chunk> subscriber;
        private long demand;
        private boolean stalled;
        private boolean cancelled;
        private boolean terminated;

        public SubscriptionImpl(Content.Source content, Flow.Subscriber<? super Content.Chunk> subscriber) {
            this.content = content;
            this.subscriber = subscriber;
            this.stalled = true;
        }

        @Override
        public void request(long n) {
            boolean process = false;
            IllegalArgumentException failure = null;
            try (AutoLock ignored = this.lock.lock();){
                if (this.cancelled || this.terminated) {
                    return;
                }
                if (n <= 0L) {
                    this.terminated = true;
                    failure = new IllegalArgumentException("invalid demand " + n);
                }
                this.demand = MathUtils.cappedAdd((long)this.demand, (long)n);
                if (this.stalled) {
                    this.stalled = false;
                    process = true;
                }
            }
            if (failure != null) {
                this.subscriber.onError(failure);
            } else if (process) {
                this.process();
            }
        }

        @Override
        public void cancel() {
            try (AutoLock ignored = this.lock.lock();){
                this.cancelled = true;
            }
        }

        private void process() {
            Content.Chunk chunk;
            do {
                block16: {
                    try (AutoLock ignored = this.lock.lock();){
                        if (this.demand > 0L) {
                            --this.demand;
                            break block16;
                        }
                        this.stalled = true;
                        return;
                    }
                }
                chunk = this.content.read();
                if (chunk == null) {
                    try (AutoLock ignored = this.lock.lock();){
                        ++this.demand;
                        this.stalled = true;
                    }
                    this.content.demand(this::process);
                    return;
                }
                if (Content.Chunk.isFailure(chunk)) {
                    this.terminate();
                    this.subscriber.onError(chunk.getFailure());
                    return;
                }
                this.subscriber.onNext(chunk);
                chunk.release();
            } while (!chunk.isLast());
            this.terminate();
            this.subscriber.onComplete();
        }

        private void terminate() {
            try (AutoLock ignored = this.lock.lock();){
                this.terminated = true;
            }
        }
    }
}

