Skip to content
This repository was archived by the owner on Sep 16, 2025. It is now read-only.

Commit 4ef0ebc

Browse files
author
Mark Dietz
committed
add stubs for kinesis client
1 parent fe16fc5 commit 4ef0ebc

File tree

1 file changed

+231
-0
lines changed

1 file changed

+231
-0
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package com.bizo.awsstubs.services.kinesis;
2+
3+
import java.nio.ByteBuffer;
4+
import java.util.concurrent.BlockingQueue;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.LinkedBlockingQueue;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import com.amazonaws.AmazonClientException;
10+
import com.amazonaws.AmazonServiceException;
11+
import com.amazonaws.AmazonWebServiceRequest;
12+
import com.amazonaws.ResponseMetadata;
13+
import com.amazonaws.regions.Region;
14+
import com.amazonaws.services.kinesis.AmazonKinesis;
15+
import com.amazonaws.services.kinesis.model.*;
16+
17+
/**
18+
* This should be kept threadsafe so that off thread writes can be tested
19+
*/
20+
public class AmazonKinesisClientStub implements AmazonKinesis {
21+
22+
public ConcurrentHashMap<String, Stream> streams = new ConcurrentHashMap<String, Stream>();
23+
24+
public void reset() {
25+
streams.clear();
26+
}
27+
28+
@Override
29+
public void createStream(final CreateStreamRequest createStreamRequest)
30+
throws AmazonServiceException,
31+
AmazonClientException {
32+
streams.putIfAbsent(createStreamRequest.getStreamName(), new Stream());
33+
}
34+
35+
@Override
36+
public void createStream(final String streamName, final Integer shardCount)
37+
throws AmazonServiceException,
38+
AmazonClientException {
39+
createStream(new CreateStreamRequest().withStreamName(streamName).withShardCount(shardCount));
40+
}
41+
42+
@Override
43+
public void deleteStream(final DeleteStreamRequest deleteStreamRequest)
44+
throws AmazonServiceException,
45+
AmazonClientException {
46+
throw new UnsupportedOperationException();
47+
}
48+
49+
@Override
50+
public void deleteStream(final String streamName) throws AmazonServiceException, AmazonClientException {
51+
throw new UnsupportedOperationException();
52+
}
53+
54+
@Override
55+
public DescribeStreamResult describeStream(final DescribeStreamRequest describeStreamRequest)
56+
throws AmazonServiceException,
57+
AmazonClientException {
58+
throw new UnsupportedOperationException();
59+
}
60+
61+
@Override
62+
public DescribeStreamResult describeStream(final String streamName)
63+
throws AmazonServiceException,
64+
AmazonClientException {
65+
throw new UnsupportedOperationException();
66+
}
67+
68+
@Override
69+
public DescribeStreamResult describeStream(final String streamName, final String exclusiveStartShardId)
70+
throws AmazonServiceException,
71+
AmazonClientException {
72+
throw new UnsupportedOperationException();
73+
}
74+
75+
@Override
76+
public DescribeStreamResult describeStream(
77+
final String streamName,
78+
final Integer limit,
79+
final String exclusiveStartShardId) throws AmazonServiceException, AmazonClientException {
80+
throw new UnsupportedOperationException();
81+
}
82+
83+
@Override
84+
public ResponseMetadata getCachedResponseMetadata(final AmazonWebServiceRequest request) {
85+
throw new UnsupportedOperationException();
86+
}
87+
88+
@Override
89+
public GetRecordsResult getRecords(final GetRecordsRequest request)
90+
throws AmazonServiceException,
91+
AmazonClientException {
92+
throw new UnsupportedOperationException();
93+
}
94+
95+
@Override
96+
public GetShardIteratorResult getShardIterator(final GetShardIteratorRequest getShardIteratorRequest)
97+
throws AmazonServiceException,
98+
AmazonClientException {
99+
throw new UnsupportedOperationException();
100+
}
101+
102+
@Override
103+
public GetShardIteratorResult getShardIterator(
104+
final String streamName,
105+
final String shardId,
106+
final String shardIteratorType) throws AmazonServiceException, AmazonClientException {
107+
throw new UnsupportedOperationException();
108+
}
109+
110+
@Override
111+
public GetShardIteratorResult getShardIterator(
112+
final String streamName,
113+
final String shardId,
114+
final String shardIteratorType,
115+
final String startingSequenceNumber) throws AmazonServiceException, AmazonClientException {
116+
throw new UnsupportedOperationException();
117+
}
118+
119+
@Override
120+
public ListStreamsResult listStreams() throws AmazonServiceException, AmazonClientException {
121+
throw new UnsupportedOperationException();
122+
}
123+
124+
@Override
125+
public ListStreamsResult listStreams(final ListStreamsRequest listStreamsRequest)
126+
throws AmazonServiceException,
127+
AmazonClientException {
128+
throw new UnsupportedOperationException();
129+
}
130+
131+
@Override
132+
public ListStreamsResult listStreams(final String exclusiveStartStreamName)
133+
throws AmazonServiceException,
134+
AmazonClientException {
135+
throw new UnsupportedOperationException();
136+
}
137+
138+
@Override
139+
public ListStreamsResult listStreams(final Integer limit, final String exclusiveStartStreamName)
140+
throws AmazonServiceException,
141+
AmazonClientException {
142+
throw new UnsupportedOperationException();
143+
}
144+
145+
@Override
146+
public void mergeShards(final MergeShardsRequest mergeShardsRequest)
147+
throws AmazonServiceException,
148+
AmazonClientException {
149+
throw new UnsupportedOperationException();
150+
}
151+
152+
@Override
153+
public void mergeShards(final String streamName, final String shardToMerge, final String adjacentShardToMerge)
154+
throws AmazonServiceException,
155+
AmazonClientException {
156+
throw new UnsupportedOperationException();
157+
}
158+
159+
@Override
160+
public PutRecordResult putRecord(final PutRecordRequest putRecordRequest)
161+
throws AmazonServiceException,
162+
AmazonClientException {
163+
if (putRecordRequest.getData().array().length > 50 * 1024) {
164+
throw new InvalidArgumentException("Payload exceeds 50 KB");
165+
}
166+
167+
final Stream stream = streams.get(putRecordRequest.getStreamName());
168+
final String nextSequenceNumber =
169+
String.valueOf(streams.get(putRecordRequest.getStreamName()).sequenceNumber.incrementAndGet());
170+
171+
stream.records.add(new Record()
172+
.withData(putRecordRequest.getData())
173+
.withPartitionKey(putRecordRequest.getPartitionKey())
174+
.withSequenceNumber(nextSequenceNumber));
175+
return new PutRecordResult().withSequenceNumber(nextSequenceNumber).withShardId("hard-coded-only-stub-shard");
176+
}
177+
178+
@Override
179+
public PutRecordResult putRecord(final String streamName, final ByteBuffer data, final String partitionKey)
180+
throws AmazonServiceException,
181+
AmazonClientException {
182+
return putRecord(new PutRecordRequest().withStreamName(streamName).withData(data).withPartitionKey(partitionKey));
183+
}
184+
185+
@Override
186+
public PutRecordResult putRecord(
187+
final String streamName,
188+
final ByteBuffer data,
189+
final String partitionKey,
190+
final String exclusiveMinimumSequenceNumber) throws AmazonServiceException, AmazonClientException {
191+
throw new UnsupportedOperationException();
192+
}
193+
194+
@Override
195+
public void setEndpoint(final String endpoint) throws IllegalArgumentException {
196+
throw new UnsupportedOperationException();
197+
}
198+
199+
@Override
200+
public void setRegion(final Region region) throws IllegalArgumentException {
201+
throw new UnsupportedOperationException();
202+
}
203+
204+
@Override
205+
public void shutdown() {
206+
throw new UnsupportedOperationException();
207+
}
208+
209+
@Override
210+
public void splitShard(final SplitShardRequest splitShardRequest)
211+
throws AmazonServiceException,
212+
AmazonClientException {
213+
throw new UnsupportedOperationException();
214+
}
215+
216+
@Override
217+
public void splitShard(final String streamName, final String shardToSplit, final String newStartingHashKey)
218+
throws AmazonServiceException,
219+
AmazonClientException {
220+
throw new UnsupportedOperationException();
221+
}
222+
223+
public static class Stream {
224+
public AtomicInteger sequenceNumber = new AtomicInteger(0);
225+
public BlockingQueue<Record> records = new LinkedBlockingQueue<Record>();
226+
}
227+
228+
public BlockingQueue<Record> getRecordsForStream(final String streamName) {
229+
return streams.get(streamName).records;
230+
}
231+
}

0 commit comments

Comments
 (0)