Building Distributed Concurrency Control in Spring
In this article, we’ll build on the latest annotations introduced by Spring in the latest release, making the implementation ready for distributed applications.
The full code of this implementation is available here.

Foresta Umbra - April 2024
1. Introduction: Spring 7.0’s New Resilience Features
Spring 7.0 introduced new resilience features for common tasks. One of these simplifies concurrency throttling in Spring with the new @ConcurrencyLimit annotation. This allows us to easily restrict the number of concurrent accesses to a specific resource:
@ConcurrencyLimit(10)
public String sendMessage(String message) {
return this.openApiClient.chat(message);
}
In a vanilla Java implementation, the equivalent idea would be using a Semaphore
to control access:
private final Semaphore semaphore = new Semaphore(10);
public String sendMessage(String message) {
try {
semaphore.acquire();
return this.openApiClient.chat(message);
}
//...
}
2. The Problem: Pod-Scoped Concurrency Limits
When scaling horizontally, increasing the number of pods, this limit does not work anymore (with N pods = N x 10 concurrent requests). This can overload the downstream, increase costs, and risk cascading failures if not managed at the system level.

Concurrency Limit on Multiple Instances
3. The Solution: Distributed Concurrency Control
Several architectural solutions are possible to solve this problem.
In this case, we will build on Spring’s @ConcurrencyLimit
, using a distributed semaphore
that stores its internal state in Redis.

