Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-dbt/dagster_dbt/rpc/solids.py
import json | import json | ||||
import time | import time | ||||
from typing import Callable, Iterator, Optional | from typing import Callable, Optional | ||||
import pandas as pd | import pandas as pd | ||||
from dagster_pandas import DataFrame | from dagster_pandas import DataFrame | ||||
from dagster import ( | from dagster import ( | ||||
Array, | Array, | ||||
AssetMaterialization, | |||||
Bool, | Bool, | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
EventMetadataEntry, | |||||
Failure, | Failure, | ||||
Field, | Field, | ||||
InputDefinition, | InputDefinition, | ||||
Int, | Int, | ||||
Noneable, | Noneable, | ||||
Nothing, | Nothing, | ||||
Output, | Output, | ||||
OutputDefinition, | OutputDefinition, | ||||
Permissive, | Permissive, | ||||
RetryRequested, | RetryRequested, | ||||
String, | String, | ||||
check, | check, | ||||
solid, | solid, | ||||
) | ) | ||||
from dagster.core.execution.context.compute import SolidExecutionContext | from dagster.core.execution.context.compute import SolidExecutionContext | ||||
from ..errors import DagsterDbtRpcUnexpectedPollOutputError | from ..errors import DagsterDbtRpcUnexpectedPollOutputError | ||||
from .types import DbtRpcOutput | from .types import DbtRpcOutput | ||||
from .utils import log_rpc, raise_for_rpc_error | from .utils import log_rpc, raise_for_rpc_error | ||||
def _generate_materializations(dro: DbtRpcOutput) -> Iterator[AssetMaterialization]: | |||||
"""Yields ``AssetMaterializations`` for metadata in the dbt RPC ``DbtRpcOutput``.""" | |||||
for node_result in dro.result.results: | |||||
if node_result.node["resource_type"] in ["model", "snapshot"]: | |||||
success = not node_result.fail and not node_result.skip and not node_result.error | |||||
if success: | |||||
entries = [ | |||||
EventMetadataEntry.json(data=node_result.node, label="Node"), | |||||
EventMetadataEntry.text(text=str(node_result.status), label="Status"), | |||||
EventMetadataEntry.text( | |||||
text=str(node_result.execution_time), label="Execution Time" | |||||
), | |||||
EventMetadataEntry.text( | |||||
text=node_result.node["config"]["materialized"], | |||||
label="Materialization Strategy", | |||||
), | |||||
EventMetadataEntry.text(text=node_result.node["database"], label="Database"), | |||||
EventMetadataEntry.text(text=node_result.node["schema"], label="Schema"), | |||||
EventMetadataEntry.text(text=node_result.node["alias"], label="Alias"), | |||||
EventMetadataEntry.text( | |||||
text=node_result.node["description"], label="Description" | |||||
), | |||||
] | |||||
for step_timing in node_result.step_timings: | |||||
if step_timing.name == "execute": | |||||
execution_entries = [ | |||||
EventMetadataEntry.text( | |||||
text=step_timing.started_at.isoformat(timespec="seconds"), | |||||
label="Execution Started At", | |||||
), | |||||
EventMetadataEntry.text( | |||||
text=step_timing.completed_at.isoformat(timespec="seconds"), | |||||
label="Execution Completed At", | |||||
), | |||||
EventMetadataEntry.text( | |||||
text=str(step_timing.duration), label="Execution Duration" | |||||
), | |||||
] | |||||
entries.extend(execution_entries) | |||||
if step_timing.name == "compile": | |||||
execution_entries = [ | |||||
EventMetadataEntry.text( | |||||
text=step_timing.started_at.isoformat(timespec="seconds"), | |||||
label="Compilation Started At", | |||||
), | |||||
EventMetadataEntry.text( | |||||
text=step_timing.completed_at.isoformat(timespec="seconds"), | |||||
label="Compilation Completed At", | |||||
), | |||||
EventMetadataEntry.text( | |||||
text=str(step_timing.duration), label="Compilation Duration" | |||||
), | |||||
] | |||||
entries.extend(execution_entries) | |||||
yield AssetMaterialization( | |||||
description="A materialized node within the dbt graph.", | |||||
metadata_entries=entries, | |||||
asset_key=node_result.node["unique_id"], | |||||
) | |||||
def _poll_rpc( | def _poll_rpc( | ||||
context: SolidExecutionContext, request_token: str, should_yield_materializations: bool = True | context: SolidExecutionContext, request_token: str, should_yield_materializations: bool = True | ||||
) -> DbtRpcOutput: | ) -> DbtRpcOutput: | ||||
"""Polls the dbt RPC server for the status of a request until the state is ``success``.""" | """Polls the dbt RPC server for the status of a request until the state is ``success``.""" | ||||
from ..utils import generate_materializations | |||||
logs_start = 0 | logs_start = 0 | ||||
interval = context.solid_config.get("interval") | interval = context.solid_config.get("interval") | ||||
elapsed_time = -1 | elapsed_time = -1 | ||||
current_state = None | current_state = None | ||||
while True: | while True: | ||||
# Poll for the dbt RPC request. | # Poll for the dbt RPC request. | ||||
Show All 37 Lines | ) -> DbtRpcOutput: | ||||
context.log.info( | context.log.info( | ||||
f"Request {request_token} finished with state '{current_state}' in {elapsed_time} seconds" | f"Request {request_token} finished with state '{current_state}' in {elapsed_time} seconds" | ||||
) | ) | ||||
context.log.debug(json.dumps(resp_result, indent=2)) | context.log.debug(json.dumps(resp_result, indent=2)) | ||||
polled_run_results = DbtRpcOutput.from_dict(resp_result) | polled_run_results = DbtRpcOutput.from_dict(resp_result) | ||||
if should_yield_materializations: | if should_yield_materializations: | ||||
for materialization in _generate_materializations(polled_run_results): | for materialization in generate_materializations(polled_run_results): | ||||
yield materialization | yield materialization | ||||
yield Output(polled_run_results) | yield Output(polled_run_results) | ||||
def unwrap_result(poll_rpc_generator) -> DbtRpcOutput: | def unwrap_result(poll_rpc_generator) -> DbtRpcOutput: | ||||
"""A helper function that extracts the `DbtRpcOutput` value from a generator. | """A helper function that extracts the `DbtRpcOutput` value from a generator. | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | config_schema={ | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"task_tags": Permissive(), | "task_tags": Permissive(), | ||||
"max_retries": Field(config=Int, is_required=False, default_value=5), | "max_retries": Field(config=Int, is_required=False, default_value=5), | ||||
"retry_interval": Field(config=Int, is_required=False, default_value=120), | "retry_interval": Field(config=Int, is_required=False, default_value=120), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
def dbt_rpc_run_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | def dbt_rpc_run_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | ||||
"""This solid sends the ``dbt run`` command to a dbt RPC server and returns the result of the | """This solid sends the ``dbt run`` command to a dbt RPC server and returns the result of the | ||||
executed dbt process. | executed dbt process. | ||||
Show All 33 Lines | def dbt_rpc_run_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | ||||
if context.solid_config["fail_fast"]: | if context.solid_config["fail_fast"]: | ||||
command += " --fail-fast" | command += " --fail-fast" | ||||
context.log.debug(f"Running dbt command: dbt {command}") | context.log.debug(f"Running dbt command: dbt {command}") | ||||
resp = context.resources.dbt_rpc.cli(cli=command, **context.solid_config["task_tags"]) | resp = context.resources.dbt_rpc.cli(cli=command, **context.solid_config["task_tags"]) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
return _poll_rpc(context, request_token) | return _poll_rpc( | ||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
@solid( | @solid( | ||||
description="A solid to invoke dbt test over RPC.", | description="A solid to invoke dbt test over RPC.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition( | OutputDefinition( | ||||
name="request_token", | name="request_token", | ||||
▲ Show 20 Lines • Show All 86 Lines • ▼ Show 20 Lines | config_schema={ | ||||
description="The interval (in seconds) at which to poll the dbt rpc process.", | description="The interval (in seconds) at which to poll the dbt rpc process.", | ||||
), | ), | ||||
"logs": Field( | "logs": Field( | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
def dbt_rpc_test_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | def dbt_rpc_test_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | ||||
"""This solid sends the ``dbt test`` command to a dbt RPC server and returns the result of the | """This solid sends the ``dbt test`` command to a dbt RPC server and returns the result of the | ||||
executed dbt process. | executed dbt process. | ||||
This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt | This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt | ||||
process is completed. | process is completed. | ||||
""" | """ | ||||
resp = context.resources.dbt_rpc.test( | resp = context.resources.dbt_rpc.test( | ||||
models=context.solid_config["models"], | models=context.solid_config["models"], | ||||
exclude=context.solid_config["exclude"], | exclude=context.solid_config["exclude"], | ||||
data=context.solid_config["data"], | data=context.solid_config["data"], | ||||
schema=context.solid_config["schema"], | schema=context.solid_config["schema"], | ||||
) | ) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
return _poll_rpc(context, request_token) | return _poll_rpc( | ||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
@solid( | @solid( | ||||
description="A solid to invoke a dbt run operation over RPC.", | description="A solid to invoke a dbt run operation over RPC.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition( | OutputDefinition( | ||||
name="request_token", | name="request_token", | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | config_schema={ | ||||
description="The interval (in seconds) at which to poll the dbt rpc process.", | description="The interval (in seconds) at which to poll the dbt rpc process.", | ||||
), | ), | ||||
"logs": Field( | "logs": Field( | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
def dbt_rpc_run_operation_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | def dbt_rpc_run_operation_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | ||||
"""This solid sends the ``dbt run-operation`` command to a dbt RPC server and returns the | """This solid sends the ``dbt run-operation`` command to a dbt RPC server and returns the | ||||
result of the executed dbt process. | result of the executed dbt process. | ||||
This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt | This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt | ||||
process is completed. | process is completed. | ||||
""" | """ | ||||
resp = context.resources.dbt_rpc.run_operation( | resp = context.resources.dbt_rpc.run_operation( | ||||
macro=context.solid_config["macro"], args=context.solid_config["args"] | macro=context.solid_config["macro"], args=context.solid_config["args"] | ||||
) | ) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
return _poll_rpc(context, request_token) | return _poll_rpc( | ||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
@solid( | @solid( | ||||
description="A solid to invoke a dbt snapshot over RPC.", | description="A solid to invoke a dbt snapshot over RPC.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition( | OutputDefinition( | ||||
name="request_token", | name="request_token", | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | config_schema={ | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"task_tags": Permissive(), | "task_tags": Permissive(), | ||||
"max_retries": Field(config=Int, is_required=False, default_value=5), | "max_retries": Field(config=Int, is_required=False, default_value=5), | ||||
"retry_interval": Field(config=Int, is_required=False, default_value=120), | "retry_interval": Field(config=Int, is_required=False, default_value=120), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
def dbt_rpc_snapshot_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | def dbt_rpc_snapshot_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | ||||
"""This solid sends the ``dbt snapshot`` command to a dbt RPC server and returns the result of | """This solid sends the ``dbt snapshot`` command to a dbt RPC server and returns the result of | ||||
the executed dbt process. | the executed dbt process. | ||||
Show All 13 Lines | if context.solid_config["task_tags"]: | ||||
) | ) | ||||
resp = context.resources.dbt_rpc.snapshot( | resp = context.resources.dbt_rpc.snapshot( | ||||
select=context.solid_config["select"], exclude=context.solid_config["exclude"] | select=context.solid_config["select"], exclude=context.solid_config["exclude"] | ||||
) | ) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
return _poll_rpc(context, request_token) | return _poll_rpc( | ||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
@solid( | @solid( | ||||
description="A solid to invoke dbt source snapshot-freshness over RPC.", | description="A solid to invoke dbt source snapshot-freshness over RPC.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition( | OutputDefinition( | ||||
name="request_token", | name="request_token", | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | config_schema={ | ||||
description="The interval (in seconds) at which to poll the dbt rpc process.", | description="The interval (in seconds) at which to poll the dbt rpc process.", | ||||
), | ), | ||||
"logs": Field( | "logs": Field( | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
def dbt_rpc_snapshot_freshness_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | def dbt_rpc_snapshot_freshness_and_wait(context: SolidExecutionContext) -> DbtRpcOutput: | ||||
"""This solid sends the ``dbt source snapshot`` command to a dbt RPC server and returns the | """This solid sends the ``dbt source snapshot`` command to a dbt RPC server and returns the | ||||
result of the executed dbt process. | result of the executed dbt process. | ||||
Show All 11 Lines | if context.solid_config["select"]: | ||||
select = " ".join(set(context.solid_config["select"])) | select = " ".join(set(context.solid_config["select"])) | ||||
command += f" --select {select}" | command += f" --select {select}" | ||||
context.log.debug(f"Running dbt command: dbt {command}") | context.log.debug(f"Running dbt command: dbt {command}") | ||||
resp = context.resources.dbt_rpc.cli(cli=command) | resp = context.resources.dbt_rpc.cli(cli=command) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
return _poll_rpc(context, request_token) | return _poll_rpc( | ||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
@solid( | @solid( | ||||
description="A solid to compile a SQL query in context of a dbt project over RPC.", | description="A solid to compile a SQL query in context of a dbt project over RPC.", | ||||
input_defs=[ | input_defs=[ | ||||
InputDefinition(name="start_after", dagster_type=Nothing), | InputDefinition(name="start_after", dagster_type=Nothing), | ||||
InputDefinition( | InputDefinition( | ||||
name="sql", description="The SQL query to be compiled.", dagster_type=String | name="sql", description="The SQL query to be compiled.", dagster_type=String | ||||
Show All 11 Lines | config_schema={ | ||||
description="The interval (in seconds) at which to poll the dbt rpc process.", | description="The interval (in seconds) at which to poll the dbt rpc process.", | ||||
), | ), | ||||
"logs": Field( | "logs": Field( | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
def dbt_rpc_compile_sql(context: SolidExecutionContext, sql: String) -> String: | def dbt_rpc_compile_sql(context: SolidExecutionContext, sql: String) -> String: | ||||
"""This solid sends the ``dbt compile`` command to a dbt RPC server and returns the request | """This solid sends the ``dbt compile`` command to a dbt RPC server and returns the request | ||||
token. | token. | ||||
This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to | This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to | ||||
poll the progress of the running dbt process. | poll the progress of the running dbt process. | ||||
""" | """ | ||||
resp = context.resources.dbt_rpc.compile_sql(sql=sql, name=context.solid_config["name"]) | resp = context.resources.dbt_rpc.compile_sql(sql=sql, name=context.solid_config["name"]) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
result = unwrap_result(_poll_rpc(context, request_token)) | result = unwrap_result( | ||||
_poll_rpc( | |||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
) | |||||
return result.results[0].node["compiled_sql"] | return result.results[0].node["compiled_sql"] | ||||
def create_dbt_rpc_run_sql_solid( | def create_dbt_rpc_run_sql_solid( | ||||
name: str, output_def: Optional[OutputDefinition] = None, **kwargs | name: str, output_def: Optional[OutputDefinition] = None, **kwargs | ||||
) -> Callable: | ) -> Callable: | ||||
"""This function is a factory which constructs a solid that will copy the results of a SQL query | """This function is a factory which constructs a solid that will copy the results of a SQL query | ||||
run within the context of a dbt project to a pandas ``DataFrame``. | run within the context of a dbt project to a pandas ``DataFrame``. | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | @solid( | ||||
description="The interval (in seconds) at which to poll the dbt rpc process.", | description="The interval (in seconds) at which to poll the dbt rpc process.", | ||||
), | ), | ||||
"logs": Field( | "logs": Field( | ||||
config=Bool, | config=Bool, | ||||
is_required=False, | is_required=False, | ||||
default_value=True, | default_value=True, | ||||
description="Whether or not to return logs from the process.", | description="Whether or not to return logs from the process.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation " | |||||
"will be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
required_resource_keys={"dbt_rpc"}, | required_resource_keys={"dbt_rpc"}, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
**kwargs, | **kwargs, | ||||
) | ) | ||||
def _dbt_rpc_run_sql(context: SolidExecutionContext, sql: String) -> DataFrame: | def _dbt_rpc_run_sql(context: SolidExecutionContext, sql: String) -> DataFrame: | ||||
resp = context.resources.dbt_rpc.run_sql(sql=sql, name=context.solid_config["name"]) | resp = context.resources.dbt_rpc.run_sql(sql=sql, name=context.solid_config["name"]) | ||||
context.log.debug(resp.text) | context.log.debug(resp.text) | ||||
raise_for_rpc_error(context, resp) | raise_for_rpc_error(context, resp) | ||||
request_token = resp.json().get("result").get("request_token") | request_token = resp.json().get("result").get("request_token") | ||||
result = unwrap_result(_poll_rpc(context, request_token)) | result = unwrap_result( | ||||
_poll_rpc( | |||||
context, | |||||
request_token, | |||||
should_yield_materializations=context.solid_config["yield_materializations"], | |||||
) | |||||
) | |||||
table = result.results[0].table | table = result.results[0].table | ||||
return pd.DataFrame.from_records(data=table["rows"], columns=table["column_names"]) | return pd.DataFrame.from_records(data=table["rows"], columns=table["column_names"]) | ||||
return _dbt_rpc_run_sql | return _dbt_rpc_run_sql |