Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@
<artifactId>rxjava-async-util</artifactId>
<version>0.21.0</version>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
22 changes: 13 additions & 9 deletions src/main/java/com/hazelcast/rxjava/RxHazelcast.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@

package com.hazelcast.rxjava;

import com.hazelcast.cache.ICache;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IAtomicReference;
import com.hazelcast.core.IMap;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.rxjava.impl.RxAtomicLongImpl;
import com.hazelcast.rxjava.impl.RxAtomicReferenceImpl;
import com.hazelcast.rxjava.impl.RxHazelcastInstanceImpl;
import com.hazelcast.rxjava.impl.RxIMapImpl;
import com.hazelcast.rxjava.impl.RxRingbufferImpl;
import com.hazelcast.rxjava.impl.*;

import java.util.concurrent.Executor;

Expand Down Expand Up @@ -63,7 +60,7 @@ public static <K, V> RxIMap<K, V> from(IMap<K, V> map) {
}

/**
* @param map non-rx java object to wrap-around
* @param map non-rx java object to wrap-around
* @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
Expand All @@ -81,7 +78,7 @@ public static <E> RxRingbuffer<E> from(Ringbuffer<E> ringbuffer) {

/**
* @param ringbuffer non-rx java object to wrap-around
* @param executor executor to pass to rx-java for callback execution
* @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
public static <E> RxRingbuffer<E> from(Ringbuffer<E> ringbuffer, Executor executor) {
Expand All @@ -98,7 +95,7 @@ public static RxAtomicLong from(IAtomicLong atomicLong) {

/**
* @param atomicLong non-rx java object to wrap-around
* @param executor executor to pass to rx-java for callback execution
* @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
public static RxAtomicLong from(IAtomicLong atomicLong, Executor executor) {
Expand All @@ -115,11 +112,18 @@ public static <E> RxAtomicReference<E> from(IAtomicReference<E> atomicReference)

/**
* @param atomicReference non-rx java object to wrap-around
* @param executor executor to pass to rx-java for callback execution
* @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
public static <E> RxAtomicReference<E> from(IAtomicReference<E> atomicReference, Executor executor) {
return RxAtomicReferenceImpl.from(atomicReference, executor);
}

public static RxICache<String, String> from(ICache<String, String> cache) {
return RxICacheImpl.from(cache);
}

public static RxICache<String, String> from(ICache<String, String> cache, Executor executor) {
return RxICacheImpl.from(cache, executor);
}
}
30 changes: 30 additions & 0 deletions src/main/java/com/hazelcast/rxjava/RxICache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.hazelcast.rxjava;



import com.hazelcast.cache.ICache;
import rx.Observable;

import javax.cache.expiry.ExpiryPolicy;

/**
* TODO
*
* @author Viktor Gamov on 7/29/16.
* Twitter: @gamussa
* @since 0.0.1
*/
public interface RxICache<K, V> {

Observable<V> get(K key);

Observable<V> getAndPut(K key, V newValue);

Observable<V> getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy);

Observable<Void> put(K key, V value);

Observable<Void> put(K key, V value, ExpiryPolicy expiryPolicy);

ICache<K, V> getDelegate();
}
63 changes: 63 additions & 0 deletions src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.hazelcast.rxjava.impl;

import com.hazelcast.cache.ICache;
import com.hazelcast.rxjava.RxICache;
import rx.Observable;

import javax.cache.expiry.ExpiryPolicy;
import java.util.concurrent.Executor;

/**
* TODO
*
* @author Viktor Gamov on 7/29/16.
* Twitter: @gamussa
* @since 0.0.1
*/
public class RxICacheImpl<K, V> implements RxICache<K, V> {

private final ICache<K, V> cache;
private final Executor executor;

public RxICacheImpl(ICache<K, V> cache) {
this.cache = cache;
this.executor = null;
}

public RxICacheImpl(ICache<K, V> cache, Executor executor) {
this.cache = cache;
this.executor = executor;
}

@Override public Observable<V> get(K key) {
return RxIObservable.from(cache.getAsync(key), executor);
}

@Override public Observable<V> getAndPut(K key, V newValue) {
return RxIObservable.from(cache.getAndPutAsync(key, newValue), executor);
}

@Override public Observable<V> getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy) {
return RxIObservable.from(cache.getAndPutAsync(key, newValue, expiryPolicy), executor);
}

