Skip to content

Commit 33dd489

Browse files
YoHanKiggivo
andauthored
Add binary stream support for XREAD and XREADGROUP (#3566) (#4152)
* Add binary stream support for XREAD and XREADGROUP (#3566) - Created StreamEntryBinary class to support binary data with Map<byte[], byte[]> - Added xreadBinary, xreadBinaryAsMap, xreadGroupBinary, and xreadGroupBinaryAsMap methods to StreamBinaryCommands - Implemented binary stream builders in BuilderFactory - Added implementation in Jedis and UnifiedJedis classes - Created BinaryStreamEntryTest to verify binary stream functionality * Add parameterized binary stream tests for RESP2/RESP3 in Jedis, Cluster, and Pooled clients TEST (#3566) - Introduce `BinaryStreamEntryTest` and `BinaryStreamsCommandsTestBase` following the unified test pattern - Add `ClusterBinaryStreamsCommandsTest` and `PooledBinaryStreamsCommandsTest` to cover JedisCluster and JedisPooled - Parameterize tests to run under both RESP2 and RESP3 protocols - Format BuilderFactory to conform to project formatter rules * Switch to StreamEntryID in binary stream tests and add illegal UTF-8 sequence test (#3566) - Refactor StreamsBinaryCommandsTest: replace Map.Entry<byte[], byte[]> with Map.Entry<byte[], StreamEntryID> for xreadBinary, xreadBinaryAsMap, xreadGroupBinary and related methods - Update tests to assert on StreamEntryBinary IDs and byte-array payloads accordingly - Add a new test case covering an illegal UTF-8 sequence (0xc3 0x28) to validate correct binary handling (issue #3566) * Add Map<byte[], StreamEntryID> support and binary stream tests (#3566) - Introduce a Map<byte[], StreamEntryID> parameter for binary stream commands - Duplicate all existing test cases for xread, xreadAsMap, xreadGroup and xreadGroupAsMap from StreamsCommandsTest.java and replace them with xreadBinary, xreadBinaryAsMap, xreadGroupBinary and xreadGroupBinaryAsMap in StreamsBinaryCommandsTest * Modify Map<byte[], StreamEntryID> support and binary stream tests (#3566) - Duplicate all existing test cases for xread, xreadAsMap, xreadGroup and xreadGroupAsMap from StreamsCommandsTest.java and replace them with xreadBinary, xreadBinaryAsMap, xreadGroupBinary and xreadGroupBinaryAsMap in StreamsBinaryCommandsTest * Add JedisByteMap implementation (#3566) - Implemented a new JedisByteMap<T> class similar to the existing JedisByteHashMap to properly handle byte array keys in Map<byte[], T> operations - This implementation ensures that `get(byte[])` operations work correctly by using proper byte array equality comparison instead of reference comparison - Fixes issues when using byte arrays as keys in stream-related operations * Add JedisByteMapTest (#3566) - Added tests for JedisByteMap similar to existing JedisByteHashMap tests * Changed Map<byte[], byte[]> in STREAM_ENTRY_BINARY_LIST to JedisByteHashMap (#3566) - Changed Map<byte[], byte[]> in STREAM_ENTRY_BINARY_LIST to JedisByteHashMap. * Remove varargs alternatives There is already discrepancy in API between `xread` binary variant & `xread` string one. Binary one accepts varargs for requested streams, while the string variant expects them to be provided as `Map`. Going forward xread binary methods should use Map. This commit also marks existing `List<Object> xread` one as deprecated in favor of newly introduced methods for type safety and better stream entry parsing. * Add xreadBinary and xreadGroupBinary methods to PipelineBase * Deprecate List<Object> xreadXXX in favor of newly introduced typed methods * Clean up * Java docs * STREAM_ENTRY_BINARY not used * Fix BinaryStreamEntryTest test * Add BinaryStreamsPipeline tests Added tests to cover Pipeline command executions. UnifiedJedis BinaryStreamsCommand test clean up and attempt to simplify focused on testing xreadBinary and xreadGroupBinary methods . * Test cleanup & formating * Merged & refactored a bit Jedis BinaryStreamEntryTest.java & StreamsBinaryCommandsTest to match UnifiedJedis StreamsBinaryCommandsTestBase. --------- Co-authored-by: ggivo <[email protected]>
1 parent 3605bef commit 33dd489

17 files changed

+1603
-11
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@
88
import redis.clients.jedis.resps.*;
99
import redis.clients.jedis.resps.LCSMatchResult.MatchedPosition;
1010
import redis.clients.jedis.resps.LCSMatchResult.Position;
11-
import redis.clients.jedis.util.DoublePrecision;
12-
import redis.clients.jedis.util.JedisByteHashMap;
13-
import redis.clients.jedis.util.KeyValue;
14-
import redis.clients.jedis.util.SafeEncoder;
11+
import redis.clients.jedis.util.*;
1512

1613
public final class BuilderFactory {
1714

@@ -1809,6 +1806,112 @@ public String toString() {
18091806
}
18101807
};
18111808

1809+
public static final Builder<List<StreamEntryBinary>> STREAM_ENTRY_BINARY_LIST = new Builder<List<StreamEntryBinary>>() {
1810+
@Override
1811+
@SuppressWarnings("unchecked")
1812+
public List<StreamEntryBinary> build(Object data) {
1813+
if (null == data) {
1814+
return null;
1815+
}
1816+
List<ArrayList<Object>> objectList = (List<ArrayList<Object>>) data;
1817+
1818+
List<StreamEntryBinary> responses = new ArrayList<>(objectList.size() / 2);
1819+
if (objectList.isEmpty()) {
1820+
return responses;
1821+
}
1822+
1823+
for (ArrayList<Object> res : objectList) {
1824+
if (res == null) {
1825+
responses.add(null);
1826+
continue;
1827+
}
1828+
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
1829+
StreamEntryID entryID = new StreamEntryID(entryIdString);
1830+
List<byte[]> hash = (List<byte[]>) res.get(1);
1831+
if (hash == null) {
1832+
responses.add(new StreamEntryBinary(entryID, null));
1833+
continue;
1834+
}
1835+
1836+
Iterator<byte[]> hashIterator = hash.iterator();
1837+
Map<byte[], byte[]> map = new JedisByteHashMap();
1838+
while (hashIterator.hasNext()) {
1839+
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
1840+
}
1841+
responses.add(new StreamEntryBinary(entryID, map));
1842+
}
1843+
1844+
return responses;
1845+
}
1846+
1847+
@Override
1848+
public String toString() {
1849+
return "List<StreamEntryBinary>";
1850+
}
1851+
};
1852+
1853+
public static final Builder<Map<byte[], List<StreamEntryBinary>>> STREAM_READ_BINARY_MAP_RESPONSE
1854+
= new Builder<Map<byte[], List<StreamEntryBinary>>>() {
1855+
@Override
1856+
@SuppressWarnings("unchecked")
1857+
public Map<byte[], List<StreamEntryBinary>> build(Object data) {
1858+
if (data == null) return null;
1859+
List list = (List) data;
1860+
if (list.isEmpty()) return Collections.emptyMap();
1861+
1862+
JedisByteMap<List<StreamEntryBinary>> result = new JedisByteMap<>();
1863+
if (list.get(0) instanceof KeyValue) {
1864+
((List<KeyValue>) list).forEach(kv -> result.put(BINARY.build(kv.getKey()), STREAM_ENTRY_BINARY_LIST.build(kv.getValue())));
1865+
return result;
1866+
} else {
1867+
for (Object anObj : list) {
1868+
List<Object> streamObj = (List<Object>) anObj;
1869+
byte[] streamKey = (byte[]) streamObj.get(0);
1870+
List<StreamEntryBinary> streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1));
1871+
result.put(streamKey, streamEntries);
1872+
}
1873+
return result;
1874+
}
1875+
}
1876+
1877+
@Override
1878+
public String toString() {
1879+
return "Map<byte[], List<StreamEntryBinary>>";
1880+
}
1881+
};
1882+
1883+
public static final Builder<List<Map.Entry<byte[], List<StreamEntryBinary>>>> STREAM_READ_BINARY_RESPONSE
1884+
= new Builder<List<Map.Entry<byte[], List<StreamEntryBinary>>>>() {
1885+
@Override
1886+
@SuppressWarnings("unchecked")
1887+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> build(Object data) {
1888+
if (data == null) return null;
1889+
List list = (List) data;
1890+
if (list.isEmpty()) return Collections.emptyList();
1891+
1892+
if (list.get(0) instanceof KeyValue) {
1893+
return ((List<KeyValue>) list).stream()
1894+
.map(kv -> new KeyValue<>(BINARY.build(kv.getKey()),
1895+
STREAM_ENTRY_BINARY_LIST.build(kv.getValue())))
1896+
.collect(Collectors.toList());
1897+
} else {
1898+
List<Map.Entry<byte[], List<StreamEntryBinary>>> result = new ArrayList<>(list.size());
1899+
for (Object anObj : list) {
1900+
List<Object> streamObj = (List<Object>) anObj;
1901+
byte[] streamKey = BINARY.build(streamObj.get(0));
1902+
List<StreamEntryBinary> streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1));
1903+
result.add(KeyValue.of(streamKey, streamEntries));
1904+
}
1905+
return result;
1906+
}
1907+
}
1908+
1909+
@Override
1910+
public String toString() {
1911+
return "List<Entry<byte[], List<StreamEntryBinary>>>";
1912+
}
1913+
};
1914+
18121915
private static final List<Builder> BACKUP_BUILDERS_FOR_DECODING_FUNCTIONS
18131916
= Arrays.asList(STRING, LONG, DOUBLE);
18141917

