Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.burstIQConnection import (
BurstIQConnection,
BurstIQConnection as BurstIQConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.burstiq.client import BurstIQClient
Expand All @@ -35,7 +36,7 @@
_CLIENT_CACHE: Dict[str, BurstIQClient] = {} # noqa: UP006


def get_connection(connection: BurstIQConnection) -> BurstIQClient:
def get_connection(connection: BurstIQConnectionConfig) -> BurstIQClient:
"""
Create or return a cached BurstIQ client connection.

Expand All @@ -52,56 +53,61 @@ def get_connection(connection: BurstIQConnection) -> BurstIQClient:
return _CLIENT_CACHE[key]


def test_connection(
metadata: OpenMetadata,
client: BurstIQClient,
service_connection: BurstIQConnection,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection to BurstIQ. This can be executed either as part
of a metadata workflow or during an Automation Workflow

Args:
metadata: OpenMetadata client
client: BurstIQClient instance
service_connection: BurstIQConnection configuration
automation_workflow: Optional automation workflow
timeout_seconds: Timeout for connection test

Returns:
TestConnectionResult
"""

def test_authenticate():
"""Test authentication with BurstIQ credentials"""
client.test_authenticate()

def test_get_dictionaries():
"""Test fetching dictionaries from BurstIQ"""
dictionaries = client.get_dictionaries(limit=1)
if not dictionaries:
raise ConnectionError("Failed to fetch dictionaries from BurstIQ")

def test_get_edges():
"""Test fetching edges used for lineage"""
edges = client.get_edges(limit=1)
# Edges might not exist, so don't fail if empty
logger.info(f"Found {len(edges)} edges in BurstIQ")

test_fn = {
"CheckAccess": test_authenticate,
"GetDictionaries": test_get_dictionaries,
"GetEdges": test_get_edges,
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=service_connection.connectionTimeout
if hasattr(service_connection, "connectionTimeout")
else timeout_seconds,
)
class BurstIQConnection(BaseConnection[BurstIQConnectionConfig, BurstIQClient]):
def _get_client(self) -> BurstIQClient:
return get_connection(self.service_connection)

def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection to BurstIQ. This can be executed either as part
of a metadata workflow or during an Automation Workflow

Args:
metadata: OpenMetadata client
client: BurstIQClient instance
service_connection: BurstIQConnection configuration
automation_workflow: Optional automation workflow
timeout_seconds: Timeout for connection test

Returns:
TestConnectionResult
"""
client = self.client
service_connection = self.service_connection

def test_authenticate():
"""Test authentication with BurstIQ credentials"""
client.test_authenticate()

def test_get_dictionaries():
"""Test fetching dictionaries from BurstIQ"""
dictionaries = client.get_dictionaries(limit=1)
if not dictionaries:
raise ConnectionError("Failed to fetch dictionaries from BurstIQ")

def test_get_edges():
"""Test fetching edges used for lineage"""
edges = client.get_edges(limit=1)
# Edges might not exist, so don't fail if empty
logger.info(f"Found {len(edges)} edges in BurstIQ")

test_fn = {
"CheckAccess": test_authenticate,
"GetDictionaries": test_get_dictionaries,
"GetEdges": test_get_edges,
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value, # pyright: ignore[reportOptionalMemberAccess]
automation_workflow=automation_workflow,
timeout_seconds=service_connection.connectionTimeout # pyright: ignore[reportAttributeAccessIssue]
if hasattr(service_connection, "connectionTimeout")
else timeout_seconds,
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from metadata.data_quality.interface.pandas.pandas_test_suite_interface import (
PandasTestSuiteInterface,
)
from metadata.ingestion.source.database.burstiq.connection import BurstIQConnection
from metadata.ingestion.source.database.burstiq.lineage import BurstiqLineageSource
from metadata.ingestion.source.database.burstiq.metadata import Burstiqsource
from metadata.profiler.interface.pandas.burstiq.profiler_interface import (
Expand All @@ -15,4 +16,5 @@
profiler_class=BurstIQProfilerInterface,
test_suite_class=PandasTestSuiteInterface,
sampler_class=BurstIQSampler,
connection_class=BurstIQConnection, # pyright: ignore[reportArgumentType]
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
StorageConfig,
)
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeConnection,
DeltaLakeConnection as DeltaLakeConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN
Expand All @@ -57,7 +58,7 @@ def get_deltalake_client(connection, config):


@get_deltalake_client.register
def _(connection: MetastoreConfig, config: DeltaLakeConnection):
def _(connection: MetastoreConfig, config: DeltaLakeConnectionConfig):
from metadata.ingestion.source.database.deltalake.clients.pyspark import ( # noqa: PLC0415
DeltalakePySparkClient,
)
Expand All @@ -66,7 +67,7 @@ def _(connection: MetastoreConfig, config: DeltaLakeConnection):


@get_deltalake_client.register # noqa: RET503
def _(connection: StorageConfig, config: DeltaLakeConnection):
def _(connection: StorageConfig, config: DeltaLakeConnectionConfig):
from metadata.ingestion.source.database.deltalake.clients.s3 import ( # noqa: PLC0415
DeltalakeS3Client,
)
Expand All @@ -75,34 +76,35 @@ def _(connection: StorageConfig, config: DeltaLakeConnection):
return DeltalakeS3Client.from_config(config)


def get_connection(connection: DeltaLakeConnection) -> DeltalakeClient:
"""Create Deltalake Client"""
return DeltalakeClient(
client=get_deltalake_client(connection.configSource, connection),
config=connection,
)


def test_connection(
metadata: OpenMetadata,
connection: DeltalakeClient,
service_connection: DeltaLakeConnection,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
test_fn = {
"GetDatabases": connection.client.get_test_get_databases_fn(service_connection.configSource),
"GetTables": connection.client.get_test_get_tables_fn(service_connection.configSource),
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
class DeltaLakeConnection(BaseConnection[DeltaLakeConnectionConfig, DeltalakeClient]):
def _get_client(self) -> DeltalakeClient:
connection = self.service_connection
return DeltalakeClient(
client=get_deltalake_client(connection.configSource, connection),
config=connection,
)

def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
connection = self.client
service_connection = self.service_connection
test_fn = {
"GetDatabases": connection.client.get_test_get_databases_fn(service_connection.configSource),
"GetTables": connection.client.get_test_get_tables_fn(service_connection.configSource),
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value, # pyright: ignore[reportOptionalMemberAccess]
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from metadata.ingestion.source.database.deltalake.connection import DeltaLakeConnection
from metadata.ingestion.source.database.deltalake.metadata import DeltalakeSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DeltalakeSource)
ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=DeltalakeSource, # pyright: ignore[reportArgumentType]
connection_class=DeltaLakeConnection, # pyright: ignore[reportArgumentType]
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.domoDatabaseConnection import (
DomoDatabaseConnection,
DomoDatabaseConnection as DomoDatabaseConnectionConfig,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.connection import BaseConnection
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
test_connection_steps,
Expand All @@ -34,46 +35,45 @@
from metadata.utils.constants import THREE_MIN


def get_connection(connection: DomoDatabaseConnection) -> Domo:
"""
Create connection
"""
try:
domo = Domo(
connection.clientId,
connection.secretToken.get_secret_value(),
api_host=connection.apiHost,
)
return domo # noqa: RET504, TRY300
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg) # noqa: B904

class DomoDatabaseConnection(BaseConnection[DomoDatabaseConnectionConfig, Domo]):
def _get_client(self) -> Domo:
connection = self.service_connection
try:
domo = Domo(
connection.clientId,
connection.secretToken.get_secret_value(),
api_host=connection.apiHost, # pyright: ignore[reportArgumentType]
)
return domo # noqa: RET504, TRY300
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg) # noqa: B904

def test_connection(
metadata: OpenMetadata,
domo: Domo,
service_connection: DomoDatabaseConnection,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
def test_connection(
self,
metadata: OpenMetadata,
automation_workflow: Optional[AutomationWorkflow] = None, # noqa: UP045
timeout_seconds: Optional[int] = THREE_MIN, # noqa: UP045
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
domo = self.client
service_connection = self.service_connection

def custom_executor():
result = domo.datasets.list()
return list(result)
def custom_executor():
result = domo.datasets.list()
return list(result)

test_fn = {
"GetTables": custom_executor,
}
test_fn = {
"GetTables": custom_executor,
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value, # pyright: ignore[reportOptionalMemberAccess]
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from metadata.ingestion.source.database.domodatabase.connection import DomoDatabaseConnection
from metadata.ingestion.source.database.domodatabase.metadata import DomodatabaseSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(metadata_source_class=DomodatabaseSource)
ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=DomodatabaseSource, # pyright: ignore[reportArgumentType]
connection_class=DomoDatabaseConnection, # pyright: ignore[reportArgumentType]
)
Loading
Loading