Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 6 years ago
commit d3fcab1238

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Properties;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class PropertiesDecoder implements Decoder<Properties> {
@Override
public Properties decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Properties result = new Properties();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
return result;
}
}

@ -1716,14 +1716,17 @@ public class RedissonConnection extends AbstractRedisConnection {
sync(f);
}
private static final RedisStrictCommand<Properties> INFO_DEFAULT = new RedisStrictCommand<Properties>("INFO", "DEFAULT", new PropertiesDecoder());
private static final RedisStrictCommand<Properties> INFO = new RedisStrictCommand<Properties>("INFO", new PropertiesDecoder());
@Override
public Properties info() {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO_DEFAULT);
}
@Override
public Properties info(String section) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO, section);
}
@Override
@ -1738,17 +1741,17 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public List<String> getConfig(String pattern) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
}
@Override
public void setConfig(String param, String value) {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_SET, param, value);
}
@Override
public void resetConfigStats() {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
}
private static final RedisStrictCommand<Long> TIME = new RedisStrictCommand<Long>("TIME", new TimeLongObjectDecoder());

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Properties;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class PropertiesDecoder implements Decoder<Properties> {
@Override
public Properties decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Properties result = new Properties();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
return result;
}
}

@ -1745,14 +1745,17 @@ public class RedissonConnection extends AbstractRedisConnection {
sync(f);
}
private static final RedisStrictCommand<Properties> INFO_DEFAULT = new RedisStrictCommand<Properties>("INFO", "DEFAULT", new PropertiesDecoder());
private static final RedisStrictCommand<Properties> INFO = new RedisStrictCommand<Properties>("INFO", new PropertiesDecoder());
@Override
public Properties info() {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO_DEFAULT);
}
@Override
public Properties info(String section) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO, section);
}
@Override
@ -1767,17 +1770,17 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public List<String> getConfig(String pattern) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
}
@Override
public void setConfig(String param, String value) {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_SET, param, value);
}
@Override
public void resetConfigStats() {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
}
private static final RedisStrictCommand<Long> TIME = new RedisStrictCommand<Long>("TIME", new TimeLongObjectDecoder());

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Properties;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class PropertiesDecoder implements Decoder<Properties> {
@Override
public Properties decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Properties result = new Properties();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
return result;
}
}

@ -1764,14 +1764,17 @@ public class RedissonConnection extends AbstractRedisConnection {
sync(f);
}
private static final RedisStrictCommand<Properties> INFO_DEFAULT = new RedisStrictCommand<Properties>("INFO", "DEFAULT", new PropertiesDecoder());
private static final RedisStrictCommand<Properties> INFO = new RedisStrictCommand<Properties>("INFO", new PropertiesDecoder());
@Override
public Properties info() {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO_DEFAULT);
}
@Override
public Properties info(String section) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO, section);
}
@Override
@ -1786,17 +1789,17 @@ public class RedissonConnection extends AbstractRedisConnection {
@Override
public List<String> getConfig(String pattern) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
}
@Override
public void setConfig(String param, String value) {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_SET, param, value);
}
@Override
public void resetConfigStats() {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
}
private static final RedisStrictCommand<Long> TIME = new RedisStrictCommand<Long>("TIME", new TimeLongObjectDecoder());

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.spring.data.connection;
import java.util.Properties;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class PropertiesDecoder implements Decoder<Properties> {
@Override
public Properties decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Properties result = new Properties();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
return result;
}
}

