Feature - TimeSeries object added. #2742

pull/2791/head
Nikita Koksharov 5 years ago
parent c8e4c50a11
commit 36b15709f6

@ -356,6 +356,20 @@ public class RedissonReactive implements RedissonReactiveClient {
new RedissonListReactive<V>(codec, commandExecutor, name), RDequeReactive.class);
}
@Override
public <V> RTimeSeriesReactive<V> getTimeSeries(String name) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(evictionScheduler, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V>(timeSeries, this), RTimeSeriesReactive.class);
}
@Override
public <V> RTimeSeriesReactive<V> getTimeSeries(String name, Codec codec) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(codec, evictionScheduler, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V>(timeSeries, this), RTimeSeriesReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);

@ -333,6 +333,20 @@ public class RedissonRx implements RedissonRxClient {
new RedissonListRx<V>(queue), RDequeRx.class);
}
@Override
public <V> RTimeSeriesRx<V> getTimeSeries(String name) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(evictionScheduler, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<V>(timeSeries, this), RTimeSeriesRx.class);
}
@Override
public <V> RTimeSeriesRx<V> getTimeSeries(String name, Codec codec) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(codec, evictionScheduler, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<V>(timeSeries, this), RTimeSeriesRx.class);
}
@Override
public <V> RSetCacheRx<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);

