0%

实现reactor-netty和lettuce共享EventLoopGroup

reactor-netty和lettuce底层都是使用netty作为网络通信组件。

往往这两个组件会各自创建一个私有的EventLoopGroup,在全响应式的项目中(例如SCG),存在一定的线程资源浪费。

PS:本篇承接自Spring Cloud Gateway性能优化,一起食用,风味更佳。

貌似中文互联网上还没有相关的教程。

看了下reactor-netty和lettuce在spring中的自动配置方式。

实现成本不大,下面展示最简单的一种配置方法。

环境

  • springboot
    • 2.7.18
  • reactor-bom
    • 2023.0.14
  • lettuce
    • 6.3.2.RELEASE

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.event.DefaultEventBus;
import io.lettuce.core.resource.EventLoopGroupProvider;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.apache.commons.collections4.IteratorUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.data.redis.ClientResourcesBuilderCustomizer;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import reactor.core.scheduler.Schedulers;
import reactor.netty.resources.LoopResources;

import java.util.concurrent.TimeUnit;

@Configuration(proxyBeanMethods = false)
public class RedisConfigure implements BeanPostProcessor {

/**
* 与reactor-netty共享同一个EventLoopGroup实例
*
* @see org.springframework.boot.autoconfigure.reactor.netty.ReactorNettyConfigurations
* @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#redisConnectionFactory(org.springframework.beans.factory.ObjectProvider, io.lettuce.core.resource.ClientResources)
*/
@Bean
public ClientResourcesBuilderCustomizer clientResourcesBuilderCustomizer(final ReactorResourceFactory factory) {
return builder -> {
final LoopResources resources = factory.getLoopResources();
// 获取reactor-netty中的EventLoopGroup
final EventLoopGroup executors = resources.onServer(LoopResources.DEFAULT_NATIVE);
final int threads = IteratorUtils.size(executors.iterator());
builder.eventBus(new DefaultEventBus(Schedulers.fromExecutorService(executors, "lettuce-eventbus")));
builder.eventExecutorGroup(executors);
builder.eventLoopGroupProvider(new EventLoopGroupProvider() {

@Override
public <T extends EventLoopGroup> T allocate(final Class<T> type) {
@SuppressWarnings("unchecked") T shared = (T) executors;
return shared;
}

@Override
public int threadPoolSize() {
return threads;
}

@Override
public Promise<Boolean> release(final EventExecutorGroup group, final long period, final long timeout, final TimeUnit unit) {
// no-op,等待 reactor-netty 去关闭 EventExecutorGroup
final DefaultPromise<Boolean> promise = new DefaultPromise<>(executors.next());
promise.setSuccess(Boolean.TRUE);
return promise;
}

@Override
public Future<Boolean> shutdown(final long period, final long timeout, final TimeUnit unit) {
throw new UnsupportedOperationException("The shutdown method is not expected to be called.");
}
});
builder.timer(ScheduleUtil.getHashedWheelTimer());
};
}

/**
* @param objectMapper 统一使用spring托管的ObjectMapper实例
* @see org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration
*/
@Bean
public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(final ReactiveRedisConnectionFactory factory, final ObjectMapper objectMapper) {
final Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
serializer.setObjectMapper(objectMapper);

final RedisSerializationContext<String, Object> context = RedisSerializationContext.<String, Object>newSerializationContext().key(StringRedisSerializer.UTF_8).value(serializer).hashKey(StringRedisSerializer.UTF_8).hashValue(serializer).string(StringRedisSerializer.UTF_8).build();
return new ReactiveRedisTemplate<>(factory, context);
}

@Override
public Object postProcessBeforeInitialization(final Object bean, final String name) throws BeansException {
return bean;
}

/**
* 修改redis客户端配置
*
* @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
*/
@Override
public Object postProcessAfterInitialization(final Object bean, final String name) throws BeansException {
//
if (bean instanceof RedisProperties) {
// final int workers = LoopResources.DEFAULT_IO_WORKER_COUNT;
final RedisProperties properties = (RedisProperties) bean;
properties.setTimeout(Const.Durations.TEN_SECOND);
properties.setConnectTimeout(Const.Durations.TEN_SECOND);
properties.setClientName("lettuce");
properties.setClientType(RedisProperties.ClientType.LETTUCE);
final RedisProperties.Pool pool = properties.getLettuce().getPool();
// ReactiveLettuce不需要连接池,单个TCP连接足够使用
pool.setEnabled(Boolean.FALSE);
// pool.setMaxIdle(workers + 1);
// pool.setMinIdle(workers + 1);
// pool.setMaxActive(workers * 2);
// pool.setMaxWait(Duration.ofSeconds(-1L));
// pool.setTimeBetweenEvictionRuns(Duration.ofSeconds(-1L));
} else if (bean instanceof ReactiveRedisTemplate) {
@SuppressWarnings("unchecked") final ReactiveRedisTemplate<String, Object> template =
(ReactiveRedisTemplate<String, Object>) bean;
template.opsForValue().set("warmup", "v", Const.Durations.TEN_SECOND);
}
return bean;
}
}

注意

不能将

org.springframework.http.client.reactive.ReactorResourceFactory#useGlobalResources

的值设置为false

结尾

以上代码仅供参考,需根据实际情况调整。