@ -66,6 +66,7 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.TimeLongObjectDecoder;
import org.redisson.command.CommandAsyncService;
import org.redisson.command.CommandBatchService;
@ -1778,14 +1779,17 @@ public class RedissonConnection extends AbstractRedisConnection {
sync(f);
}
private static final RedisStrictCommand<Properties> INFO_DEFAULT = new RedisStrictCommand<Properties>("INFO", "DEFAULT", new PropertiesDecoder());
private static final RedisStrictCommand<Properties> INFO = new RedisStrictCommand<Properties>("INFO", new PropertiesDecoder());
@Override
public Properties info() {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO_DEFAULT);
}
@Override
public Properties info(String section) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, INFO, section);
}
@Override
@ -1798,19 +1802,21 @@ public class RedissonConnection extends AbstractRedisConnection {
throw new UnsupportedOperationException();
}
private static final RedisStrictCommand<Properties> CONFIG_GET = new RedisStrictCommand<Properties>("CONFIG", "GET", new PropertiesDecoder());
@Override
public Properties getConfig(String pattern) {
throw new UnsupportedOperationException();
return read(null, StringCodec.INSTANCE, RedisCommands.CONFIG_GET, pattern);
}
@Override
public void setConfig(String param, String value) {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_SET, param, value);
}
@Override
public void resetConfigStats() {
throw new UnsupportedOperationException();
write(null, StringCodec.INSTANCE, RedisCommands.CONFIG_RESETSTAT);
}
private static final RedisStrictCommand<Long> TIME = new RedisStrictCommand<Long>("TIME", new TimeLongObjectDecoder());

@ -55,6 +55,7 @@ import org.redisson.codec.MapCacheEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
@ -620,6 +621,31 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
public void putAll(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
get(putAllAsync(map, ttl, ttlUnit));
}
@Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
if (map.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
RFuture<Void> future = putAllOperationAsync(map, ttl, ttlUnit);
if (hasNoWriter()) {
return future;
}
MapWriterTask<Void> listener = new MapWriterTask<Void>() {
@Override
public void execute() {
options.getWriter().writeAll((Map<K, V>) map);
}
};
return mapWriterFuture(future, listener);
}
@Override
public V addAndGet(K key, Number value) {
return get(addAndGetAsync(key, value));
@ -1773,6 +1799,105 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
params.toArray());
}
private RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
List<Object> params = new ArrayList<Object>(map.size()*2 + 2);
params.add(System.currentTimeMillis());
long ttlTimeout = 0;
if (ttl > 0) {
ttlTimeout = System.currentTimeMillis() + ttlUnit.toMillis(ttl);
}
params.add(ttlTimeout);
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
if (t.getKey() == null) {
throw new NullPointerException("map key can't be null");
}
if (t.getValue() == null) {
throw new NullPointerException("map value can't be null");
}
params.add(encodeMapKey(t.getKey()));
params.add(encodeMapValue(t.getValue()));
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local currentTime = tonumber(table.remove(ARGV, 1)); " + // index is the first parameter
"local ttl = table.remove(ARGV, 1); " + // ttl is the second parameter
"local ttlNumber = tonumber(ttl); " +
"local maxSize = tonumber(redis.call('hget', KEYS[8], 'max-size'));" +
"for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "local key = ARGV[i-1];" +
"local v = redis.call('hget', KEYS[1], key);" +
"local exists = false;" +
"if v ~= false then" +
" local t, val = struct.unpack('dLc0', v);" +
" local expireDate = 92233720368547758;" +
" local expireDateScore = redis.call('zscore', KEYS[2], key);" +
" if expireDateScore ~= false then" +
" expireDate = tonumber(expireDateScore)" +
" end;" +
" if t ~= 0 then" +
" local expireIdle = redis.call('zscore', KEYS[3], key);" +
" if expireIdle ~= false then" +
" expireDate = math.min(expireDate, tonumber(expireIdle))" +
" end;" +
" end;" +
" if expireDate > tonumber(currentTime) then" +
" exists = true;" +
" end;" +
"end;" +
"" +
"if ttlNumber > 0 then " +
" redis.call('zadd', KEYS[2], ttl, key); " +
"else " +
" redis.call('zrem', KEYS[2], key); " +
"end; " +
"" +
"local newvalue = struct.pack('dLc0', 0, string.len(value), value);" +
"redis.call('hset', KEYS[1], key, newvalue);" +
"local lastAccessTimeSetName = KEYS[6];" +
"if exists == false then" +
" if maxSize ~= nil and maxSize ~= 0 then" +
" redis.call('zadd', lastAccessTimeSetName, currentTime, key);" +
" local cacheSize = tonumber(redis.call('hlen', KEYS[1]));" +
" if cacheSize > maxSize then" +
" local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1);" +
" for index, lruItem in ipairs(lruItems) do" +
" if lruItem then" +
" local lruItemValue = redis.call('hget', KEYS[1], lruItem);" +
" redis.call('hdel', KEYS[1], lruItem);" +
" redis.call('zrem', KEYS[2], lruItem);" +
" redis.call('zrem', KEYS[3], lruItem);" +
" redis.call('zrem', lastAccessTimeSetName, lruItem);" +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[7];" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" +
" redis.call('publish', removedChannelName, msg);"
+ "end; " +
" end;" +
" end" +
" end;" +
" end;" +
" local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value);" +
" redis.call('publish', KEYS[4], msg);" +
"else " +
"local t, val = struct.unpack('dLc0', v);" +
"local msg = struct.pack('Lc0Lc0Lc0', string.len(key), key, string.len(value), value, string.len(val), val);" +
"redis.call('publish', KEYS[5], msg);" +
" if maxSize ~= nil and maxSize ~= 0 then " +
" redis.call('zadd', lastAccessTimeSetName, currentTime, key);" +
" end;" +
"end;"
+ "end;"
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getCreatedChannelName(),
getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()),
params.toArray());
}
private Boolean isWindows;
@Override