@ -0,0 +1,218 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Reactive interface for Redis based time-series collection.
*
* @author Nikita Koksharov
*
*/
public interface RTimeSeriesReactive<V> extends RExpirableReactive {
/**
* Returns iterator over collection elements
*
* @return iterator
*/
Flux<V> iterator();
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @return void
*/
Mono<Void> add(long timestamp, V object);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
*
* @param objects - map of elements to add
* @return void
*/
Mono<Void> addAll(Map<Long, V> objects);
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
Mono<Void> add(long timestamp, V object, long timeToLive, TimeUnit timeUnit);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
*
* @param objects - map of elements to add
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
Mono<Void> addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);
/**
* Returns size of this set.
*
* @return size
*/
Mono<Integer> size();
/**
* Returns object by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp - object timestamp
* @return object
*/
Mono<V> get(long timestamp);
/**
* Removes and returns the head elements or {@code null} if this time-series collection is empty.
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this time-series collection is empty
*/
Mono<Collection<V>> pollFirst(int count);
/**
* Removes and returns the tail elements or {@code null} if this time-series collection is empty.
*
* @param count - elements amount
* @return the tail element or {@code null} if this time-series collection is empty
*/
Mono<Collection<V>> pollLast(int count);
/**
* Removes and returns the head element or {@code null} if this time-series collection is empty.
*
* @return the head element,
* or {@code null} if this time-series collection is empty
*/
Mono<V> pollFirst();
/**
* Removes and returns the tail element or {@code null} if this time-series collection is empty.
*
* @return the tail element or {@code null} if this time-series collection is empty
*/
Mono<V> pollLast();
/**
* Returns the tail element or {@code null} if this time-series collection is empty.
*
* @return the tail element or {@code null} if this time-series collection is empty
*/
Mono<V> last();
/**
* Returns the head element or {@code null} if this time-series collection is empty.
*
* @return the head element or {@code null} if this time-series collection is empty
*/
Mono<V> first();
/**
* Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty.
*
* @return timestamp or {@code null} if this time-series collection is empty
*/
Mono<Long> firstTimestamp();
/**
* Returns timestamp of the tail element or {@code null} if this time-series collection is empty.
*
* @return timestamp or {@code null} if this time-series collection is empty
*/
Mono<Long> lastTimestamp();
/**
* Returns the tail elements of this time-series collection.
*
* @param count - elements amount
* @return the tail elements
*/
Mono<Collection<V>> last(int count);
/**
* Returns the head elements of this time-series collection.
*
* @param count - elements amount
* @return the head elements
*/
Mono<Collection<V>> first(int count);
/**
* Removes values within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return number of removed elements
*/
Mono<Integer> removeRange(long startTimestamp, long endTimestamp);
/**
* Returns ordered elements of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Mono<Collection<V>> range(long startTimestamp, long endTimestamp);
/**
* Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Mono<Collection<V>> rangeReversed(long startTimestamp, long endTimestamp);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Mono<Collection<TimeSeriesEntry<V>>> entryRange(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Mono<Collection<TimeSeriesEntry<V>>> entryRangeReversed(long startTimestamp, long endTimestamp);
}

@ -0,0 +1,220 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Rx interface for Redis based time-series collection.
*
* @author Nikita Koksharov
*
*/
public interface RTimeSeriesRx<V> extends RExpirableRx {
/**
* Returns iterator over collection elements
*
* @return iterator
*/
Flowable<V> iterator();
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @return void
*/
Completable add(long timestamp, V object);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
*
* @param objects - map of elements to add
* @return void
*/
Completable addAll(Map<Long, V> objects);
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
Completable add(long timestamp, V object, long timeToLive, TimeUnit timeUnit);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
*
* @param objects - map of elements to add
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return void
*/
Completable addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);
/**
* Returns size of this set.
*
* @return size
*/
Single<Integer> size();
/**
* Returns object by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp - object timestamp
* @return object
*/
Maybe<V> get(long timestamp);
/**
* Removes and returns the head elements or {@code null} if this time-series collection is empty.
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this time-series collection is empty
*/
Single<Collection<V>> pollFirst(int count);
/**
* Removes and returns the tail elements or {@code null} if this time-series collection is empty.
*
* @param count - elements amount
* @return the tail element or {@code null} if this time-series collection is empty
*/
Single<Collection<V>> pollLast(int count);
/**
* Removes and returns the head element or {@code null} if this time-series collection is empty.
*
* @return the head element,
* or {@code null} if this time-series collection is empty
*/
Maybe<V> pollFirst();
/**
* Removes and returns the tail element or {@code null} if this time-series collection is empty.
*
* @return the tail element or {@code null} if this time-series collection is empty
*/
Maybe<V> pollLast();
/**
* Returns the tail element or {@code null} if this time-series collection is empty.
*
* @return the tail element or {@code null} if this time-series collection is empty
*/
Maybe<V> last();
/**
* Returns the head element or {@code null} if this time-series collection is empty.
*
* @return the head element or {@code null} if this time-series collection is empty
*/
Maybe<V> first();
/**
* Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty.
*
* @return timestamp or {@code null} if this time-series collection is empty
*/
Single<Long> firstTimestamp();
/**
* Returns timestamp of the tail element or {@code null} if this time-series collection is empty.
*
* @return timestamp or {@code null} if this time-series collection is empty
*/
Single<Long> lastTimestamp();
/**
* Returns the tail elements of this time-series collection.
*
* @param count - elements amount
* @return the tail elements
*/
Single<Collection<V>> last(int count);
/**
* Returns the head elements of this time-series collection.
*
* @param count - elements amount
* @return the head elements
*/
Single<Collection<V>> first(int count);
/**
* Removes values within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return number of removed elements
*/
Single<Integer> removeRange(long startTimestamp, long endTimestamp);
/**
* Returns ordered elements of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Single<Collection<V>> range(long startTimestamp, long endTimestamp);
/**
* Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Single<Collection<V>> rangeReversed(long startTimestamp, long endTimestamp);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Single<Collection<TimeSeriesEntry<V>>> entryRange(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp - start timestamp
* @param endTimestamp - end timestamp
* @return elements collection
*/
Single<Collection<TimeSeriesEntry<V>>> entryRangeReversed(long startTimestamp, long endTimestamp);
}

