Fix: pollable consumer doesn't ack the message correctly (#3247)

pull/3305/head
Bowen Li 2 years ago committed by GitHub
parent a2981e8d0a
commit 11909a0916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,6 +16,8 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull; package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
import java.util.Collections;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
@ -79,7 +81,7 @@ public class RocketMQAckCallback implements AcknowledgmentCallback {
switch (status) { switch (status) {
case REJECT: case REJECT:
case ACCEPT: case ACCEPT:
consumer.committed(messageQueue); consumer.commit(Collections.singleton(messageQueue), false);
break; break;
case REQUEUE: case REQUEUE:
consumer.seek(messageQueue, offset); consumer.seek(messageQueue, offset);

Loading…
Cancel
Save