From 11909a09167d14d5c6176f09e13f33d7b8a29e25 Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Tue, 4 Apr 2023 13:52:13 +0800 Subject: [PATCH] Fix: pollable consumer doesn't ack the message correctly (#3247) --- .../integration/inbound/pull/RocketMQAckCallback.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java index 234b68237..cf58dc6b5 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java @@ -16,6 +16,8 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull; +import java.util.Collections; + import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; @@ -79,7 +81,7 @@ public class RocketMQAckCallback implements AcknowledgmentCallback { switch (status) { case REJECT: case ACCEPT: - consumer.committed(messageQueue); + consumer.commit(Collections.singleton(messageQueue), false); break; case REQUEUE: consumer.seek(messageQueue, offset);