Ask AI

You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.declarative_automation.automation_condition

import datetime
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Mapping, NamedTuple, Optional, Sequence, Union

from dagster._annotations import experimental
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice, TemporalContext
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_automation.serialized_objects import (
    AssetSubsetWithMetadata,
    AutomationConditionCursor,
    AutomationConditionEvaluation,
    AutomationConditionNodeCursor,
    AutomationConditionSnapshot,
    get_serializable_candidate_subset,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._record import copy
from dagster._time import get_current_timestamp
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import disable_dagster_warnings

if TYPE_CHECKING:
    from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy

    from .automation_context import AutomationContext
    from .operands import (
        CodeVersionChangedCondition,
        CronTickPassedCondition,
        FailedAutomationCondition,
        InLatestTimeWindowCondition,
        InProgressAutomationCondition,
        MissingAutomationCondition,
        NewlyRequestedCondition,
        NewlyUpdatedCondition,
        WillBeRequestedCondition,
    )
    from .operators import (
        AllDepsCondition,
        AndAutomationCondition,
        AnyDepsCondition,
        AnyDownstreamConditionsCondition,
        NewlyTrueCondition,
        NotAutomationCondition,
        OrAutomationCondition,
        SinceCondition,
    )


[docs] class AutomationCondition(ABC): @property def requires_cursor(self) -> bool: return False @property def children(self) -> Sequence["AutomationCondition"]: return [] @property @abstractmethod def description(self) -> str: """Human-readable description of when this condition is true.""" raise NotImplementedError() @property @abstractmethod def label(self) -> Optional[str]: """User-provided label subjectively describing the purpose of this condition in the broader evaluation tree.""" raise NotImplementedError() @property def name(self) -> Optional[str]: """Formal name of this specific condition, generally aligning with its static constructor.""" return None def get_snapshot(self, unique_id: str) -> AutomationConditionSnapshot: """Returns a snapshot of this condition that can be used for serialization.""" return AutomationConditionSnapshot( class_name=self.__class__.__name__, description=self.description, unique_id=unique_id, label=self.label, name=self.name, ) def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str: """Returns a unique identifier for this condition within the broader condition tree.""" parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description] return non_secure_md5_hash_str("".join(parts).encode()) def get_hash( self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None ) -> int: """Generates a hash based off of the unique ids of all children.""" unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index) hashes = [hash(unique_id)] for i, child in enumerate(self.children): hashes.append(child.get_hash(parent_unique_id=unique_id, index=i)) return hash(tuple(hashes)) def __hash__(self) -> int: return self.get_hash() def as_auto_materialize_policy(self) -> "AutoMaterializePolicy": """Returns an AutoMaterializePolicy which contains this condition.""" from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy return AutoMaterializePolicy.from_automation_condition(self) def is_rule_condition(self): from .legacy import RuleCondition if isinstance(self, RuleCondition): return True return any(child.is_rule_condition() for child in self.children) @abstractmethod def evaluate(self, context: "AutomationContext") -> "AutomationResult": raise NotImplementedError() def with_label(self, label: Optional[str]) -> "AutomationCondition": """Returns a copy of this AutomationCondition with a human-readable label.""" return copy(self, label=label) def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition": from .operators import AndAutomationCondition # group AndAutomationConditions together if isinstance(self, AndAutomationCondition): return AndAutomationCondition(operands=[*self.operands, other]) return AndAutomationCondition(operands=[self, other]) def __or__(self, other: "AutomationCondition") -> "OrAutomationCondition": from .operators import OrAutomationCondition # group OrAutomationConditions together if isinstance(self, OrAutomationCondition): return OrAutomationCondition(operands=[*self.operands, other]) return OrAutomationCondition(operands=[self, other]) def __invert__(self) -> "NotAutomationCondition": from .operators import NotAutomationCondition return NotAutomationCondition(operand=self) def since(self, reset_condition: "AutomationCondition") -> "SinceCondition": """Returns a AutomationCondition that is true if this condition has become true since the last time the reference condition became true. """ from .operators import SinceCondition return SinceCondition(trigger_condition=self, reset_condition=reset_condition) def newly_true(self) -> "NewlyTrueCondition": """Returns a AutomationCondition that is true only on the tick that this condition goes from false to true for a given asset partition. """ from .operators import NewlyTrueCondition return NewlyTrueCondition(operand=self) @experimental @staticmethod def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": """Returns a AutomationCondition that is true for an asset partition if at least one partition of any of its dependencies evaluate to True for the given condition. Args: condition (AutomationCondition): The AutomationCondition that will be evaluated against this asset's dependencies. """ from .operators import AnyDepsCondition return AnyDepsCondition(operand=condition) @experimental @staticmethod def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": """Returns a AutomationCondition that is true for an asset partition if at least one partition of all of its dependencies evaluate to True for the given condition. Args: condition (AutomationCondition): The AutomationCondition that will be evaluated against this asset's dependencies. """ from .operators import AllDepsCondition return AllDepsCondition(operand=condition) @experimental @staticmethod def missing() -> "MissingAutomationCondition": """Returns a AutomationCondition that is true for an asset partition if it has never been materialized or observed. """ from .operands import MissingAutomationCondition return MissingAutomationCondition() @experimental @staticmethod def in_progress() -> "InProgressAutomationCondition": """Returns a AutomationCondition that is true for an asset partition if it is part of an in-progress run.""" from .operands import InProgressAutomationCondition return InProgressAutomationCondition() @experimental @staticmethod def failed() -> "FailedAutomationCondition": """Returns a AutomationCondition that is true for an asset partition if its latest run failed.""" from .operands import FailedAutomationCondition return FailedAutomationCondition() @experimental @staticmethod def in_latest_time_window( lookback_delta: Optional[datetime.timedelta] = None, ) -> "InLatestTimeWindowCondition": """Returns a AutomationCondition that is true for an asset partition when it is within the latest time window. Args: lookback_delta (Optional, datetime.timedelta): If provided, the condition will return all partitions within the provided delta of the end of the latest time window. For example, if this is used on a daily-partitioned asset with a lookback_delta of 48 hours, this will return the latest two partitions. """ from .operands import InLatestTimeWindowCondition return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta) @experimental @staticmethod def will_be_requested() -> "WillBeRequestedCondition": """Returns a AutomationCondition that is true for an asset partition if it will be requested this tick.""" from .operands import WillBeRequestedCondition return WillBeRequestedCondition() @experimental @staticmethod def newly_updated() -> "NewlyUpdatedCondition": """Returns a AutomationCondition that is true for an asset partition if it has been updated since the previous tick.""" from .operands import NewlyUpdatedCondition return NewlyUpdatedCondition() @experimental @staticmethod def newly_requested() -> "NewlyRequestedCondition": """Returns a AutomationCondition that is true for an asset partition if it was requested on the previous tick.""" from .operands import NewlyRequestedCondition return NewlyRequestedCondition() @experimental @staticmethod def code_version_changed() -> "CodeVersionChangedCondition": """Returns a AutomationCondition that is true for an asset partition if its asset's code version has been changed since the previous tick. """ from .operands import CodeVersionChangedCondition return CodeVersionChangedCondition() @experimental @staticmethod def cron_tick_passed( cron_schedule: str, cron_timezone: str = "UTC" ) -> "CronTickPassedCondition": """Returns a AutomationCondition that is true for all asset partitions whenever a cron tick of the provided schedule is passed.""" from .operands import CronTickPassedCondition return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone) @experimental @staticmethod def eager() -> "AutomationCondition": """Returns a condition which will "eagerly" fill in missing partitions as they are created, and ensures unpartitioned assets are updated whenever their dependencies are updated (either via scheduled execution or ad-hoc runs). Specifically, this is a composite AutomationCondition which is true for an asset partition if all of the following are true: - The asset partition is within the latest time window - At least one of its parents has been updated more recently than it has been requested, or the asset partition has never been materialized - None of its parent partitions are missing - None of its parent partitions are currently part of an in-progress run """ with disable_dagster_warnings(): became_missing_or_any_parents_updated = ( AutomationCondition.missing().newly_true().with_label("became missing") | AutomationCondition.any_deps_match( AutomationCondition.newly_updated() | AutomationCondition.will_be_requested() ).with_label("any parents updated") ) any_parents_missing = AutomationCondition.any_deps_match( AutomationCondition.missing() & ~AutomationCondition.will_be_requested() ).with_label("any parents missing") any_parents_in_progress = AutomationCondition.any_deps_match( AutomationCondition.in_progress() ).with_label("any parents in progress") return ( AutomationCondition.in_latest_time_window() & became_missing_or_any_parents_updated.since( AutomationCondition.newly_requested() | AutomationCondition.newly_updated() ) & ~any_parents_missing & ~any_parents_in_progress & ~AutomationCondition.in_progress() ).with_label("eager") @experimental @staticmethod def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition": """Returns a condition which will materialize asset partitions within the latest time window on a given cron schedule, after their parents have been updated. For example, if the cron_schedule is set to "0 0 * * *" (every day at midnight), then this rule will not become true on a given day until all of its parents have been updated during that same day. Specifically, this is a composite AutomationCondition which is true for an asset partition if all of the following are true: - The asset partition is within the latest time window - All parent asset partitions have been updated since the latest tick of the provided cron schedule, or will be requested this tick - The asset partition has not been requested since the latest tick of the provided cron schedule """ with disable_dagster_warnings(): cron_label = f"'{cron_schedule}' ({cron_timezone})" cron_tick_passed = AutomationCondition.cron_tick_passed( cron_schedule, cron_timezone ).with_label(f"tick of {cron_label} passed") all_deps_updated_since_cron = AutomationCondition.all_deps_match( AutomationCondition.newly_updated().since(cron_tick_passed) | AutomationCondition.will_be_requested() ).with_label(f"all parents updated since {cron_label}") return ( AutomationCondition.in_latest_time_window() & cron_tick_passed.since(AutomationCondition.newly_requested()) & all_deps_updated_since_cron ).with_label(f"on cron {cron_label}") @experimental @staticmethod def any_downstream_conditions() -> "AnyDownstreamConditionsCondition": """Returns a condition which will represent the union of all distinct downstream conditions.""" from .operators import AnyDownstreamConditionsCondition return AnyDownstreamConditionsCondition()
class AutomationResult(NamedTuple): condition: AutomationCondition condition_unique_id: str value_hash: str start_timestamp: float end_timestamp: float temporal_context: TemporalContext true_slice: AssetSlice candidate_slice: AssetSlice child_results: Sequence["AutomationResult"] node_cursor: Optional[AutomationConditionNodeCursor] serializable_evaluation: AutomationConditionEvaluation extra_state: Any subsets_with_metadata: Sequence[AssetSubsetWithMetadata] @property def asset_key(self) -> AssetKey: return self.true_slice.asset_key @property def true_subset(self) -> AssetSubset: return self.true_slice.convert_to_valid_asset_subset() @staticmethod def _create( context: "AutomationContext", true_slice: AssetSlice, subsets_with_metadata: Sequence[AssetSubsetWithMetadata], extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]], child_results: Sequence["AutomationResult"], ) -> "AutomationResult": start_timestamp = context.create_time.timestamp() end_timestamp = get_current_timestamp() return AutomationResult( condition=context.condition, condition_unique_id=context.condition_unique_id, value_hash=_compute_value_hash( condition_unique_id=context.condition_unique_id, condition_description=context.condition.description, true_slice=true_slice, candidate_slice=context.candidate_slice, subsets_with_metadata=subsets_with_metadata, child_results=child_results, ), start_timestamp=start_timestamp, end_timestamp=end_timestamp, temporal_context=context.new_temporal_context, true_slice=true_slice, candidate_slice=context.candidate_slice, subsets_with_metadata=[], child_results=child_results, extra_state=None, node_cursor=_create_node_cursor( true_slice=true_slice, candidate_slice=context.candidate_slice, subsets_with_metadata=subsets_with_metadata, extra_state=extra_state, ) if context.condition.requires_cursor else None, serializable_evaluation=_create_serializable_evaluation( context=context, true_slice=true_slice, candidate_slice=context.candidate_slice, subsets_with_metadata=subsets_with_metadata, start_timestamp=start_timestamp, end_timestamp=end_timestamp, child_results=child_results, ), ) @staticmethod def create_from_children( context: "AutomationContext", true_slice: AssetSlice, child_results: Sequence["AutomationResult"], extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]] = None, ) -> "AutomationResult": """Returns a new AutomationResult from the given child results.""" return AutomationResult._create( context=context, true_slice=true_slice, subsets_with_metadata=[], extra_state=extra_state, child_results=child_results, ) @staticmethod def create( context: "AutomationContext", true_slice: AssetSlice, subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [], extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]] = None, ) -> "AutomationResult": """Returns a new AutomationResult from the given parameters.""" return AutomationResult._create( context=context, true_slice=true_slice, subsets_with_metadata=subsets_with_metadata, extra_state=extra_state, child_results=[], ) def get_child_node_cursors(self) -> Mapping[str, AutomationConditionNodeCursor]: node_cursors = {self.condition_unique_id: self.node_cursor} if self.node_cursor else {} for child_result in self.child_results: node_cursors.update(child_result.get_child_node_cursors()) return node_cursors def get_new_cursor(self) -> AutomationConditionCursor: return AutomationConditionCursor( previous_requested_subset=self.serializable_evaluation.true_subset, effective_timestamp=self.temporal_context.effective_dt.timestamp(), last_event_id=self.temporal_context.last_event_id, node_cursors_by_unique_id=self.get_child_node_cursors(), result_value_hash=self.value_hash, ) def _create_node_cursor( true_slice: AssetSlice, candidate_slice: AssetSlice, subsets_with_metadata: Sequence[AssetSubsetWithMetadata], extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]], ) -> AutomationConditionNodeCursor: return AutomationConditionNodeCursor( true_subset=true_slice.convert_to_valid_asset_subset(), candidate_subset=get_serializable_candidate_subset( candidate_slice.convert_to_valid_asset_subset() ), subsets_with_metadata=subsets_with_metadata, extra_state=extra_state, ) def _create_serializable_evaluation( context: "AutomationContext", true_slice: AssetSlice, candidate_slice: AssetSlice, subsets_with_metadata: Sequence[AssetSubsetWithMetadata], start_timestamp: float, end_timestamp: float, child_results: Sequence[AutomationResult], ) -> AutomationConditionEvaluation: return AutomationConditionEvaluation( condition_snapshot=context.condition.get_snapshot(context.condition_unique_id), true_subset=true_slice.convert_to_valid_asset_subset(), candidate_subset=get_serializable_candidate_subset( candidate_slice.convert_to_valid_asset_subset() ), subsets_with_metadata=subsets_with_metadata, start_timestamp=start_timestamp, end_timestamp=end_timestamp, child_evaluations=[child_result.serializable_evaluation for child_result in child_results], ) def _compute_value_hash( condition_unique_id: str, condition_description: str, true_slice: AssetSlice, candidate_slice: AssetSlice, subsets_with_metadata: Sequence[AssetSubsetWithMetadata], child_results: Sequence[AutomationResult], ) -> str: """Computes a unique hash representing the values contained within an evaluation result. This string will be identical for results which have identical values, allowing us to detect changes without serializing the entire value. """ components: Sequence[str] = [ condition_unique_id, condition_description, _compute_subset_value_str(true_slice.convert_to_valid_asset_subset()), _compute_subset_value_str(candidate_slice.convert_to_valid_asset_subset()), *(_compute_subset_with_metadata_value_str(swm) for swm in subsets_with_metadata), *(child_result.value_hash for child_result in child_results), ] return non_secure_md5_hash_str("".join(components).encode("utf-8")) def _compute_subset_value_str(subset: AssetSubset) -> str: """Computes a unique string representing a given AssetSubsets. This string will be equal for equivalent AssetSubsets. """ if isinstance(subset.value, bool): return str(subset.value) elif isinstance(subset.value, AllPartitionsSubset): return AllPartitionsSubset.__name__ elif isinstance(subset.value, BaseTimeWindowPartitionsSubset): return str( [ (tw.start.timestamp(), tw.end.timestamp()) for tw in sorted(subset.value.included_time_windows) ] ) else: return str(list(sorted(subset.asset_partitions))) def _compute_subset_with_metadata_value_str(subset_with_metadata: AssetSubsetWithMetadata): return _compute_subset_value_str(subset_with_metadata.subset) + str( sorted(subset_with_metadata.frozen_metadata) )