Skip to content

Commit b236c80

Browse files
authored
Merge pull request #401 from weaviate/v6-shards
v6: Get and update shard status
2 parents 2ed24a4 + fa8015f commit b236c80

File tree

7 files changed

+126
-0
lines changed

7 files changed

+126
-0
lines changed

src/it/java/io/weaviate/integration/CollectionsITest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import io.weaviate.client6.v1.api.collections.Property;
1414
import io.weaviate.client6.v1.api.collections.Replication;
1515
import io.weaviate.client6.v1.api.collections.VectorIndex;
16+
import io.weaviate.client6.v1.api.collections.config.Shard;
17+
import io.weaviate.client6.v1.api.collections.config.ShardStatus;
1618
import io.weaviate.client6.v1.api.collections.vectorindex.Hnsw;
1719
import io.weaviate.client6.v1.api.collections.vectorizers.NoneVectorizer;
1820
import io.weaviate.containers.Container;
@@ -159,4 +161,27 @@ public void testUpdateCollection() throws IOException {
159161
.extracting(CollectionConfig::replication).returns(false, Replication::asyncEnabled);
160162
});
161163
}
164+
165+
@Test
166+
public void testShards() throws IOException {
167+
var nsShatteredCups = ns("ShatteredCups");
168+
client.collections.create(nsShatteredCups);
169+
var cups = client.collections.use(nsShatteredCups);
170+
171+
// Act: get initial shard state
172+
var shards = cups.config.getShards();
173+
174+
Assertions.assertThat(shards).as("single-tenant collections has 1 shard").hasSize(1);
175+
var singleShard = shards.get(0);
176+
177+
// Act: flip the status
178+
var wantStatus = singleShard.status().equals("READY") ? ShardStatus.READONLY : ShardStatus.READY;
179+
var updated = cups.config.updateShards(wantStatus, singleShard.name());
180+
181+
Assertions.assertThat(updated)
182+
.as("shard status changed")
183+
.hasSize(1)
184+
.extracting(Shard::status)
185+
.containsOnly(wantStatus.name());
186+
}
162187
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.weaviate.client6.v1.api.collections.config;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
6+
import org.apache.hc.core5.http.HttpStatus;
7+
8+
import com.google.gson.reflect.TypeToken;
9+
10+
import io.weaviate.client6.v1.internal.json.JSON;
11+
import io.weaviate.client6.v1.internal.rest.Endpoint;
12+
13+
public record GetShardsRequest(String collectionName) {
14+
15+
@SuppressWarnings("unchecked")
16+
public static final Endpoint<GetShardsRequest, List<Shard>> _ENDPOINT = Endpoint.of(
17+
request -> "GET",
18+
request -> "/schema/" + request.collectionName + "/shards", // TODO: tenant support
19+
(gson, request) -> null,
20+
request -> Collections.emptyMap(),
21+
code -> code != HttpStatus.SC_SUCCESS,
22+
(gson, response) -> (List<Shard>) JSON.deserialize(response, TypeToken.getParameterized(
23+
List.class, Shard.class)));
24+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.weaviate.client6.v1.api.collections.config;
2+
3+
import com.google.gson.annotations.SerializedName;
4+
5+
public record Shard(
6+
@SerializedName("name") String name,
7+
@SerializedName("status") String status,
8+
@SerializedName("vectorQueueSize") long vectorQueueSize) {
9+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.weaviate.client6.v1.api.collections.config;
2+
3+
import com.google.gson.annotations.SerializedName;
4+
5+
public enum ShardStatus {
6+
@SerializedName("READY")
7+
READY,
8+
@SerializedName("READONLY")
9+
READONLY;
10+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.weaviate.client6.v1.api.collections.config;
2+
3+
import java.util.Collections;
4+
import java.util.Map;
5+
6+
import org.apache.hc.core5.http.HttpStatus;
7+
8+
import io.weaviate.client6.v1.internal.json.JSON;
9+
import io.weaviate.client6.v1.internal.rest.Endpoint;
10+
11+
public record UpdateShardStatusRequest(String collection, String shard, ShardStatus status) {
12+
public static final Endpoint<UpdateShardStatusRequest, Void> _ENDPOINT = Endpoint.of(
13+
request -> "PUT",
14+
request -> "/schema/" + request.collection + "/shards/" + request.shard,
15+
(gson, request) -> JSON.serialize(Map.of("status", request.status)),
16+
request -> Collections.emptyMap(),
17+
code -> code != HttpStatus.SC_SUCCESS,
18+
(gson, response) -> null);
19+
}

src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.weaviate.client6.v1.api.collections.config;
22

33
import java.io.IOException;
4+
import java.util.Arrays;
5+
import java.util.List;
46
import java.util.Optional;
57
import java.util.function.Function;
68

@@ -45,4 +47,21 @@ public void update(String collectionName,
4547
this.restTransport.performRequest(UpdateCollectionRequest.of(thisCollection, fn),
4648
UpdateCollectionRequest._ENDPOINT);
4749
}
50+
51+
public List<Shard> getShards() throws IOException {
52+
return this.restTransport.performRequest(new GetShardsRequest(collection.name()), GetShardsRequest._ENDPOINT);
53+
}
54+
55+
public List<Shard> updateShards(ShardStatus status, String... shards) throws IOException {
56+
return updateShards(status, Arrays.asList(shards));
57+
}
58+
59+
public List<Shard> updateShards(ShardStatus status, List<String> shards) throws IOException {
60+
for (var shard : shards) {
61+
this.restTransport.performRequest(
62+
new UpdateShardStatusRequest(collection.name(), shard, status),
63+
UpdateShardStatusRequest._ENDPOINT);
64+
}
65+
return getShards();
66+
}
4867
}

src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.weaviate.client6.v1.api.collections.config;
22

33
import java.io.IOException;
4+
import java.util.Arrays;
5+
import java.util.List;
46
import java.util.Optional;
57
import java.util.concurrent.CompletableFuture;
68
import java.util.function.Function;
@@ -48,4 +50,22 @@ public CompletableFuture<Void> update(String collectionName,
4850
UpdateCollectionRequest._ENDPOINT);
4951
});
5052
}
53+
54+
public CompletableFuture<List<Shard>> getShards() {
55+
return this.restTransport.performRequestAsync(new GetShardsRequest(collectionDescriptor.name()),
56+
GetShardsRequest._ENDPOINT);
57+
}
58+
59+
public CompletableFuture<List<Shard>> updateShards(ShardStatus status, String... shards) throws IOException {
60+
return updateShards(status, Arrays.asList(shards));
61+
}
62+
63+
public CompletableFuture<List<Shard>> updateShards(ShardStatus status, List<String> shards) throws IOException {
64+
var updates = shards.stream().map(
65+
shard -> this.restTransport.performRequestAsync(
66+
new UpdateShardStatusRequest(collectionDescriptor.name(), shard, status),
67+
UpdateShardStatusRequest._ENDPOINT))
68+
.toArray(CompletableFuture[]::new);
69+
return CompletableFuture.allOf(updates).thenCompose(__ -> getShards());
70+
}
5171
}

0 commit comments

Comments
 (0)