diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java index 005f18025dd..82ff25c534b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java @@ -113,22 +113,40 @@ public HandleData(MessageReceiptHandle messageReceiptHandle) { public Long lock(long timeoutMs) { try { - boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); long currentTimeMs = System.currentTimeMillis(); + // Try non-blocking acquire first + if (this.semaphore.tryAcquire()) { + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } + // Check if lock is already expired before blocking + long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3; + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + synchronized (this) { + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + log.warn("HandleData lock expired, acquire lock success without blocking " + + "and reset lock time. MessageReceiptHandle={}, lockTime={}", + messageReceiptHandle, currentTimeMs); + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; + } + } + } + // Try blocking acquire with timeout + boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); + currentTimeMs = System.currentTimeMillis(); if (result) { this.lastLockTimeMs.set(currentTimeMs); return currentTimeMs; - } else { - // if the lock is expired, can be acquired again - long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3; - if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { - synchronized (this) { - if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { - log.warn("HandleData lock expired, acquire lock success and reset lock time. " + - "MessageReceiptHandle={}, lockTime={}", messageReceiptHandle, currentTimeMs); - this.lastLockTimeMs.set(currentTimeMs); - return currentTimeMs; - } + } + // Recheck expiration after timeout + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + synchronized (this) { + if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) { + log.warn("HandleData lock expired, acquire lock success and reset lock time. " + + "MessageReceiptHandle={}, lockTime={}", messageReceiptHandle, currentTimeMs); + this.lastLockTimeMs.set(currentTimeMs); + return currentTimeMs; } } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java index fdc1c5a3965..9e3edc60dfe 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java @@ -21,12 +21,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.utils.FutureUtils; +import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.InitConfigTest; import org.junit.Before; import org.junit.Test; @@ -312,4 +314,25 @@ public void testRemoveOne() { private MessageReceiptHandle createMessageReceiptHandle(String handle, String msgID) { return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0); } -} \ No newline at end of file + + @Test + public void testAcquireExpiredLock() throws InterruptedException { + final long lockTimeoutMs = 100L; + ConfigurationManager.getProxyConfig().setLockTimeoutMsInHandleGroup(lockTimeoutMs); + + String handle = createHandle(); + receiptHandleGroup.put(msgID, createMessageReceiptHandle(handle, msgID)); + + // Simulate a slow task via a never-completing future + CompletableFuture slowFuture = new CompletableFuture<>(); + receiptHandleGroup.computeIfPresent(msgID, handle, h -> slowFuture); + // Then make the lock expired + TimeUnit.MILLISECONDS.sleep(3 * lockTimeoutMs); + + // Another caller can acquire this expired lock without blocking + long startTime = System.currentTimeMillis(); + receiptHandleGroup.remove(msgID, handle); + long elapsed = System.currentTimeMillis() - startTime; + assertTrue(elapsed < lockTimeoutMs); + } +}