package com.rabbitmq.client.test.functional;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.test.BrokerTestCase;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/* loaded from: classes.dex */
public abstract class RequeueOnClose extends BrokerTestCase {
    private static final int MESSAGES_TO_CONSUME = 20;
    private static final int MESSAGE_COUNT = 2000;
    private static final String Q = "RequeueOnClose";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public class PartialConsumer extends DefaultConsumer {
        private volatile boolean acknowledge;
        private final boolean cancelBeforeFinish;
        private Channel channel;
        private volatile int count;
        private CountDownLatch latch;

        public PartialConsumer(Channel channel, int i, boolean z, CountDownLatch countDownLatch, boolean z2) {
            super(channel);
            this.count = i;
            this.channel = channel;
            this.latch = countDownLatch;
            this.acknowledge = z;
            this.cancelBeforeFinish = z2;
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            if (this.acknowledge) {
                this.channel.basicAck(envelope.getDeliveryTag(), false);
            }
            int i = this.count - 1;
            this.count = i;
            if (i == 0) {
                if (this.cancelBeforeFinish) {
                    this.channel.basicCancel(getConsumerTag());
                }
                this.acknowledge = false;
                this.latch.countDown();
            }
        }
    }

    private GetResponse getMessage() throws IOException {
        return this.channel.basicGet(Q, false);
    }

    private void injectMessage() throws IOException {
        this.channel.queueDeclare(Q, false, false, false, null);
        this.channel.queueDelete(Q);
        this.channel.queueDeclare(Q, false, false, false, null);
        this.channel.basicPublish("", Q, null, "RequeueOnClose message".getBytes());
    }

    private void publishAndGet(int i, boolean z) throws IOException, InterruptedException {
        openConnection();
        for (int i2 = 0; i2 < i; i2++) {
            open();
            injectMessage();
            GetResponse message = getMessage();
            if (z) {
                this.channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
            close();
            open();
            if (z) {
                assertNull("Expected missing second basicGet (repeat=" + i2 + ")", getMessage());
            } else {
                assertNotNull("Expected present second basicGet (repeat=" + i2 + ")", getMessage());
            }
            close();
        }
        closeConnection();
    }

    private void publishLotsAndConsumeSome(boolean z, boolean z2) throws IOException, InterruptedException, ShutdownSignalException {
        openConnection();
        open();
        this.channel.queueDeclare(Q, false, false, false, null);
        this.channel.queueDelete(Q);
        this.channel.queueDeclare(Q, false, false, false, null);
        for (int i = 0; i < 2000; i++) {
            this.channel.basicPublish("", Q, null, "in flight message".getBytes());
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.channel.basicConsume(Q, new PartialConsumer(this.channel, 20, z, countDownLatch, z2));
        countDownLatch.await();
        close();
        open();
        int i2 = z ? 1980 : 2000;
        for (int i3 = 0; i3 < i2; i3++) {
            assertNotNull("only got " + i3 + " out of " + i2 + " messages", this.channel.basicGet(Q, true));
        }
        int i4 = 0;
        while (this.channel.basicGet(Q, true) != null) {
            i4++;
        }
        assertTrue("got " + i4 + " more messages than " + i2 + " expected", i4 == 0);
        this.channel.queueDelete(Q);
        close();
        closeConnection();
    }

    private void publishLotsAndGet() throws IOException, InterruptedException, ShutdownSignalException {
        openConnection();
        open();
        this.channel.queueDeclare(Q, false, false, false, null);
        this.channel.queueDelete(Q);
        this.channel.queueDeclare(Q, false, false, false, null);
        for (int i = 0; i < 2000; i++) {
            this.channel.basicPublish("", Q, null, "in flight message".getBytes());
        }
        QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
        this.channel.basicConsume(Q, queueingConsumer);
        queueingConsumer.nextDelivery();
        close();
        open();
        for (int i2 = 0; i2 < 2000; i2++) {
            assertNotNull("only got " + i2 + " out of 2000 messages", this.channel.basicGet(Q, true));
        }
        assertNull("got more messages than 2000 expected", this.channel.basicGet(Q, true));
        this.channel.queueDelete(Q);
        close();
        closeConnection();
    }

    protected abstract void close() throws IOException;

    protected abstract void open() throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.client.test.BrokerTestCase
    public void setUp() throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.client.test.BrokerTestCase
    public void tearDown() throws IOException {
    }

    public void testNormal() throws Exception {
        publishAndGet(3, true);
    }

    public void testRequeueInFlight() throws Exception {
        for (int i = 0; i < 5; i++) {
            publishLotsAndGet();
        }
    }

    public void testRequeueInFlightConsumerAck() throws Exception {
        for (int i = 0; i < 5; i++) {
            publishLotsAndConsumeSome(true, true);
        }
    }

    public void testRequeueInFlightConsumerAckNoCancel() throws Exception {
        for (int i = 0; i < 5; i++) {
            publishLotsAndConsumeSome(true, false);
        }
    }

    public void testRequeueInFlightConsumerNoAck() throws Exception {
        for (int i = 0; i < 5; i++) {
            publishLotsAndConsumeSome(false, true);
        }
    }

    public void testRequeueInFlightConsumerNoAckNoCancel() throws Exception {
        for (int i = 0; i < 5; i++) {
            publishLotsAndConsumeSome(false, false);
        }
    }

    public void testRequeueing() throws Exception {
        publishAndGet(3, false);
    }

    public void testRequeueingConsumer() throws Exception {
        openConnection();
        open();
        injectMessage();
        QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
        this.channel.basicConsume(Q, queueingConsumer);
        queueingConsumer.nextDelivery();
        close();
        open();
        assertNotNull(getMessage());
        close();
        closeConnection();
    }
}