@ -15,8 +15,10 @@
*/
package org.redisson.api;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.event.MapEntryListener;
/**
@ -232,6 +234,33 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V>, RDest
*/
boolean fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
* <p>
* If {@link MapWriter} is defined then new map entries will be stored in write-through mode.
*
* @param map - mappings to be stored in this map
* @param ttl - time to live for all key\value entries.
* If <code>0</code> then stores infinitely.
* @param ttlUnit - time unit
*/
void putAll(java.util.Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map - mappings to be stored in this map
* @param ttl - time to live for all key\value entries.
* If <code>0</code> then stores infinitely.
* @param ttlUnit - time unit
* @return void
*/
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit);
/**
* Returns the number of entries in cache.
* This number can reflects expired entries too

@ -649,6 +649,38 @@ public class RedissonMapCacheTest extends BaseMapTest {
map.destroy();
}
@Test
public void testPutAllGetTTL() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple06");
Assert.assertNull(map.get(new SimpleKey("33")));
Assert.assertNull(map.get(new SimpleKey("55")));
Map<SimpleKey, SimpleValue> entries = new HashMap<>();
entries.put(new SimpleKey("33"), new SimpleValue("44"));
entries.put(new SimpleKey("55"), new SimpleValue("66"));
map.putAll(entries, 2, TimeUnit.SECONDS);
SimpleValue val1 = map.get(new SimpleKey("33"));
Assert.assertEquals("44", val1.getValue());
SimpleValue val2 = map.get(new SimpleKey("55"));
Assert.assertEquals("66", val2.getValue());
Thread.sleep(1000);
Assert.assertEquals(2, map.size());
SimpleValue val3 = map.get(new SimpleKey("33"));
Assert.assertEquals("44", val3.getValue());
SimpleValue val4 = map.get(new SimpleKey("55"));
Assert.assertEquals("66", val4.getValue());
Assert.assertEquals(2, map.size());
Thread.sleep(1000);
Assert.assertNull(map.get(new SimpleKey("33")));
Assert.assertNull(map.get(new SimpleKey("55")));
map.destroy();
}
@Test
public void testPutIfAbsentTTL() throws Exception {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");

Loading…
Cancel
Save