Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/telemetry.py
Show All 30 Lines | |||||
from dagster.core.definitions.pipeline_base import IPipeline | from dagster.core.definitions.pipeline_base import IPipeline | ||||
from dagster.core.definitions.reconstructable import ( | from dagster.core.definitions.reconstructable import ( | ||||
ReconstructablePipeline, | ReconstructablePipeline, | ||||
ReconstructableRepository, | ReconstructableRepository, | ||||
get_ephemeral_repository_name, | get_ephemeral_repository_name, | ||||
) | ) | ||||
from dagster.core.errors import DagsterInvariantViolationError | from dagster.core.errors import DagsterInvariantViolationError | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.seven import get_current_datetime_in_utc | |||||
TELEMETRY_STR = ".telemetry" | TELEMETRY_STR = ".telemetry" | ||||
INSTANCE_ID_STR = "instance_id" | INSTANCE_ID_STR = "instance_id" | ||||
ENABLED_STR = "enabled" | ENABLED_STR = "enabled" | ||||
DAGSTER_HOME_FALLBACK = "~/.dagster" | DAGSTER_HOME_FALLBACK = "~/.dagster" | ||||
DAGSTER_TELEMETRY_URL = "http://telemetry.dagster.io/actions" | DAGSTER_TELEMETRY_URL = "http://telemetry.dagster.io/actions" | ||||
MAX_BYTES = 10485760 # 10 MB = 10 * 1024 * 1024 bytes | MAX_BYTES = 10485760 # 10 MB = 10 * 1024 * 1024 bytes | ||||
UPDATE_REPO_STATS = "update_repo_stats" | UPDATE_REPO_STATS = "update_repo_stats" | ||||
Show All 33 Lines | except ValueError: | ||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
"Attempted to log telemetry for function {name} that does not take a DagsterInstance " | "Attempted to log telemetry for function {name} that does not take a DagsterInstance " | ||||
"in a parameter called 'instance'" | "in a parameter called 'instance'" | ||||
) | ) | ||||
@wraps(f) | @wraps(f) | ||||
def wrap(*args, **kwargs): | def wrap(*args, **kwargs): | ||||
instance = _check_telemetry_instance_param(args, kwargs, instance_index) | instance = _check_telemetry_instance_param(args, kwargs, instance_index) | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
log_action(instance=instance, action=f.__name__ + "_started", client_time=start_time) | log_action(instance=instance, action=f.__name__ + "_started", client_time=start_time) | ||||
result = f(*args, **kwargs) | result = f(*args, **kwargs) | ||||
end_time = datetime.datetime.now() | end_time = get_current_datetime_in_utc() | ||||
log_action( | log_action( | ||||
instance=instance, | instance=instance, | ||||
action=f.__name__ + "_ended", | action=f.__name__ + "_ended", | ||||
client_time=end_time, | client_time=end_time, | ||||
elapsed_time=end_time - start_time, | elapsed_time=end_time - start_time, | ||||
metadata={"success": getattr(result, "success", None)}, | metadata={"success": getattr(result, "success", None)}, | ||||
) | ) | ||||
return result | return result | ||||
▲ Show 20 Lines • Show All 242 Lines • ▼ Show 20 Lines | if _get_instance_telemetry_enabled(instance): | ||||
pipeline_name_hash = hash_name(external_pipeline.name) if external_pipeline else "" | pipeline_name_hash = hash_name(external_pipeline.name) if external_pipeline else "" | ||||
repo_hash = hash_name(external_repo.name) | repo_hash = hash_name(external_repo.name) | ||||
num_pipelines_in_repo = len(external_repo.get_all_external_pipelines()) | num_pipelines_in_repo = len(external_repo.get_all_external_pipelines()) | ||||
write_telemetry_log_line( | write_telemetry_log_line( | ||||
TelemetryEntry( | TelemetryEntry( | ||||
action=UPDATE_REPO_STATS, | action=UPDATE_REPO_STATS, | ||||
client_time=str(datetime.datetime.now()), | client_time=str(get_current_datetime_in_utc()), | ||||
event_id=str(uuid.uuid4()), | event_id=str(uuid.uuid4()), | ||||
instance_id=instance_id, | instance_id=instance_id, | ||||
pipeline_name_hash=pipeline_name_hash, | pipeline_name_hash=pipeline_name_hash, | ||||
num_pipelines_in_repo=str(num_pipelines_in_repo), | num_pipelines_in_repo=str(num_pipelines_in_repo), | ||||
repo_hash=repo_hash, | repo_hash=repo_hash, | ||||
metadata={"source": source}, | metadata={"source": source}, | ||||
)._asdict() | )._asdict() | ||||
) | ) | ||||
Show All 21 Lines | if _get_instance_telemetry_enabled(instance): | ||||
else: | else: | ||||
pipeline_name_hash = hash_name(pipeline.get_definition().name) | pipeline_name_hash = hash_name(pipeline.get_definition().name) | ||||
repo_hash = hash_name(get_ephemeral_repository_name(pipeline.get_definition().name)) | repo_hash = hash_name(get_ephemeral_repository_name(pipeline.get_definition().name)) | ||||
num_pipelines_in_repo = 1 | num_pipelines_in_repo = 1 | ||||
write_telemetry_log_line( | write_telemetry_log_line( | ||||
TelemetryEntry( | TelemetryEntry( | ||||
action=UPDATE_REPO_STATS, | action=UPDATE_REPO_STATS, | ||||
client_time=str(datetime.datetime.now()), | client_time=str(get_current_datetime_in_utc()), | ||||
event_id=str(uuid.uuid4()), | event_id=str(uuid.uuid4()), | ||||
instance_id=instance_id, | instance_id=instance_id, | ||||
pipeline_name_hash=pipeline_name_hash, | pipeline_name_hash=pipeline_name_hash, | ||||
num_pipelines_in_repo=str(num_pipelines_in_repo), | num_pipelines_in_repo=str(num_pipelines_in_repo), | ||||
repo_hash=repo_hash, | repo_hash=repo_hash, | ||||
metadata={"source": source}, | metadata={"source": source}, | ||||
)._asdict() | )._asdict() | ||||
) | ) | ||||
def log_action(instance, action, client_time=None, elapsed_time=None, metadata=None): | def log_action(instance, action, client_time=None, elapsed_time=None, metadata=None): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
if client_time is None: | if client_time is None: | ||||
client_time = datetime.datetime.now() | client_time = get_current_datetime_in_utc() | ||||
(dagster_telemetry_enabled, instance_id) = _get_instance_telemetry_info(instance) | (dagster_telemetry_enabled, instance_id) = _get_instance_telemetry_info(instance) | ||||
if dagster_telemetry_enabled: | if dagster_telemetry_enabled: | ||||
# Log general statistics | # Log general statistics | ||||
write_telemetry_log_line( | write_telemetry_log_line( | ||||
TelemetryEntry( | TelemetryEntry( | ||||
action=action, | action=action, | ||||
Show All 31 Lines | |||||
""" % { | """ % { | ||||
"welcome": click.style("Welcome to Dagster!", bold=True) | "welcome": click.style("Welcome to Dagster!", bold=True) | ||||
} | } | ||||
def upload_logs(stop_event): | def upload_logs(stop_event): | ||||
"""Upload logs to telemetry server every hour, or when log directory size is > 10MB""" | """Upload logs to telemetry server every hour, or when log directory size is > 10MB""" | ||||
try: | try: | ||||
last_run = datetime.datetime.now() - datetime.timedelta(minutes=120) | last_run = get_current_datetime_in_utc() - datetime.timedelta(minutes=120) | ||||
dagster_log_dir = get_dir_from_dagster_home("logs") | dagster_log_dir = get_dir_from_dagster_home("logs") | ||||
dagster_log_queue_dir = get_dir_from_dagster_home(".logs_queue") | dagster_log_queue_dir = get_dir_from_dagster_home(".logs_queue") | ||||
in_progress = False | in_progress = False | ||||
while not stop_event.is_set(): | while not stop_event.is_set(): | ||||
log_size = 0 | log_size = 0 | ||||
if os.path.isdir(dagster_log_dir): | if os.path.isdir(dagster_log_dir): | ||||
log_size = sum( | log_size = sum( | ||||
os.path.getsize(os.path.join(dagster_log_dir, f)) | os.path.getsize(os.path.join(dagster_log_dir, f)) | ||||
for f in os.listdir(dagster_log_dir) | for f in os.listdir(dagster_log_dir) | ||||
if os.path.isfile(os.path.join(dagster_log_dir, f)) | if os.path.isfile(os.path.join(dagster_log_dir, f)) | ||||
) | ) | ||||
log_queue_size = 0 | log_queue_size = 0 | ||||
if os.path.isdir(dagster_log_queue_dir): | if os.path.isdir(dagster_log_queue_dir): | ||||
log_queue_size = sum( | log_queue_size = sum( | ||||
os.path.getsize(os.path.join(dagster_log_queue_dir, f)) | os.path.getsize(os.path.join(dagster_log_queue_dir, f)) | ||||
for f in os.listdir(dagster_log_queue_dir) | for f in os.listdir(dagster_log_queue_dir) | ||||
if os.path.isfile(os.path.join(dagster_log_queue_dir, f)) | if os.path.isfile(os.path.join(dagster_log_queue_dir, f)) | ||||
) | ) | ||||
if log_size == 0 and log_queue_size == 0: | if log_size == 0 and log_queue_size == 0: | ||||
return | return | ||||
if not in_progress and ( | if not in_progress and ( | ||||
datetime.datetime.now() - last_run > datetime.timedelta(minutes=60) | get_current_datetime_in_utc() - last_run > datetime.timedelta(minutes=60) | ||||
or log_size >= MAX_BYTES | or log_size >= MAX_BYTES | ||||
or log_queue_size >= MAX_BYTES | or log_queue_size >= MAX_BYTES | ||||
): | ): | ||||
in_progress = True # Prevent concurrent _upload_logs invocations | in_progress = True # Prevent concurrent _upload_logs invocations | ||||
last_run = datetime.datetime.now() | last_run = get_current_datetime_in_utc() | ||||
dagster_log_dir = get_dir_from_dagster_home("logs") | dagster_log_dir = get_dir_from_dagster_home("logs") | ||||
dagster_log_queue_dir = get_dir_from_dagster_home(".logs_queue") | dagster_log_queue_dir = get_dir_from_dagster_home(".logs_queue") | ||||
_upload_logs(dagster_log_dir, log_size, dagster_log_queue_dir) | _upload_logs(dagster_log_dir, log_size, dagster_log_queue_dir) | ||||
in_progress = False | in_progress = False | ||||
stop_event.wait(600) # Sleep for 10 minutes | stop_event.wait(600) # Sleep for 10 minutes | ||||
except Exception: # pylint: disable=broad-except | except Exception: # pylint: disable=broad-except | ||||
pass | pass | ||||
Show All 36 Lines |