package org.apache.rocketmq.client.java.impl.consumer;

import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
import apache.rocketmq.v2.VerifyMessageResult;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.apache.rocketmq.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/rocketmq-client-java-5.0.4.jar:org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.class */
public class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PushConsumerImpl.class);
    final AtomicLong consumptionOkQuantity;
    final AtomicLong consumptionErrorQuantity;
    private final ClientConfiguration clientConfiguration;
    private final PushSubscriptionSettings pushSubscriptionSettings;
    private final String consumerGroup;
    private final Map<String, FilterExpression> subscriptionExpressions;
    private final ConcurrentMap<String, Assignments> cacheAssignments;
    private final MessageListener messageListener;
    private final int maxCacheMessageCount;
    private final int maxCacheMessageSizeInBytes;
    private final AtomicLong receptionTimes;
    private final AtomicLong receivedMessagesQuantity;
    private final ThreadPoolExecutor consumptionExecutor;
    private final ConcurrentMap<MessageQueueImpl, ProcessQueue> processQueueTable;
    private ConsumeService consumeService;
    private volatile ScheduledFuture<?> scanAssignmentsFuture;

    public PushConsumerImpl(ClientConfiguration clientConfiguration, String str, Map<String, FilterExpression> map, MessageListener messageListener, int i, int i2, int i3) {
        super(clientConfiguration, str, map.keySet());
        this.clientConfiguration = clientConfiguration;
        this.pushSubscriptionSettings = new PushSubscriptionSettings(this.clientId, this.endpoints, new Resource(str), clientConfiguration.getRequestTimeout(), map);
        this.consumerGroup = str;
        this.subscriptionExpressions = map;
        this.cacheAssignments = new ConcurrentHashMap();
        this.messageListener = messageListener;
        this.maxCacheMessageCount = i;
        this.maxCacheMessageSizeInBytes = i2;
        this.receptionTimes = new AtomicLong(0L);
        this.receivedMessagesQuantity = new AtomicLong(0L);
        this.consumptionOkQuantity = new AtomicLong(0L);
        this.consumptionErrorQuantity = new AtomicLong(0L);
        this.processQueueTable = new ConcurrentHashMap();
        this.consumptionExecutor = new ThreadPoolExecutor(i3, i3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("MessageConsumption", getClientId().getIndex()));
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl, org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        try {
            log.info("Begin to start the rocketmq push consumer, clientId={}", this.clientId);
            this.clientMeterManager.setGaugeObserver(new ProcessQueueGaugeObserver(this.processQueueTable, this.clientId, this.consumerGroup));
            super.startUp();
            ScheduledExecutorService scheduler = getClientManager().getScheduler();
            this.consumeService = createConsumeService();
            this.scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
                try {
                    scanAssignments();
                } catch (Throwable th) {
                    log.error("Exception raised while scanning the load assignments, clientId={}", this.clientId, th);
                }
            }, 1L, 5L, TimeUnit.SECONDS);
            log.info("The rocketmq push consumer starts successfully, clientId={}", this.clientId);
        } catch (Throwable th) {
            log.error("Exception raised while starting the rocketmq push consumer, clientId={}", this.clientId, th);
            shutDown();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl, org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws InterruptedException {
        log.info("Begin to shutdown the rocketmq push consumer, clientId={}", this.clientId);
        if (null != this.scanAssignmentsFuture) {
            this.scanAssignmentsFuture.cancel(false);
        }
        super.shutDown();
        this.consumptionExecutor.shutdown();
        ExecutorServices.awaitTerminated(this.consumptionExecutor);
        log.info("Shutdown the rocketmq push consumer successfully, clientId={}", this.clientId);
    }

    private ConsumeService createConsumeService() {
        ScheduledExecutorService scheduler = getClientManager().getScheduler();
        if (this.pushSubscriptionSettings.isFifo()) {
            log.info("Create FIFO consume service, consumerGroup={}, clientId={}", this.consumerGroup, this.clientId);
            return new FifoConsumeService(this.clientId, this.messageListener, this.consumptionExecutor, this, scheduler);
        }
        log.info("Create standard consume service, consumerGroup={}, clientId={}", this.consumerGroup, this.clientId);
        return new StandardConsumeService(this.clientId, this.messageListener, this.consumptionExecutor, this, scheduler);
    }

    @Override // org.apache.rocketmq.client.apis.consumer.PushConsumer
    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public PushSubscriptionSettings getPushConsumerSettings() {
        return this.pushSubscriptionSettings;
    }

    @Override // org.apache.rocketmq.client.apis.consumer.PushConsumer
    public Map<String, FilterExpression> getSubscriptionExpressions() {
        return new HashMap(this.subscriptionExpressions);
    }

    @Override // org.apache.rocketmq.client.apis.consumer.PushConsumer
    public PushConsumer subscribe(String str, FilterExpression filterExpression) throws ClientException {
        if (!isRunning()) {
            log.error("Unable to add subscription because push consumer is not running, state={}, clientId={}", state(), this.clientId);
            throw new IllegalStateException("Push consumer is not running now");
        }
        handleClientFuture(getRouteData(str));
        this.subscriptionExpressions.put(str, filterExpression);
        return this;
    }

    @Override // org.apache.rocketmq.client.apis.consumer.PushConsumer
    public PushConsumer unsubscribe(String str) {
        if (isRunning()) {
            this.subscriptionExpressions.remove(str);
            return this;
        }
        log.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}", state(), this.clientId);
        throw new IllegalStateException("Push consumer is not running now");
    }

    private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String str) {
        return Futures.transformAsync(getRouteData(str), topicRouteData -> {
            return Futures.immediateFuture(topicRouteData.pickEndpointsToQueryAssignments());
        }, MoreExecutors.directExecutor());
    }

    private QueryAssignmentRequest wrapQueryAssignmentRequest(String str) {
        return QueryAssignmentRequest.newBuilder().setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(str).build()).setEndpoints(this.endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
    }

    ListenableFuture<Assignments> queryAssignment(String str) {
        return Futures.transformAsync(pickEndpointsToQueryAssignments(str), endpoints -> {
            RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> queryAssignment = getClientManager().queryAssignment(endpoints, wrapQueryAssignmentRequest(str), this.clientConfiguration.getRequestTimeout());
            return Futures.transformAsync(queryAssignment, queryAssignmentResponse -> {
                StatusChecker.check(queryAssignmentResponse.getStatus(), queryAssignment);
                return Futures.immediateFuture(new Assignments((List) queryAssignmentResponse.getAssignmentsList().stream().map(assignment -> {
                    return new Assignment(new MessageQueueImpl(assignment.getMessageQueue()));
                }).collect(Collectors.toList())));
            }, MoreExecutors.directExecutor());
        }, MoreExecutors.directExecutor());
    }

    void dropProcessQueue(MessageQueueImpl messageQueueImpl) {
        ProcessQueue remove = this.processQueueTable.remove(messageQueueImpl);
        if (null != remove) {
            remove.drop();
        }
    }

    protected Optional<ProcessQueue> createProcessQueue(MessageQueueImpl messageQueueImpl, FilterExpression filterExpression) {
        ProcessQueueImpl processQueueImpl = new ProcessQueueImpl(this, messageQueueImpl, filterExpression);
        return null != this.processQueueTable.putIfAbsent(messageQueueImpl, processQueueImpl) ? Optional.empty() : Optional.of(processQueueImpl);
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl
    public HeartbeatRequest wrapHeartbeatRequest() {
        return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).setClientType(ClientType.PUSH_CONSUMER).build();
    }

    @VisibleForTesting
    void syncProcessQueue(String str, Assignments assignments, FilterExpression filterExpression) {
        HashSet<MessageQueueImpl> hashSet = new HashSet();
        Iterator<Assignment> it = assignments.getAssignmentList().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getMessageQueue());
        }
        HashSet hashSet2 = new HashSet();
        for (Map.Entry<MessageQueueImpl, ProcessQueue> entry : this.processQueueTable.entrySet()) {
            MessageQueueImpl key = entry.getKey();
            ProcessQueue value = entry.getValue();
            if (str.equals(key.getTopic())) {
                if (!hashSet.contains(key)) {
                    log.info("Drop message queue according to the latest assignmentList, mq={}, clientId={}", key, this.clientId);
                    dropProcessQueue(key);
                } else if (value.expired()) {
                    log.warn("Drop message queue because it is expired, mq={}, clientId={}", key, this.clientId);
                    dropProcessQueue(key);
                } else {
                    hashSet2.add(key);
                }
            }
        }
        for (MessageQueueImpl messageQueueImpl : hashSet) {
            if (!hashSet2.contains(messageQueueImpl)) {
                Optional<ProcessQueue> createProcessQueue = createProcessQueue(messageQueueImpl, filterExpression);
                if (createProcessQueue.isPresent()) {
                    log.info("Start to fetch message from remote, mq={}, clientId={}", messageQueueImpl, this.clientId);
                    createProcessQueue.get().fetchMessageImmediately();
                }
            }
        }
    }

    @VisibleForTesting
    void scanAssignments() {
        try {
            log.debug("Start to scan assignments periodically, clientId={}", this.clientId);
            for (Map.Entry<String, FilterExpression> entry : this.subscriptionExpressions.entrySet()) {
                final String key = entry.getKey();
                final FilterExpression value = entry.getValue();
                final Assignments assignments = this.cacheAssignments.get(key);
                Futures.addCallback(queryAssignment(key), new FutureCallback<Assignments>() { // from class: org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.1
                    @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(Assignments assignments2) {
                        if (assignments2.getAssignmentList().isEmpty()) {
                            if (null == assignments || assignments.getAssignmentList().isEmpty()) {
                                PushConsumerImpl.log.info("Acquired empty assignments from remote, would scan later, topic={}, clientId={}", key, PushConsumerImpl.this.clientId);
                                return;
                            }
                            PushConsumerImpl.log.info("Attention!!! acquired empty assignments from remote, but existed assignments is not empty, topic={}, clientId={}", key, PushConsumerImpl.this.clientId);
                        }
                        if (assignments2.equals(assignments)) {
                            PushConsumerImpl.log.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", key, assignments, PushConsumerImpl.this.clientId);
                            PushConsumerImpl.this.syncProcessQueue(key, assignments2, value);
                        } else {
                            PushConsumerImpl.log.info("Assignments of topic={} has changed, {} => {}, clientId={}", key, assignments, assignments2, PushConsumerImpl.this.clientId);
                            PushConsumerImpl.this.syncProcessQueue(key, assignments2, value);
                            PushConsumerImpl.this.cacheAssignments.put(key, assignments2);
                        }
                    }

                    @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        PushConsumerImpl.log.error("Exception raised while scanning the assignments, topic={}, clientId={}", key, PushConsumerImpl.this.clientId, th);
                    }
                }, MoreExecutors.directExecutor());
            }
        } catch (Throwable th) {
            log.error("Exception raised while scanning the assignments for all topics, clientId={}", this.clientId, th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl
    public Settings getSettings() {
        return this.pushSubscriptionSettings;
    }

    @Override // org.apache.rocketmq.client.apis.consumer.PushConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        stopAsync().awaitTerminated();
    }

    int getQueueSize() {
        return this.processQueueTable.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int cacheMessageBytesThresholdPerQueue() {
        int queueSize = getQueueSize();
        if (queueSize <= 0) {
            return 0;
        }
        return Math.max(1, this.maxCacheMessageSizeInBytes / queueSize);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int cacheMessageCountThresholdPerQueue() {
        int queueSize = getQueueSize();
        if (queueSize <= 0) {
            return 0;
        }
        return Math.max(1, this.maxCacheMessageCount / queueSize);
    }

    public AtomicLong getReceptionTimes() {
        return this.receptionTimes;
    }

    public AtomicLong getReceivedMessagesQuantity() {
        return this.receivedMessagesQuantity;
    }

    public ConsumeService getConsumeService() {
        return this.consumeService;
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl, org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public void onVerifyMessageCommand(final Endpoints endpoints, VerifyMessageCommand verifyMessageCommand) {
        final String nonce = verifyMessageCommand.getNonce();
        MessageViewImpl fromProtobuf = MessageViewImpl.fromProtobuf(verifyMessageCommand.getMessage());
        final MessageId messageId = fromProtobuf.getMessageId();
        Futures.addCallback(this.consumeService.consume(fromProtobuf), new FutureCallback<ConsumeResult>() { // from class: org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.2
            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ConsumeResult consumeResult) {
                TelemetryCommand build = TelemetryCommand.newBuilder().setVerifyMessageResult(VerifyMessageResult.newBuilder().setNonce(nonce).build()).setStatus(Status.newBuilder().setCode(ConsumeResult.SUCCESS.equals(consumeResult) ? Code.OK : Code.FAILED_TO_CONSUME_MESSAGE).build()).build();
                try {
                    PushConsumerImpl.this.telemetry(endpoints, build);
                } catch (Throwable th) {
                    PushConsumerImpl.log.error("Failed to send message verification result command, endpoints={}, command={}, messageId={}, clientId={}", endpoints, build, messageId, PushConsumerImpl.this.clientId, th);
                }
            }

            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                PushConsumerImpl.log.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, clientId={}", endpoints, messageId, PushConsumerImpl.this.clientId, th);
            }
        }, MoreExecutors.directExecutor());
    }

    private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(MessageViewImpl messageViewImpl) {
        return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(messageViewImpl.getTopic()).build()).setReceiptHandle(messageViewImpl.getReceiptHandle()).setMessageId(messageViewImpl.getMessageId().toString()).setDeliveryAttempt(messageViewImpl.getDeliveryAttempt()).setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
    }

    public RpcFuture<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(MessageViewImpl messageViewImpl) {
        final List<GeneralMessage> singletonList = Collections.singletonList(new GeneralMessageImpl(messageViewImpl));
        final MessageInterceptorContextImpl messageInterceptorContextImpl = new MessageInterceptorContextImpl(MessageHookPoints.FORWARD_TO_DLQ);
        doBefore(messageInterceptorContextImpl, singletonList);
        RpcFuture<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue = getClientManager().forwardMessageToDeadLetterQueue(messageViewImpl.getEndpoints(), wrapForwardMessageToDeadLetterQueueRequest(messageViewImpl), this.clientConfiguration.getRequestTimeout());
        Futures.addCallback(forwardMessageToDeadLetterQueue, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() { // from class: org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.3
            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ForwardMessageToDeadLetterQueueResponse forwardMessageToDeadLetterQueueResponse) {
                PushConsumerImpl.this.doAfter(new MessageInterceptorContextImpl(messageInterceptorContextImpl, Code.OK.equals(forwardMessageToDeadLetterQueueResponse.getStatus().getCode()) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR), singletonList);
            }

            @Override // org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                PushConsumerImpl.this.doAfter(new MessageInterceptorContextImpl(messageInterceptorContextImpl, MessageHookPointsStatus.ERROR), singletonList);
            }
        }, MoreExecutors.directExecutor());
        return forwardMessageToDeadLetterQueue;
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientImpl, org.apache.rocketmq.client.java.impl.Client
    public void doStats() {
        log.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, consumptionOkQuantity={}, consumptionErrorQuantity={}", this.clientId, this.consumerGroup, Long.valueOf(this.receptionTimes.getAndSet(0L)), Long.valueOf(this.receivedMessagesQuantity.getAndSet(0L)), Long.valueOf(this.consumptionOkQuantity.getAndSet(0L)), Long.valueOf(this.consumptionErrorQuantity.getAndSet(0L)));
        this.processQueueTable.values().forEach((v0) -> {
            v0.doStats();
        });
    }

    public RetryPolicy getRetryPolicy() {
        return this.pushSubscriptionSettings.getRetryPolicy();
    }

    public ThreadPoolExecutor getConsumptionExecutor() {
        return this.consumptionExecutor;
    }
}
