Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 113 additions & 4 deletions meta/src/meta/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@
%nonterm gnf_column logic.GNFColumn
%nonterm gnf_column_path Sequence[String]
%nonterm gnf_columns Sequence[logic.GNFColumn]
%nonterm named_column logic.NamedColumn
%nonterm relation_keys Sequence[logic.NamedColumn]
%nonterm target_relation logic.TargetRelation
%nonterm non_cdc_relations Sequence[logic.TargetRelation]
%nonterm cdc_inserts Sequence[logic.TargetRelation]
%nonterm cdc_deletes Sequence[logic.TargetRelation]
%nonterm relation_body logic.TargetRelations
%nonterm target_relations logic.TargetRelations
%nonterm csv_config logic.CSVConfig
%nonterm csv_data logic.CSVData
%nonterm csv_locator_inline_data String
Expand Down Expand Up @@ -1107,13 +1115,58 @@ csv_asof
: "(" "asof" STRING ")"

csv_data
: "(" "csv_data" csvlocator csv_config gnf_columns csv_asof ")"
construct: $$ = logic.CSVData(locator=$3, config=$4, columns=$5, asof=$6)
: "(" "csv_data" csvlocator csv_config gnf_columns? target_relations? csv_asof ")"
construct: $$ = construct_csv_data($3, $4, $5, $6, $7)
deconstruct:
$3: logic.CSVLocator = $$.locator
$4: logic.CSVConfig = $$.config
$5: Sequence[logic.GNFColumn] = $$.columns
$6: String = $$.asof
$5: Optional[Sequence[logic.GNFColumn]] = deconstruct_csv_data_columns_optional($$)
$6: Optional[logic.TargetRelations] = deconstruct_csv_data_relations_optional($$)
$7: String = $$.asof

named_column
: "(" "column" STRING type ")"
construct: $$ = logic.NamedColumn(name=$3, type=$4)
deconstruct:
$3: String = $$.name
$4: logic.Type = $$.type

relation_keys
: "(" "keys" named_column* ")"

target_relation
: "(" "relation" relation_id named_column* ")"
construct: $$ = logic.TargetRelation(target_id=$3, values=$4)
deconstruct:
$3: logic.RelationId = $$.target_id
$4: Sequence[logic.NamedColumn] = $$.values

non_cdc_relations
: target_relation*

cdc_inserts
: "(" "inserts" target_relation* ")"

cdc_deletes
: "(" "deletes" target_relation* ")"

relation_body
: non_cdc_relations
construct: $$ = construct_non_cdc_relations($1)
deconstruct if builtin.has_proto_field($$, 'plain'):
$1: Sequence[logic.TargetRelation] = $$.plain.targets
| cdc_inserts cdc_deletes
construct: $$ = construct_cdc_relations($1, $2)
deconstruct if builtin.has_proto_field($$, 'cdc'):
$1: Sequence[logic.TargetRelation] = $$.cdc.inserts
$2: Sequence[logic.TargetRelation] = $$.cdc.deletes

target_relations
: "(" "relations" relation_keys relation_body ")"
construct: $$ = construct_relations($3, $4)
deconstruct:
$3: Sequence[logic.NamedColumn] = $$.keys
$4: logic.TargetRelations = $$

csv_locator_paths
: "(" "paths" STRING* ")"
Expand Down Expand Up @@ -1473,6 +1526,62 @@ def _try_extract_value_string_list(value: Optional[logic.Value]) -> Optional[Seq
return None


def construct_non_cdc_relations(
targets: Sequence[logic.TargetRelation],
) -> logic.TargetRelations:
return logic.TargetRelations(
keys=list[logic.NamedColumn](),
plain=logic.PlainTargets(targets=targets),
)


def construct_cdc_relations(
inserts: Sequence[logic.TargetRelation],
deletes: Sequence[logic.TargetRelation],
) -> logic.TargetRelations:
return logic.TargetRelations(
keys=list[logic.NamedColumn](),
cdc=logic.CdcTargets(inserts=inserts, deletes=deletes),
)


def construct_relations(
keys: Sequence[logic.NamedColumn],
body: logic.TargetRelations,
) -> logic.TargetRelations:
if builtin.has_proto_field(body, "plain"):
return logic.TargetRelations(keys=keys, plain=body.plain)
return logic.TargetRelations(keys=keys, cdc=body.cdc)


def construct_csv_data(
locator: logic.CSVLocator,
config: logic.CSVConfig,
columns_opt: Optional[Sequence[logic.GNFColumn]],
relations_opt: Optional[logic.TargetRelations],
asof: String,
) -> logic.CSVData:
return logic.CSVData(
locator=locator,
config=config,
columns=builtin.unwrap_option_or(columns_opt, list[logic.GNFColumn]()),
asof=asof,
relations=relations_opt,
)


def deconstruct_csv_data_columns_optional(msg: logic.CSVData) -> Optional[Sequence[logic.GNFColumn]]:
if builtin.has_proto_field(msg, "relations"):
return builtin.none()
return builtin.some(msg.columns)


def deconstruct_csv_data_relations_optional(msg: logic.CSVData) -> Optional[logic.TargetRelations]:
if builtin.has_proto_field(msg, "relations"):
return builtin.some(builtin.unwrap_option(msg.relations))
return builtin.none()


def construct_csv_config(
config_dict: Sequence[Tuple[String, logic.Value]],
storage_integration_opt: Optional[Sequence[Tuple[String, logic.Value]]],
Expand Down
35 changes: 35 additions & 0 deletions proto/relationalai/lqp/v1/logic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,46 @@ message StorageIntegration {
string s3_secret_access_key = 5;
}

// A single named column with its type. Used to describe both shared key columns and
// per-relation value columns in the generalized `TargetRelations` loading construct.
message NamedColumn {
string name = 1; // Column name (e.g. "src"); special name "METADATA$KEY" => derived hash
Type type = 2; // Column type
}

// One target relation: the shared keys plus this relation's own (possibly empty) value columns.
message TargetRelation {
RelationId target_id = 1; // Output relation path
repeated NamedColumn values = 2; // Value columns for this relation (may be empty)
}

// Plain (non-CDC) load: each input row becomes a tuple in every target relation.
message PlainTargets {
repeated TargetRelation targets = 1; // Target relations (each row feeds all of them)
}

// CDC load: input rows are routed by METADATA$ACTION into insert and delete deltas.
message CdcTargets {
repeated TargetRelation inserts = 1; // INSERT-action rows feed these
repeated TargetRelation deletes = 2; // DELETE-action rows feed these
}

// Generalized loading: shared key columns plus the target relations, loaded either as a
// plain snapshot or as CDC insert/delete deltas. The two modes are mutually exclusive.
message TargetRelations {
repeated NamedColumn keys = 1; // Shared key columns (name "METADATA$KEY" => derived hash)
oneof body {
PlainTargets plain = 2;
CdcTargets cdc = 3;
}
}

message CSVData {
CSVLocator locator = 1;
CSVConfig config = 2;
repeated GNFColumn columns = 3;
string asof = 4; // Blob storage timestamp for freshness requirements
optional TargetRelations relations = 5; // If present, generalized loading; mutually exclusive with columns
}

message CSVLocator {
Expand Down
Loading
Loading