Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/events.py
import os | import os | ||||
import re | import re | ||||
import warnings | import warnings | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from enum import Enum | from enum import Enum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.address import Address | |||||
from dagster.core.errors import DagsterInvalidAssetKey | from dagster.core.errors import DagsterInvalidAssetKey | ||||
from dagster.serdes import Persistable, whitelist_for_persistence | from dagster.serdes import Persistable, whitelist_for_persistence | ||||
from dagster.utils.backcompat import experimental_arg_warning | from dagster.utils.backcompat import experimental_arg_warning | ||||
from .utils import DEFAULT_OUTPUT | from .utils import DEFAULT_OUTPUT | ||||
def last_file_comp(path): | def last_file_comp(path): | ||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def path(path, label, description=None): | ||||
Args: | Args: | ||||
path (Optional[str]): The path contained by this metadata entry. | path (Optional[str]): The path contained by this metadata entry. | ||||
label (str): Short display label for this metadata entry. | label (str): Short display label for this metadata entry. | ||||
description (Optional[str]): A human-readable description of this metadata entry. | description (Optional[str]): A human-readable description of this metadata entry. | ||||
""" | """ | ||||
return EventMetadataEntry(label, description, PathMetadataEntryData(path)) | return EventMetadataEntry(label, description, PathMetadataEntryData(path)) | ||||
@staticmethod | @staticmethod | ||||
def address(address, label, step_output_handle=None, description=None): | |||||
"""Static constructor for a metadata entry containing a path as | |||||
:py:class:`AddressMetadataEntryData`. | |||||
Args: | |||||
address (Optional[str]): The address contained by this metadata entry. | |||||
label (str): Short display label for this metadata entry. | |||||
step_output_handle | |||||
description (Optional[str]): A human-readable description of this metadata entry. | |||||
""" | |||||
return EventMetadataEntry( | |||||
label, description, AddressMetadataEntryData(address, step_output_handle) | |||||
) | |||||
@staticmethod | |||||
def fspath(path, label=None, description=None): | def fspath(path, label=None, description=None): | ||||
"""Static constructor for a metadata entry containing a filesystem path as | """Static constructor for a metadata entry containing a filesystem path as | ||||
:py:class:`PathMetadataEntryData`. | :py:class:`PathMetadataEntryData`. | ||||
Args: | Args: | ||||
path (Optional[str]): The path contained by this metadata entry. | path (Optional[str]): The path contained by this metadata entry. | ||||
label (str): Short display label for this metadata entry. Defaults to the | label (str): Short display label for this metadata entry. Defaults to the | ||||
base name of the path. | base name of the path. | ||||
▲ Show 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | class PathMetadataEntryData(namedtuple("_PathMetadataEntryData", "path"), Persistable): | ||||
def __new__(cls, path): | def __new__(cls, path): | ||||
return super(PathMetadataEntryData, cls).__new__( | return super(PathMetadataEntryData, cls).__new__( | ||||
cls, check.opt_str_param(path, "path", default="") | cls, check.opt_str_param(path, "path", default="") | ||||
) | ) | ||||
@whitelist_for_persistence | @whitelist_for_persistence | ||||
class AddressMetadataEntryData( | |||||
namedtuple("_AddressMetadataEntryData", "address step_output_handle"), Persistable | |||||
): | |||||
"""Container class for address metadata entry data. | |||||
Args: | |||||
address (Optional[str]): The path as a string. | |||||
step_output_handle | |||||
""" | |||||
def __new__(cls, address, step_output_handle=None): | |||||
from dagster.core.execution.plan.objects import StepOutputHandle | |||||
return super(AddressMetadataEntryData, cls).__new__( | |||||
cls, | |||||
check.opt_inst_param(address, "address", Address), | |||||
check.opt_inst_param(step_output_handle, "step_output_handle", StepOutputHandle), | |||||
) | |||||
@whitelist_for_persistence | |||||
class JsonMetadataEntryData(namedtuple("_JsonMetadataEntryData", "data"), Persistable): | class JsonMetadataEntryData(namedtuple("_JsonMetadataEntryData", "data"), Persistable): | ||||
"""Container class for JSON metadata entry data. | """Container class for JSON metadata entry data. | ||||
Args: | Args: | ||||
data (Optional[Dict[str, Any]]): The JSON data. | data (Optional[Dict[str, Any]]): The JSON data. | ||||
""" | """ | ||||
def __new__(cls, data): | def __new__(cls, data): | ||||
Show All 39 Lines | class IntMetadataEntryData(namedtuple("_IntMetadataEntryData", "value"), Persistable): | ||||
def __new__(cls, value): | def __new__(cls, value): | ||||
return super(IntMetadataEntryData, cls).__new__(cls, check.opt_int_param(value, "value")) | return super(IntMetadataEntryData, cls).__new__(cls, check.opt_int_param(value, "value")) | ||||
EntryDataUnion = ( | EntryDataUnion = ( | ||||
TextMetadataEntryData, | TextMetadataEntryData, | ||||
UrlMetadataEntryData, | UrlMetadataEntryData, | ||||
PathMetadataEntryData, | PathMetadataEntryData, | ||||
AddressMetadataEntryData, | |||||
JsonMetadataEntryData, | JsonMetadataEntryData, | ||||
MarkdownMetadataEntryData, | MarkdownMetadataEntryData, | ||||
PythonArtifactMetadataEntryData, | PythonArtifactMetadataEntryData, | ||||
FloatMetadataEntryData, | FloatMetadataEntryData, | ||||
IntMetadataEntryData, | IntMetadataEntryData, | ||||
) | ) | ||||
Show All 19 Lines | class Output(namedtuple("_Output", "value output_name address")): | ||||
def __new__(cls, value, output_name=DEFAULT_OUTPUT, address=None): | def __new__(cls, value, output_name=DEFAULT_OUTPUT, address=None): | ||||
if address: | if address: | ||||
experimental_arg_warning("address", "Output.__new__") | experimental_arg_warning("address", "Output.__new__") | ||||
return super(Output, cls).__new__( | return super(Output, cls).__new__( | ||||
cls, | cls, | ||||
value, | value, | ||||
check.str_param(output_name, "output_name"), | check.str_param(output_name, "output_name"), | ||||
check.opt_str_param(address, "address"), | check.opt_inst_param(address, "address", Address), | ||||
) | ) | ||||
@whitelist_for_persistence | @whitelist_for_persistence | ||||
class AssetMaterialization( | class AssetMaterialization( | ||||
namedtuple("_AssetMaterialization", "asset_key description metadata_entries partition"), | namedtuple("_AssetMaterialization", "asset_key description metadata_entries partition"), | ||||
Persistable, | Persistable, | ||||
): | ): | ||||
▲ Show 20 Lines • Show All 239 Lines • ▼ Show 20 Lines | class RetryRequested(Exception): # base exception instead? | ||||
def __init__(self, max_retries=1, seconds_to_wait=None): | def __init__(self, max_retries=1, seconds_to_wait=None): | ||||
super(RetryRequested, self).__init__() | super(RetryRequested, self).__init__() | ||||
self.max_retries = check.int_param(max_retries, "max_retries") | self.max_retries = check.int_param(max_retries, "max_retries") | ||||
self.seconds_to_wait = check.opt_int_param(seconds_to_wait, "seconds_to_wait") | self.seconds_to_wait = check.opt_int_param(seconds_to_wait, "seconds_to_wait") | ||||
class ObjectStoreOperationType(Enum): | class ObjectStoreOperationType(Enum): | ||||
SET_OBJECT = "SET_OBJECT" | SET_OBJECT = "SET_OBJECT" | ||||
SET_EXTERNAL_OBJECT = "SET_EXTERNAL_OBJECT" | |||||
GET_OBJECT = "GET_OBJECT" | GET_OBJECT = "GET_OBJECT" | ||||
GET_EXTERNAL_OBJECT = "GET_EXTERNAL_OBJECT" | |||||
RM_OBJECT = "RM_OBJECT" | RM_OBJECT = "RM_OBJECT" | ||||
CP_OBJECT = "CP_OBJECT" | CP_OBJECT = "CP_OBJECT" | ||||
class ObjectStoreOperation( | class ObjectStoreOperation( | ||||
namedtuple( | namedtuple( | ||||
"_ObjectStoreOperation", | "_ObjectStoreOperation", | ||||
"op key dest_key obj serialization_strategy_name object_store_name value_name", | "op key dest_key obj serialization_strategy_name object_store_name value_name address step_output_handle", | ||||
) | ) | ||||
): | ): | ||||
"""This event is used internally by Dagster machinery when values are written to and read from | """This event is used internally by Dagster machinery when values are written to and read from | ||||
an ObjectStore. | an ObjectStore. | ||||
Users should not import this class or yield events of this type from user code. | Users should not import this class or yield events of this type from user code. | ||||
Args: | Args: | ||||
op (ObjectStoreOperationType): The type of the operation on the object store. | op (ObjectStoreOperationType): The type of the operation on the object store. | ||||
key (str): The key of the object on which the operation was performed. | key (str): The key of the object on which the operation was performed. | ||||
dest_key (Optional[str]): The destination key, if any, to which the object was copied. | dest_key (Optional[str]): The destination key, if any, to which the object was copied. | ||||
obj (Any): The object, if any, retrieved by the operation. | obj (Any): The object, if any, retrieved by the operation. | ||||
serialization_strategy_name (Optional[str]): The name of the serialization strategy, if any, | serialization_strategy_name (Optional[str]): The name of the serialization strategy, if any, | ||||
employed by the operation | employed by the operation | ||||
object_store_name (Optional[str]): The name of the object store that performed the | object_store_name (Optional[str]): The name of the object store that performed the | ||||
operation. | operation. | ||||
address | |||||
step_output_handle | |||||
""" | """ | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
op, | op, | ||||
key, | key, | ||||
dest_key=None, | dest_key=None, | ||||
obj=None, | obj=None, | ||||
serialization_strategy_name=None, | serialization_strategy_name=None, | ||||
object_store_name=None, | object_store_name=None, | ||||
value_name=None, | value_name=None, | ||||
address=None, | |||||
step_output_handle=None, | |||||
): | ): | ||||
from dagster.core.execution.plan.objects import StepOutputHandle | |||||
return super(ObjectStoreOperation, cls).__new__( | return super(ObjectStoreOperation, cls).__new__( | ||||
cls, | cls, | ||||
op=op, | op=op, | ||||
key=check.str_param(key, "key"), | key=check.str_param(key, "key"), | ||||
dest_key=check.opt_str_param(dest_key, "dest_key"), | dest_key=check.opt_str_param(dest_key, "dest_key"), | ||||
obj=obj, | obj=obj, | ||||
serialization_strategy_name=check.opt_str_param( | serialization_strategy_name=check.opt_str_param( | ||||
serialization_strategy_name, "serialization_strategy_name" | serialization_strategy_name, "serialization_strategy_name" | ||||
), | ), | ||||
object_store_name=check.opt_str_param(object_store_name, "object_store_name"), | object_store_name=check.opt_str_param(object_store_name, "object_store_name"), | ||||
value_name=check.opt_str_param(value_name, "value_name"), | value_name=check.opt_str_param(value_name, "value_name"), | ||||
address=check.opt_inst_param(address, "address", Address), | |||||
step_output_handle=check.opt_inst_param( | |||||
step_output_handle, "step_output_handle", StepOutputHandle | |||||
), | |||||
) | ) | ||||
@classmethod | @classmethod | ||||
def serializable(cls, inst, **kwargs): | def serializable(cls, inst, **kwargs): | ||||
return cls( | return cls( | ||||
**dict( | **dict( | ||||
{ | { | ||||
"op": inst.op.value, | "op": inst.op.value, | ||||
"key": inst.key, | "key": inst.key, | ||||
"dest_key": inst.dest_key, | "dest_key": inst.dest_key, | ||||
"obj": None, | "obj": None, | ||||
"serialization_strategy_name": inst.serialization_strategy_name, | "serialization_strategy_name": inst.serialization_strategy_name, | ||||
"object_store_name": inst.object_store_name, | "object_store_name": inst.object_store_name, | ||||
"value_name": inst.value_name, | "value_name": inst.value_name, | ||||
"address": inst.address, | |||||
"step_output_handle": inst.step_output_handle, | |||||
}, | }, | ||||
**kwargs | **kwargs | ||||
) | ) | ||||
) | ) | ||||
class HookExecutionResult(namedtuple("_HookExecutionResult", "hook_name is_skipped")): | class HookExecutionResult(namedtuple("_HookExecutionResult", "hook_name is_skipped")): | ||||
"""This event is used internally to indicate the execution result of a hook, e.g. whether the | """This event is used internally to indicate the execution result of a hook, e.g. whether the | ||||
Show All 13 Lines |