Distributed Concurrency Limit
We’ll create a new annotation, @DistributedConcurrencyLimit
,
which enforces a maximum number of accesses to the external resource across multiple
pods or even different applications, without requiring an additional load balancer or
proxy layer in between.
@DistributedConcurrencyLimit(value = 10, identifier = "open-api-chat", timeout = 4000)
public String sendMessage(String message) {
return this.openApiClient.chat(message);
}
4. Implementation Deep Dive
Dependencies & Why Redisson
We use Redisson because it provides ready-made distributed data structures, including RSemaphore, which handles the complexity of coordinating permits across multiple Redis clients.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-aop'
implementation 'org.redisson:redisson-spring-boot-starter:3.51.0'
}
The Annotation
Our @DistributedConcurrencyLimit
annotation defines three key parameters:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedConcurrencyLimit {
int value(); // Max concurrent executions
String identifier(); // Unique semaphore key
long timeout() default 5000; // Wait timeout in ms
}
Configuration
Enable the feature and configure Redis connection:
distributed-concurrency:
enabled: true
spring:
data:
redis:
host: localhost
port: 6379
The AOP Interceptor
The core logic uses Spring AOP to intercept annotated methods:
@Component
@Aspect
@ConditionalOnProperty(name = "distributed-concurrency.enabled")
public class DistributedConcurrencyLimitInterceptor {
private final RedissonClient redisson;
private final Map<String, RSemaphore> semaphores = new ConcurrentHashMap<>();
@Around("@annotation(concurrencyLimit)")
public Object intercept(ProceedingJoinPoint joinPoint,
DistributedConcurrencyLimit concurrencyLimit) throws Throwable {
String identifier = concurrencyLimit.identifier();
RSemaphore semaphore = getOrCreateSemaphore(identifier, concurrencyLimit.value());
boolean acquired = semaphore.tryAcquire(Duration.ofMillis(concurrencyLimit.timeout()));
if (!acquired) {
throw new ConcurrencyLimitExceededException(/*...*/);
}
try {
return joinPoint.proceed();
} finally {
semaphore.release();
}
}
private RSemaphore getOrCreateSemaphore(String identifier, int maxPermits) {
return semaphores.computeIfAbsent(identifier, key -> {
RSemaphore semaphore = redisson.getSemaphore(key);
boolean wasSet = semaphore.trySetPermits(maxPermits);
if (wasSet) {
log.info("Initialized distributed semaphore '{}' with {} permits", key, maxPermits);
} else {
log.debug("Semaphore '{}' was already initialized by another instance", key);
}
return semaphore;
});
}
}
The getOrCreateSemaphore
method implements a two-level caching strategy:
- Local cache: Uses
ConcurrentHashMap.computeIfAbsent()
to cache semaphore instances per JVM - Distributed initialization: Uses
trySetPermits()
which atomically sets permits only if the semaphore doesn’t exist in Redis yet
This ensures thread-safety within each pod and safe initialization across multiple pods without race conditions.
5. Testing Distributed Concurrency Control
Testing distributed behavior requires simulating multiple application instances. We use Testcontainers to spin up a real Redis instance and create separate Spring contexts to mimic different pods.
For a deeper dive into Testcontainers with Spring Boot, check out this article.
Base Test Setup
@SpringBootTest(classes = DistributedConcurrencyApp.class)
@Testcontainers
public abstract class AbstractRedisIntegrationTest {
@Container
static GenericContainer<?> redis = new GenericContainer<>(DockerImageName.parse("redis:7.2-alpine"))
.withExposedPorts(6379)
.withReuse(true);
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.redis.host", redis::getHost);
registry.add("spring.data.redis.port", () -> redis.getMappedPort(6379));
registry.add("distributed-concurrency.enabled", () -> "true");
}
}
Testing Multiple Pods
The key insight is creating separate Spring application contexts to simulate different JVM instances:
@Test
void shouldLimitConcurrencyAcrossMultiplePods() {
final int numberOfPods = 3;
final int requestsPerPod = 2;
// Create separate Spring contexts to simulate different pods
List<ApplicationContext> podContexts = new ArrayList<>();
List<ResourceService> podServices = new ArrayList<>();
for (int i = 0; i < numberOfPods; i++) {
AnnotationConfigApplicationContext podContext = new AnnotationConfigApplicationContext();
podContext.register(DistributedConcurrencyApp.class);
podContext.refresh();
podServices.add(podContext.getBean(ResourceService.class));
podContexts.add(podContext);
}
// Execute concurrent requests from all "pods"
List<CompletableFuture<Void>> futures = new ArrayList<>();
AtomicInteger successfulExecutions = new AtomicInteger(0);
AtomicInteger failedExecutions = new AtomicInteger(0);
for (int podIndex = 0; podIndex < numberOfPods; podIndex++) {
final ResourceService podService = podServices.get(podIndex);
for (int requestIndex = 0; requestIndex < requestsPerPod; requestIndex++) {
futures.add(CompletableFuture.runAsync(() -> {
try {
podService.useLimitedResource("request-" + System.nanoTime());
successfulExecutions.incrementAndGet();
} catch (ConcurrencyLimitExceededException e) {
failedExecutions.incrementAndGet();
}
}));
}
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// Verify that concurrency limits work across all pods
assertTrue(failedExecutions.get() > 0, "Some requests should fail due to distributed limits");
assertTrue(successfulExecutions.get() > 0, "Some requests should succeed");
}
This test validates that the distributed semaphore correctly limits concurrent access across multiple simulated application instances, proving our implementation works in a real distributed scenario.
6. Conclusion & Next Steps
This distributed concurrency control pattern shines when you need fine-grained resource protection across multiple application instances. However, consider alternative patterns when:
- Load balancer limits suffice: If your infrastructure already handles rate limiting effectively
- Single pod scenarios: Where local semaphores are adequate
- Ultra-low latency requirements: The Redis round-trip adds some overhead per operation
As next steps, we could consider:
- Dynamic permit adjustment: Allow runtime modification of semaphore limits
- Metrics integration: Add Micrometer metrics for permit usage and wait times
- Circuit breaker integration: Combine with resilience patterns for robust failure handling
- Rate limiting: Extend the annotation to support time-based limits (requests per second)
The complete implementation with tests and examples is available on GitHub.
As always, thank you for reading!