src/main/java/redis/clients/jedis/CommandObjects.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2881,6 +2881,11 @@ public final CommandObject<Map<String, List<StreamEntry>>> xreadGroupAsMap(
28812881
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
28822882
}
28832883

2884+
/**
2885+
* @deprecated As of Jedis 6.1.0, replaced by {@link #xreadBinary(XReadParams, Map)} or
2886+
* {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry parsing.
2887+
*/
2888+
@Deprecated
28842889
public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
28852890
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
28862891
for (Map.Entry<byte[], byte[]> entry : streams) {
@@ -2892,6 +2897,35 @@ public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entr
28922897
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
28932898
}
28942899

2900+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(
2901+
XReadParams xReadParams, Map.Entry<byte[], StreamEntryID>... streams) {
2902+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2903+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2904+
args.key(entry.getKey());
2905+
}
2906+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2907+
args.add(entry.getValue());
2908+
}
2909+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2910+
}
2911+
2912+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(
2913+
XReadParams xReadParams, Map.Entry<byte[], StreamEntryID>... streams) {
2914+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2915+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2916+
args.key(entry.getKey());
2917+
}
2918+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2919+
args.add(entry.getValue());
2920+
}
2921+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
2922+
}
2923+
2924+
/**
2925+
* @deprecated As of Jedis 6.1.0, use {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or
2926+
* {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead.
2927+
*/
2928+
@Deprecated
28952929
public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
28962930
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
28972931
CommandArguments args = commandArguments(XREADGROUP)
@@ -2905,6 +2939,78 @@ public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] con
29052939
}
29062940
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
29072941
}
2942+
2943+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
2944+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2945+
Map.Entry<byte[], StreamEntryID>... streams) {
2946+
CommandArguments args = commandArguments(XREADGROUP)
2947+
.add(GROUP).add(groupName).add(consumer)
2948+
.addParams(xReadGroupParams).add(STREAMS);
2949+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2950+
args.key(entry.getKey());
2951+
}
2952+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2953+
args.add(entry.getValue());
2954+
}
2955+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2956+
}
2957+
2958+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
2959+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2960+
Map.Entry<byte[], StreamEntryID>... streams) {
2961+
CommandArguments args = commandArguments(XREADGROUP)
2962+
.add(GROUP).add(groupName).add(consumer)
2963+
.addParams(xReadGroupParams).add(STREAMS);
2964+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2965+
args.key(entry.getKey());
2966+
}
2967+
for (Map.Entry<byte[], StreamEntryID> entry : streams) {
2968+
args.add(entry.getValue());
2969+
}
2970+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
2971+
}
2972+
2973+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(
2974+
XReadParams xReadParams, Map<byte[], StreamEntryID> streams) {
2975+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2976+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
2977+
entrySet.forEach(entry -> args.key(entry.getKey()));
2978+
entrySet.forEach(entry -> args.add(entry.getValue()));
2979+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2980+
}
2981+
2982+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(
2983+
XReadParams xReadParams, Map<byte[], StreamEntryID> streams) {
2984+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2985+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
2986+
entrySet.forEach(entry -> args.key(entry.getKey()));
2987+
entrySet.forEach(entry -> args.add(entry.getValue()));
2988+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
2989+
}
2990+
2991+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
2992+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2993+
Map<byte[], StreamEntryID> streams) {
2994+
CommandArguments args = commandArguments(XREADGROUP)
2995+
.add(GROUP).add(groupName).add(consumer)
2996+
.addParams(xReadGroupParams).add(STREAMS);
2997+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
2998+
entrySet.forEach(entry -> args.key(entry.getKey()));
2999+
entrySet.forEach(entry -> args.add(entry.getValue()));
3000+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
3001+
}
3002+
3003+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
3004+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
3005+
Map<byte[], StreamEntryID> streams) {
3006+
CommandArguments args = commandArguments(XREADGROUP)
3007+
.add(GROUP).add(groupName).add(consumer)
3008+
.addParams(xReadGroupParams).add(STREAMS);
3009+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
3010+
entrySet.forEach(entry -> args.key(entry.getKey()));
3011+
entrySet.forEach(entry -> args.add(entry.getValue()));
3012+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
3013+
}
29083014
// Stream commands
29093015

