Changeset View
Changeset View
Standalone View
Standalone View
docs/content/guides/dagster/run-attribution.mdx
Show All 16 Lines | |||||
To accomplish this, we'll use a custom Run Coordinator to read Flask HTTP headers (from Dagster's [GraphQL server](/concepts/dagit/graphql)) and parse the headers to get an email which we'll attach as a tag. | To accomplish this, we'll use a custom Run Coordinator to read Flask HTTP headers (from Dagster's [GraphQL server](/concepts/dagit/graphql)) and parse the headers to get an email which we'll attach as a tag. | ||||
### Custom Run Coordinator | ### Custom Run Coordinator | ||||
In this use case, we'd like to add a hook to customize submitted runs while still using a queue to submit runs to the [Dagster Daemon](/deployment/dagster-daemon). To accomplish this, we can use the <Link href="/deployment/run-coordinator#queuedruncoordinator">Queued Run Coordinator</Link> as follows: | In this use case, we'd like to add a hook to customize submitted runs while still using a queue to submit runs to the [Dagster Daemon](/deployment/dagster-daemon). To accomplish this, we can use the <Link href="/deployment/run-coordinator#queuedruncoordinator">Queued Run Coordinator</Link> as follows: | ||||
```python file=/guides/dagster/run-attribution/custom_run_coordinator_skeleton.py startafter=start_custom_run_coordinator_marker endbefore=end_custom_run_coordinator_marker | ```python file=/guides/dagster/run-attribution/custom_run_coordinator_skeleton.py startafter=start_custom_run_coordinator_marker endbefore=end_custom_run_coordinator_marker | ||||
from dagster.core.run_coordinator import QueuedRunCoordinator, SubmitRunContext | from dagster.core.run_coordinator import ( | ||||
QueuedRunCoordinator, | |||||
SubmitRunContext, | |||||
) | |||||
from dagster.core.storage.pipeline_run import PipelineRun | from dagster.core.storage.pipeline_run import PipelineRun | ||||
class CustomRunCoordinator(QueuedRunCoordinator): | class CustomRunCoordinator(QueuedRunCoordinator): | ||||
def submit_run(self, context: SubmitRunContext) -> PipelineRun: | def submit_run(self, context: SubmitRunContext) -> PipelineRun: | ||||
pass | pass | ||||
``` | ``` | ||||
This allows us to insert custom hooks in `submit_run` to execute when runs are submitted. | This allows us to insert custom hooks in `submit_run` to execute when runs are submitted. | ||||
We can first read HTTP headers with the following code snippet: | We can first read HTTP headers with the following code snippet: | ||||
```python file=/guides/dagster/run-attribution/custom_run_coordinator_skeleton.py startafter=start_flask_header_marker endbefore=end_flask_header_marker | ```python file=/guides/dagster/run-attribution/custom_run_coordinator_skeleton.py startafter=start_flask_header_marker endbefore=end_flask_header_marker | ||||
from flask import has_request_context, request | from flask import has_request_context, request | ||||
desired_header = request.headers.get(CUSTOM_HEADER_NAME) if has_request_context() else None | desired_header = ( | ||||
request.headers.get(CUSTOM_HEADER_NAME) if has_request_context() else None | |||||
) | |||||
``` | ``` | ||||
Then we can parse the relevant header (in this case, called the `jwt_claims_header`) with any custom hook. In the following example, we're decoding a JWT header which contains the user's email. | Then we can parse the relevant header (in this case, called the `jwt_claims_header`) with any custom hook. In the following example, we're decoding a JWT header which contains the user's email. | ||||
```python file=../../run_attribution_example/run_attribution_example/custom_run_coordinator.py startafter=start_email_marker endbefore=end_email_marker dedent=4 | ```python file=../../run_attribution_example/run_attribution_example/custom_run_coordinator.py startafter=start_email_marker endbefore=end_email_marker dedent=4 | ||||
def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]: | def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]: | ||||
if not jwt_claims_header: | if not jwt_claims_header: | ||||
return None | return None | ||||
▲ Show 20 Lines • Show All 56 Lines • Show Last 20 Lines |