Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions meta/src/meta/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
%nonterm csv_data logic.CSVData
%nonterm csv_locator_inline_data String
%nonterm csv_locator_paths Sequence[String]
%nonterm _storage_integration Sequence[Tuple[String, logic.Value]]
%nonterm csvlocator logic.CSVLocator
%nonterm data logic.Data
%nonterm date logic.DateValue
Expand Down Expand Up @@ -216,6 +217,7 @@
%validator_ignore_completeness BeTreeLocator
%validator_ignore_completeness BeTreeConfig
%validator_ignore_completeness ExportCSVColumns
%validator_ignore_completeness StorageIntegration

%%

Expand Down Expand Up @@ -1127,9 +1129,16 @@ csvlocator
$4: Optional[String] = builtin.decode_string($$.inline_data) if builtin.decode_string($$.inline_data) != "" else None

csv_config
: "(" "csv_config" config_dict ")"
construct: $$ = construct_csv_config($3)
deconstruct: $3: Sequence[Tuple[String, logic.Value]] = deconstruct_csv_config($$)
: "(" "csv_config" config_dict _storage_integration? ")"
construct: $$ = construct_csv_config($3, $4)
deconstruct:
$3: Sequence[Tuple[String, logic.Value]] = deconstruct_csv_config($$)
$4: Optional[Sequence[Tuple[String, logic.Value]]] = deconstruct_csv_storage_integration_optional($$)

_storage_integration
: "(" "storage_integration" config_dict ")"
construct: $$ = $3
deconstruct: $3: Sequence[Tuple[String, logic.Value]] = $$

gnf_column_path
: STRING
Expand Down Expand Up @@ -1464,7 +1473,10 @@ def _try_extract_value_string_list(value: Optional[logic.Value]) -> Optional[Seq
return None


def construct_csv_config(config_dict: Sequence[Tuple[String, logic.Value]]) -> logic.CSVConfig:
def construct_csv_config(
config_dict: Sequence[Tuple[String, logic.Value]],
storage_integration_opt: Optional[Sequence[Tuple[String, logic.Value]]],
) -> logic.CSVConfig:
config: Dict[String, logic.Value] = builtin.dict_from_list(config_dict)
header_row: Int32 = _extract_value_int32(builtin.dict_get(config, "csv_header_row"), 1)
skip: int = _extract_value_int64(builtin.dict_get(config, "csv_skip"), 0)
Expand All @@ -1478,6 +1490,7 @@ def construct_csv_config(config_dict: Sequence[Tuple[String, logic.Value]]) -> l
encoding: str = _extract_value_string(builtin.dict_get(config, "csv_encoding"), "utf-8")
compression: str = _extract_value_string(builtin.dict_get(config, "csv_compression"), "auto")
partition_size_mb: int = _extract_value_int64(builtin.dict_get(config, "csv_partition_size_mb"), 0)
storage_integration: Optional[logic.StorageIntegration] = construct_csv_storage_integration(storage_integration_opt)
return logic.CSVConfig(
header_row=header_row,
skip=skip,
Expand All @@ -1491,9 +1504,25 @@ def construct_csv_config(config_dict: Sequence[Tuple[String, logic.Value]]) -> l
encoding=encoding,
compression=compression,
partition_size_mb=partition_size_mb,
storage_integration=storage_integration,
)


def construct_csv_storage_integration(
storage_integration_opt: Optional[Sequence[Tuple[String, logic.Value]]],
) -> Optional[logic.StorageIntegration]:
if storage_integration_opt is None:
return builtin.none()
config: Dict[String, logic.Value] = builtin.dict_from_list(builtin.unwrap_option(storage_integration_opt))
return builtin.some(logic.StorageIntegration(
provider=_extract_value_string(builtin.dict_get(config, "provider"), ""),
azure_sas_token=_extract_value_string(builtin.dict_get(config, "azure_sas_token"), ""),
s3_region=_extract_value_string(builtin.dict_get(config, "s3_region"), ""),
s3_access_key_id=_extract_value_string(builtin.dict_get(config, "s3_access_key_id"), ""),
s3_secret_access_key=_extract_value_string(builtin.dict_get(config, "s3_secret_access_key"), ""),
))


def construct_betree_info(
key_types: Sequence[logic.Type],
value_types: Sequence[logic.Type],
Expand Down Expand Up @@ -1658,6 +1687,26 @@ def deconstruct_csv_config(msg: logic.CSVConfig) -> List[Tuple[String, logic.Val
return builtin.list_sort(result)


# Secret credential values are masked in the human-readable output. As a result the
# storage integration block does not round-trip the real secrets back through the parser.
def deconstruct_csv_storage_integration_optional(msg: logic.CSVConfig) -> Optional[Sequence[Tuple[String, logic.Value]]]:
if not builtin.has_proto_field(msg, "storage_integration"):
return builtin.none()
si: logic.StorageIntegration = builtin.unwrap_option(msg.storage_integration)
result: List[Tuple[String, logic.Value]] = list[Tuple[String, logic.Value]]()
if si.provider != "":
builtin.list_push(result, builtin.tuple("provider", _make_value_string(si.provider)))
if si.azure_sas_token != "":
builtin.list_push(result, builtin.tuple("azure_sas_token", _make_value_string("***")))
if si.s3_region != "":
builtin.list_push(result, builtin.tuple("s3_region", _make_value_string(si.s3_region)))
if si.s3_access_key_id != "":
builtin.list_push(result, builtin.tuple("s3_access_key_id", _make_value_string("***")))
if si.s3_secret_access_key != "":
builtin.list_push(result, builtin.tuple("s3_secret_access_key", _make_value_string("***")))
return builtin.some(builtin.list_sort(result))



def deconstruct_betree_info_config(msg: logic.BeTreeInfo) -> List[Tuple[String, logic.Value]]:
result: List[Tuple[String, logic.Value]] = list[Tuple[String, logic.Value]]()
Expand Down
15 changes: 15 additions & 0 deletions proto/relationalai/lqp/v1/logic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ message BeTreeLocator {
int64 tree_height = 3;
}

message StorageIntegration {
string provider = 1; // "azure" or "s3"

// Options for azure
string azure_sas_token = 2;

// Options for s3
string s3_region = 3;
string s3_access_key_id = 4;
string s3_secret_access_key = 5;
}

message CSVData {
CSVLocator locator = 1;
CSVConfig config = 2;
Expand Down Expand Up @@ -314,6 +326,9 @@ message CSVConfig {

// Partitioning (for export)
int64 partition_size_mb = 12;

// Storage integration (credentials for private buckets)
optional StorageIntegration storage_integration = 13;
}

message IcebergData {
Expand Down
Loading
Loading