29103016
// Scripting commands

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,7 @@ public long hsetex(byte[] key, HSetExParams params, byte[] field, byte[] value)
11721172
checkIsInMultiOrPipeline();
11731173
return connection.executeCommand(commandObjects.hsetex(key, params, field, value));
11741174
}
1175-
1175+
11761176
@Override
11771177
public long hsetex(byte[] key, HSetExParams params, Map<byte[], byte[]> hash){
11781178
checkIsInMultiOrPipeline();
@@ -1200,13 +1200,13 @@ public List<byte[]> hgetex(byte[] key, HGetExParams params, byte[]... fields){
12001200
checkIsInMultiOrPipeline();
12011201
return connection.executeCommand(commandObjects.hgetex(key, params, fields));
12021202
}
1203-
1203+
12041204
@Override
12051205
public List<byte[]> hgetdel(byte[] key, byte[]... fields){
12061206
checkIsInMultiOrPipeline();
12071207
return connection.executeCommand(commandObjects.hgetdel(key, fields));
12081208
}
1209-
1209+
12101210
/**
12111211
* Set the specified hash field to the specified value if the field not exists. <b>Time
12121212
* complexity:</b> O(1)
@@ -4770,19 +4770,60 @@ public List<Long> hpersist(byte[] key, byte[]... fields) {
47704770
return connection.executeCommand(commandObjects.hpersist(key, fields));
47714771
}
47724772

4773+
/**
4774+
* @deprecated As of Jedis 6.1.0, use
4775+
* {@link #xreadBinary(XReadParams, Map)} or
4776+
* {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry
4777+
* parsing.
4778+
*/
4779+
@Deprecated
47734780
@Override
47744781
public List<Object> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
47754782
checkIsInMultiOrPipeline();
47764783
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
47774784
}
47784785

