Differential D4718 Diff 24113 integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py
Show First 20 Lines • Show All 148 Lines • ▼ Show 20 Lines | def local_port_forward_postgres(namespace): | ||||
finally: | finally: | ||||
print("Terminating port-forwarding") | print("Terminating port-forwarding") | ||||
p.terminate() | p.terminate() | ||||
@pytest.fixture | @pytest.fixture | ||||
def dagster_instance_with_k8s_scheduler( | def dagster_instance_with_k8s_scheduler( | ||||
helm_namespace, run_launcher, k8s_scheduler, schedule_tempdir | helm_namespace_for_k8s_run_launcher, run_launcher, k8s_scheduler, schedule_tempdir | ||||
): | ): | ||||
with local_port_forward_postgres(namespace=helm_namespace) as local_forward_port: | with local_port_forward_postgres( | ||||
postgres_url = "postgresql://test:test@localhost:{local_forward_port}/test".format( | namespace=helm_namespace_for_k8s_run_launcher | ||||
) as local_forward_port: | |||||
postgres_url = 'postgresql://test:test@localhost:{local_forward_port}/test'.format( | |||||
local_forward_port=local_forward_port | local_forward_port=local_forward_port | ||||
) | ) | ||||
print("Local Postgres forwarding URL: ", postgres_url) | print("Local Postgres forwarding URL: ", postgres_url) | ||||
instance = DagsterInstance( | instance = DagsterInstance( | ||||
instance_type=InstanceType.EPHEMERAL, | instance_type=InstanceType.EPHEMERAL, | ||||
local_artifact_storage=LocalArtifactStorage(schedule_tempdir), | local_artifact_storage=LocalArtifactStorage(schedule_tempdir), | ||||
run_storage=SqliteRunStorage.from_local(os.path.join(schedule_tempdir, "runs")), | run_storage=SqliteRunStorage.from_local(os.path.join(schedule_tempdir, "runs")), | ||||
Show All 28 Lines | ) as local_forward_port: | ||||
run_storage=PostgresRunStorage(postgres_url), | run_storage=PostgresRunStorage(postgres_url), | ||||
event_storage=PostgresEventLogStorage(postgres_url), | event_storage=PostgresEventLogStorage(postgres_url), | ||||
compute_log_manager=NoOpComputeLogManager(), | compute_log_manager=NoOpComputeLogManager(), | ||||
run_launcher=run_launcher, | run_launcher=run_launcher, | ||||
) | ) | ||||
yield instance | yield instance | ||||
@pytest.fixture(scope="session") | @pytest.fixture(scope='session') | ||||
def dagster_instance_for_k8s_run_launcher(helm_namespace_for_k8s_run_launcher, run_launcher): | |||||
tempdir = DagsterInstance.temp_storage() | |||||
with local_port_forward_postgres( | |||||
namespace=helm_namespace_for_k8s_run_launcher | |||||
) as local_forward_port: | |||||
postgres_url = 'postgresql://test:test@localhost:{local_forward_port}/test'.format( | |||||
local_forward_port=local_forward_port | |||||
) | |||||
print('Local Postgres forwarding URL: ', postgres_url) | |||||
instance = DagsterInstance( | |||||
instance_type=InstanceType.EPHEMERAL, | |||||
local_artifact_storage=LocalArtifactStorage(tempdir), | |||||
run_storage=PostgresRunStorage(postgres_url), | |||||
event_storage=PostgresEventLogStorage(postgres_url), | |||||
compute_log_manager=NoOpComputeLogManager(), | |||||
run_launcher=run_launcher, | |||||
) | |||||
yield instance | |||||
@pytest.fixture(scope='session') | |||||
def dagster_instance(helm_namespace, run_launcher): # pylint: disable=redefined-outer-name | def dagster_instance(helm_namespace, run_launcher): # pylint: disable=redefined-outer-name | ||||
tempdir = DagsterInstance.temp_storage() | tempdir = DagsterInstance.temp_storage() | ||||
with local_port_forward_postgres(namespace=helm_namespace) as local_forward_port: | with local_port_forward_postgres(namespace=helm_namespace) as local_forward_port: | ||||
postgres_url = "postgresql://test:test@localhost:{local_forward_port}/test".format( | postgres_url = "postgresql://test:test@localhost:{local_forward_port}/test".format( | ||||
local_forward_port=local_forward_port | local_forward_port=local_forward_port | ||||
) | ) | ||||
print("Local Postgres forwarding URL: ", postgres_url) | print("Local Postgres forwarding URL: ", postgres_url) | ||||
Show All 10 Lines |