Skip to content

Commit 0f2ee1d

Browse files
committed
feature: async support for objects batcher (refactor)
1 parent 073ab02 commit 0f2ee1d

File tree

9 files changed

+1532
-628
lines changed

9 files changed

+1532
-628
lines changed

src/main/java/io/weaviate/client/v1/async/batch/Batch.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class Batch {
2525
private final AccessTokenProvider tokenProvider;
2626

2727
public Batch(CloseableHttpAsyncClient client, Config config, DbVersionSupport dbVersionSupport,
28-
GrpcVersionSupport grpcVersionSupport, AccessTokenProvider tokenProvider, Data data) {
28+
GrpcVersionSupport grpcVersionSupport, AccessTokenProvider tokenProvider, Data data) {
2929
this.client = client;
3030
this.config = config;
3131
this.objectsPath = new ObjectsPath();
@@ -65,8 +65,9 @@ public ObjectsBatcher objectsAutoBatcher(ObjectsBatcher.AutoBatchConfig autoBatc
6565
);
6666
}
6767

68+
6869
public ObjectsBatcher objectsAutoBatcher(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig,
69-
ObjectsBatcher.AutoBatchConfig autoBatchConfig) {
70+
ObjectsBatcher.AutoBatchConfig autoBatchConfig) {
7071
return ObjectsBatcher.createAuto(client, config, data, objectsPath, tokenProvider, grpcVersionSupport, batchRetriesConfig, autoBatchConfig);
7172
}
7273

@@ -109,7 +110,7 @@ public ReferencesBatcher referencesAutoBatcher(ReferencesBatcher.AutoBatchConfig
109110
}
110111

111112
public ReferencesBatcher referencesAutoBatcher(ReferencesBatcher.BatchRetriesConfig batchRetriesConfig,
112-
ReferencesBatcher.AutoBatchConfig autoBatchConfig) {
113+
ReferencesBatcher.AutoBatchConfig autoBatchConfig) {
113114
return ReferencesBatcher.createAuto(client, config, referencesPath, batchRetriesConfig, autoBatchConfig);
114115
}
115116
}

src/main/java/io/weaviate/client/v1/async/batch/api/ObjectsBatcher.java

