diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c635d2be3..02c62c83e 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -92,7 +92,7 @@ jobs: fail-fast: false matrix: WEAVIATE_VERSION: - ["1.32.24", "1.33.11", "1.34.7", "1.35.2", "1.36.9", "1.37.2"] + ["1.32.24", "1.33.11", "1.34.7", "1.35.2", "1.36.9", "1.37.7"] steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index 33eda3402..5deae2d25 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -45,7 +45,7 @@ public enum Version { V134(1, 34, 7), V135(1, 35, 2), V136(1, 36, 9), - V137(1, 37, 2); + V137(1, 37, 7); public final SemanticVersion semver; @@ -228,6 +228,13 @@ public Builder withFilesystemBackup(String fsPath) { return this; } + public Builder withExportPath(String fsPath) { + withFilesystemBackup("/tmp/backups"); + environment.put("EXPORT_ENABLED", "true"); + environment.put("EXPORT_DEFAULT_PATH", fsPath); + return this; + } + public Builder withAdminUsers(String... admins) { adminUsers.addAll(Arrays.asList(admins)); return this; diff --git a/src/it/java/io/weaviate/integration/CollectionsITest.java b/src/it/java/io/weaviate/integration/CollectionsITest.java index 927b2ac05..983d4b24d 100644 --- a/src/it/java/io/weaviate/integration/CollectionsITest.java +++ b/src/it/java/io/weaviate/integration/CollectionsITest.java @@ -18,9 +18,9 @@ import io.weaviate.client6.v1.api.collections.Quantization; import io.weaviate.client6.v1.api.collections.ReferenceProperty; import io.weaviate.client6.v1.api.collections.Replication; +import io.weaviate.client6.v1.api.collections.Replication.AsyncReplicationConfig; import io.weaviate.client6.v1.api.collections.TextAnalyzer; import io.weaviate.client6.v1.api.collections.Tokenization; -import io.weaviate.client6.v1.api.collections.Replication.AsyncReplicationConfig; import io.weaviate.client6.v1.api.collections.VectorConfig; import io.weaviate.client6.v1.api.collections.VectorIndex; import io.weaviate.client6.v1.api.collections.config.PropertyIndexType; @@ -464,7 +464,7 @@ public void testTextAnalyzer() throws Exception { @Test public void test_asyncReplicationConfig() throws IOException { - Weaviate.Version.latest().orSkip(); + Weaviate.Version.V137.orSkip(); // Arrange var nsThings = ns("Things"); @@ -496,10 +496,10 @@ public void test_asyncReplicationConfig() throws IOException { .extracting(CollectionConfig::replication) .extracting(Replication::asyncReplicationConfig) .returns(1, AsyncReplicationConfig::hashTreeHeight) - .returns(2, AsyncReplicationConfig::maxWorkers) + .returns(null, AsyncReplicationConfig::maxWorkers) // Deprecated .returns(3, AsyncReplicationConfig::frequencyMillis) .returns(4, AsyncReplicationConfig::frequencyMillisWhilePropagating) - .returns(5, AsyncReplicationConfig::aliveNodesCheckingFrequencyMillis) + .returns(null, AsyncReplicationConfig::aliveNodesCheckingFrequencyMillis) .returns(6, AsyncReplicationConfig::loggingFrequencySeconds) .returns(7, AsyncReplicationConfig::diffBatchSize) .returns(8, AsyncReplicationConfig::diffPerNodeTimeoutSeconds) diff --git a/src/it/java/io/weaviate/integration/ExportITest.java b/src/it/java/io/weaviate/integration/ExportITest.java new file mode 100644 index 000000000..37f4772e9 --- /dev/null +++ b/src/it/java/io/weaviate/integration/ExportITest.java @@ -0,0 +1,84 @@ +package io.weaviate.integration; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.weaviate.ConcurrentTest; +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.export.Export; +import io.weaviate.client6.v1.api.export.ExportStatus; +import io.weaviate.client6.v1.api.export.ShardExportProgress; +import io.weaviate.containers.Weaviate; + +public class ExportITest extends ConcurrentTest { + private static final WeaviateClient client = Weaviate.custom() + .withExportPath("/tmp/export").build() + .getClient(); + + @BeforeClass + public static void __() { + Weaviate.Version.V137.orSkip(); + } + + @Test + public void test_lifecycle() throws IOException, TimeoutException { + // Arrange + String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"); + String exportId = ns("export_1").toLowerCase(); + String backend = "filesystem"; + + var collectionA = client.collections.create(nsA); + var collectionB = client.collections.create(nsB); + var collectionC = client.collections.create(nsC); + + // Insert some data + for (var c : List.of(collectionA, collectionB, collectionC)) { + var resp = c.data.insertMany(Map.of(), Map.of(), Map.of()); + Assertions.assertThat(resp.errors()).isEmpty(); + } + + // Act: start export + var started = client.export.create(exportId, backend, + export -> export + .includeCollections(nsA, nsB)); + + // Assert + Assertions.assertThat(started) + .as("created export operation") + .returns(exportId, Export::id) + .returns(backend, Export::backend) + .returns(ExportStatus.STARTED, Export::status) + .returns(null, Export::error) + .extracting(Export::includesCollections, InstanceOfAssertFactories.list(String.class)) + .containsOnly(nsA, nsB); + + // Act: await export competion + var completed = started.waitForCompletion(client); + + // Assert + Assertions.assertThat(completed) + .as("await export completion") + .returns(exportId, Export::id) + .returns(backend, Export::backend) + .returns(ExportStatus.SUCCESS, Export::status) + .returns(null, Export::error) + .extracting(Export::includesCollections, InstanceOfAssertFactories.list(String.class)) + .containsOnly(nsA, nsB); + + Assertions.assertThat(completed) + .extracting(Export::shardStatus, InstanceOfAssertFactories.map(String.class, + Object.class)) + .allSatisfy((__, shards) -> { + Assertions.assertThat(shards) + .asInstanceOf(InstanceOfAssertFactories.map(String.class, ShardExportProgress.class)) + .isNotEmpty(); + }); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java index 166b582f9..dedf64e00 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java @@ -7,6 +7,7 @@ import io.weaviate.client6.v1.api.backup.WeaviateBackupClient; import io.weaviate.client6.v1.api.cluster.WeaviateClusterClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; +import io.weaviate.client6.v1.api.export.WeaviateExportClient; import io.weaviate.client6.v1.api.rbac.groups.WeaviateGroupsClient; import io.weaviate.client6.v1.api.rbac.roles.WeaviateRolesClient; import io.weaviate.client6.v1.api.rbac.users.WeaviateUsersClient; @@ -42,6 +43,9 @@ public class WeaviateClient implements AutoCloseable { /** Client for {@code /backups} endpoints for managing backups. */ public final WeaviateBackupClient backup; + /** Client for {@code /export} endpoints for managing exports. */ + public final WeaviateExportClient export; + /** * Client for {@code /authz/roles} endpoints for managing RBAC roles. */ @@ -124,6 +128,7 @@ public WeaviateClient(Config config) { this.grpcTransport = new DefaultGrpcTransport(grpcOpt); this.alias = new WeaviateAliasClient(restTransport); this.backup = new WeaviateBackupClient(restTransport); + this.export = new WeaviateExportClient(restTransport); this.tokenize = new WeaviateTokenizeClient(restTransport); this.collections = new WeaviateCollectionsClient(restTransport, grpcTransport); this.roles = new WeaviateRolesClient(restTransport); diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java index 0992212ad..f89ca9b4a 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java @@ -9,6 +9,7 @@ import io.weaviate.client6.v1.api.cluster.WeaviateClusterClientAsync; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClientAsync; +import io.weaviate.client6.v1.api.export.WeaviateExportClientAsync; import io.weaviate.client6.v1.api.rbac.groups.WeaviateGroupsClientAsync; import io.weaviate.client6.v1.api.rbac.roles.WeaviateRolesClientAsync; import io.weaviate.client6.v1.api.rbac.users.WeaviateUsersClientAsync; @@ -56,6 +57,9 @@ public class WeaviateClientAsync implements AutoCloseable { /** Client for {@code /backups} endpoints for managing backups. */ public final WeaviateBackupClientAsync backup; + /** Client for {@code /export} endpoints for managing exports. */ + public final WeaviateExportClientAsync export; + /** * Client for {@code /nodes} and {@code /replication} endpoints * for managing replication and sharding. @@ -128,6 +132,7 @@ public WeaviateClientAsync(Config config) { this.grpcTransport = new DefaultGrpcTransport(grpcOpt); this.alias = new WeaviateAliasClientAsync(restTransport); this.backup = new WeaviateBackupClientAsync(restTransport); + this.export = new WeaviateExportClientAsync(restTransport); this.tokenize = new WeaviateTokenizeClientAsync(restTransport); this.roles = new WeaviateRolesClientAsync(restTransport); this.groups = new WeaviateGroupsClientAsync(restTransport); diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java b/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java index 3b9289149..eadac09b6 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/Replication.java @@ -93,7 +93,11 @@ public Builder hashTreeHeight(int hashTreeHeight) { return this; } - /** Maximum number of async replication workers. */ + /** + * Maximum number of async replication workers. + * + * @deprecated This paramter should be controled server-side. + */ public Builder maxWorkers(int maxWorkers) { this.maxWorkers = maxWorkers; return this; @@ -117,7 +121,11 @@ public Builder frequencyMillisWhilePropagating(int frequencyMillisWhilePropagati return this; } - /** Interval in milliseconds at which liveness of target nodes is checked." */ + /** + * Interval in milliseconds at which liveness of target nodes is checked. + * + * @deprecated This parameter should be controled server-side. + */ public Builder aliveNodesCheckingFrequencyMillis(int aliveNodesCheckingFrequencyMillis) { this.aliveNodesCheckingFrequencyMillis = aliveNodesCheckingFrequencyMillis; return this; diff --git a/src/main/java/io/weaviate/client6/v1/api/export/CancelExportRequest.java b/src/main/java/io/weaviate/client6/v1/api/export/CancelExportRequest.java new file mode 100644 index 000000000..91750157e --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/CancelExportRequest.java @@ -0,0 +1,18 @@ +package io.weaviate.client6.v1.api.export; + +import java.util.Collections; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record CancelExportRequest(String exportId, String backend) { + + public static Endpoint _ENDPOINT = SimpleEndpoint.sideEffect( + __ -> "DELETE", + request -> { + var cancel = (CancelExportRequest) request; + return "/export/" + cancel.backend + "/" + cancel.exportId; + }, + request -> Collections.emptyMap()) + .allowStatus(409); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/CreateExportRequest.java b/src/main/java/io/weaviate/client6/v1/api/export/CreateExportRequest.java new file mode 100644 index 000000000..336b3e3f7 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/CreateExportRequest.java @@ -0,0 +1,92 @@ +package io.weaviate.client6.v1.api.export; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record CreateExportRequest(ExportCreate body, String backend) { + + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + request -> "POST", + request -> "/export/" + request.backend, + request -> Collections.emptyMap(), + request -> JSON.serialize(request.body), + (statusCode, response) -> JSON.deserialize(response, Export.class)); + + public static record ExportCreate( + @SerializedName("id") String id, + @SerializedName("file_format") FileFormat fileFormat, + @SerializedName("include") List includeCollections, + @SerializedName("exclude") List excludeCollections) { + + public static ExportCreate of(String exportId) { + return of(exportId, ObjectBuilder.identity()); + } + + public static ExportCreate of(String exportId, Function> fn) { + return fn.apply(new Builder(exportId)).build(); + } + + public ExportCreate(Builder builder) { + this( + builder.exportId, + builder.fileFormat, + builder.includeCollections, + builder.excludeCollections); + } + + public static class Builder implements ObjectBuilder { + private final String exportId; + + private FileFormat fileFormat = FileFormat.PARQUET; + private final List includeCollections = new ArrayList<>(); + private final List excludeCollections = new ArrayList<>(); + + public Builder(String exportId) { + this.exportId = exportId; + } + + /** Collection that should be included in the backup. */ + public Builder includeCollections(String... includeCollections) { + return includeCollections(Arrays.asList(includeCollections)); + } + + /** Collection that should be included in the backup. */ + public Builder includeCollections(List includeCollections) { + this.includeCollections.addAll(includeCollections); + return this; + } + + /** Collection that should be excluded from the backup. */ + public Builder excludeCollections(String... excludeCollections) { + return excludeCollections(Arrays.asList(excludeCollections)); + } + + /** Collection that should be excluded from the backup. */ + public Builder excludeCollections(List excludeCollections) { + this.excludeCollections.addAll(excludeCollections); + return this; + } + + /** Export file format. */ + public Builder fileFormat(FileFormat fileFormat) { + this.fileFormat = fileFormat; + return this; + } + + @Override + public ExportCreate build() { + return new ExportCreate(this); + } + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/Export.java b/src/main/java/io/weaviate/client6/v1/api/export/Export.java new file mode 100644 index 000000000..c03765b55 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/Export.java @@ -0,0 +1,200 @@ +package io.weaviate.client6.v1.api.export; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.WeaviateClientAsync; +import io.weaviate.client6.v1.internal.ObjectBuilder; + +public record Export( + /** Export ID. */ + @SerializedName("id") String id, + /** Path to export in the backend storage. */ + @SerializedName("path") String path, + /** Export storage backend. */ + @SerializedName("backend") String backend, + /** Collections included in the export. */ + @SerializedName("classes") List includesCollections, + /** Export creation status. */ + @SerializedName("status") ExportStatus status, + /** Time at which the export creation. */ + @SerializedName("startedAt") OffsetDateTime startedAt, + + /** + * Time at which the export was completed, successfully or otherwise. + * Null unless export has completed. + */ + @SerializedName("completedAt") OffsetDateTime completedAt, + /** + * Export creation error. + * Null unless export has failed. + */ + @SerializedName("error") String error, + /** + * Progress reports for individual shards within each collection. + * Keyed by the name of the collection and then by shard ID. + */ + @SerializedName("shardStatus") Map> shardStatus, + /** + * Export duration in milliseconds. + * Null unless export has completed. + */ + @SerializedName("tookInMs") Integer tookMs) { + + /** + * Block until the export has been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * + * @throws TimeoutException in case the wait times out without reaching + * ExportStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export waitForCompletion(WeaviateClient client) throws IOException, TimeoutException { + return waitForStatus(client, ExportStatus.SUCCESS); + } + + /** + * Block until the export has been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @param fn Lambda expression for optional parameters. + * + * @throws TimeoutException in case the wait times out without reaching + * ExportStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export waitForCompletion(WeaviateClient client, Function> fn) + throws IOException, TimeoutException { + return waitForStatus(client, ExportStatus.SUCCESS, fn); + } + + /** + * Block until the export operation reaches a certain status. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @param status Target status. + * + * @throws TimeoutException in case the wait times out without reaching + * the target status. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export waitForStatus(WeaviateClient client, ExportStatus status) throws IOException, TimeoutException { + return waitForStatus(client, status, ObjectBuilder.identity()); + } + + /** + * Block until the export operation reaches a certain status. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @param status Target status. + * @param fn Lambda expression for optional parameters. + * + * @throws TimeoutException in case the wait times out without reaching + * the target status. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export waitForStatus(WeaviateClient client, ExportStatus status, + Function> fn) throws IOException, TimeoutException { + return new Waiter(this, WaitOptions.of(fn)) + .waitForStatus(status, () -> client.export.getCreateStatus(id, backend)); + } + + /** + * Cancel export creation. + * + *

+ * This method cannot be called to cancel export restore. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void cancel(WeaviateClient client) throws IOException { + client.export.cancel(id(), backend()); + } + + /** + * Poll until export's been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * + */ + public CompletableFuture waitForCompletion(WeaviateClientAsync client) { + return waitForStatus(client, ExportStatus.SUCCESS); + } + + /** + * Poll until export's been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture waitForCompletion(WeaviateClientAsync client, + Function> fn) { + return waitForStatus(client, ExportStatus.SUCCESS, fn); + } + + /** + * Poll until export reaches a certain status or the wait times out. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param status Target status. + */ + public CompletableFuture waitForStatus(WeaviateClientAsync client, ExportStatus status) { + return waitForStatus(client, status, ObjectBuilder.identity()); + } + + /** + * Poll until export reaches a certain status or the wait times out. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param status Target status. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture waitForStatus(WeaviateClientAsync client, ExportStatus status, + Function> fn) { + + return new Waiter(this, WaitOptions.of(fn)) + .waitForStatusAsync(status, () -> client.export.getCreateStatus(id, backend)); + } + + /** + * Cancel export creation. + * + *

+ * This method cannot be called to cancel export restore. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + */ + public CompletableFuture cancel(WeaviateClientAsync client) { + return client.export.cancel(id(), backend()); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/ExportStatus.java b/src/main/java/io/weaviate/client6/v1/api/export/ExportStatus.java new file mode 100644 index 000000000..e9e23af30 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/ExportStatus.java @@ -0,0 +1,21 @@ +package io.weaviate.client6.v1.api.export; + +import com.google.gson.annotations.SerializedName; + +public enum ExportStatus { + /** Export creation has begun. */ + @SerializedName("STARTED") + STARTED, + /** Export in progress, data is being transferred. */ + @SerializedName("TRANSFERRING") + TRANSFERRING, + /** Export creation completed successfully. */ + @SerializedName("SUCCESS") + SUCCESS, + /** Export creation failed. */ + @SerializedName("FAILED") + FAILED, + /** Export creation canceled. */ + @SerializedName("CANCELED") + CANCELED; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/FileFormat.java b/src/main/java/io/weaviate/client6/v1/api/export/FileFormat.java new file mode 100644 index 000000000..06cbeab2a --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/FileFormat.java @@ -0,0 +1,8 @@ +package io.weaviate.client6.v1.api.export; + +import com.google.gson.annotations.SerializedName; + +public enum FileFormat { + @SerializedName("parquet") + PARQUET; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/GetCreateStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/export/GetCreateStatusRequest.java new file mode 100644 index 000000000..6b3ebfbcc --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/GetCreateStatusRequest.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.export; + +import java.util.Collections; +import java.util.Optional; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.OptionalEndpoint; + +public record GetCreateStatusRequest(String exportId, String backend) { + public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( + request -> "GET", + request -> "/export/" + request.backend + "/" + request.exportId, + request -> Collections.emptyMap(), + Export.class); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/ShardExportProgress.java b/src/main/java/io/weaviate/client6/v1/api/export/ShardExportProgress.java new file mode 100644 index 000000000..4a0e231bd --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/ShardExportProgress.java @@ -0,0 +1,10 @@ +package io.weaviate.client6.v1.api.export; + +import com.google.gson.annotations.SerializedName; + +public record ShardExportProgress( + @SerializedName("status") ShardExportStatus status, + @SerializedName("objectsExported") Integer objectsExported, + @SerializedName("error") String error, + @SerializedName("skipReason") String skipReason) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/ShardExportStatus.java b/src/main/java/io/weaviate/client6/v1/api/export/ShardExportStatus.java new file mode 100644 index 000000000..146baf7be --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/ShardExportStatus.java @@ -0,0 +1,19 @@ +package io.weaviate.client6.v1.api.export; + +import com.google.gson.annotations.SerializedName; + +/** Export status for an individual collection shard. */ +public enum ShardExportStatus { + /** Export in progress, data is being transferred. */ + @SerializedName("TRANSFERRING") + TRANSFERRING, + /** Export creation completed successfully. */ + @SerializedName("SUCCESS") + SUCCESS, + /** Export creation failed. */ + @SerializedName("FAILED") + FAILED, + /** Shard data will not be included in the snapshot. */ + @SerializedName("SKIPPED") + SKIPPED; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/WaitOptions.java b/src/main/java/io/weaviate/client6/v1/api/export/WaitOptions.java new file mode 100644 index 000000000..cfcb6dfd0 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/WaitOptions.java @@ -0,0 +1,64 @@ +package io.weaviate.client6.v1.api.export; + +import java.time.Duration; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; + +public record WaitOptions(long interval, long timeout) { + private static final long DEFAULT_INTERVAL_MILLIS = 1_000; + private static final long DEFAULT_TIMEOUT_MILLIS = 3600_000; + + public static WaitOptions of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public WaitOptions(Builder builder) { + this(builder.interval, builder.timeout); + } + + public static class Builder implements ObjectBuilder { + private long interval = DEFAULT_INTERVAL_MILLIS; + private long timeout = DEFAULT_TIMEOUT_MILLIS; + + /** Set polling interval. Defaults to 1s. */ + public Builder interval(Duration duration) { + return interval(duration.toMillis()); + } + + /** + * Set polling interval. Defaults to 1s. + * + * @param intervalMillis Polling interval in milliseconds. Minimum 1ms. + */ + public Builder interval(long intervalMillis) { + this.interval = Math.max(intervalMillis, 1); + return this; + } + + /** + * Set wait timeout. Defaults to 1s. + * + * @param duration Wait timeout duration. + */ + public Builder timeout(Duration duration) { + return timeout(duration.toMillis()); + } + + /** + * Set wait timeout. Set this to a negative value + * for the wait to expire immediately. + * + * @param timeoutMillis Wait timeout in milliseconds. + */ + public Builder timeout(long timeoutMillis) { + this.timeout = timeoutMillis; + return this; + } + + @Override + public WaitOptions build() { + return new WaitOptions(this); + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/Waiter.java b/src/main/java/io/weaviate/client6/v1/api/export/Waiter.java new file mode 100644 index 000000000..b018d93cd --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/Waiter.java @@ -0,0 +1,104 @@ +package io.weaviate.client6.v1.api.export; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +final class Waiter { + + private final Export export; + private final WaitOptions wait; + + Waiter(final Export export, WaitOptions wait) { + this.export = export; + this.wait = wait; + } + + Export waitForStatus(final ExportStatus wantStatus, Callable> poll) + throws IOException, TimeoutException { + if (export.error() != null) { + throw new RuntimeException(export.error()); + } + + if (export.status() == wantStatus) { + return export; + } + + final Instant deadline = Instant.now().plusMillis(wait.timeout()); + Export latest = export; + while (!Thread.interrupted()) { + if (Instant.now().isAfter(deadline)) { + throw new TimeoutException("timed out after %s, latest status %s".formatted( + Duration.ofMillis(wait.timeout()).toSeconds(), latest.status())); + } + + try { + var current = poll.call().orElseThrow(); + latest = current; + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (latest.status() == wantStatus) { + return latest; + } else if (isComplete(latest)) { + throw new IllegalStateException("completed with status=%s without reaching %s" + .formatted(latest.status(), wantStatus)); + } + + try { + Thread.sleep(wait.interval()); + } catch (InterruptedException e) { + // TODO: the interrupted state will be cleared on the next while() check + // and then we will simply return the latest state. An absence of an exception + // might be misleading here. What should we do? + Thread.currentThread().interrupt(); + } + } + return latest; + } + + CompletableFuture waitForStatusAsync( + final ExportStatus wantStatus, + Supplier>> poll) { + if (export.status() == wantStatus) { + return CompletableFuture.completedFuture(export); + } + final Instant deadline = Instant.now().plusMillis(wait.timeout()); + return poll.get().thenCompose(latest -> _waitForStatusAsync(wantStatus, latest.orElseThrow(), poll, deadline)); + } + + CompletableFuture _waitForStatusAsync( + final ExportStatus wantStatus, + final Export current, + Supplier>> poll, + final Instant deadline) { + + if (current.status() == wantStatus) { + return CompletableFuture.completedFuture(current); + } + + if (Instant.now().isAfter(deadline)) { + var e = new TimeoutException("timed out after %s, latest status %s".formatted( + Duration.ofMillis(wait.timeout()).toSeconds(), current.status())); + throw new CompletionException(e); + } + + return poll.get().thenComposeAsync( + latest -> _waitForStatusAsync(wantStatus, latest.orElseThrow(), poll, deadline), + CompletableFuture.delayedExecutor(wait.interval(), TimeUnit.MILLISECONDS)); + } + + private boolean isComplete(final Export export) { + return export.status() == ExportStatus.SUCCESS + || export.status() == ExportStatus.FAILED + || export.status() == ExportStatus.CANCELED; + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/WeaviateExportClient.java b/src/main/java/io/weaviate/client6/v1/api/export/WeaviateExportClient.java new file mode 100644 index 000000000..2f12a0e26 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/WeaviateExportClient.java @@ -0,0 +1,95 @@ +package io.weaviate.client6.v1.api.export; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +import io.weaviate.client6.v1.api.WeaviateApiException; +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateExportClient { + private final RestTransport restTransport; + + public WeaviateExportClient(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Start a new export process. + * + * @param exportId Export ID. Must be unique for the backend. + * @param backend Export storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export create(String exportId, String backend) throws IOException { + return create(new CreateExportRequest(CreateExportRequest.ExportCreate.of(exportId), backend)); + } + + /** + * Start a new export process. + * + * @param exportId Export ID. Must be unique for the backend. + * @param backend Export storage backend. + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export create(String exportId, String backend, + Function> fn) + throws IOException { + return create(new CreateExportRequest(CreateExportRequest.ExportCreate.of(exportId, fn), backend)); + } + + /** + * Start a new export process. + * + * @param request Create export request. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Export create(CreateExportRequest request) throws IOException { + return this.restTransport.performRequest(request, CreateExportRequest._ENDPOINT); + } + + /** + * Get export create status. + * + * @param exportId Export ID. + * @param backend Export storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional getCreateStatus(String exportId, String backend) throws IOException { + return this.restTransport.performRequest( + new GetCreateStatusRequest(exportId, backend), GetCreateStatusRequest._ENDPOINT); + } + + /** + * Cancel in-progress export. + * + * @param exportId Export ID. + * @param backend Export storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void cancel(String exportId, String backend) throws IOException { + this.restTransport.performRequest(new CancelExportRequest(exportId, backend), CancelExportRequest._ENDPOINT); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/export/WeaviateExportClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/export/WeaviateExportClientAsync.java new file mode 100644 index 000000000..1e6a5383f --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/export/WeaviateExportClientAsync.java @@ -0,0 +1,69 @@ +package io.weaviate.client6.v1.api.export; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateExportClientAsync { + private final RestTransport restTransport; + + public WeaviateExportClientAsync(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Start a new export process. + * + * @param exportId Export ID. Must be unique for the backend. + * @param backend Export storage backend. + */ + public CompletableFuture create(String exportId, String backend) { + return create(new CreateExportRequest(CreateExportRequest.ExportCreate.of(exportId), backend)); + } + + /** + * Start a new export process. + * + * @param exportId Export ID. Must be unique for the backend. + * @param backend Export storage backend. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture create(String exportId, String backend, + Function> fn) { + return create(new CreateExportRequest(CreateExportRequest.ExportCreate.of(exportId, fn), backend)); + } + + /** + * Start a new export process. + * + * @param request Create export request. + */ + public CompletableFuture create(CreateExportRequest request) { + return this.restTransport.performRequestAsync(request, CreateExportRequest._ENDPOINT); + } + + /** + * Get export create status. + * + * @param exportId Export ID. + * @param backend Export storage backend. + */ + public CompletableFuture> getCreateStatus(String exportId, String backend) { + return this.restTransport.performRequestAsync( + new GetCreateStatusRequest(exportId, backend), GetCreateStatusRequest._ENDPOINT); + } + + /** + * Cancel in-progress export creation. + * + * @param exportId Export ID. + * @param backend Export storage backend. + */ + public CompletableFuture cancel(String exportId, String backend) { + return this.restTransport.performRequestAsync(new CancelExportRequest(exportId, backend), + CancelExportRequest._ENDPOINT); + } +}