Skip to content

Commit 45b0f2a

Browse files
authored
Add imports (#161)
## Problem Add four endpoints of the `BulkOperationsApi`. ## Solution Added the following four endpoints of the `BulkOperationsApi`: 1. `startImport(String uri, String integrationId, ImportErrorMode.OnErrorEnum errorMode)` 2. `describeImport(Integer limit, String paginationToken)` 3. `listImport(String id)` 4. `cancelImport(String id)` ## Type of Change - [ ] Bug fix (non-breaking change which fixes an issue) - [X] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update - [ ] Infrastructure change (CI configs, etc) - [ ] Non-code change (docs, etc) - [ ] None of the above: (explain here) ## Test Plan Added unit tests.
1 parent 7af4587 commit 45b0f2a

File tree

6 files changed

+443
-4
lines changed

6 files changed

+443
-4
lines changed

README.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,82 @@ RerankResult result = inference.rerank(model, query, documents, rankFields, topN
643643
System.out.println(result.getData());
644644
```
645645

646+
# Imports
647+
## Start an import
648+
649+
The following example initiates an asynchronous import of vectors from object storage into the index.
650+
651+
```java
652+
import org.openapitools.db_data.client.ApiException;
653+
import org.openapitools.db_data.client.model.ImportErrorMode;
654+
import org.openapitools.db_data.client.model.StartImportResponse;
655+
...
656+
657+
// Initialize pinecone object
658+
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
659+
// Get async imports connection object
660+
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");
661+
662+
// s3 uri
663+
String uri = "s3://path/to/file.parquet";
664+
665+
// Start an import
666+
StartImportResponse response = asyncIndex.startImport(uri, "123-456-789", ImportErrorMode.OnErrorEnum.CONTINUE);
667+
```
668+
669+
## List imports
670+
671+
The following example lists all recent and ongoing import operations for the specified index.
672+
673+
```java
674+
import org.openapitools.db_data.client.ApiException;
675+
import org.openapitools.db_data.client.model.ListImportsResponse;
676+
...
677+
678+
// Initialize pinecone object
679+
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
680+
// Get async imports connection object
681+
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");
682+
683+
// List imports
684+
ListImportsResponse response = asyncIndex.listImports(100, "some-pagination-token");
685+
```
686+
687+
## Describe an import
688+
689+
The following example retrieves detailed information about a specific import operation using its unique identifier.
690+
691+
```java
692+
import org.openapitools.db_data.client.ApiException;
693+
import org.openapitools.db_data.client.model.ImportModel;
694+
...
695+
696+
// Initialize pinecone object
697+
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
698+
// Get async imports connection object
699+
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");
700+
701+
// Describe import
702+
ImportModel importDetails = asyncIndex.describeImport("1");
703+
```
704+
705+
## Cancel an import
706+
707+
The following example attempts to cancel an ongoing import operation using its unique identifier.
708+
709+
```java
710+
import org.openapitools.db_data.client.ApiException;
711+
...
712+
713+
// Initialize pinecone object
714+
Pinecone pinecone = new Pinecone.Builder("PINECONE_API_KEY").build();
715+
// Get async imports connection object
716+
AsyncIndex asyncIndex = pinecone.getAsyncIndexConnection("PINECONE_INDEX_NAME");
717+
718+
// Cancel import
719+
asyncIndex.cancelImport("2");
720+
```
721+
646722
## Examples
647723

648724
- The data and control plane operation examples can be found in `io/pinecone/integration` folder.

src/integration/java/io/pinecone/integration/dataPlane/QueryErrorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.pinecone.clients.AsyncIndex;
44
import io.pinecone.clients.Index;
5+
import io.pinecone.configs.PineconeConfig;
56
import io.pinecone.configs.PineconeConnection;
67
import io.pinecone.exceptions.PineconeValidationException;
78
import io.pinecone.proto.VectorServiceGrpc;
@@ -24,6 +25,7 @@ public class QueryErrorTest {
2425

2526
@BeforeAll
2627
public static void setUp() throws IOException, InterruptedException {
28+
PineconeConfig config = mock(PineconeConfig.class);
2729
PineconeConnection connectionMock = mock(PineconeConnection.class);
2830

2931
VectorServiceGrpc.VectorServiceBlockingStub stubMock = mock(VectorServiceGrpc.VectorServiceBlockingStub.class);
@@ -33,7 +35,7 @@ public static void setUp() throws IOException, InterruptedException {
3335
when(connectionMock.getAsyncStub()).thenReturn(asyncStubMock);
3436

3537
index = new Index(connectionMock, "some-index-name");
36-
asyncIndex = new AsyncIndex(connectionMock, "some-index-name");
38+
asyncIndex = new AsyncIndex(config, connectionMock, "some-index-name");
3739
}
3840

3941
@Test

src/integration/java/io/pinecone/integration/dataPlane/UpsertErrorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.pinecone.clients.AsyncIndex;
44
import io.pinecone.clients.Index;
5+
import io.pinecone.configs.PineconeConfig;
56
import io.pinecone.configs.PineconeConnection;
67
import io.pinecone.exceptions.PineconeException;
78
import io.pinecone.exceptions.PineconeValidationException;
@@ -27,6 +28,7 @@ public class UpsertErrorTest {
2728

2829
@BeforeAll
2930
public static void setUp() throws IOException, InterruptedException {
31+
PineconeConfig config = mock(PineconeConfig.class);
3032
PineconeConnection connectionMock = mock(PineconeConnection.class);
3133

3234
VectorServiceGrpc.VectorServiceBlockingStub stubMock = mock(VectorServiceGrpc.VectorServiceBlockingStub.class);
@@ -36,7 +38,7 @@ public static void setUp() throws IOException, InterruptedException {
3638
when(connectionMock.getAsyncStub()).thenReturn(asyncStubMock);
3739

3840
index = new Index(connectionMock, "some-index-name");
39-
asyncIndex = new AsyncIndex(connectionMock, "some-index-name");
41+
asyncIndex = new AsyncIndex(config, connectionMock, "some-index-name");
4042
}
4143

4244
@Test

src/main/java/io/pinecone/clients/AsyncIndex.java

Lines changed: 199 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,32 @@
55
import com.google.common.util.concurrent.MoreExecutors;
66
import com.google.protobuf.Struct;
77
import io.pinecone.commons.IndexInterface;
8+
import io.pinecone.configs.PineconeConfig;
89
import io.pinecone.configs.PineconeConnection;
910
import io.pinecone.exceptions.PineconeValidationException;
1011
import io.pinecone.proto.*;
12+
import io.pinecone.proto.DeleteRequest;
13+
import io.pinecone.proto.DescribeIndexStatsRequest;
14+
import io.pinecone.proto.FetchResponse;
15+
import io.pinecone.proto.ListResponse;
16+
import io.pinecone.proto.QueryRequest;
17+
import io.pinecone.proto.QueryResponse;
18+
import io.pinecone.proto.UpdateRequest;
19+
import io.pinecone.proto.UpsertRequest;
20+
import io.pinecone.proto.UpsertResponse;
1121
import io.pinecone.unsigned_indices_model.QueryResponseWithUnsignedIndices;
1222
import io.pinecone.unsigned_indices_model.VectorWithUnsignedIndices;
23+
import okhttp3.OkHttpClient;
24+
import org.openapitools.db_data.client.ApiClient;
25+
import org.openapitools.db_data.client.ApiException;
26+
import org.openapitools.db_data.client.Configuration;
27+
import org.openapitools.db_data.client.api.BulkOperationsApi;
28+
import org.openapitools.db_data.client.model.*;
1329

1430
import java.util.List;
1531

32+
import static io.pinecone.clients.Pinecone.buildOkHttpClient;
33+
1634

1735
/**
1836
* A client for interacting with a Pinecone index via GRPC asynchronously. Allows for upserting, querying, fetching, updating, and deleting vectors.
@@ -38,6 +56,7 @@ public class AsyncIndex implements IndexInterface<ListenableFuture<UpsertRespons
3856
private final PineconeConnection connection;
3957
private final VectorServiceGrpc.VectorServiceFutureStub asyncStub;
4058
private final String indexName;
59+
BulkOperationsApi bulkOperations;
4160

4261
/**
4362
* Constructs an {@link AsyncIndex} instance for interacting with a Pinecone index.
@@ -55,14 +74,24 @@ public class AsyncIndex implements IndexInterface<ListenableFuture<UpsertRespons
5574
* @param indexName The name of the index to interact with. The index host will be automatically resolved.
5675
* @throws PineconeValidationException if the connection object is null.
5776
*/
58-
public AsyncIndex(PineconeConnection connection, String indexName) {
77+
public AsyncIndex(PineconeConfig config, PineconeConnection connection, String indexName) {
5978
if (connection == null) {
6079
throw new PineconeValidationException("Pinecone connection object cannot be null.");
6180
}
6281

6382
this.indexName = indexName;
6483
this.connection = connection;
6584
this.asyncStub = connection.getAsyncStub();
85+
86+
OkHttpClient customOkHttpClient = config.getCustomOkHttpClient();
87+
ApiClient apiClient = (customOkHttpClient != null) ? new ApiClient(customOkHttpClient) : new ApiClient(buildOkHttpClient(config.getProxyConfig()));
88+
apiClient.setApiKey(config.getApiKey());
89+
apiClient.setUserAgent(config.getUserAgent());
90+
apiClient.addDefaultHeader("X-Pinecone-Api-Version", Configuration.VERSION);
91+
92+
this.bulkOperations = new BulkOperationsApi(apiClient);
93+
String protocol = config.isTLSEnabled() ? "https://" : "http://";
94+
bulkOperations.setCustomBaseUrl(protocol + config.getHost());
6695
}
6796

6897
/**
@@ -1039,6 +1068,175 @@ public ListenableFuture<ListResponse> list(String namespace, String prefix, Stri
10391068
return asyncStub.list(listRequest);
10401069
}
10411070

1071+
/**
1072+
* <p>Initiates an asynchronous import of vectors from object storage into a specified index.</p>
1073+
*
1074+
* <p>The method constructs a {@link StartImportRequest} using the provided URI for the data and optional
1075+
* storage integration ID. It also allows for specifying how to respond to errors during the import process
1076+
* through the {@link ImportErrorMode}. The import operation is then initiated via a call to the
1077+
* underlying {@link BulkOperationsApi}.</p>
1078+
*
1079+
* <p>Example:
1080+
* <pre>{@code
1081+
* import org.openapitools.db_data.client.ApiException;
1082+
* import org.openapitools.db_data.client.model.ImportErrorMode;
1083+
*
1084+
* ...
1085+
*
1086+
* String uri = "s3://path/to/file.parquet";
1087+
* String integrationId = "123-456-789";
1088+
* StartImportResponse response = asyncIndex.startImport(uri, integrationId, ImportErrorMode.OnErrorEnum.CONTINUE);
1089+
* }</pre>
1090+
*
1091+
* @param uri The URI prefix under which the data to import is available.
1092+
* @param integrationId The ID of the storage integration to access the data. Can be null or empty.
1093+
* @param errorMode Indicates how to respond to errors during the import process. Can be null.
1094+
* @return {@link StartImportResponse} containing the details of the initiated import operation.
1095+
* @throws ApiException if there are issues processing the request or communicating with the server.
1096+
* This includes network issues, server errors, or serialization issues with the request or response.
1097+
*/
1098+
public StartImportResponse startImport(String uri, String integrationId, ImportErrorMode.OnErrorEnum errorMode) throws ApiException {
1099+
StartImportRequest importRequest = new StartImportRequest();
1100+
importRequest.setUri(uri);
1101+
if(integrationId != null && !integrationId.isEmpty()) {
1102+
importRequest.setIntegrationId(integrationId);
1103+
}
1104+
if(errorMode != null) {
1105+
ImportErrorMode importErrorMode = new ImportErrorMode().onError(errorMode);
1106+
importRequest.setErrorMode(importErrorMode);
1107+
}
1108+
1109+
return bulkOperations.startBulkImport(importRequest);
1110+
}
1111+
1112+
/**
1113+
* <p>Lists all recent and ongoing import operations for the specified index with default limit and pagination.</p>
1114+
*
1115+
* <p>The method constructs a request to fetch a list of import operations, limited by the default value set to 100
1116+
* number of operations to return per page. The pagination token is set to null as well by default.</p>
1117+
*
1118+
*
1119+
* <p>Example:
1120+
* <pre>{@code
1121+
* import org.openapitools.db_data.client.ApiException;
1122+
* import org.openapitools.db_data.client.model.ListImportsResponse;
1123+
*
1124+
* ...
1125+
*
1126+
* ListImportsResponse response = asyncIndex.listImports();
1127+
* }</pre>
1128+
*
1129+
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
1130+
* @throws ApiException if there are issues processing the request or communicating with the server.
1131+
* This includes network issues, server errors, or serialization issues with the request or response.
1132+
*/
1133+
public ListImportsResponse listImports() throws ApiException {
1134+
return listImports(100, null);
1135+
}
1136+
1137+
/**
1138+
* <p>Lists all recent and ongoing import operations for the specified index based on limit.</p>
1139+
*
1140+
* <p>The method constructs a request to fetch a list of import operations, limited by the specified
1141+
* maximum number of operations to return per page. The pagination token is set to null by default.</p>
1142+
*
1143+
*
1144+
* <p>Example:
1145+
* <pre>{@code
1146+
* import org.openapitools.db_data.client.ApiException;
1147+
* import org.openapitools.db_data.client.model.ListImportsResponse;
1148+
*
1149+
* ...
1150+
* int limit = 10;
1151+
* ListImportsResponse response = asyncIndex.listImports(limit);
1152+
* }</pre>
1153+
*
1154+
* @param limit The maximum number of operations to return per page. Default is 100.
1155+
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
1156+
* @throws ApiException if there are issues processing the request or communicating with the server.
1157+
* This includes network issues, server errors, or serialization issues with the request or response.
1158+
*/
1159+
public ListImportsResponse listImports(Integer limit) throws ApiException {
1160+
return listImports(limit, null);
1161+
}
1162+
1163+
/**
1164+
* <p>Lists all recent and ongoing import operations for the specified index.</p>
1165+
*
1166+
* <p>The method constructs a request to fetch a list of import operations, limited by the specified
1167+
* maximum number of operations to return per page. The pagination token allows for
1168+
* deterministic pagination through the list of import operations.</p>
1169+
*
1170+
* <p>Example:
1171+
* <pre>{@code
1172+
* import org.openapitools.db_data.client.ApiException;
1173+
* import org.openapitools.db_data.client.model.ListImportsResponse;
1174+
*
1175+
* ...
1176+
* int limit = 10;
1177+
* String paginationToken = "some-pagination-token";
1178+
* ListImportsResponse response = asyncIndex.listImports(limit, paginationToken);
1179+
* }</pre>
1180+
*
1181+
* @param limit The maximum number of operations to return per page. Default is 100.
1182+
* @param paginationToken The token to continue a previous listing operation. Can be null or empty.
1183+
* @return {@link ListImportsResponse} containing the list of recent and ongoing import operations.
1184+
* @throws ApiException if there are issues processing the request or communicating with the server.
1185+
* This includes network issues, server errors, or serialization issues with the request or response.
1186+
*/
1187+
public ListImportsResponse listImports(Integer limit, String paginationToken) throws ApiException {
1188+
return bulkOperations.listBulkImports(limit, paginationToken);
1189+
}
1190+
1191+
/**
1192+
* <p>Retrieves detailed information about a specific import operation using its unique identifier.</p>
1193+
*
1194+
* <p>The method constructs a request to fetch details of the specified import operation by its ID,
1195+
* allowing users to monitor the status and results of the import process.</p>
1196+
*
1197+
* <p>Example:
1198+
* <pre>{@code
1199+
* import org.openapitools.db_data.client.ApiException;
1200+
* import org.openapitools.db_data.client.model.ImportModel;
1201+
*
1202+
* ...
1203+
*
1204+
* String importId = "1";
1205+
* ImportModel importDetails = asyncIndex.describeImport(importId);
1206+
* }</pre>
1207+
*
1208+
* @param id The unique identifier for the import operation.
1209+
* @return {@link ImportModel} containing details of the specified import operation.
1210+
* @throws ApiException if there are issues processing the request or communicating with the server.
1211+
* This includes network issues, server errors, or serialization issues with the request or response.
1212+
*/
1213+
public ImportModel describeImport(String id) throws ApiException {
1214+
return bulkOperations.describeBulkImport(id);
1215+
}
1216+
1217+
/**
1218+
* <p>Attempts to cancel an ongoing import operation using its unique identifier.</p>
1219+
*
1220+
* <p>The method issues a request to cancel the specified import operation if it has not yet finished.
1221+
* If the operation is already completed, the method has no effect.</p>
1222+
*
1223+
* <p>Example:
1224+
* <pre>{@code
1225+
* import org.openapitools.db_data.client.ApiException;
1226+
*
1227+
* ...
1228+
* String importId = "2";
1229+
* asyncIndex.cancelImport(importId);
1230+
* }</pre>
1231+
*
1232+
* @param id The unique identifier for the import operation to cancel.
1233+
* @throws ApiException if there are issues processing the request or communicating with the server.
1234+
* This includes network issues, server errors, or serialization issues with the request or response.
1235+
*/
1236+
public void cancelImport(String id) throws ApiException {
1237+
bulkOperations.cancelBulkImport(id);
1238+
}
1239+
10421240
/**
10431241
* {@inheritDoc}
10441242
* Closes the current index connection gracefully, releasing any resources associated with it. This method should

src/main/java/io/pinecone/clients/Pinecone.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,7 @@ public AsyncIndex getAsyncIndexConnection(String indexName) throws PineconeValid
873873

874874
config.setHost(getIndexHost(indexName));
875875
PineconeConnection connection = getConnection(indexName);
876-
return new AsyncIndex(connection, indexName);
876+
return new AsyncIndex(config, connection, indexName);
877877
}
878878

879879
/**

0 commit comments

Comments
 (0)