import datetime
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Mapping, NamedTuple, Optional, Sequence, Union
from dagster._annotations import experimental
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice, TemporalContext
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetSubsetWithMetadata,
AutomationConditionCursor,
AutomationConditionEvaluation,
AutomationConditionNodeCursor,
AutomationConditionSnapshot,
get_serializable_candidate_subset,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._record import copy
from dagster._time import get_current_timestamp
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import disable_dagster_warnings
if TYPE_CHECKING:
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from .automation_context import AutomationContext
from .operands import (
CodeVersionChangedCondition,
CronTickPassedCondition,
FailedAutomationCondition,
InLatestTimeWindowCondition,
InProgressAutomationCondition,
MissingAutomationCondition,
NewlyRequestedCondition,
NewlyUpdatedCondition,
WillBeRequestedCondition,
)
from .operators import (
AllDepsCondition,
AndAutomationCondition,
AnyDepsCondition,
AnyDownstreamConditionsCondition,
NewlyTrueCondition,
NotAutomationCondition,
OrAutomationCondition,
SinceCondition,
)
[docs]
class AutomationCondition(ABC):
@property
def requires_cursor(self) -> bool:
return False
@property
def children(self) -> Sequence["AutomationCondition"]:
return []
@property
@abstractmethod
def description(self) -> str:
"""Human-readable description of when this condition is true."""
raise NotImplementedError()
@property
@abstractmethod
def label(self) -> Optional[str]:
"""User-provided label subjectively describing the purpose of this condition in the broader evaluation tree."""
raise NotImplementedError()
@property
def name(self) -> Optional[str]:
"""Formal name of this specific condition, generally aligning with its static constructor."""
return None
def get_snapshot(self, unique_id: str) -> AutomationConditionSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AutomationConditionSnapshot(
class_name=self.__class__.__name__,
description=self.description,
unique_id=unique_id,
label=self.label,
name=self.name,
)
def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str:
"""Returns a unique identifier for this condition within the broader condition tree."""
parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description]
return non_secure_md5_hash_str("".join(parts).encode())
def get_hash(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> int:
"""Generates a hash based off of the unique ids of all children."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
hashes = [hash(unique_id)]
for i, child in enumerate(self.children):
hashes.append(child.get_hash(parent_unique_id=unique_id, index=i))
return hash(tuple(hashes))
def __hash__(self) -> int:
return self.get_hash()
def as_auto_materialize_policy(self) -> "AutoMaterializePolicy":
"""Returns an AutoMaterializePolicy which contains this condition."""
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
return AutoMaterializePolicy.from_automation_condition(self)
def is_rule_condition(self):
from .legacy import RuleCondition
if isinstance(self, RuleCondition):
return True
return any(child.is_rule_condition() for child in self.children)
@abstractmethod
def evaluate(self, context: "AutomationContext") -> "AutomationResult":
raise NotImplementedError()
def with_label(self, label: Optional[str]) -> "AutomationCondition":
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)
def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition":
from .operators import AndAutomationCondition
# group AndAutomationConditions together
if isinstance(self, AndAutomationCondition):
return AndAutomationCondition(operands=[*self.operands, other])
return AndAutomationCondition(operands=[self, other])
def __or__(self, other: "AutomationCondition") -> "OrAutomationCondition":
from .operators import OrAutomationCondition
# group OrAutomationConditions together
if isinstance(self, OrAutomationCondition):
return OrAutomationCondition(operands=[*self.operands, other])
return OrAutomationCondition(operands=[self, other])
def __invert__(self) -> "NotAutomationCondition":
from .operators import NotAutomationCondition
return NotAutomationCondition(operand=self)
def since(self, reset_condition: "AutomationCondition") -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
last time the reference condition became true.
"""
from .operators import SinceCondition
return SinceCondition(trigger_condition=self, reset_condition=reset_condition)
def newly_true(self) -> "NewlyTrueCondition":
"""Returns a AutomationCondition that is true only on the tick that this condition goes
from false to true for a given asset partition.
"""
from .operators import NewlyTrueCondition
return NewlyTrueCondition(operand=self)
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
"""Returns a AutomationCondition that is true for an asset partition if at least one partition
of any of its dependencies evaluate to True for the given condition.
Args:
condition (AutomationCondition): The AutomationCondition that will be evaluated against
this asset's dependencies.
"""
from .operators import AnyDepsCondition
return AnyDepsCondition(operand=condition)
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
"""Returns a AutomationCondition that is true for an asset partition if at least one partition
of all of its dependencies evaluate to True for the given condition.
Args:
condition (AutomationCondition): The AutomationCondition that will be evaluated against
this asset's dependencies.
"""
from .operators import AllDepsCondition
return AllDepsCondition(operand=condition)
@experimental
@staticmethod
def missing() -> "MissingAutomationCondition":
"""Returns a AutomationCondition that is true for an asset partition if it has never been
materialized or observed.
"""
from .operands import MissingAutomationCondition
return MissingAutomationCondition()
@experimental
@staticmethod
def in_progress() -> "InProgressAutomationCondition":
"""Returns a AutomationCondition that is true for an asset partition if it is part of an in-progress run."""
from .operands import InProgressAutomationCondition
return InProgressAutomationCondition()
@experimental
@staticmethod
def failed() -> "FailedAutomationCondition":
"""Returns a AutomationCondition that is true for an asset partition if its latest run failed."""
from .operands import FailedAutomationCondition
return FailedAutomationCondition()
@experimental
@staticmethod
def in_latest_time_window(
lookback_delta: Optional[datetime.timedelta] = None,
) -> "InLatestTimeWindowCondition":
"""Returns a AutomationCondition that is true for an asset partition when it is within the latest
time window.
Args:
lookback_delta (Optional, datetime.timedelta): If provided, the condition will
return all partitions within the provided delta of the end of the latest time window.
For example, if this is used on a daily-partitioned asset with a lookback_delta of
48 hours, this will return the latest two partitions.
"""
from .operands import InLatestTimeWindowCondition
return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)
@experimental
@staticmethod
def will_be_requested() -> "WillBeRequestedCondition":
"""Returns a AutomationCondition that is true for an asset partition if it will be requested this tick."""
from .operands import WillBeRequestedCondition
return WillBeRequestedCondition()
@experimental
@staticmethod
def newly_updated() -> "NewlyUpdatedCondition":
"""Returns a AutomationCondition that is true for an asset partition if it has been updated since the previous tick."""
from .operands import NewlyUpdatedCondition
return NewlyUpdatedCondition()
@experimental
@staticmethod
def newly_requested() -> "NewlyRequestedCondition":
"""Returns a AutomationCondition that is true for an asset partition if it was requested on the previous tick."""
from .operands import NewlyRequestedCondition
return NewlyRequestedCondition()
@experimental
@staticmethod
def code_version_changed() -> "CodeVersionChangedCondition":
"""Returns a AutomationCondition that is true for an asset partition if its asset's code
version has been changed since the previous tick.
"""
from .operands import CodeVersionChangedCondition
return CodeVersionChangedCondition()
@experimental
@staticmethod
def cron_tick_passed(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "CronTickPassedCondition":
"""Returns a AutomationCondition that is true for all asset partitions whenever a cron tick of the provided schedule is passed."""
from .operands import CronTickPassedCondition
return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone)
@experimental
@staticmethod
def eager() -> "AutomationCondition":
"""Returns a condition which will "eagerly" fill in missing partitions as they are created,
and ensures unpartitioned assets are updated whenever their dependencies are updated (either
via scheduled execution or ad-hoc runs).
Specifically, this is a composite AutomationCondition which is true for an asset partition
if all of the following are true:
- The asset partition is within the latest time window
- At least one of its parents has been updated more recently than it has been requested, or
the asset partition has never been materialized
- None of its parent partitions are missing
- None of its parent partitions are currently part of an in-progress run
"""
with disable_dagster_warnings():
became_missing_or_any_parents_updated = (
AutomationCondition.missing().newly_true().with_label("became missing")
| AutomationCondition.any_deps_match(
AutomationCondition.newly_updated() | AutomationCondition.will_be_requested()
).with_label("any parents updated")
)
any_parents_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
).with_label("any parents missing")
any_parents_in_progress = AutomationCondition.any_deps_match(
AutomationCondition.in_progress()
).with_label("any parents in progress")
return (
AutomationCondition.in_latest_time_window()
& became_missing_or_any_parents_updated.since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
& ~any_parents_missing
& ~any_parents_in_progress
& ~AutomationCondition.in_progress()
).with_label("eager")
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
"""Returns a condition which will materialize asset partitions within the latest time window
on a given cron schedule, after their parents have been updated. For example, if the
cron_schedule is set to "0 0 * * *" (every day at midnight), then this rule will not become
true on a given day until all of its parents have been updated during that same day.
Specifically, this is a composite AutomationCondition which is true for an asset partition
if all of the following are true:
- The asset partition is within the latest time window
- All parent asset partitions have been updated since the latest tick of the provided cron
schedule, or will be requested this tick
- The asset partition has not been requested since the latest tick of the provided cron schedule
"""
with disable_dagster_warnings():
cron_label = f"'{cron_schedule}' ({cron_timezone})"
cron_tick_passed = AutomationCondition.cron_tick_passed(
cron_schedule, cron_timezone
).with_label(f"tick of {cron_label} passed")
all_deps_updated_since_cron = AutomationCondition.all_deps_match(
AutomationCondition.newly_updated().since(cron_tick_passed)
| AutomationCondition.will_be_requested()
).with_label(f"all parents updated since {cron_label}")
return (
AutomationCondition.in_latest_time_window()
& cron_tick_passed.since(AutomationCondition.newly_requested())
& all_deps_updated_since_cron
).with_label(f"on cron {cron_label}")
@experimental
@staticmethod
def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
"""Returns a condition which will represent the union of all distinct downstream conditions."""
from .operators import AnyDownstreamConditionsCondition
return AnyDownstreamConditionsCondition()
class AutomationResult(NamedTuple):
condition: AutomationCondition
condition_unique_id: str
value_hash: str
start_timestamp: float
end_timestamp: float
temporal_context: TemporalContext
true_slice: AssetSlice
candidate_slice: AssetSlice
child_results: Sequence["AutomationResult"]
node_cursor: Optional[AutomationConditionNodeCursor]
serializable_evaluation: AutomationConditionEvaluation
extra_state: Any
subsets_with_metadata: Sequence[AssetSubsetWithMetadata]
@property
def asset_key(self) -> AssetKey:
return self.true_slice.asset_key
@property
def true_subset(self) -> AssetSubset:
return self.true_slice.convert_to_valid_asset_subset()
@staticmethod
def _create(
context: "AutomationContext",
true_slice: AssetSlice,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata],
extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]],
child_results: Sequence["AutomationResult"],
) -> "AutomationResult":
start_timestamp = context.create_time.timestamp()
end_timestamp = get_current_timestamp()
return AutomationResult(
condition=context.condition,
condition_unique_id=context.condition_unique_id,
value_hash=_compute_value_hash(
condition_unique_id=context.condition_unique_id,
condition_description=context.condition.description,
true_slice=true_slice,
candidate_slice=context.candidate_slice,
subsets_with_metadata=subsets_with_metadata,
child_results=child_results,
),
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
temporal_context=context.new_temporal_context,
true_slice=true_slice,
candidate_slice=context.candidate_slice,
subsets_with_metadata=[],
child_results=child_results,
extra_state=None,
node_cursor=_create_node_cursor(
true_slice=true_slice,
candidate_slice=context.candidate_slice,
subsets_with_metadata=subsets_with_metadata,
extra_state=extra_state,
)
if context.condition.requires_cursor
else None,
serializable_evaluation=_create_serializable_evaluation(
context=context,
true_slice=true_slice,
candidate_slice=context.candidate_slice,
subsets_with_metadata=subsets_with_metadata,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
child_results=child_results,
),
)
@staticmethod
def create_from_children(
context: "AutomationContext",
true_slice: AssetSlice,
child_results: Sequence["AutomationResult"],
extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]] = None,
) -> "AutomationResult":
"""Returns a new AutomationResult from the given child results."""
return AutomationResult._create(
context=context,
true_slice=true_slice,
subsets_with_metadata=[],
extra_state=extra_state,
child_results=child_results,
)
@staticmethod
def create(
context: "AutomationContext",
true_slice: AssetSlice,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [],
extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]] = None,
) -> "AutomationResult":
"""Returns a new AutomationResult from the given parameters."""
return AutomationResult._create(
context=context,
true_slice=true_slice,
subsets_with_metadata=subsets_with_metadata,
extra_state=extra_state,
child_results=[],
)
def get_child_node_cursors(self) -> Mapping[str, AutomationConditionNodeCursor]:
node_cursors = {self.condition_unique_id: self.node_cursor} if self.node_cursor else {}
for child_result in self.child_results:
node_cursors.update(child_result.get_child_node_cursors())
return node_cursors
def get_new_cursor(self) -> AutomationConditionCursor:
return AutomationConditionCursor(
previous_requested_subset=self.serializable_evaluation.true_subset,
effective_timestamp=self.temporal_context.effective_dt.timestamp(),
last_event_id=self.temporal_context.last_event_id,
node_cursors_by_unique_id=self.get_child_node_cursors(),
result_value_hash=self.value_hash,
)
def _create_node_cursor(
true_slice: AssetSlice,
candidate_slice: AssetSlice,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata],
extra_state: Optional[Union[AssetSubset, Sequence[AssetSubset]]],
) -> AutomationConditionNodeCursor:
return AutomationConditionNodeCursor(
true_subset=true_slice.convert_to_valid_asset_subset(),
candidate_subset=get_serializable_candidate_subset(
candidate_slice.convert_to_valid_asset_subset()
),
subsets_with_metadata=subsets_with_metadata,
extra_state=extra_state,
)
def _create_serializable_evaluation(
context: "AutomationContext",
true_slice: AssetSlice,
candidate_slice: AssetSlice,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata],
start_timestamp: float,
end_timestamp: float,
child_results: Sequence[AutomationResult],
) -> AutomationConditionEvaluation:
return AutomationConditionEvaluation(
condition_snapshot=context.condition.get_snapshot(context.condition_unique_id),
true_subset=true_slice.convert_to_valid_asset_subset(),
candidate_subset=get_serializable_candidate_subset(
candidate_slice.convert_to_valid_asset_subset()
),
subsets_with_metadata=subsets_with_metadata,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
child_evaluations=[child_result.serializable_evaluation for child_result in child_results],
)
def _compute_value_hash(
condition_unique_id: str,
condition_description: str,
true_slice: AssetSlice,
candidate_slice: AssetSlice,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata],
child_results: Sequence[AutomationResult],
) -> str:
"""Computes a unique hash representing the values contained within an evaluation result. This
string will be identical for results which have identical values, allowing us to detect changes
without serializing the entire value.
"""
components: Sequence[str] = [
condition_unique_id,
condition_description,
_compute_subset_value_str(true_slice.convert_to_valid_asset_subset()),
_compute_subset_value_str(candidate_slice.convert_to_valid_asset_subset()),
*(_compute_subset_with_metadata_value_str(swm) for swm in subsets_with_metadata),
*(child_result.value_hash for child_result in child_results),
]
return non_secure_md5_hash_str("".join(components).encode("utf-8"))
def _compute_subset_value_str(subset: AssetSubset) -> str:
"""Computes a unique string representing a given AssetSubsets. This string will be equal for
equivalent AssetSubsets.
"""
if isinstance(subset.value, bool):
return str(subset.value)
elif isinstance(subset.value, AllPartitionsSubset):
return AllPartitionsSubset.__name__
elif isinstance(subset.value, BaseTimeWindowPartitionsSubset):
return str(
[
(tw.start.timestamp(), tw.end.timestamp())
for tw in sorted(subset.value.included_time_windows)
]
)
else:
return str(list(sorted(subset.asset_partitions)))
def _compute_subset_with_metadata_value_str(subset_with_metadata: AssetSubsetWithMetadata):
return _compute_subset_value_str(subset_with_metadata.subset) + str(
sorted(subset_with_metadata.frozen_metadata)
)