4786+
/**
4787+
* @deprecated As of Jedis 6.1.0, use
4788+
* {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or
4789+
* {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead.
4790+
*/
4791+
@Deprecated
47794792
@Override
47804793
public List<Object> xreadGroup(byte[] groupName, byte[] consumer,
47814794
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
47824795
checkIsInMultiOrPipeline();
47834796
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
47844797
}
47854798

4799+
@Override
4800+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
4801+
Map<byte[], StreamEntryID> streams) {
4802+
checkIsInMultiOrPipeline();
4803+
return connection.executeCommand(commandObjects.xreadBinary(xReadParams, streams));
4804+
}
4805+
4806+
@Override
4807+
public Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
4808+
Map<byte[], StreamEntryID> streams) {
4809+
checkIsInMultiOrPipeline();
4810+
return connection.executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
4811+
}
4812+
4813+
@Override
4814+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
4815+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
4816+
checkIsInMultiOrPipeline();
4817+
return connection.executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
4818+
}
4819+
4820+
@Override
4821+
public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
4822+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
4823+
checkIsInMultiOrPipeline();
4824+
return connection.executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
4825+
}
4826+
47864827
@Override
47874828
public byte[] xadd(final byte[] key, final XAddParams params, final Map<byte[], byte[]> hash) {
47884829
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/PipeliningBase.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3359,17 +3359,56 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
33593359
return appendCommand(commandObjects.xinfoConsumers(key, group));
33603360
}
33613361

3362+
/**
3363+
* @deprecated As of Jedis 6.1.0, use {@link #xreadBinary(XReadParams, Map)} or
3364+
* {@link #xreadBinaryAsMap(XReadParams, Map)} for type safety and better stream entry
3365+
* parsing.
3366+
*/
3367+
@Deprecated
33623368
@Override
3363-
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
3369+
public Response<List<Object>> xread(XReadParams xReadParams,
3370+
Map.Entry<byte[], byte[]>... streams) {
33643371
return appendCommand(commandObjects.xread(xReadParams, streams));
33653372
}
33663373

3374+
/**
3375+
* @deprecated As of Jedis 6.1.0, use
3376+
* {@link #xreadGroupBinary(byte[], byte[], XReadGroupParams, Map)} or
3377+
* {@link #xreadGroupBinaryAsMap(byte[], byte[], XReadGroupParams, Map)} instead.
3378+
*/
3379+
@Deprecated
33673380
@Override
33683381
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
33693382
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
33703383
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
33713384
}
33723385

3386+
@Override
3387+
public Response<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(XReadParams xReadParams,
3388+
Map<byte[], StreamEntryID> streams) {
3389+
return appendCommand(commandObjects.xreadBinary(xReadParams, streams));
3390+
}
3391+
3392+
@Override
3393+
public Response<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(XReadParams xReadParams,
3394+
Map<byte[], StreamEntryID> streams) {
3395+
return appendCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
3396+
}
3397+
3398+
@Override
3399+
public Response<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(byte[] groupName,
3400+
byte[] consumer, XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
3401+
return appendCommand(
3402+
commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
3403+
}
3404+
3405+
@Override
3406+
public Response<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(byte[] groupName,
3407+
byte[] consumer, XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
3408+
return appendCommand(
3409+
commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
3410+
}
3411+
33733412
@Override
33743413
public Response<String> set(byte[] key, byte[] value) {
33753414
return appendCommand(commandObjects.set(key, value));

0 commit comments

Comments
 (0)