@ -50,7 +50,7 @@ public interface RedissonClient {
* @param <V> type of value
* @param name - name of instance
* @param codec - codec for values
* @return RStream object
* @return RTimeSeries object
*/
<V> RTimeSeries<V> getTimeSeries(String name, Codec codec);

@ -32,6 +32,26 @@ import java.util.List;
*/
public interface RedissonReactiveClient {
/**
* Returns time-series instance by <code>name</code>
*
* @param <V> type of value
* @param name - name of instance
* @return RTimeSeries object
*/
<V> RTimeSeriesReactive<V> getTimeSeries(String name);
/**
* Returns time-series instance by <code>name</code>
* using provided <code>codec</code> for values.
*
* @param <V> type of value
* @param name - name of instance
* @param codec - codec for values
* @return RTimeSeries object
*/
<V> RTimeSeriesReactive<V> getTimeSeries(String name, Codec codec);
/**
* Returns stream instance by <code>name</code>
* <p>

@ -31,6 +31,26 @@ import org.redisson.config.Config;
*/
public interface RedissonRxClient {
/**
* Returns time-series instance by <code>name</code>
*
* @param <V> type of value
* @param name - name of instance
* @return RTimeSeries object
*/
<V> RTimeSeriesRx<V> getTimeSeries(String name);
/**
* Returns time-series instance by <code>name</code>
* using provided <code>codec</code> for values.
*
* @param <V> type of value
* @param name - name of instance
* @param codec - codec for values
* @return RTimeSeries object
*/
<V> RTimeSeriesRx<V> getTimeSeries(String name, Codec codec);
/**
* Returns stream instance by <code>name</code>
* <p>

@ -0,0 +1,52 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonTimeSeries;
import org.redisson.api.RFuture;
import org.redisson.api.RTimeSeries;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
/**
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public class RedissonTimeSeriesReactive<V> {
private final RTimeSeries<V> instance;
private final RedissonReactiveClient redisson;
public RedissonTimeSeriesReactive(RTimeSeries<V> instance, RedissonReactiveClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonTimeSeries) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
}
});
}
}

@ -0,0 +1,51 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import org.reactivestreams.Publisher;
import org.redisson.RedissonTimeSeries;
import org.redisson.api.RFuture;
import org.redisson.api.RTimeSeries;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
/**
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public class RedissonTimeSeriesRx<V> {
private final RTimeSeries<V> instance;
private final RedissonRxClient redisson;
public RedissonTimeSeriesRx(RTimeSeries<V> instance, RedissonRxClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Publisher<V> iterator() {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonTimeSeries) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
}
}.create();
}
}

@ -0,0 +1,34 @@
package org.redisson;
import org.junit.Test;
import org.redisson.api.RTimeSeriesReactive;
import org.redisson.api.RTimeSeriesRx;
import org.redisson.api.TimeSeriesEntry;
import org.redisson.rx.BaseRxTest;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTimeSeriesReactiveTest extends BaseReactiveTest {
@Test
public void testOrder() {
RTimeSeriesReactive<String> t = redisson.getTimeSeries("test");
sync(t.add(4, "40"));
sync(t.add(2, "20"));
sync(t.add(1, "10", 1, TimeUnit.SECONDS));
Collection<TimeSeriesEntry<String>> r11 = sync(t.entryRange(1, 5));
assertThat(r11).containsExactly(new TimeSeriesEntry<>(1,"10"),
new TimeSeriesEntry<>(2, "20"),
new TimeSeriesEntry<>(4, "40"));
}
}

@ -0,0 +1,35 @@
package org.redisson.rx;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.api.RTimeSeries;
import org.redisson.api.RTimeSeriesRx;
import org.redisson.api.TimeSeriesEntry;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTimeSeriesRxTest extends BaseRxTest {
@Test
public void testOrder() {
RTimeSeriesRx<String> t = redisson.getTimeSeries("test");
sync(t.add(4, "40"));
sync(t.add(2, "20"));
sync(t.add(1, "10", 1, TimeUnit.SECONDS));
Collection<TimeSeriesEntry<String>> r11 = sync(t.entryRange(1, 5));
assertThat(r11).containsExactly(new TimeSeriesEntry<>(1,"10"),
new TimeSeriesEntry<>(2, "20"),
new TimeSeriesEntry<>(4, "40"));
}
}
Loading…
Cancel
Save