package org.apache.rocketmq.client.java.impl.consumer;

import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.HeartbeatRequest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import net.javacrumbs.futureconverter.java8guava.FutureConverter;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.apache.rocketmq.shaded.com.google.common.math.IntMath;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.commons.lang3.RandomUtils;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-client-java-5.0.4.jar:org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.class */
class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SimpleConsumerImpl.class);
    private final SimpleSubscriptionSettings simpleSubscriptionSettings;
    private final String consumerGroup;
    private final Duration awaitDuration;
    private final AtomicInteger topicIndex;
    private final Map<String, FilterExpression> subscriptionExpressions;
    private final ConcurrentMap<String, SubscriptionLoadBalancer> subscriptionRouteDataCache;

    public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String str, Duration duration, Map<String, FilterExpression> map) {
        super(clientConfiguration, str, map.keySet());
        this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(this.clientId, this.endpoints, new Resource(str), clientConfiguration.getRequestTimeout(), duration, map);
        this.consumerGroup = str;
        this.awaitDuration = duration;
        this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
        this.subscriptionExpressions = map;
        this.subscriptionRouteDataCache = new ConcurrentHashMap();
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl, org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        try {
            log.info("Begin to start the rocketmq simple consumer, clientId={}", this.clientId);
            super.startUp();
            log.info("The rocketmq simple consumer starts successfully, clientId={}", this.clientId);
        } catch (Throwable th) {
            log.error("Failed to start the rocketmq simple consumer, try to shutdown it, clientId={}", this.clientId, th);
            shutDown();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl, org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws InterruptedException {
        log.info("Begin to shutdown the rocketmq simple consumer, clientId={}", this.clientId);
        super.shutDown();
        log.info("Shutdown the rocketmq simple consumer successfully, clientId={}", this.clientId);
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public SimpleConsumer subscribe(String str, FilterExpression filterExpression) throws ClientException {
        if (!isRunning()) {
            log.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}", state(), this.clientId);
            throw new IllegalStateException("Simple consumer is not running now");
        }
        handleClientFuture(getRouteData(str));
        this.subscriptionExpressions.put(str, filterExpression);
        return this;
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public SimpleConsumer unsubscribe(String str) {
        if (isRunning()) {
            this.subscriptionExpressions.remove(str);
            return this;
        }
        log.error("Unable to remove subscription because simple consumer is not running, state={}, clientId={}", state(), this.clientId);
        throw new IllegalStateException("Simple consumer is not running now");
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public Map<String, FilterExpression> getSubscriptionExpressions() {
        return new HashMap(this.subscriptionExpressions);
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl
    public HeartbeatRequest wrapHeartbeatRequest() {
        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).setClientType(ClientType.SIMPLE_CONSUMER).build();
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public List<MessageView> receive(int i, Duration duration) throws ClientException {
        return (List) handleClientFuture(receive0(i, duration));
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public CompletableFuture<List<MessageView>> receiveAsync(int i, Duration duration) {
        return FutureConverter.toCompletableFuture(receive0(i, duration));
    }

    public ListenableFuture<List<MessageView>> receive0(int i, Duration duration) {
        if (!isRunning()) {
            log.error("Unable to receive message because simple consumer is not running, state={}, clientId={}", state(), this.clientId);
            return Futures.immediateFailedFuture(new IllegalStateException("Simple consumer is not running now"));
        }
        if (i <= 0) {
            return Futures.immediateFailedFuture(new IllegalArgumentException("maxMessageNum must be greater than 0"));
        }
        HashMap hashMap = new HashMap(this.subscriptionExpressions);
        ArrayList arrayList = new ArrayList(hashMap.keySet());
        if (arrayList.isEmpty()) {
            return Futures.immediateFailedFuture(new IllegalArgumentException("There is no topic to receive message"));
        }
        String str = (String) arrayList.get(IntMath.mod(this.topicIndex.getAndIncrement(), arrayList.size()));
        FilterExpression filterExpression = (FilterExpression) hashMap.get(str);
        return Futures.transformAsync(Futures.transformAsync(getSubscriptionLoadBalancer(str), subscriptionLoadBalancer -> {
            MessageQueueImpl takeMessageQueue = subscriptionLoadBalancer.takeMessageQueue();
            return receiveMessage(wrapReceiveMessageRequest(i, takeMessageQueue, filterExpression, duration), takeMessageQueue, this.awaitDuration);
        }, MoreExecutors.directExecutor()), receiveMessageResult -> {
            return Futures.immediateFuture(receiveMessageResult.getMessageViews());
        }, this.clientCallbackExecutor);
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public void ack(MessageView messageView) throws ClientException {
        handleClientFuture(ack0(messageView));
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public CompletableFuture<Void> ackAsync(MessageView messageView) {
        return FutureConverter.toCompletableFuture(ack0(messageView));
    }

    private ListenableFuture<Void> ack0(MessageView messageView) {
        if (!isRunning()) {
            log.error("Unable to ack message because simple consumer is not running, state={}, clientId={}", state(), this.clientId);
            return Futures.immediateFailedFuture(new IllegalStateException("Simple consumer is not running now"));
        }
        if (!(messageView instanceof MessageViewImpl)) {
            return Futures.immediateFailedFuture(new IllegalArgumentException("Failed downcasting for messageView"));
        }
        RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage = ackMessage((MessageViewImpl) messageView);
        return Futures.transformAsync(ackMessage, ackMessageResponse -> {
            StatusChecker.check(ackMessageResponse.getStatus(), ackMessage);
            return Futures.immediateVoidFuture();
        }, this.clientCallbackExecutor);
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public void changeInvisibleDuration(MessageView messageView, Duration duration) throws ClientException {
        handleClientFuture(changeInvisibleDuration0(messageView, duration));
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer
    public CompletableFuture<Void> changeInvisibleDurationAsync(MessageView messageView, Duration duration) {
        return FutureConverter.toCompletableFuture(changeInvisibleDuration0(messageView, duration));
    }

    public ListenableFuture<Void> changeInvisibleDuration0(MessageView messageView, Duration duration) {
        if (!isRunning()) {
            log.error("Unable to change invisible duration because simple consumer is not running, state={}, clientId={}", state(), this.clientId);
            return Futures.immediateFailedFuture(new IllegalStateException("Simple consumer is not running now"));
        }
        if (!(messageView instanceof MessageViewImpl)) {
            return Futures.immediateFailedFuture(new IllegalArgumentException("Failed downcasting for messageView"));
        }
        MessageViewImpl messageViewImpl = (MessageViewImpl) messageView;
        RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse> changeInvisibleDuration = changeInvisibleDuration(messageViewImpl, duration);
        return Futures.transformAsync(changeInvisibleDuration, changeInvisibleDurationResponse -> {
            messageViewImpl.setReceiptHandle(changeInvisibleDurationResponse.getReceiptHandle());
            StatusChecker.check(changeInvisibleDurationResponse.getStatus(), changeInvisibleDuration);
            return Futures.immediateVoidFuture();
        }, MoreExecutors.directExecutor());
    }

    @Override // org.apache.rocketmq.client.apis.consumer.SimpleConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        stopAsync().awaitTerminated();
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl
    public Settings getSettings() {
        return this.simpleSubscriptionSettings;
    }

    private SubscriptionLoadBalancer updateSubscriptionLoadBalancer(String str, TopicRouteData topicRouteData) {
        SubscriptionLoadBalancer subscriptionLoadBalancer = this.subscriptionRouteDataCache.get(str);
        SubscriptionLoadBalancer subscriptionLoadBalancer2 = null == subscriptionLoadBalancer ? new SubscriptionLoadBalancer(topicRouteData) : subscriptionLoadBalancer.update(topicRouteData);
        this.subscriptionRouteDataCache.put(str, subscriptionLoadBalancer2);
        return subscriptionLoadBalancer2;
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl
    public void onTopicRouteDataUpdate0(String str, TopicRouteData topicRouteData) {
        updateSubscriptionLoadBalancer(str, topicRouteData);
    }

    private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(String str) {
        SubscriptionLoadBalancer subscriptionLoadBalancer = this.subscriptionRouteDataCache.get(str);
        return null != subscriptionLoadBalancer ? Futures.immediateFuture(subscriptionLoadBalancer) : Futures.transform(getRouteData(str), topicRouteData -> {
            return updateSubscriptionLoadBalancer(str, topicRouteData);
        }, MoreExecutors.directExecutor());
    }
}
