/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.feedback;

import java.io.Closeable;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.agents.runtime.feedback.FeedbackChannelBroker;
import org.apache.flink.agents.runtime.feedback.FeedbackConsumer;
import org.apache.flink.agents.runtime.feedback.FeedbackQueue;
import org.apache.flink.agents.runtime.feedback.SubtaskFeedbackKey;
import org.apache.flink.util.IOUtils;

public final class FeedbackChannel<T>
implements Closeable {
    private final SubtaskFeedbackKey<T> key;
    private final FeedbackQueue<T> queue;
    private final AtomicReference<ConsumerTask<T>> consumerRef = new AtomicReference();

    FeedbackChannel(SubtaskFeedbackKey<T> key, FeedbackQueue<T> queue) {
        this.key = Objects.requireNonNull(key);
        this.queue = Objects.requireNonNull(queue);
    }

    public void put(T value) {
        if (!this.queue.addAndCheckIfWasEmpty(value)) {
            return;
        }
        ConsumerTask<T> consumer = this.consumerRef.get();
        if (consumer == null) {
            return;
        }
        consumer.scheduleDrainAll();
    }

    public void registerConsumer(FeedbackConsumer<T> consumer, Executor executor) {
        Objects.requireNonNull(consumer);
        ConsumerTask<T> consumerTask = new ConsumerTask<T>(executor, consumer, this.queue);
        if (!this.consumerRef.compareAndSet(null, consumerTask)) {
            throw new IllegalStateException("There can be only a single consumer in a FeedbackChannel.");
        }
        consumerTask.scheduleDrainAll();
    }

    @Override
    public void close() {
        ConsumerTask consumer = this.consumerRef.getAndSet(null);
        IOUtils.closeQuietly((AutoCloseable)consumer);
        FeedbackChannelBroker broker = FeedbackChannelBroker.get();
        broker.removeChannel(this.key);
    }

    private static final class ConsumerTask<T>
    implements Runnable,
    Closeable {
        private final Executor executor;
        private final FeedbackConsumer<T> consumer;
        private final FeedbackQueue<T> queue;

        ConsumerTask(Executor executor, FeedbackConsumer<T> consumer, FeedbackQueue<T> queue) {
            this.executor = Objects.requireNonNull(executor);
            this.consumer = Objects.requireNonNull(consumer);
            this.queue = Objects.requireNonNull(queue);
        }

        void scheduleDrainAll() {
            this.executor.execute(this);
        }

        @Override
        public void run() {
            Deque<T> buffer = this.queue.drainAll();
            try {
                T element;
                while ((element = buffer.pollFirst()) != null) {
                    this.consumer.processFeedback(element);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void close() {
        }
    }
}

