From f58c083653b157dede3f4a09dcaf6d328045fd77 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 26 Aug 2019 10:56:04 +0300 Subject: [PATCH] refactoring --- .../command/BaseRedisBatchExecutor.java | 93 +++++++ .../redisson/command/CommandBatchService.java | 14 +- .../redisson/command/RedisBatchExecutor.java | 231 +----------------- .../command/RedisQueuedBatchExecutor.java | 222 +++++++++++++++++ 4 files changed, 334 insertions(+), 226 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java create mode 100644 redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java diff --git a/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java b/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java new file mode 100644 index 000000000..fb643ae6b --- /dev/null +++ b/redisson/src/main/java/org/redisson/command/BaseRedisBatchExecutor.java @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.command; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.redisson.api.BatchOptions; +import org.redisson.api.RFuture; +import org.redisson.client.RedisConnection; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.BatchCommandData; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandBatchService.ConnectionEntry; +import org.redisson.command.CommandBatchService.Entry; +import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.NodeSource; +import org.redisson.liveobject.core.RedissonObjectBuilder; +import org.redisson.misc.RPromise; +import org.redisson.pubsub.AsyncSemaphore; + +/** + * + * @author Nikita Koksharov + * + * @param type of value + * @param type of returned value + */ +public class BaseRedisBatchExecutor extends RedisExecutor { + + final ConcurrentMap commands; + final ConcurrentMap connections; + final BatchOptions options; + final AtomicInteger index; + + final AtomicBoolean executed; + final AsyncSemaphore semaphore; + + @SuppressWarnings("ParameterNumber") + public BaseRedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, + Object[] params, RPromise mainPromise, boolean ignoreRedirect, + ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, + ConcurrentMap commands, ConcurrentMap connections, + BatchOptions options, AtomicInteger index, AtomicBoolean executed, AsyncSemaphore semaphore) { + + super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, + objectBuilder); + this.commands = commands; + this.connections = connections; + this.options = options; + this.index = index; + this.executed = executed; + this.semaphore = semaphore; + } + + protected final void addBatchCommandData(Object[] batchParams) { + if (source.getEntry() != null) { + Entry entry = commands.get(source.getEntry()); + if (entry == null) { + entry = new Entry(); + Entry oldEntry = commands.putIfAbsent(source.getEntry(), entry); + if (oldEntry != null) { + entry = oldEntry; + } + } + + if (!readOnlyMode) { + entry.setReadOnlyMode(false); + } + + Codec codecToUse = getCodec(codec); + BatchCommandData commandData = new BatchCommandData(mainPromise, codecToUse, command, batchParams, index.incrementAndGet()); + entry.getCommands().add(commandData); + } + } + +} diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 115adf0c4..b7f546a4a 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -37,6 +37,7 @@ import org.redisson.client.protocol.BatchCommandData; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandBatchService.Entry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; @@ -133,9 +134,16 @@ public class CommandBatchService extends CommandAsyncService { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, boolean ignoreRedirect) { - RedisBatchExecutor executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, - true, connectionManager, objectBuilder, commands, connections, options, index, isRedisBasedQueue(), executed, semaphore); - executor.execute(); + if (isRedisBasedQueue()) { + RedisExecutor executor = new RedisQueuedBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, + true, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore); + executor.execute(); + } else { + RedisExecutor executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, + true, connectionManager, objectBuilder, commands, connections, options, index, executed, semaphore); + executor.execute(); + } + } @Override diff --git a/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java index 9e243d70d..a7626e9db 100644 --- a/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisBatchExecutor.java @@ -15,32 +15,20 @@ */ package org.redisson.command; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.BatchOptions; -import org.redisson.api.BatchOptions.ExecutionMode; -import org.redisson.api.RFuture; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.BatchCommandData; -import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandBatchService.ConnectionEntry; import org.redisson.command.CommandBatchService.Entry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; -import org.redisson.connection.NodeSource.Redirect; import org.redisson.liveobject.core.RedissonObjectBuilder; -import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.AsyncSemaphore; /** @@ -50,224 +38,21 @@ import org.redisson.pubsub.AsyncSemaphore; * @param type of value * @param type of returned value */ -public class RedisBatchExecutor extends RedisExecutor { +public class RedisBatchExecutor extends BaseRedisBatchExecutor { - final ConcurrentMap commands; - final ConcurrentMap connections; - final BatchOptions options; - final AtomicInteger index; - - final boolean isRedisBasedQueue; - final AtomicBoolean executed; - final AsyncSemaphore semaphore; - @SuppressWarnings("ParameterNumber") public RedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, - Object[] params, RPromise mainPromise, boolean ignoreRedirect, - ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, - ConcurrentMap commands, ConcurrentMap connections, - BatchOptions options, AtomicInteger index, boolean isRedisBasedQueue, AtomicBoolean executed, AsyncSemaphore semaphore) { - - super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, - objectBuilder); - this.commands = commands; - this.connections = connections; - this.options = options; - this.index = index; - this.isRedisBasedQueue = isRedisBasedQueue; - this.executed = executed; - this.semaphore = semaphore; + Object[] params, RPromise mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager, + RedissonObjectBuilder objectBuilder, ConcurrentMap commands, + ConcurrentMap connections, BatchOptions options, AtomicInteger index, + AtomicBoolean executed, AsyncSemaphore semaphore) { + super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, + commands, connections, options, index, executed, semaphore); } @Override public void execute() { - if (source.getEntry() != null) { - Entry entry = commands.get(source.getEntry()); - if (entry == null) { - entry = new Entry(); - Entry oldEntry = commands.putIfAbsent(source.getEntry(), entry); - if (oldEntry != null) { - entry = oldEntry; - } - } - - if (!readOnlyMode) { - entry.setReadOnlyMode(false); - } - - Object[] batchParams = null; - if (!isRedisBasedQueue) { - batchParams = params; - } - Codec codecToUse = getCodec(codec); - BatchCommandData commandData = new BatchCommandData(mainPromise, codecToUse, command, batchParams, index.incrementAndGet()); - entry.getCommands().add(commandData); - } - - if (!isRedisBasedQueue) { - return; - } - - if (!readOnlyMode && this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC) { - throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC"); - } - - super.execute(); + addBatchCommandData(params); } - @Override - protected void releaseConnection(RPromise attemptPromise, RFuture connectionFuture) { - if (!isRedisBasedQueue || RedisCommands.EXEC.getName().equals(command.getName())) { - super.releaseConnection(attemptPromise, connectionFuture); - } - } - - @Override - protected void handleSuccess(RPromise promise, RFuture connectionFuture, R res) throws ReflectiveOperationException { - if (RedisCommands.EXEC.getName().equals(command.getName())) { - super.handleSuccess(promise, connectionFuture, res); - return; - } - if (RedisCommands.DISCARD.getName().equals(command.getName())) { - super.handleSuccess(promise, connectionFuture, null); - if (executed.compareAndSet(false, true)) { - connectionFuture.getNow().forceFastReconnectAsync().onComplete((r, e) -> { - RedisBatchExecutor.super.releaseConnection(promise, connectionFuture); - }); - } - return; - } - - if (isRedisBasedQueue) { - BatchPromise batchPromise = (BatchPromise) promise; - RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); - super.handleSuccess(sentPromise, connectionFuture, null); - semaphore.release(); - } - } - - @Override - protected void handleError(RFuture connectionFuture, Throwable cause) { - if (isRedisBasedQueue && mainPromise instanceof BatchPromise) { - BatchPromise batchPromise = (BatchPromise) mainPromise; - RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); - sentPromise.tryFailure(cause); - mainPromise.tryFailure(cause); - if (executed.compareAndSet(false, true)) { - connectionFuture.getNow().forceFastReconnectAsync().onComplete((res, e) -> { - RedisBatchExecutor.super.releaseConnection(mainPromise, connectionFuture); - }); - } - semaphore.release(); - return; - } - - super.handleError(connectionFuture, cause); - } - - @Override - protected void sendCommand(RPromise attemptPromise, RedisConnection connection) { - if (!isRedisBasedQueue) { - super.sendCommand(attemptPromise, connection); - return; - } - - ConnectionEntry connectionEntry = connections.get(source.getEntry()); - - if (source.getRedirect() == Redirect.ASK) { - List> list = new ArrayList>(2); - RPromise promise = new RedissonPromise(); - list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[]{})); - if (connectionEntry.isFirstCommand()) { - list.add(new CommandData(promise, codec, RedisCommands.MULTI, new Object[]{})); - connectionEntry.setFirstCommand(false); - } - list.add(new CommandData(attemptPromise, codec, command, params)); - RPromise main = new RedissonPromise(); - writeFuture = connection.send(new CommandsData(main, list, true)); - } else { - if (log.isDebugEnabled()) { - log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", - command, LogHelper.toString(params), source, connection.getRedisClient().getAddr(), connection); - } - - if (connectionEntry.isFirstCommand()) { - List> list = new ArrayList>(2); - list.add(new CommandData(new RedissonPromise(), codec, RedisCommands.MULTI, new Object[]{})); - list.add(new CommandData(attemptPromise, codec, command, params)); - RPromise main = new RedissonPromise(); - writeFuture = connection.send(new CommandsData(main, list, true)); - connectionEntry.setFirstCommand(false); - } else { - if (RedisCommands.EXEC.getName().equals(command.getName())) { - Entry entry = commands.get(source.getEntry()); - - List> list = new ArrayList<>(); - - if (options.isSkipResult()) { - list.add(new CommandData(new RedissonPromise(), codec, RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" })); - } - - list.add(new CommandData(attemptPromise, codec, command, params)); - - if (options.isSkipResult()) { - list.add(new CommandData(new RedissonPromise(), codec, RedisCommands.CLIENT_REPLY, new Object[]{ "ON" })); - } - if (options.getSyncSlaves() > 0) { - BatchCommandData waitCommand = new BatchCommandData(RedisCommands.WAIT, - new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet()); - list.add(waitCommand); - entry.getCommands().add(waitCommand); - } - - RPromise main = new RedissonPromise(); - writeFuture = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands()), options.isSkipResult(), false, true)); - } else { - RPromise main = new RedissonPromise(); - List> list = new ArrayList<>(); - list.add(new CommandData(attemptPromise, codec, command, params)); - writeFuture = connection.send(new CommandsData(main, list, true)); - } - } - } - } - - @Override - protected RFuture getConnection() { - if (!isRedisBasedQueue) { - return super.getConnection(); - } - - ConnectionEntry entry = connections.get(source.getEntry()); - if (entry == null) { - entry = new ConnectionEntry(); - ConnectionEntry oldEntry = connections.putIfAbsent(source.getEntry(), entry); - if (oldEntry != null) { - entry = oldEntry; - } - } - - - if (entry.getConnectionFuture() != null) { - return entry.getConnectionFuture(); - } - - synchronized (this) { - if (entry.getConnectionFuture() != null) { - return entry.getConnectionFuture(); - } - - RFuture connectionFuture; - if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { - connectionFuture = connectionManager.connectionWriteOp(source, null); - } else { - connectionFuture = connectionManager.connectionReadOp(source, null); - } - connectionFuture.syncUninterruptibly(); - entry.setConnectionFuture(connectionFuture); - return connectionFuture; - } - } - - } diff --git a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java new file mode 100644 index 000000000..01744601f --- /dev/null +++ b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java @@ -0,0 +1,222 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.command; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.redisson.api.BatchOptions; +import org.redisson.api.BatchOptions.ExecutionMode; +import org.redisson.api.RFuture; +import org.redisson.client.RedisConnection; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.BatchCommandData; +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.CommandsData; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandBatchService.ConnectionEntry; +import org.redisson.command.CommandBatchService.Entry; +import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.NodeSource; +import org.redisson.connection.NodeSource.Redirect; +import org.redisson.liveobject.core.RedissonObjectBuilder; +import org.redisson.misc.LogHelper; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.redisson.pubsub.AsyncSemaphore; + +/** + * + * @author Nikita Koksharov + * + * @param type of value + * @param type of returned value + */ +public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor { + + @SuppressWarnings("ParameterNumber") + public RedisQueuedBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, + Object[] params, RPromise mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager, + RedissonObjectBuilder objectBuilder, ConcurrentMap commands, + ConcurrentMap connections, BatchOptions options, AtomicInteger index, + AtomicBoolean executed, AsyncSemaphore semaphore) { + super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, + commands, connections, options, index, executed, semaphore); + } + + @Override + public void execute() { + addBatchCommandData(null); + + if (!readOnlyMode && this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC) { + throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC"); + } + + super.execute(); + } + + + @Override + protected void releaseConnection(RPromise attemptPromise, RFuture connectionFuture) { + if (RedisCommands.EXEC.getName().equals(command.getName())) { + super.releaseConnection(attemptPromise, connectionFuture); + } + } + + @Override + protected void handleSuccess(RPromise promise, RFuture connectionFuture, R res) + throws ReflectiveOperationException { + if (RedisCommands.EXEC.getName().equals(command.getName())) { + super.handleSuccess(promise, connectionFuture, res); + return; + } + if (RedisCommands.DISCARD.getName().equals(command.getName())) { + super.handleSuccess(promise, connectionFuture, null); + if (executed.compareAndSet(false, true)) { + connectionFuture.getNow().forceFastReconnectAsync().onComplete((r, e) -> { + releaseConnection(promise, connectionFuture); + }); + } + return; + } + + BatchPromise batchPromise = (BatchPromise) promise; + RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); + super.handleSuccess(sentPromise, connectionFuture, null); + semaphore.release(); + } + + @Override + protected void handleError(RFuture connectionFuture, Throwable cause) { + if (mainPromise instanceof BatchPromise) { + BatchPromise batchPromise = (BatchPromise) mainPromise; + RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); + sentPromise.tryFailure(cause); + mainPromise.tryFailure(cause); + if (executed.compareAndSet(false, true)) { + connectionFuture.getNow().forceFastReconnectAsync().onComplete((res, e) -> { + RedisQueuedBatchExecutor.super.releaseConnection(mainPromise, connectionFuture); + }); + } + semaphore.release(); + return; + } + + super.handleError(connectionFuture, cause); + } + + @Override + protected void sendCommand(RPromise attemptPromise, RedisConnection connection) { + ConnectionEntry connectionEntry = connections.get(source.getEntry()); + + if (source.getRedirect() == Redirect.ASK) { + List> list = new ArrayList>(2); + RPromise promise = new RedissonPromise(); + list.add(new CommandData(promise, codec, RedisCommands.ASKING, new Object[]{})); + if (connectionEntry.isFirstCommand()) { + list.add(new CommandData(promise, codec, RedisCommands.MULTI, new Object[]{})); + connectionEntry.setFirstCommand(false); + } + list.add(new CommandData(attemptPromise, codec, command, params)); + RPromise main = new RedissonPromise(); + writeFuture = connection.send(new CommandsData(main, list, true)); + } else { + if (log.isDebugEnabled()) { + log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", + command, LogHelper.toString(params), source, connection.getRedisClient().getAddr(), connection); + } + + if (connectionEntry.isFirstCommand()) { + List> list = new ArrayList>(2); + list.add(new CommandData(new RedissonPromise(), codec, RedisCommands.MULTI, new Object[]{})); + list.add(new CommandData(attemptPromise, codec, command, params)); + RPromise main = new RedissonPromise(); + writeFuture = connection.send(new CommandsData(main, list, true)); + connectionEntry.setFirstCommand(false); + } else { + if (RedisCommands.EXEC.getName().equals(command.getName())) { + Entry entry = commands.get(source.getEntry()); + + List> list = new ArrayList<>(); + + if (options.isSkipResult()) { + list.add(new CommandData(new RedissonPromise(), codec, RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" })); + } + + list.add(new CommandData(attemptPromise, codec, command, params)); + + if (options.isSkipResult()) { + list.add(new CommandData(new RedissonPromise(), codec, RedisCommands.CLIENT_REPLY, new Object[]{ "ON" })); + } + if (options.getSyncSlaves() > 0) { + BatchCommandData waitCommand = new BatchCommandData(RedisCommands.WAIT, + new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet()); + list.add(waitCommand); + entry.getCommands().add(waitCommand); + } + + RPromise main = new RedissonPromise(); + writeFuture = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands()), options.isSkipResult(), false, true)); + } else { + RPromise main = new RedissonPromise(); + List> list = new ArrayList<>(); + list.add(new CommandData(attemptPromise, codec, command, params)); + writeFuture = connection.send(new CommandsData(main, list, true)); + } + } + } + } + + @Override + protected RFuture getConnection() { + ConnectionEntry entry = connections.get(source.getEntry()); + if (entry == null) { + entry = new ConnectionEntry(); + ConnectionEntry oldEntry = connections.putIfAbsent(source.getEntry(), entry); + if (oldEntry != null) { + entry = oldEntry; + } + } + + + if (entry.getConnectionFuture() != null) { + return entry.getConnectionFuture(); + } + + synchronized (this) { + if (entry.getConnectionFuture() != null) { + return entry.getConnectionFuture(); + } + + RFuture connectionFuture; + if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { + connectionFuture = connectionManager.connectionWriteOp(source, null); + } else { + connectionFuture = connectionManager.connectionReadOp(source, null); + } + connectionFuture.syncUninterruptibly(); + entry.setConnectionFuture(connectionFuture); + return connectionFuture; + } + } + + +}