Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,40 @@ public HandleData(MessageReceiptHandle messageReceiptHandle) {

public Long lock(long timeoutMs) {
try {
boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
long currentTimeMs = System.currentTimeMillis();
// Try non-blocking acquire first
if (this.semaphore.tryAcquire()) {
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
}
// Check if lock is already expired before blocking
long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3;
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
synchronized (this) {
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
log.warn("HandleData lock expired, acquire lock success without blocking " +
"and reset lock time. MessageReceiptHandle={}, lockTime={}",
messageReceiptHandle, currentTimeMs);
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
}
}
}
// Try blocking acquire with timeout
boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
currentTimeMs = System.currentTimeMillis();
if (result) {
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
} else {
// if the lock is expired, can be acquired again
long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3;
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
synchronized (this) {
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
log.warn("HandleData lock expired, acquire lock success and reset lock time. " +
"MessageReceiptHandle={}, lockTime={}", messageReceiptHandle, currentTimeMs);
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
}
}
// Recheck expiration after timeout
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
synchronized (this) {
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
log.warn("HandleData lock expired, acquire lock success and reset lock time. "
+ "MessageReceiptHandle={}, lockTime={}", messageReceiptHandle, currentTimeMs);
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -312,4 +314,25 @@ public void testRemoveOne() {
private MessageReceiptHandle createMessageReceiptHandle(String handle, String msgID) {
return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0);
}
}

@Test
public void testAcquireExpiredLock() throws InterruptedException {
final long lockTimeoutMs = 100L;
ConfigurationManager.getProxyConfig().setLockTimeoutMsInHandleGroup(lockTimeoutMs);

String handle = createHandle();
receiptHandleGroup.put(msgID, createMessageReceiptHandle(handle, msgID));

// Simulate a slow task via a never-completing future
CompletableFuture<MessageReceiptHandle> slowFuture = new CompletableFuture<>();
receiptHandleGroup.computeIfPresent(msgID, handle, h -> slowFuture);
// Then make the lock expired
TimeUnit.MILLISECONDS.sleep(3 * lockTimeoutMs);

// Another caller can acquire this expired lock without blocking
long startTime = System.currentTimeMillis();
receiptHandleGroup.remove(msgID, handle);
long elapsed = System.currentTimeMillis() - startTime;
assertTrue(elapsed < lockTimeoutMs);
}
}
Loading