Fixed - RTopic subscribes only to a single master if __keyspace or __keyevent channel is defined. #4759

pull/5170/head
Nikita Koksharov 2 years ago
parent 3f1fb96e36
commit bd8c79c9a9

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -79,7 +79,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -97,7 +97,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -150,9 +150,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -245,8 +250,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -255,8 +262,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -79,7 +79,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -97,7 +97,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -149,9 +149,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -244,8 +249,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -254,8 +261,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -79,7 +79,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -97,7 +97,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -149,9 +149,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -244,8 +249,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -254,8 +261,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -79,7 +79,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -97,7 +97,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -149,9 +149,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -244,8 +249,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -254,8 +261,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -79,7 +79,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -97,7 +97,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -149,9 +149,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -244,8 +249,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -254,8 +261,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -52,7 +52,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -82,7 +82,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -125,7 +125,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -177,9 +177,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -272,8 +277,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -282,8 +289,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -53,7 +53,7 @@ public class RedissonSubscription extends AbstractSubscription {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
for (byte[] channel : channels) {
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -82,7 +82,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -125,7 +125,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -177,9 +177,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -272,8 +277,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -282,8 +289,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -57,7 +57,7 @@ public class RedissonSubscription extends AbstractSubscription {
continue;
}
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -82,7 +82,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -125,7 +125,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -177,9 +177,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -272,8 +277,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -282,8 +289,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -57,7 +57,7 @@ public class RedissonSubscription extends AbstractSubscription {
continue;
}
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -82,7 +82,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
@ -125,7 +125,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (ByteBuffer channel : channels) {
ChannelName cn = toChannelName(channel);
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, cn, subscriptionListener);
f = f.whenComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(cn, res));
futures.add(f);
}
@ -177,9 +177,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
f = f.whenComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.channels) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.channels.remove(cn);
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
for (PubSubConnectionEntry entry : entries) {
if (!entry.hasListeners(cn)) {
entries.remove(entry);
if (entries.isEmpty()) {
RedissonReactiveSubscription.this.channels.remove(cn);
}
}
}
}
});
@ -272,8 +277,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
};
disposable = () -> {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
@ -282,8 +289,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : channels.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {

@ -57,7 +57,7 @@ public class RedissonSubscription extends AbstractSubscription {
continue;
}
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
CompletableFuture<List<PubSubConnectionEntry>> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) ch).getName(), channel)) {

@ -29,10 +29,6 @@ public class PubSubStatusListener implements RedisPubSubListener<Object> {
private final StatusListener listener;
private final String name;
public String getName() {
return name;
}
public PubSubStatusListener(StatusListener listener, String name) {
super();
this.listener = listener;
@ -86,4 +82,12 @@ public class PubSubStatusListener implements RedisPubSubListener<Object> {
return false;
}
public String getName() {
return name;
}
public StatusListener getListener() {
return listener;
}
}

@ -123,7 +123,7 @@ public class RedissonTopic implements RTopic {
}
protected RFuture<Integer> addListenerAsync(RedisPubSubListener<?> pubSubListener) {
CompletableFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, channelName, pubSubListener);
CompletableFuture<List<PubSubConnectionEntry>> future = subscribeService.subscribe(codec, channelName, pubSubListener);
CompletableFuture<Integer> f = future.thenApply(res -> {
return System.identityHashCode(pubSubListener);
});

@ -17,6 +17,7 @@ package org.redisson.pubsub;
import io.netty.util.Timeout;
import org.redisson.PubSubPatternStatusListener;
import org.redisson.PubSubStatusListener;
import org.redisson.client.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -211,15 +212,46 @@ public class PublishSubscribeService {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, clientEntry, listeners);
}
public CompletableFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
public CompletableFuture<List<PubSubConnectionEntry>> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
if (isMultiEntity(channelName)) {
Collection<MasterSlaveEntry> entrySet = connectionManager.getEntrySet();
AtomicInteger statusCounter = new AtomicInteger(entrySet.size());
RedisPubSubListener[] ls = Arrays.stream(listeners).map(l -> {
if (l instanceof PubSubStatusListener) {
return new PubSubStatusListener(((PubSubStatusListener) l).getListener(), ((PubSubStatusListener) l).getName()) {
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (statusCounter.decrementAndGet() == 0) {
return super.onStatus(type, channel);
}
return true;
}
};
}
return l;
}).toArray(RedisPubSubListener[]::new);
List<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<>();
for (MasterSlaveEntry entry : entrySet) {
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, ls);
futures.add(future);
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return future.thenApply(r -> {
return futures.stream().map(v -> v.getNow(null)).collect(Collectors.toList());
});
}
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
CompletableFuture<List<PubSubConnectionEntry>> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
}
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
return f.thenApply(res -> Collections.singletonList(res));
}
public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
@ -731,8 +763,21 @@ public class PublishSubscribeService {
private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners,
Codec subscribeCodec) {
CompletableFuture<PubSubConnectionEntry> subscribeFuture =
subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[0]));
MasterSlaveEntry entry = getEntry(channelName);
if (isMultiEntity(channelName)) {
entry = connectionManager.getEntrySet()
.stream()
.filter(e -> !name2PubSubConnection.containsKey(new PubSubKey(channelName, e)))
.findFirst()
.orElse(null);
}
CompletableFuture<PubSubConnectionEntry> subscribeFuture;
if (entry != null) {
subscribeFuture = subscribe(PubSubType.SUBSCRIBE, subscribeCodec, channelName, entry, null, listeners.toArray(new RedisPubSubListener[0]));
} else {
subscribeFuture = subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[0])).thenApply(r -> r.iterator().next());
}
subscribeFuture.whenComplete((res, e) -> {
if (e != null) {
connectionManager.getServiceManager().newTimeout(task -> {

@ -94,7 +94,85 @@ public class RedissonTopicTest {
}
}
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(3000);
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
AtomicInteger subscribedCounter = new AtomicInteger();
AtomicInteger unsubscribedCounter = new AtomicInteger();
RTopic topic = redisson.getTopic("__keyevent@0__:del", StringCodec.INSTANCE);
int id1 = topic.addListener(new StatusListener() {
@Override
public void onSubscribe(String channel) {
subscribedCounter.incrementAndGet();
}
@Override
public void onUnsubscribe(String channel) {
unsubscribedCounter.incrementAndGet();
}
});
AtomicInteger counter = new AtomicInteger();
MessageListener<String> listener = (channel, msg) -> {
System.out.println("mes " + channel + " counter " + counter.get());
counter.incrementAndGet();
};
int id2 = topic.addListener(String.class, listener);
for (int i = 0; i < 10; i++) {
redisson.getBucket("" + i).set(i);
redisson.getBucket("" + i).delete();
Thread.sleep(7);
}
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() > 9);
assertThat(subscribedCounter.get()).isEqualTo(1);
assertThat(unsubscribedCounter.get()).isZero();
topic.removeListener(id1, id2);
Thread.sleep(1000);
assertThat(unsubscribedCounter.get()).isEqualTo(1);
redisson.shutdown();
process.shutdown();
}
@Test
public void testCountSubscribers() {
RedissonClient redisson = BaseTest.createInstance();

Loading…
Cancel
Save