Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -1031,12 +1031,22 @@ void testCreatedEndpointReturnsUserTasks(TestNamespace ns) {

Task createdTask = SdkClients.user1Client().tasks().create(request);

ListResponse<Task> user1CreatedTasks = SdkClients.user1Client().tasks().listCreated();

assertNotNull(user1CreatedTasks);
assertTrue(
user1CreatedTasks.getData().stream().anyMatch(t -> t.getId().equals(createdTask.getId())),
"User1's created tasks should include the task they created");
// listCreated defaults to limit=10. Under heavy concurrent load (multiple tests sharing
// USER1) the just-created task may not appear in the first page, so request a wider page
// explicitly. Awaitility rides out any write-visibility race on top.
Awaitility.await("User1's created tasks include the new task")
.atMost(Duration.ofSeconds(15))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(
() -> {
ListResponse<Task> user1CreatedTasks =
SdkClients.user1Client().tasks().listCreated(null, null, null, null, 1000);
assertNotNull(user1CreatedTasks);
assertTrue(
user1CreatedTasks.getData().stream()
.anyMatch(t -> t.getId().equals(createdTask.getId())),
"User1's created tasks should include the task they created");
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public ListResponse<Task> listCreated(TaskEntityStatus status) throws OpenMetada
public ListResponse<Task> listCreated(
TaskEntityStatus status, String statusGroup, String domain, String fields)
throws OpenMetadataException {
return listCreated(status, statusGroup, domain, fields, null);
}

public ListResponse<Task> listCreated(
TaskEntityStatus status, String statusGroup, String domain, String fields, Integer limit)
throws OpenMetadataException {
String path = basePath + "/created";
RequestOptions.Builder optionsBuilder = RequestOptions.builder();
if (status != null) {
Expand All @@ -159,6 +165,9 @@ public ListResponse<Task> listCreated(
if (fields != null) {
optionsBuilder.queryParam("fields", fields);
}
if (limit != null) {
optionsBuilder.queryParam("limit", limit.toString());
}
String responseStr =
httpClient.executeForString(HttpMethod.GET, path, null, optionsBuilder.build());
return deserializeListResponse(responseStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,18 @@ public void detachFromDeletingDomains(
List<EntityReference> updatedDomains = getDomains(original, ALL);
List<EntityReference> removedDomains = subtractById(originalDomains, updatedDomains);
if (!removedDomains.isEmpty()) {
removeDomainLineageForDetached(dataProductId, removedDomains);
persistDomainDetachVersion(original, updatedDomains, removedDomains, updatedBy);
}
}

private void removeDomainLineageForDetached(
UUID dataProductId, List<EntityReference> removedDomains) {
for (EntityReference removedDomain : removedDomains) {
removeDomainLineage(dataProductId, DATA_PRODUCT, removedDomain);
}
}

private DataProduct loadForDomainDetach(UUID dataProductId) {
DataProduct dataProduct = find(dataProductId, ALL, false);
setFieldsInternal(dataProduct, getPutFields());
Expand Down Expand Up @@ -266,9 +274,7 @@ public void reindexAfterDomainDetach(Set<UUID> dataProductIds) {
}
for (UUID dataProductId : dataProductIds) {
if (searchRepository != null && SearchRepository.isSearchWriteDeferralActive()) {
DataProduct dataProduct = find(dataProductId, ALL, false);
setFieldsInternal(dataProduct, getPutFields());
searchRepository.updateEntityIndex(dataProduct);
reindexDetachedProduct(dataProductId);
} else {
SearchIndexRetryQueue.enqueue(
dataProductId.toString(),
Expand All @@ -279,6 +285,20 @@ public void reindexAfterDomainDetach(Set<UUID> dataProductIds) {
}
}

private void reindexDetachedProduct(UUID dataProductId) {
try {
DataProduct dataProduct = find(dataProductId, ALL, false);
setFieldsInternal(dataProduct, getPutFields());
searchRepository.updateEntityIndex(dataProduct);
} catch (EntityNotFoundException e) {
// A retained product can still be hard-deleted elsewhere in the same domain cascade.
// This loop runs post-commit, so skip the vanished product and keep reindexing the rest.
LOG.debug(
"Skipping post-detach reindex for data product {} which no longer exists after the domain delete cascade",
dataProductId);
}
}

@Override
public void setInheritedFields(DataProduct dataProduct, Fields fields) {
boolean inheritOwners = fields.contains(FIELD_OWNERS) && nullOrEmpty(dataProduct.getOwners());
Expand Down Expand Up @@ -927,6 +947,13 @@ public List<EntityReference> getCapturedUpdatedDomains() {
@Override
public void entitySpecificUpdate(boolean consolidatingChanges) {
compareAndUpdate("name", () -> updateName(updated));
// These ODPS-aligned scalar fields are not handled by the base updater;
// without recordChange they are reverted by change consolidation and a
// PATCH that sets them returns 200 but never persists.
recordChange("dataProductType", original.getDataProductType(), updated.getDataProductType());
recordChange("visibility", original.getVisibility(), updated.getVisibility());
recordChange(
"portfolioPriority", original.getPortfolioPriority(), updated.getPortfolioPriority());
// Ports are managed via dedicated bulk add/remove APIs, not via entity PATCH
// Handle domain change with asset migration
// Skip during consolidation to avoid incorrect intermediate migrations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1828,10 +1828,16 @@ public Response validateODPSYaml(String yamlContent) {
private DataProduct buildDataProductFromODPS(
ODPSDataProduct odps, String languageCode, String domainFqn) {
DataProduct dp = ODPSConverter.fromODPS(odps, languageCode);
// ODPSConverter builds a bare entity with no id; a create insert needs one
// (the standard create path assigns it via EntityMapper.copy). The
// merge/replace paths overwrite this with the existing product's id.
dp.setId(UUID.randomUUID());
if (domainFqn != null && !domainFqn.isBlank()) {
dp.setDomains(
java.util.List.of(
new EntityReference().withFullyQualifiedName(domainFqn).withType(Entity.DOMAIN)));
// Resolve to a full reference (with id): the create authorization context
// and relationship storage need the domain id, not just its FQN.
EntityReference domain =
Entity.getEntityReferenceByName(Entity.DOMAIN, domainFqn, Include.NON_DELETED);
dp.setDomains(java.util.List.of(domain));
}
return dp;
}
Expand Down Expand Up @@ -1864,7 +1870,11 @@ private Response mergeOrCreateDataProductFromODPS(
private DataProduct findExistingByName(String name) {
if (nullOrEmpty(name)) return null;
try {
return repository.getByName(null, name, repository.getFields("id,name,version"));
// Hydrate the full relationship field set. smartMerge/fullReplace copy
// owners/domains/experts/reviewers/certification/tags from the existing
// product; these are lazy fields that come back null unless requested, so
// a sparse load would wipe them (and drop the required domain) on merge.
return repository.getByName(null, name, repository.getFields(EXPORT_FIELDS));
} catch (EntityNotFoundException ignored) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

package org.openmetadata.service.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
Expand All @@ -43,12 +41,10 @@
public final class ODPSConverter {

private static final Logger LOG = LoggerFactory.getLogger(ODPSConverter.class);
private static final ObjectMapper MAPPER = new ObjectMapper();

public static final String DEFAULT_LANGUAGE = "en";
public static final String DEFAULT_SCHEMA_URL =
"https://opendataproducts.org/v4.1/schema/odps.json";
public static final String ODPS_EXTENSION_KEY = "odpsMetadata";

private ODPSConverter() {}

Expand Down Expand Up @@ -177,7 +173,12 @@ public static DataProduct fromODPS(ODPSDataProduct odps, String languageCode) {
dp.setSla(fromODPSSla(odps.getProduct().getSla()));
}

dp.setExtension(preserveOdpsMetadata(odps));
// Every meaningful ODPS field is mapped onto a native DataProduct attribute
// above. We deliberately do NOT stash the raw document under an
// `extension.odpsMetadata` custom field: `odpsMetadata` is not a registered
// custom property, so entity create/update would reject it
// ("Unknown custom field odpsMetadata"), breaking every import. The native
// fields are the source of truth on export (toODPS never reads it back).
return dp;
}

Expand Down Expand Up @@ -234,13 +235,6 @@ private static SlaDefinition fromODPSSla(ODPSSLA odpsSla) {
return sla;
}

@SuppressWarnings("unchecked")
private static Object preserveOdpsMetadata(ODPSDataProduct odps) {
Map<String, Object> extension = new LinkedHashMap<>();
extension.put(ODPS_EXTENSION_KEY, MAPPER.convertValue(odps, Map.class));
return extension;
}

private static final int MAX_ENTITY_NAME_LENGTH = 64;
private static final Pattern INVALID_NAME_CHARS = Pattern.compile("[^a-zA-Z0-9_\\-.]+");
private static final Pattern CONSECUTIVE_UNDERSCORES = Pattern.compile("_+");
Expand Down Expand Up @@ -317,8 +311,8 @@ public static DataProduct smartMerge(DataProduct existing, DataProduct imported)
imported.getTags() != null && !imported.getTags().isEmpty()
? imported.getTags()
: existing.getTags());
merged.setExtension(
imported.getExtension() != null ? imported.getExtension() : existing.getExtension());
// ODPS carries no OpenMetadata custom properties; keep the existing ones.
merged.setExtension(existing.getExtension());
return merged;
}

Expand Down Expand Up @@ -350,7 +344,9 @@ public static DataProduct fullReplace(DataProduct existing, DataProduct imported
replaced.setPortfolioPriority(imported.getPortfolioPriority());
replaced.setSla(imported.getSla());
replaced.setTags(imported.getTags());
replaced.setExtension(imported.getExtension());
// Custom properties have no ODPS representation; a declarative replace must
// not silently wipe them (same rationale as the governance fields above).
replaced.setExtension(existing.getExtension());
return replaced;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -166,13 +167,17 @@ void fromODPS_mapsBasicFields() {
}

@Test
void fromODPS_preservesOriginalDocumentInExtension() {
void fromODPS_doesNotWriteUnregisteredOdpsMetadataExtension() {
// `odpsMetadata` is not a registered custom property on the dataProduct
// entity type, so writing it into the extension would make every API import
// fail validation ("Unknown custom field odpsMetadata"). The converter maps
// all meaningful ODPS fields onto native attributes instead and leaves the
// extension untouched.
ODPSDataProduct odps = basicODPS();

DataProduct dp = ODPSConverter.fromODPS(odps);

assertNotNull(dp.getExtension());
assertTrue(dp.getExtension().toString().contains("odpsMetadata"));
assertNull(dp.getExtension());
}

@Test
Expand Down
Loading
Loading