/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.perf;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.perf.AgentBase;
import com.rabbitmq.perf.ConsumerParameters;
import com.rabbitmq.perf.MulticastSet;
import com.rabbitmq.perf.Recovery;
import com.rabbitmq.perf.Stats;
import com.rabbitmq.perf.TimestampProvider;
import com.rabbitmq.perf.TopologyRecording;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer
extends AgentBase
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    private static final AckNackOperation ACK_OPERATION = (ch, envelope, multiple) -> ch.basicAck(envelope.getDeliveryTag(), multiple);
    private static final AckNackOperation NACK_OPERATION = (ch, envelope, multiple) -> ch.basicNack(envelope.getDeliveryTag(), multiple, true);
    private volatile ConsumerImpl q;
    private final Channel channel;
    private final String id;
    private final int txSize;
    private final boolean autoAck;
    private final int multiAckEvery;
    private final Stats stats;
    private final int msgLimit;
    private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap());
    private final ConsumerLatency consumerLatency;
    private final BiFunction<AMQP.BasicProperties, byte[], Long> timestampExtractor;
    private final TimestampProvider timestampProvider;
    private final MulticastSet.CompletionHandler completionHandler;
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicReference<List<String>> queueNames = new AtomicReference();
    private final AtomicLong queueNamesVersion = new AtomicLong(0L);
    private final List<String> initialQueueNames;
    private final ConsumerState state;
    private final Recovery.RecoveryProcess recoveryProcess;
    private final ExecutorService executorService;
    private final boolean polling;
    private final int pollingInterval;
    private final AckNackOperation ackNackOperation;

    public Consumer(ConsumerParameters parameters) {
        this.channel = parameters.getChannel();
        this.id = parameters.getId();
        this.txSize = parameters.getTxSize();
        this.autoAck = parameters.isAutoAck();
        this.multiAckEvery = parameters.getMultiAckEvery();
        this.stats = parameters.getStats();
        this.msgLimit = parameters.getMsgLimit();
        this.timestampProvider = parameters.getTimestampProvider();
        this.completionHandler = parameters.getCompletionHandler();
        this.executorService = parameters.getExecutorService();
        this.polling = parameters.isPolling();
        this.pollingInterval = parameters.getPollingInterval();
        this.queueNames.set(new ArrayList<String>(parameters.getQueueNames()));
        this.initialQueueNames = new ArrayList<String>(parameters.getQueueNames());
        int consumerLatencyInMicroSeconds = parameters.getConsumerLatencyInMicroSeconds();
        this.consumerLatency = consumerLatencyInMicroSeconds <= 0 ? new NoWaitConsumerLatency() : (consumerLatencyInMicroSeconds >= 1000 ? new ThreadSleepConsumerLatency(consumerLatencyInMicroSeconds / 1000) : new BusyWaitConsumerLatency(consumerLatencyInMicroSeconds * 1000));
        this.timestampExtractor = this.timestampProvider.isTimestampInHeader() ? (properties, body) -> {
            Object timestamp = properties.getHeaders().get("timestamp");
            return timestamp == null ? Long.MAX_VALUE : (Long)timestamp;
        } : (properties, body) -> {
            DataInputStream d = new DataInputStream(new ByteArrayInputStream((byte[])body));
            try {
                d.readInt();
                return d.readLong();
            }
            catch (IOException e) {
                throw new RuntimeException("Error while extracting timestamp from body");
            }
        };
        this.ackNackOperation = parameters.isNack() ? NACK_OPERATION : ACK_OPERATION;
        this.state = new ConsumerState(parameters.getRateLimit());
        this.recoveryProcess = parameters.getRecoveryProcess();
        this.recoveryProcess.init(this);
    }

    @Override
    public void run() {
        if (this.polling) {
            this.startBasicGetConsumer();
        } else {
            this.registerAsynchronousConsumer();
        }
    }

    private void startBasicGetConsumer() {
        this.executorService.execute(() -> {
            ConsumerImpl delegate = new ConsumerImpl(this.channel);
            boolean shouldPause = this.pollingInterval > 0;
            long queueNamesVersion = this.queueNamesVersion.get();
            List<String> queues = this.queueNames.get();
            Channel ch = this.channel;
            Connection connection = this.channel.getConnection();
            while (!this.completed.get() && !Thread.interrupted()) {
                if (queueNamesVersion != this.queueNamesVersion.get()) {
                    queues = this.queueNames.get();
                    queueNamesVersion = this.queueNamesVersion.get();
                }
                for (String queue : queues) {
                    if (!this.recoveryProcess.isRecoverying()) {
                        try {
                            GetResponse response = ch.basicGet(queue, this.autoAck);
                            if (response != null) {
                                delegate.handleMessage(response.getEnvelope(), response.getProps(), response.getBody(), ch);
                            }
                        }
                        catch (IOException e) {
                            LOGGER.debug("Basic.get error on queue {}: {}", (Object)queue, (Object)e.getMessage());
                            try {
                                ch = connection.createChannel();
                            }
                            catch (Exception ex) {
                                LOGGER.debug("Error while trying to create a channel: {}", (Object)queue, (Object)e.getMessage());
                            }
                        }
                        catch (AlreadyClosedException e) {
                            LOGGER.debug("Tried to basic.get from a closed connection");
                        }
                        if (!shouldPause) continue;
                        try {
                            Thread.sleep(this.pollingInterval);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }
                    try {
                        LOGGER.debug("Recovery in progress, sleeping for a sec");
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        });
    }

    private void registerAsynchronousConsumer() {
        try {
            this.q = new ConsumerImpl(this.channel);
            for (String qName : this.queueNames.get()) {
                String tag = this.channel.basicConsume(qName, this.autoAck, (com.rabbitmq.client.Consumer)this.q);
                this.consumerTagBranchMap.put(tag, qName);
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (ShutdownSignalException e) {
            throw new RuntimeException(e);
        }
    }

    private void countDown() {
        if (this.completed.compareAndSet(false, true)) {
            this.completionHandler.countDown();
        }
    }

    @Override
    public void recover(TopologyRecording topologyRecording) {
        if (this.polling) {
            ArrayList<String> queues = new ArrayList<String>(this.initialQueueNames.size());
            for (String queue : this.initialQueueNames) {
                queues.add(topologyRecording.queue(queue).name());
            }
            this.queueNames.set(queues);
            this.queueNamesVersion.incrementAndGet();
        } else {
            for (Map.Entry<String, String> entry : this.consumerTagBranchMap.entrySet()) {
                TopologyRecording.RecordedQueue queue = topologyRecording.queue(entry.getValue());
                try {
                    this.channel.basicConsume(queue.name(), this.autoAck, entry.getKey(), (com.rabbitmq.client.Consumer)this.q);
                }
                catch (IOException e) {
                    LOGGER.warn("Error while recovering consumer {} on queue {} on connection {}", new Object[]{entry.getKey(), queue.name(), this.channel.getConnection().getClientProvidedName(), e});
                }
            }
        }
    }

    @FunctionalInterface
    private static interface AckNackOperation {
        public void apply(Channel var1, Envelope var2, boolean var3) throws IOException;
    }

    private static class BusyWaitConsumerLatency
    implements ConsumerLatency {
        private final long delay;

        private BusyWaitConsumerLatency(long delay) {
            this.delay = delay;
        }

        @Override
        public boolean simulateLatency() {
            long start = System.nanoTime();
            while (System.nanoTime() - start < this.delay) {
            }
            return true;
        }
    }

    private static class ThreadSleepConsumerLatency
    implements ConsumerLatency {
        private final int waitTime;

        private ThreadSleepConsumerLatency(int waitTime) {
            this.waitTime = waitTime;
        }

        @Override
        public boolean simulateLatency() {
            try {
                Thread.sleep(this.waitTime);
                return true;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    }

    private static class NoWaitConsumerLatency
    implements ConsumerLatency {
        private NoWaitConsumerLatency() {
        }

        @Override
        public boolean simulateLatency() {
            return true;
        }
    }

    private static interface ConsumerLatency {
        public boolean simulateLatency();
    }

    private static class ConsumerState
    implements AgentBase.AgentState {
        private final float rateLimit;
        private volatile long lastStatsTime;
        private final AtomicInteger msgCount = new AtomicInteger(0);

        protected ConsumerState(float rateLimit) {
            this.rateLimit = rateLimit;
        }

        @Override
        public float getRateLimit() {
            return this.rateLimit;
        }

        @Override
        public long getLastStatsTime() {
            return this.lastStatsTime;
        }

        protected void setLastStatsTime(long lastStatsTime) {
            this.lastStatsTime = lastStatsTime;
        }

        @Override
        public int getMsgCount() {
            return this.msgCount.get();
        }

        protected void setMsgCount(int msgCount) {
            this.msgCount.set(msgCount);
        }

        @Override
        public int incrementMessageCount() {
            return this.msgCount.incrementAndGet();
        }
    }

    private class ConsumerImpl
    extends DefaultConsumer {
        private final boolean rateLimitation;

        private ConsumerImpl(Channel channel) {
            super(channel);
            Consumer.this.state.setLastStatsTime(System.currentTimeMillis());
            Consumer.this.state.setMsgCount(0);
            this.rateLimitation = Consumer.this.state.getRateLimit() > 0.0f;
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            this.handleMessage(envelope, properties, body, Consumer.this.channel);
        }

        void handleMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, Channel ch) throws IOException {
            int currentMessageCount = Consumer.this.state.incrementMessageCount();
            if (Consumer.this.msgLimit == 0 || currentMessageCount <= Consumer.this.msgLimit) {
                long messageTimestamp = (Long)Consumer.this.timestampExtractor.apply(properties, body);
                long nowTimestamp = Consumer.this.timestampProvider.getCurrentTime();
                long diff_time = Consumer.this.timestampProvider.getDifference(nowTimestamp, messageTimestamp);
                Consumer.this.stats.handleRecv(Consumer.this.id.equals(envelope.getRoutingKey()) ? diff_time : 0L);
                if (Consumer.this.consumerLatency.simulateLatency()) {
                    this.ackIfNecessary(envelope, currentMessageCount, ch);
                    this.commitTransactionIfNecessary(currentMessageCount, ch);
                    long now = System.currentTimeMillis();
                    if (this.rateLimitation) {
                        if (now - Consumer.this.state.getLastStatsTime() > 1000L) {
                            Consumer.this.state.setLastStatsTime(now);
                            Consumer.this.state.setMsgCount(0);
                        }
                        Consumer.this.delay(now, Consumer.this.state);
                    }
                }
            }
            if (Consumer.this.msgLimit != 0 && currentMessageCount >= Consumer.this.msgLimit) {
                Consumer.this.countDown();
            }
        }

        private void ackIfNecessary(Envelope envelope, int currentMessageCount, Channel ch) throws IOException {
            if (!Consumer.this.autoAck) {
                Consumer.this.dealWithWriteOperation(() -> {
                    if (Consumer.this.multiAckEvery == 0) {
                        Consumer.this.ackNackOperation.apply(ch, envelope, false);
                    } else if (currentMessageCount % Consumer.this.multiAckEvery == 0) {
                        Consumer.this.ackNackOperation.apply(ch, envelope, true);
                    }
                }, Consumer.this.recoveryProcess);
            }
        }

        private void commitTransactionIfNecessary(int currentMessageCount, Channel ch) throws IOException {
            if (Consumer.this.txSize != 0 && currentMessageCount % Consumer.this.txSize == 0) {
                Consumer.this.dealWithWriteOperation(() -> ch.txCommit(), Consumer.this.recoveryProcess);
            }
        }

        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
            LOGGER.debug("Consumer received shutdown signal, recovery process enabled? {}, condition to trigger connection recovery? {}", (Object)Consumer.this.recoveryProcess.isEnabled(), (Object)Consumer.this.isConnectionRecoveryTriggered(sig));
            if (!Consumer.this.recoveryProcess.isEnabled()) {
                LOGGER.debug("Counting down for consumer");
                Consumer.this.countDown();
            }
        }

        public void handleCancel(String consumerTag) throws IOException {
            System.out.printf("Consumer cancelled by broker for tag: %s", consumerTag);
            if (Consumer.this.consumerTagBranchMap.containsKey(consumerTag)) {
                String qName = (String)Consumer.this.consumerTagBranchMap.get(consumerTag);
                System.out.printf("Re-consuming. Queue: %s for Tag: %s", qName, consumerTag);
                Consumer.this.channel.basicConsume(qName, Consumer.this.autoAck, (com.rabbitmq.client.Consumer)Consumer.this.q);
            } else {
                System.out.printf("Could not find queue for consumer tag: %s", consumerTag);
            }
        }
    }
}

