Differential D8122 Diff 38494 python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/run_storage.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/run_storage.py
import sqlalchemy as db | import sqlalchemy as db | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.storage.runs import DaemonHeartbeatsTable, RunStorageSqlMetadata, SqlRunStorage | from dagster.core.storage.runs import RunStorageSqlMetadata, SqlRunStorage | ||||
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev | from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev | ||||
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple | from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple | ||||
from dagster.utils import utc_datetime_from_timestamp | from dagster.utils import utc_datetime_from_timestamp | ||||
from ..utils import ( | from ..utils import ( | ||||
create_pg_connection, | create_pg_connection, | ||||
pg_alembic_config, | pg_alembic_config, | ||||
pg_config, | pg_config, | ||||
Show All 18 Lines | .. literalinclude:: ../../../../../../examples/docs_snippets/docs_snippets/deploying/dagster-pg.yaml | ||||
:caption: dagster.yaml | :caption: dagster.yaml | ||||
:lines: 1-10 | :lines: 1-10 | ||||
:language: YAML | :language: YAML | ||||
Note that the fields in this config are :py:class:`~dagster.StringSource` and | Note that the fields in this config are :py:class:`~dagster.StringSource` and | ||||
:py:class:`~dagster.IntSource` and can be configured from environment variables. | :py:class:`~dagster.IntSource` and can be configured from environment variables. | ||||
""" | """ | ||||
def __init__(self, postgres_url, should_autocreate_tables=True, inst_data=None): | def __init__( | ||||
self, postgres_url, should_autocreate_tables=True, inst_data=None, metadata_class=None | |||||
): | |||||
self._metadata_class = metadata_class if metadata_class else RunStorageSqlMetadata | |||||
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) | self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) | ||||
self.postgres_url = postgres_url | self.postgres_url = postgres_url | ||||
self.should_autocreate_tables = check.bool_param( | self.should_autocreate_tables = check.bool_param( | ||||
should_autocreate_tables, "should_autocreate_tables" | should_autocreate_tables, "should_autocreate_tables" | ||||
) | ) | ||||
# Default to not holding any connections open to prevent accumulating connections per DagsterInstance | # Default to not holding any connections open to prevent accumulating connections per DagsterInstance | ||||
self._engine = create_engine( | self._engine = create_engine( | ||||
self.postgres_url, | self.postgres_url, | ||||
isolation_level="AUTOCOMMIT", | isolation_level="AUTOCOMMIT", | ||||
poolclass=db.pool.NullPool, | poolclass=db.pool.NullPool, | ||||
) | ) | ||||
self._index_migration_cache = {} | self._index_migration_cache = {} | ||||
table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names()) | table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names()) | ||||
# Stamp and create tables if there's no previously stamped revision and the main table | # Stamp and create tables if there's no previously stamped revision and the main table | ||||
# doesn't exist (since we used to not stamp postgres storage when it was first created) | # doesn't exist (since we used to not stamp postgres storage when it was first created) | ||||
if self.should_autocreate_tables and "runs" not in table_names: | if self.should_autocreate_tables and "runs" not in table_names: | ||||
with self.connect() as conn: | with self.connect() as conn: | ||||
retry_pg_creation_fn(lambda: RunStorageSqlMetadata.create_all(conn)) | retry_pg_creation_fn(lambda: metadata_class.create_all(conn)) | ||||
# This revision may be shared by any other dagster storage classes using the same DB | # This revision may be shared by any other dagster storage classes using the same DB | ||||
stamp_alembic_rev(pg_alembic_config(__file__), conn) | stamp_alembic_rev(pg_alembic_config(__file__), conn) | ||||
# mark all secondary indexes as built | # mark all secondary indexes as built | ||||
self.build_missing_indexes() | self.build_missing_indexes() | ||||
super().__init__() | super().__init__(metadata_class=metadata_class) | ||||
def optimize_for_dagit(self, statement_timeout): | def optimize_for_dagit(self, statement_timeout): | ||||
# When running in dagit, hold 1 open connection and set statement_timeout | # When running in dagit, hold 1 open connection and set statement_timeout | ||||
self._engine = create_engine( | self._engine = create_engine( | ||||
self.postgres_url, | self.postgres_url, | ||||
isolation_level="AUTOCOMMIT", | isolation_level="AUTOCOMMIT", | ||||
pool_size=1, | pool_size=1, | ||||
connect_args={"options": pg_statement_timeout(statement_timeout)}, | connect_args={"options": pg_statement_timeout(statement_timeout)}, | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def mark_index_built(self, migration_name): | ||||
if migration_name in self._index_migration_cache: | if migration_name in self._index_migration_cache: | ||||
del self._index_migration_cache[migration_name] | del self._index_migration_cache[migration_name] | ||||
def add_daemon_heartbeat(self, daemon_heartbeat): | def add_daemon_heartbeat(self, daemon_heartbeat): | ||||
with self.connect() as conn: | with self.connect() as conn: | ||||
# insert or update if already present, using postgres specific on_conflict | # insert or update if already present, using postgres specific on_conflict | ||||
conn.execute( | conn.execute( | ||||
db.dialects.postgresql.insert(DaemonHeartbeatsTable) | db.dialects.postgresql.insert(self._daemon_heartbeats_table) | ||||
.values( # pylint: disable=no-value-for-parameter | .values( # pylint: disable=no-value-for-parameter | ||||
timestamp=utc_datetime_from_timestamp(daemon_heartbeat.timestamp), | timestamp=utc_datetime_from_timestamp(daemon_heartbeat.timestamp), | ||||
daemon_type=daemon_heartbeat.daemon_type, | daemon_type=daemon_heartbeat.daemon_type, | ||||
daemon_id=daemon_heartbeat.daemon_id, | daemon_id=daemon_heartbeat.daemon_id, | ||||
body=serialize_dagster_namedtuple(daemon_heartbeat), | body=serialize_dagster_namedtuple(daemon_heartbeat), | ||||
) | ) | ||||
.on_conflict_do_update( | .on_conflict_do_update( | ||||
index_elements=[DaemonHeartbeatsTable.c.daemon_type], | index_elements=[self._daemon_heartbeats_table.c.daemon_type], | ||||
set_={ | set_={ | ||||
"timestamp": utc_datetime_from_timestamp(daemon_heartbeat.timestamp), | "timestamp": utc_datetime_from_timestamp(daemon_heartbeat.timestamp), | ||||
"daemon_id": daemon_heartbeat.daemon_id, | "daemon_id": daemon_heartbeat.daemon_id, | ||||
"body": serialize_dagster_namedtuple(daemon_heartbeat), | "body": serialize_dagster_namedtuple(daemon_heartbeat), | ||||
}, | }, | ||||
) | ) | ||||
) | ) |