1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| import com.fasterxml.jackson.databind.ObjectMapper; import io.lettuce.core.event.DefaultEventBus; import io.lettuce.core.resource.EventLoopGroupProvider; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import org.apache.commons.collections4.IteratorUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.data.redis.ClientResourcesBuilderCustomizer; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.http.client.reactive.ReactorResourceFactory; import reactor.core.scheduler.Schedulers; import reactor.netty.resources.LoopResources;
import java.util.concurrent.TimeUnit;
@Configuration(proxyBeanMethods = false) public class RedisConfigure implements BeanPostProcessor {
@Bean public ClientResourcesBuilderCustomizer clientResourcesBuilderCustomizer(final ReactorResourceFactory factory) { return builder -> { final LoopResources resources = factory.getLoopResources(); final EventLoopGroup executors = resources.onServer(LoopResources.DEFAULT_NATIVE); final int threads = IteratorUtils.size(executors.iterator()); builder.eventBus(new DefaultEventBus(Schedulers.fromExecutorService(executors, "lettuce-eventbus"))); builder.eventExecutorGroup(executors); builder.eventLoopGroupProvider(new EventLoopGroupProvider() {
@Override public <T extends EventLoopGroup> T allocate(final Class<T> type) { @SuppressWarnings("unchecked") T shared = (T) executors; return shared; }
@Override public int threadPoolSize() { return threads; }
@Override public Promise<Boolean> release(final EventExecutorGroup group, final long period, final long timeout, final TimeUnit unit) { final DefaultPromise<Boolean> promise = new DefaultPromise<>(executors.next()); promise.setSuccess(Boolean.TRUE); return promise; }
@Override public Future<Boolean> shutdown(final long period, final long timeout, final TimeUnit unit) { throw new UnsupportedOperationException("The shutdown method is not expected to be called."); } }); builder.timer(ScheduleUtil.getHashedWheelTimer()); }; }
@Bean public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(final ReactiveRedisConnectionFactory factory, final ObjectMapper objectMapper) { final Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); serializer.setObjectMapper(objectMapper);
final RedisSerializationContext<String, Object> context = RedisSerializationContext.<String, Object>newSerializationContext().key(StringRedisSerializer.UTF_8).value(serializer).hashKey(StringRedisSerializer.UTF_8).hashValue(serializer).string(StringRedisSerializer.UTF_8).build(); return new ReactiveRedisTemplate<>(factory, context); }
@Override public Object postProcessBeforeInitialization(final Object bean, final String name) throws BeansException { return bean; }
@Override public Object postProcessAfterInitialization(final Object bean, final String name) throws BeansException { if (bean instanceof RedisProperties) {
final RedisProperties properties = (RedisProperties) bean; properties.setTimeout(Const.Durations.TEN_SECOND); properties.setConnectTimeout(Const.Durations.TEN_SECOND); properties.setClientName("lettuce"); properties.setClientType(RedisProperties.ClientType.LETTUCE); final RedisProperties.Pool pool = properties.getLettuce().getPool(); pool.setEnabled(Boolean.FALSE);
} else if (bean instanceof ReactiveRedisTemplate) { @SuppressWarnings("unchecked") final ReactiveRedisTemplate<String, Object> template = (ReactiveRedisTemplate<String, Object>) bean; template.opsForValue().set("warmup", "v", Const.Durations.TEN_SECOND); } return bean; } }
|