Lines changed: 250 additions & 280 deletions
Large diffs are not rendered by default.
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
package io.weaviate.integration.client.async.batch;
2+
3+
import com.jparams.junit4.JParamsTestRunner;
4+
import com.jparams.junit4.data.DataMethod;
5+
import io.weaviate.client.Config;
6+
import io.weaviate.client.WeaviateClient;
7+
import io.weaviate.client.base.Result;
8+
import io.weaviate.client.base.Serializer;
9+
import io.weaviate.client.v1.async.WeaviateAsyncClient;
10+
import io.weaviate.client.v1.async.batch.api.ObjectsBatcher;
11+
import io.weaviate.client.v1.batch.model.ObjectGetResponse;
12+
import io.weaviate.integration.tests.batch.BatchObjectsMockServerTestSuite;
13+
import org.junit.After;
14+
import org.junit.Before;
15+
import org.junit.Test;
16+
import org.junit.runner.RunWith;
17+
import org.mockserver.client.MockServerClient;
18+
import org.mockserver.integration.ClientAndServer;
19+
import org.mockserver.model.Delay;
20+
import org.mockserver.verify.VerificationTimes;
21+
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.function.Consumer;
24+
import java.util.function.Supplier;
25+
26+
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
27+
import static org.mockserver.model.HttpRequest.request;
28+
import static org.mockserver.model.HttpResponse.response;
29+
30+
@RunWith(JParamsTestRunner.class)
31+
public class ClientBatchCreateMockServerTest {
32+
33+
private WeaviateClient client;
34+
private ClientAndServer mockServer;
35+
private MockServerClient mockServerClient;
36+
37+
private static final String MOCK_SERVER_HOST = "localhost";
38+
private static final int MOCK_SERVER_PORT = 8999;
39+
40+
@Before
41+
public void before() {
42+
mockServer = startClientAndServer(MOCK_SERVER_PORT);
43+
mockServerClient = new MockServerClient(MOCK_SERVER_HOST, MOCK_SERVER_PORT);
44+
45+
mockServerClient.when(
46+
request().withMethod("GET").withPath("/v1/meta")
47+
).respond(
48+
response().withStatusCode(200).withBody(metaBody())
49+
);
50+
51+
Config config = new Config("http", MOCK_SERVER_HOST + ":" + MOCK_SERVER_PORT, null, 1, 1, 1);
52+
client = new WeaviateClient(config);
53+
}
54+
55+
@After
56+
public void stopMockServer() {
57+
mockServer.stop();
58+
}
59+
60+
@Test
61+
@DataMethod(source = ClientBatchCreateMockServerTest.class, method = "provideForNotCreateBatchDueToConnectionIssue")
62+
public void shouldNotCreateBatchDueToConnectionIssue(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig,
63+
long expectedExecMinMillis, long expectedExecMaxMillis) {
64+
// stop server to simulate connection issues
65+
mockServer.stop();
66+
67+
try (WeaviateAsyncClient asyncClient = client.async()) {
68+
Supplier<Result<ObjectGetResponse[]>> supplierObjectsBatcher = () -> {
69+
try {
70+
return asyncClient.batch().objectsBatcher(batchRetriesConfig)
71+
.withObjects(BatchObjectsMockServerTestSuite.PIZZA_1, BatchObjectsMockServerTestSuite.PIZZA_2,
72+
BatchObjectsMockServerTestSuite.SOUP_1, BatchObjectsMockServerTestSuite.SOUP_2)
73+
.run()
74+
.get();
75+
} catch (InterruptedException | ExecutionException e) {
76+
throw new RuntimeException(e);
77+
}
78+
};
79+
80+
BatchObjectsMockServerTestSuite.testNotCreateBatchDueToConnectionIssue(supplierObjectsBatcher,
81+
expectedExecMinMillis, expectedExecMaxMillis);
82+
}
83+
}
84+
85+
@Test
86+
@DataMethod(source = ClientBatchCreateMockServerTest.class, method = "provideForNotCreateBatchDueToConnectionIssue")
87+
public void shouldNotCreateAutoBatchDueToConnectionIssue(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig,
88+
long expectedExecMinMillis, long expectedExecMaxMillis) {
89+
// stop server to simulate connection issues
90+
mockServer.stop();
91+
92+
try (WeaviateAsyncClient asyncClient = client.async()) {
93+
Consumer<Consumer<Result<ObjectGetResponse[]>>> supplierObjectsBatcher = callback -> {
94+
ObjectsBatcher.AutoBatchConfig autoBatchConfig = ObjectsBatcher.AutoBatchConfig.defaultConfig()
95+
.batchSize(2)
96+
.callback(callback)
97+
.build();
98+
99+
try {
100+
asyncClient.batch().objectsAutoBatcher(batchRetriesConfig, autoBatchConfig)
101+
.withObjects(BatchObjectsMockServerTestSuite.PIZZA_1, BatchObjectsMockServerTestSuite.PIZZA_2,
102+
BatchObjectsMockServerTestSuite.SOUP_1, BatchObjectsMockServerTestSuite.SOUP_2)
103+
.run()
104+
.get();
105+
} catch (InterruptedException | ExecutionException e) {
106+
throw new RuntimeException(e);
107+
}
108+
};
109+
110+
BatchObjectsMockServerTestSuite.testNotCreateAutoBatchDueToConnectionIssue(supplierObjectsBatcher,
111+
expectedExecMinMillis, expectedExecMaxMillis);
112+
}
113+
}
114+
115+
public static Object[][] provideForNotCreateBatchDueToConnectionIssue() {
116+
return new Object[][]{
117+
new Object[]{
118+
// final response should be available immediately
119+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
120+
.retriesIntervalMs(400)
121+
.maxConnectionRetries(0)
122+
.build(),
123+
0, 350
124+
},
125+
new Object[]{
126+
// final response should be available after 1 retry (400 ms)
127+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
128+
.retriesIntervalMs(400)
129+
.maxConnectionRetries(1)
130+
.build(),
131+
400, 750
132+
},
133+
new Object[]{
134+
// final response should be available after 2 retries (400 + 800 ms)
135+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
136+
.retriesIntervalMs(400)
137+
.maxConnectionRetries(2)
138+
.build(),
139+
1200, 1550
140+
},
141+
new Object[]{
142+
// final response should be available after 1 retry (400 + 800 + 1200 ms)
143+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
144+
.retriesIntervalMs(400)
145+
.maxConnectionRetries(3)
146+
.build(),
147+
2400, 2750
148+
},
149+
};
150+
}
151+
152+
@Test
153+
@DataMethod(source = ClientBatchCreateMockServerTest.class, method = "provideForNotCreateBatchDueToTimeoutIssue")
154+
public void shouldNotCreateBatchDueToTimeoutIssue(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig,
155+
int expectedBatchCallsCount) {
156+
// given client times out after 1s
157+
158+
Serializer serializer = new Serializer();
159+
String pizza1Str = serializer.toJsonString(BatchObjectsMockServerTestSuite.PIZZA_1);
160+
String soup1Str = serializer.toJsonString(BatchObjectsMockServerTestSuite.SOUP_1);
161+
162+
// batch request should end up with timeout exception, but Pizza1 and Soup1 should be "added" and available by get
163+
mockServerClient.when(
164+
request().withMethod("POST").withPath("/v1/batch/objects")
165+
).respond(
166+
response().withDelay(Delay.seconds(2)).withStatusCode(200)
167+
);
168+
mockServerClient.when(
169+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Pizza", BatchObjectsMockServerTestSuite.PIZZA_1_ID))
170+
).respond(
171+
response().withBody(pizza1Str)
172+
);
173+
mockServerClient.when(
174+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Soup", BatchObjectsMockServerTestSuite.SOUP_1_ID))
175+
).respond(
176+
response().withBody(soup1Str)
177+
);
178+
179+
try (WeaviateAsyncClient asyncClient = client.async()) {
180+
Supplier<Result<ObjectGetResponse[]>> supplierObjectsBatcher = () -> {
181+
try {
182+
return asyncClient.batch().objectsBatcher(batchRetriesConfig)
183+
.withObjects(BatchObjectsMockServerTestSuite.PIZZA_1, BatchObjectsMockServerTestSuite.PIZZA_2,
184+
BatchObjectsMockServerTestSuite.SOUP_1, BatchObjectsMockServerTestSuite.SOUP_2)
185+
.run()
186+
.get();
187+
} catch (InterruptedException | ExecutionException e) {
188+
throw new RuntimeException(e);
189+
}
190+
};
191+
Consumer<Integer> assertPostObjectsCallsCount = count -> mockServerClient.verify(
192+
request().withMethod("POST").withPath("/v1/batch/objects"),
193+
VerificationTimes.exactly(count)
194+
);
195+
Consumer<Integer> assertGetPizza1CallsCount = count -> mockServerClient.verify(
196+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Pizza", BatchObjectsMockServerTestSuite.PIZZA_1_ID)),
197+
VerificationTimes.exactly(count)
198+
);
199+
Consumer<Integer> assertGetPizza2CallsCount = count -> mockServerClient.verify(
200+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Pizza", BatchObjectsMockServerTestSuite.PIZZA_2_ID)),
201+
VerificationTimes.exactly(count)
202+
);
203+
Consumer<Integer> assertGetSoup1CallsCount = count -> mockServerClient.verify(
204+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Soup", BatchObjectsMockServerTestSuite.SOUP_1_ID)),
205+
VerificationTimes.exactly(count)
206+
);
207+
Consumer<Integer> assertGetSoup2CallsCount = count -> mockServerClient.verify(
208+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Soup", BatchObjectsMockServerTestSuite.SOUP_2_ID)),
209+
VerificationTimes.exactly(count)
210+
);
211+
212+
BatchObjectsMockServerTestSuite.testNotCreateBatchDueToTimeoutIssue(supplierObjectsBatcher,
213+
assertPostObjectsCallsCount, assertGetPizza1CallsCount, assertGetPizza2CallsCount,
214+
assertGetSoup1CallsCount, assertGetSoup2CallsCount, expectedBatchCallsCount, "1 SECONDS");
215+
}
216+
}
217+
218+
@Test
219+
@DataMethod(source = ClientBatchCreateMockServerTest.class, method = "provideForNotCreateBatchDueToTimeoutIssue")
220+
public void shouldNotCreateAutoBatchDueToTimeoutIssue(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig,
221+
int expectedBatchCallsCount) {
222+
// given client times out after 1s
223+
224+
Serializer serializer = new Serializer();
225+
String pizza1Str = serializer.toJsonString(BatchObjectsMockServerTestSuite.PIZZA_1);
226+
String soup1Str = serializer.toJsonString(BatchObjectsMockServerTestSuite.SOUP_1);
227+
228+
// batch request should end up with timeout exception, but Pizza1 and Soup1 should be "added" and available by get
229+
mockServerClient.when(
230+
request().withMethod("POST").withPath("/v1/batch/objects")
231+
).respond(
232+
response().withDelay(Delay.seconds(2)).withStatusCode(200)
233+
);
234+
mockServerClient.when(
235+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Pizza", BatchObjectsMockServerTestSuite.PIZZA_1_ID))
236+
).respond(
237+
response().withBody(pizza1Str)
238+
);
239+
mockServerClient.when(
240+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Soup", BatchObjectsMockServerTestSuite.SOUP_1_ID))
241+
).respond(
242+
response().withBody(soup1Str)
243+
);
244+
245+
try (WeaviateAsyncClient asyncClient = client.async()) {
246+
Consumer<Consumer<Result<ObjectGetResponse[]>>> supplierObjectsBatcher = callback -> {
247+
ObjectsBatcher.AutoBatchConfig autoBatchConfig = ObjectsBatcher.AutoBatchConfig.defaultConfig()
248+
.batchSize(2)
249+
.callback(callback)
250+
.build();
251+
252+
try {
253+
asyncClient.batch().objectsAutoBatcher(batchRetriesConfig, autoBatchConfig)
254+
.withObjects(BatchObjectsMockServerTestSuite.PIZZA_1, BatchObjectsMockServerTestSuite.PIZZA_2,
255+
BatchObjectsMockServerTestSuite.SOUP_1, BatchObjectsMockServerTestSuite.SOUP_2)
256+
.run()
257+
.get();
258+
} catch (InterruptedException | ExecutionException e) {
259+
throw new RuntimeException(e);
260+
}
261+
};
262+
263+
Consumer<Integer> assertPostObjectsCallsCount = count -> mockServerClient.verify(
264+
request().withMethod("POST").withPath("/v1/batch/objects"),
265+
VerificationTimes.exactly(count)
266+
);
267+
Consumer<Integer> assertGetPizza1CallsCount = count -> mockServerClient.verify(
268+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Pizza", BatchObjectsMockServerTestSuite.PIZZA_1_ID)),
269+
VerificationTimes.exactly(count)
270+
);
271+
Consumer<Integer> assertGetPizza2CallsCount = count -> mockServerClient.verify(
272+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Pizza", BatchObjectsMockServerTestSuite.PIZZA_2_ID)),
273+
VerificationTimes.exactly(count)
274+
);
275+
Consumer<Integer> assertGetSoup1CallsCount = count -> mockServerClient.verify(
276+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Soup", BatchObjectsMockServerTestSuite.SOUP_1_ID)),
277+
VerificationTimes.exactly(count)
278+
);
279+
Consumer<Integer> assertGetSoup2CallsCount = count -> mockServerClient.verify(
280+
request().withMethod("GET").withPath(String.format("/v1/objects/%s/%s", "Soup", BatchObjectsMockServerTestSuite.SOUP_2_ID)),
281+
VerificationTimes.exactly(count)
282+
);
283+
284+
BatchObjectsMockServerTestSuite.testNotCreateAutoBatchDueToTimeoutIssue(supplierObjectsBatcher,
285+
assertPostObjectsCallsCount, assertGetPizza1CallsCount, assertGetPizza2CallsCount,
286+
assertGetSoup1CallsCount, assertGetSoup2CallsCount, expectedBatchCallsCount, "1 SECONDS");
287+
}
288+
}
289+
290+
public static Object[][] provideForNotCreateBatchDueToTimeoutIssue() {
291+
return new Object[][]{
292+
new Object[]{
293+
// final response should be available immediately
294+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
295+
.retriesIntervalMs(200)
296+
.maxTimeoutRetries(0)
297+
.build(),
298+
1
299+
},
300+
new Object[]{
301+
// final response should be available after 1 retry (200 ms)
302+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
303+
.retriesIntervalMs(200)
304+
.maxTimeoutRetries(1)
305+
.build(),
306+
2
307+
},
308+
new Object[]{
309+
// final response should be available after 2 retries (200 + 400 ms)
310+
ObjectsBatcher.BatchRetriesConfig.defaultConfig()
311+
.retriesIntervalMs(200)
312+
.maxTimeoutRetries(2)
313+
.build(),
314+
3
315+
},
316+
};
317+
}
318+
319+
private String metaBody() {
320+
return String.format("{\n" +
321+
" \"hostname\": \"http://[::]:%s\",\n" +
322+
" \"modules\": {},\n" +
323+
" \"version\": \"%s\"\n" +
324+
"}", MOCK_SERVER_PORT, "1.17.999-mock-server-version");
325+
}
326+
}

0 commit comments

Comments
 (0)