diff --git a/redisson/src/main/java/org/redisson/connection/DnsAddressResolverGroupFactory.java b/redisson/src/main/java/org/redisson/connection/DnsAddressResolverGroupFactory.java index ffc185abf..5aa9786b6 100644 --- a/redisson/src/main/java/org/redisson/connection/DnsAddressResolverGroupFactory.java +++ b/redisson/src/main/java/org/redisson/connection/DnsAddressResolverGroupFactory.java @@ -15,6 +15,8 @@ */ package org.redisson.connection; +import org.redisson.connection.dns.MultiDnsAddressResolverGroup; + import io.netty.channel.socket.DatagramChannel; import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsServerAddressStreamProvider; @@ -26,11 +28,13 @@ import io.netty.resolver.dns.DnsServerAddressStreamProvider; * */ public class DnsAddressResolverGroupFactory implements AddressResolverGroupFactory { - + @Override public DnsAddressResolverGroup create(Class channelType, DnsServerAddressStreamProvider nameServerProvider) { - return new DnsAddressResolverGroup(channelType, nameServerProvider); + // Workaround for https://github.com/netty/netty/issues/8261 + return new MultiDnsAddressResolverGroup(channelType, nameServerProvider); +// return new DnsAddressResolverGroup(channelType, nameServerProvider); } } diff --git a/redisson/src/main/java/org/redisson/connection/dns/GroupAddressResolver.java b/redisson/src/main/java/org/redisson/connection/dns/GroupAddressResolver.java new file mode 100644 index 000000000..047cb0741 --- /dev/null +++ b/redisson/src/main/java/org/redisson/connection/dns/GroupAddressResolver.java @@ -0,0 +1,124 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection.dns; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.resolver.AddressResolver; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; + +/** + * Workaround for https://github.com/netty/netty/issues/8261 + * + * @author Nikita Koksharov + * + */ +class GroupAddressResolver implements AddressResolver { + + private final List> resolvers; + + public GroupAddressResolver(List> resolvers) { + super(); + this.resolvers = resolvers; + } + + @Override + public boolean isSupported(SocketAddress address) { + for (AddressResolver addressResolver : resolvers) { + if (addressResolver.isSupported(address)) { + return true; + } + } + return false; + } + + @Override + public boolean isResolved(SocketAddress address) { + for (AddressResolver addressResolver : resolvers) { + if (addressResolver.isResolved(address)) { + return true; + } + } + return false; + } + + @Override + public Future resolve(SocketAddress address) { + final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + final AtomicInteger counter = new AtomicInteger(resolvers.size()); + for (AddressResolver addressResolver : resolvers) { + addressResolver.resolve(address).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + promise.trySuccess(future.getNow()); + } + + if (counter.decrementAndGet() == 0) { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + } + } + } + }); + } + return promise; + } + + @Override + public Future resolve(SocketAddress address, Promise promise) { + throw new UnsupportedOperationException(); + } + + @Override + public Future> resolveAll(SocketAddress address) { + final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise(); + final AtomicInteger counter = new AtomicInteger(resolvers.size()); + for (AddressResolver addressResolver : resolvers) { + addressResolver.resolveAll(address).addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (future.isSuccess()) { + promise.trySuccess(future.getNow()); + } + + if (counter.decrementAndGet() == 0) { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + } + } + } + }); + } + return promise; + } + + @Override + public Future> resolveAll(SocketAddress address, Promise> promise) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + } + +} diff --git a/redisson/src/main/java/org/redisson/connection/dns/MultiDnsAddressResolverGroup.java b/redisson/src/main/java/org/redisson/connection/dns/MultiDnsAddressResolverGroup.java new file mode 100644 index 000000000..7f332a852 --- /dev/null +++ b/redisson/src/main/java/org/redisson/connection/dns/MultiDnsAddressResolverGroup.java @@ -0,0 +1,74 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection.dns; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import io.netty.channel.socket.DatagramChannel; +import io.netty.resolver.AddressResolver; +import io.netty.resolver.dns.DnsAddressResolverGroup; +import io.netty.resolver.dns.DnsServerAddressStream; +import io.netty.resolver.dns.DnsServerAddressStreamProvider; +import io.netty.resolver.dns.SingletonDnsServerAddressStreamProvider; +import io.netty.util.concurrent.EventExecutor; + +/** + * Workaround for https://github.com/netty/netty/issues/8261 + * + * @author Nikita Koksharov + * + */ +public class MultiDnsAddressResolverGroup extends DnsAddressResolverGroup { + + List groups = new ArrayList(); + + public MultiDnsAddressResolverGroup( + Class channelType, + DnsServerAddressStreamProvider nameServerProvider) { + super(channelType, nameServerProvider); + + DnsServerAddressStream t = nameServerProvider.nameServerAddressStream(""); + InetSocketAddress firstDNS = t.next(); + while (true) { + InetSocketAddress dns = t.next(); + DnsAddressResolverGroup group = new DnsAddressResolverGroup(channelType, + new SingletonDnsServerAddressStreamProvider(dns)); + groups.add(group); + if (dns == firstDNS) { + break; + } + } + } + + @Override + public AddressResolver getResolver(EventExecutor executor) { + List> resolvers = new ArrayList>(); + for (DnsAddressResolverGroup group : groups) { + resolvers.add(group.getResolver(executor)); + } + return new GroupAddressResolver(resolvers); + } + + @Override + public void close() { + for (DnsAddressResolverGroup group : groups) { + group.close(); + } + } + +}