Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/grpc/server.py
Show First 20 Lines • Show All 183 Lines • ▼ Show 20 Lines | ): | ||||
self._server_termination_event = check.inst_param( | self._server_termination_event = check.inst_param( | ||||
server_termination_event, "server_termination_event", seven.ThreadingEventType | server_termination_event, "server_termination_event", seven.ThreadingEventType | ||||
) | ) | ||||
self._loadable_target_origin = check.opt_inst_param( | self._loadable_target_origin = check.opt_inst_param( | ||||
loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin | loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin | ||||
) | ) | ||||
# Each server is initialized with a unique UUID. This UUID is used by clients to track when | |||||
# servers are replaced and is used for cache invalidation and reloading. | |||||
self._server_id = str(uuid.uuid4()) | |||||
# Client tells the server to shutdown by calling ShutdownServer (or by failing to send a | # Client tells the server to shutdown by calling ShutdownServer (or by failing to send a | ||||
# hearbeat, at which point this event is set. The cleanup thread will then set the server | # hearbeat, at which point this event is set. The cleanup thread will then set the server | ||||
# termination event once all current executions have finished, which will stop the server) | # termination event once all current executions have finished, which will stop the server) | ||||
self._shutdown_once_executions_finish_event = threading.Event() | self._shutdown_once_executions_finish_event = threading.Event() | ||||
# Dict[str, (multiprocessing.Process, DagsterInstance)] | # Dict[str, (multiprocessing.Process, DagsterInstance)] | ||||
self._executions = {} | self._executions = {} | ||||
# Dict[str, multiprocessing.Event] | # Dict[str, multiprocessing.Event] | ||||
▲ Show 20 Lines • Show All 109 Lines • ▼ Show 20 Lines | def StreamingPing(self, request, _context): | ||||
for sequence_number in range(sequence_length): | for sequence_number in range(sequence_length): | ||||
yield api_pb2.StreamingPingEvent(sequence_number=sequence_number, echo=echo) | yield api_pb2.StreamingPingEvent(sequence_number=sequence_number, echo=echo) | ||||
def Heartbeat(self, request, _context): | def Heartbeat(self, request, _context): | ||||
self.__last_heartbeat_time = time.time() | self.__last_heartbeat_time = time.time() | ||||
echo = request.echo | echo = request.echo | ||||
return api_pb2.PingReply(echo=echo) | return api_pb2.PingReply(echo=echo) | ||||
def GetServerId(self, _request, _context): | |||||
return api_pb2.GetServerIdReply(server_id=self._server_id) | |||||
def ExecutionPlanSnapshot(self, request, _context): | def ExecutionPlanSnapshot(self, request, _context): | ||||
execution_plan_args = deserialize_json_to_dagster_namedtuple( | execution_plan_args = deserialize_json_to_dagster_namedtuple( | ||||
request.serialized_execution_plan_snapshot_args | request.serialized_execution_plan_snapshot_args | ||||
) | ) | ||||
check.inst_param(execution_plan_args, "execution_plan_args", ExecutionPlanSnapshotArgs) | check.inst_param(execution_plan_args, "execution_plan_args", ExecutionPlanSnapshotArgs) | ||||
recon_pipeline = self._recon_pipeline_from_origin(execution_plan_args.pipeline_origin) | recon_pipeline = self._recon_pipeline_from_origin(execution_plan_args.pipeline_origin) | ||||
execution_plan_snapshot_or_error = get_external_execution_plan_snapshot( | execution_plan_snapshot_or_error = get_external_execution_plan_snapshot( | ||||
▲ Show 20 Lines • Show All 786 Lines • Show Last 20 Lines |