@Override public ICache<K, V> getDelegate() {
return this.cache;
}

@Override public Observable<Void> put(K key, V value) {
return RxIObservable.from(cache.putAsync(key, value), executor);
}

@Override public Observable<Void> put(K key, V value, ExpiryPolicy expiryPolicy) {
return RxIObservable.from(cache.putAsync(key, value, expiryPolicy), executor);
}

public static RxICache<String, String> from(ICache<String, String> cache) {
return new RxICacheImpl<String, String>(cache);
}

public static RxICache<String, String> from(ICache<String, String> cache, Executor executor) {
return new RxICacheImpl<String, String>(cache, executor);
}
}
115 changes: 115 additions & 0 deletions src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.hazelcast.rxjava.impl;

import com.hazelcast.cache.ICache;
import com.hazelcast.cache.impl.HazelcastServerCachingProvider;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.rxjava.RxHazelcast;
import com.hazelcast.rxjava.RxICache;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import rx.observers.TestSubscriber;

import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;

/**
* TODO
*
* @author Viktor Gamov on 7/29/16.
* Twitter: @gamussa
* @since 0.0.1
*/
@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class RxICacheImplTest extends HazelcastTestSupport {

private ICache<String, String> cache;
private RxICache<String, String> rxCache;

@Before
public void before() {

final HazelcastInstance hazelcastInstance = createHazelcastInstance();
final MutableConfiguration<String, String> configuration = new MutableConfiguration<String, String>();
HazelcastServerCachingProvider.createCachingProvider(hazelcastInstance)
.getCacheManager().createCache("RxJava", configuration);

cache = hazelcastInstance.getCacheManager().getCache("RxJava");
cache.put("RxJava", "cool");
rxCache = RxHazelcast.from(cache);
}

@Test
public void fromICache() {
// WHEN
RxICache<String, String> rxICache = RxHazelcast.from(cache);

// THEN
assertEquals(cache, rxICache.getDelegate());
}

@Test
public void get() {
// WHEN
TestSubscriber<String> subscriber = new TestSubscriber<String>();
rxCache.get("RxJava").subscribe(subscriber);

// THEN
RxTestUtils.assertSingleResult("cool", subscriber);
}

@Test
public void put() {
// WHEN
TestSubscriber<Void> subscriber = new TestSubscriber<Void>();
rxCache.put("Reactive", "rocks").subscribe(subscriber);

// THEN
RxTestUtils.assertVoidResult(subscriber);
}

@Test
public void putWithTtl() {
// WHEN
TestSubscriber<Void> subscriber = new TestSubscriber<Void>();
rxCache.put("Reactive", "rocks", new CreatedExpiryPolicy(new Duration(SECONDS, 120)))
.subscribe(subscriber);

// THEN
RxTestUtils.assertSingleResult(null, subscriber);
}

@Test
public void getAndPut() {
//WHEN
final TestSubscriber<String> subscriber = new TestSubscriber<String>();
rxCache.put("Reactive", "rocks").subscribe();
rxCache.getAndPut("Reactive", "rocks!!!").subscribe(subscriber);

// THEN
RxTestUtils.assertSingleResult("rocks", subscriber);
}

@Test
public void getAndPut_wthTtl() {
//WHEN
final TestSubscriber<String> subscriber = new TestSubscriber<String>();
rxCache.put("Reactive", "rocks").subscribe();
rxCache.getAndPut("Reactive", "rocks!!!", new CreatedExpiryPolicy(new Duration(SECONDS, 120))).subscribe
(subscriber);

// THEN
RxTestUtils.assertSingleResult("rocks", subscriber);
}

}