Skip to content

Commit 615cddf

Browse files
authored
airflow.models.taskinstance deprecations removed (#41784)
1 parent a0baa68 commit 615cddf

File tree

12 files changed

+26
-188
lines changed

12 files changed

+26
-188
lines changed

airflow/models/taskinstance.py

Lines changed: 0 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import jinja2
3939
import lazy_object_proxy
4040
import pendulum
41-
from deprecated import deprecated
4241
from jinja2 import TemplateAssertionError, UndefinedError
4342
from sqlalchemy import (
4443
Column,
@@ -80,7 +79,6 @@
8079
AirflowSkipException,
8180
AirflowTaskTerminated,
8281
AirflowTaskTimeout,
83-
DagRunNotFound,
8482
RemovedInAirflow3Warning,
8583
TaskDeferralError,
8684
TaskDeferred,
@@ -425,7 +423,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance | TaskInstancePydantic,
425423
def clear_task_instances(
426424
tis: list[TaskInstance],
427425
session: Session,
428-
activate_dag_runs: None = None,
429426
dag: DAG | None = None,
430427
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
431428
) -> None:
@@ -443,7 +440,6 @@ def clear_task_instances(
443440
:param dag_run_state: state to set finished DagRuns to.
444441
If set to False, DagRuns state will not be changed.
445442
:param dag: DAG object
446-
:param activate_dag_runs: Deprecated parameter, do not pass
447443
"""
448444
job_ids = []
449445
# Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
@@ -521,16 +517,6 @@ def clear_task_instances(
521517

522518
session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))
523519

524-
if activate_dag_runs is not None:
525-
warnings.warn(
526-
"`activate_dag_runs` parameter to clear_task_instances function is deprecated. "
527-
"Please use `dag_run_state`",
528-
RemovedInAirflow3Warning,
529-
stacklevel=2,
530-
)
531-
if not activate_dag_runs:
532-
dag_run_state = False
533-
534520
if dag_run_state is not False and tis:
535521
from airflow.models.dagrun import DagRun # Avoid circular import
536522

@@ -1922,7 +1908,6 @@ class TaskInstance(Base, LoggingMixin):
19221908
def __init__(
19231909
self,
19241910
task: Operator,
1925-
execution_date: datetime | None = None,
19261911
run_id: str | None = None,
19271912
state: str | None = None,
19281913
map_index: int = -1,
@@ -1938,42 +1923,7 @@ def __init__(
19381923
# init_on_load will config the log
19391924
self.init_on_load()
19401925

1941-
if run_id is None and execution_date is not None:
1942-
from airflow.models.dagrun import DagRun # Avoid circular import
1943-
1944-
warnings.warn(
1945-
"Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",
1946-
RemovedInAirflow3Warning,
1947-
# Stack level is 4 because SQLA adds some wrappers around the constructor
1948-
stacklevel=4,
1949-
)
1950-
# make sure we have a localized execution_date stored in UTC
1951-
if execution_date and not timezone.is_localized(execution_date):
1952-
self.log.warning(
1953-
"execution date %s has no timezone information. Using default from dag or system",
1954-
execution_date,
1955-
)
1956-
if self.task.has_dag():
1957-
if TYPE_CHECKING:
1958-
assert self.task.dag
1959-
execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
1960-
else:
1961-
execution_date = timezone.make_aware(execution_date)
1962-
1963-
execution_date = timezone.convert_to_utc(execution_date)
1964-
with create_session() as session:
1965-
run_id = (
1966-
session.query(DagRun.run_id)
1967-
.filter_by(dag_id=self.dag_id, execution_date=execution_date)
1968-
.scalar()
1969-
)
1970-
if run_id is None:
1971-
raise DagRunNotFound(
1972-
f"DagRun for {self.dag_id!r} with date {execution_date} not found"
1973-
) from None
1974-
19751926
self.run_id = run_id
1976-
19771927
self.try_number = 0
19781928
self.max_tries = self.task.retries
19791929
self.unixname = getuser()
@@ -1989,26 +1939,6 @@ def __init__(
19891939
def __hash__(self):
19901940
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
19911941

1992-
@property
1993-
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
1994-
def _try_number(self):
1995-
"""
1996-
Do not use. For semblance of backcompat.
1997-
1998-
:meta private:
1999-
"""
2000-
return self.try_number
2001-
2002-
@_try_number.setter
2003-
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
2004-
def _try_number(self, val):
2005-
"""
2006-
Do not use. For semblance of backcompat.
2007-
2008-
:meta private:
2009-
"""
2010-
self.try_number = val
2011-
20121942
@property
20131943
def stats_tags(self) -> dict[str, str]:
20141944
"""Returns task instance tags."""
@@ -2051,23 +1981,6 @@ def init_on_load(self) -> None:
20511981
"""Initialize the attributes that aren't stored in the DB."""
20521982
self.test_mode = False # can be changed when calling 'run'
20531983

2054-
@property
2055-
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
2056-
def prev_attempted_tries(self) -> int:
2057-
"""
2058-
Calculate the total number of attempted tries, defaulting to 0.
2059-
2060-
This used to be necessary because try_number did not always tell the truth.
2061-
2062-
:meta private:
2063-
"""
2064-
return self.try_number
2065-
2066-
@property
2067-
def next_try_number(self) -> int:
2068-
# todo (dstandish): deprecate this property; we don't need a property that is just + 1
2069-
return self.try_number + 1
2070-
20711984
@property
20721985
def operator_name(self) -> str | None:
20731986
"""@property: use a more friendly display name for the operator, if set."""
@@ -2498,40 +2411,6 @@ def get_previous_ti(
24982411
"""
24992412
return _get_previous_ti(task_instance=self, state=state, session=session)
25002413

2501-
@property
2502-
def previous_ti(self) -> TaskInstance | TaskInstancePydantic | None:
2503-
"""
2504-
This attribute is deprecated.
2505-
2506-
Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
2507-
"""
2508-
warnings.warn(
2509-
"""
2510-
This attribute is deprecated.
2511-
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
2512-
""",
2513-
RemovedInAirflow3Warning,
2514-
stacklevel=2,
2515-
)
2516-
return self.get_previous_ti()
2517-
2518-
@property
2519-
def previous_ti_success(self) -> TaskInstance | TaskInstancePydantic | None:
2520-
"""
2521-
This attribute is deprecated.
2522-
2523-
Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
2524-
"""
2525-
warnings.warn(
2526-
"""
2527-
This attribute is deprecated.
2528-
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
2529-
""",
2530-
RemovedInAirflow3Warning,
2531-
stacklevel=2,
2532-
)
2533-
return self.get_previous_ti(state=DagRunState.SUCCESS)
2534-
25352414
@provide_session
25362415
def get_previous_execution_date(
25372416
self,
@@ -2558,23 +2437,6 @@ def get_previous_start_date(
25582437
"""
25592438
return _get_previous_start_date(task_instance=self, state=state, session=session)
25602439

2561-
@property
2562-
def previous_start_date_success(self) -> pendulum.DateTime | None:
2563-
"""
2564-
This attribute is deprecated.
2565-
2566-
Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`.
2567-
"""
2568-
warnings.warn(
2569-
"""
2570-
This attribute is deprecated.
2571-
Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
2572-
""",
2573-
RemovedInAirflow3Warning,
2574-
stacklevel=2,
2575-
)
2576-
return self.get_previous_start_date(state=DagRunState.SUCCESS)
2577-
25782440
@provide_session
25792441
def are_dependencies_met(
25802442
self, dep_context: DepContext | None = None, session: Session = NEW_SESSION, verbose: bool = False
@@ -4115,21 +3977,6 @@ def __eq__(self, other):
41153977
return self.__dict__ == other.__dict__
41163978
return NotImplemented
41173979

4118-
def as_dict(self):
4119-
warnings.warn(
4120-
"This method is deprecated. Use BaseSerialization.serialize.",
4121-
RemovedInAirflow3Warning,
4122-
stacklevel=2,
4123-
)
4124-
new_dict = dict(self.__dict__)
4125-
for key in new_dict:
4126-
if key in ["start_date", "end_date"]:
4127-
val = new_dict[key]
4128-
if not val or isinstance(val, str):
4129-
continue
4130-
new_dict.update({key: val.isoformat()})
4131-
return new_dict
4132-
41333980
@classmethod
41343981
def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
41353982
return cls(
@@ -4150,24 +3997,6 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
41503997
priority_weight=ti.priority_weight if hasattr(ti, "priority_weight") else None,
41513998
)
41523999

4153-
@classmethod
4154-
def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance:
4155-
warnings.warn(
4156-
"This method is deprecated. Use BaseSerialization.deserialize.",
4157-
RemovedInAirflow3Warning,
4158-
stacklevel=2,
4159-
)
4160-
ti_key = TaskInstanceKey(*obj_dict.pop("key"))
4161-
start_date = None
4162-
end_date = None
4163-
start_date_str: str | None = obj_dict.pop("start_date")
4164-
end_date_str: str | None = obj_dict.pop("end_date")
4165-
if start_date_str:
4166-
start_date = timezone.parse(start_date_str)
4167-
if end_date_str:
4168-
end_date = timezone.parse(end_date_str)
4169-
return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key)
4170-
41714000

41724001
class TaskInstanceNote(TaskInstanceDependencies):
41734002
"""For storage of arbitrary notes concerning the task instance."""

airflow/providers/amazon/aws/executors/batch/batch_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
448448
airflow_cmd=ti.command_as_list(),
449449
queue=ti.queue,
450450
exec_config=ti.executor_config,
451-
attempt_number=ti.prev_attempted_tries,
451+
attempt_number=ti.try_number,
452452
)
453453
adopted_tis.append(ti)
454454

airflow/utils/log/file_task_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ def read(self, task_instance, try_number=None, metadata=None):
462462
# try number gets incremented in DB, i.e logs produced the time
463463
# after cli run and before try_number + 1 in DB will not be displayed.
464464
if try_number is None:
465-
next_try = task_instance.next_try_number
465+
next_try = task_instance.try_number + 1
466466
try_numbers = list(range(1, next_try))
467467
elif try_number < 1:
468468
logs = [

airflow/utils/log/log_reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
7474
:param metadata: A dictionary containing information about how to read the task log
7575
"""
7676
if try_number is None:
77-
next_try = ti.next_try_number
77+
next_try = ti.try_number + 1
7878
try_numbers = list(range(1, next_try))
7979
else:
8080
try_numbers = [try_number]

newsfragments/41784.significant.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Removed a set of deprecations in from ``airflow.models.taskinstance``.
2+
3+
- Removed deprecated arg ``activate_dag_runs`` from ``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead.
4+
- Removed deprecated arg ``execution_date`` from ``TaskInstance.__init__()``. Please use ``run_id`` instead.
5+
- Removed deprecated property ``_try_number`` from ``TaskInstance``. Please use ``try_number`` instead.
6+
- Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. Please use ``try_number`` instead.
7+
- Removed deprecated property ``next_try_number`` from ``TaskInstance``. Please use ``try_number + 1`` instead.
8+
- Removed deprecated property ``previous_ti`` from ``TaskInstance``. Please use ``get_previous_ti`` instead.
9+
- Removed deprecated property ``previous_ti_success`` from ``TaskInstance``. Please use ``get_previous_ti`` instead.
10+
- Removed deprecated property ``previous_start_date_success`` from ``TaskInstance``. Please use ``get_previous_start_date`` instead.
11+
- Removed deprecated function ``as_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.serialize`` instead.
12+
- Removed deprecated function ``from_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.deserialize`` instead.

tests/jobs/test_triggerer_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def create_trigger_in_db(session, trigger, operator=None):
103103
operator.dag = dag
104104
else:
105105
operator = BaseOperator(task_id="test_ti", dag=dag)
106-
task_instance = TaskInstance(operator, execution_date=run.execution_date, run_id=run.run_id)
106+
task_instance = TaskInstance(operator, run_id=run.run_id)
107107
task_instance.trigger_id = trigger_orm.id
108108
session.add(dag_model)
109109
session.add(run)

tests/models/test_baseoperator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,11 +1094,11 @@ def test_get_task_instances(session):
10941094
"run_type": DagRunType.MANUAL,
10951095
}
10961096
dr1 = DagRun(execution_date=first_execution_date, run_id="test_run_id_1", **common_dr_kwargs)
1097-
ti_1 = TaskInstance(run_id=dr1.run_id, task=task, execution_date=first_execution_date)
1097+
ti_1 = TaskInstance(run_id=dr1.run_id, task=task)
10981098
dr2 = DagRun(execution_date=second_execution_date, run_id="test_run_id_2", **common_dr_kwargs)
1099-
ti_2 = TaskInstance(run_id=dr2.run_id, task=task, execution_date=second_execution_date)
1099+
ti_2 = TaskInstance(run_id=dr2.run_id, task=task)
11001100
dr3 = DagRun(execution_date=third_execution_date, run_id="test_run_id_3", **common_dr_kwargs)
1101-
ti_3 = TaskInstance(run_id=dr3.run_id, task=task, execution_date=third_execution_date)
1101+
ti_3 = TaskInstance(run_id=dr3.run_id, task=task)
11021102
session.add_all([dr1, dr2, dr3, ti_1, ti_2, ti_3])
11031103
session.commit()
11041104

tests/models/test_dagrun.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1422,7 +1422,7 @@ def task_2(arg2): ...
14221422
assert len(decision.schedulable_tis) == 2
14231423

14241424
# We insert a faulty record
1425-
session.add(TaskInstance(dag.get_task("task_2"), dr.execution_date, dr.run_id))
1425+
session.add(TaskInstance(task=dag.get_task("task_2"), run_id=dr.run_id))
14261426
session.flush()
14271427

14281428
decision = dr.task_instance_scheduling_decisions()

tests/providers/microsoft/conftest.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,11 @@ class MockedTaskInstance(TaskInstance):
124124
def __init__(
125125
self,
126126
task,
127-
execution_date: datetime | None = None,
128127
run_id: str | None = "run_id",
129128
state: str | None = TaskInstanceState.RUNNING,
130129
map_index: int = -1,
131130
):
132-
super().__init__(
133-
task=task, execution_date=execution_date, run_id=run_id, state=state, map_index=map_index
134-
)
131+
super().__init__(task=task, run_id=run_id, state=state, map_index=map_index)
135132
self.values: dict[str, Any] = {}
136133

137134
def xcom_pull(

tests/ti_deps/deps/test_dag_ti_slots_available_dep.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_concurrency_reached(self):
3434
"""
3535
dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=True))
3636
task = Mock(dag=dag, pool_slots=1)
37-
ti = TaskInstance(task, execution_date=None)
37+
ti = TaskInstance(task)
3838

3939
assert not DagTISlotsAvailableDep().is_met(ti=ti)
4040

@@ -44,6 +44,6 @@ def test_all_conditions_met(self):
4444
"""
4545
dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=False))
4646
task = Mock(dag=dag, pool_slots=1)
47-
ti = TaskInstance(task, execution_date=None)
47+
ti = TaskInstance(task)
4848

4949
assert DagTISlotsAvailableDep().is_met(ti=ti)

tests/ti_deps/deps/test_dag_unpaused_dep.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_concurrency_reached(self):
3434
"""
3535
dag = Mock(**{"get_is_paused.return_value": True})
3636
task = Mock(dag=dag)
37-
ti = TaskInstance(task=task, execution_date=None)
37+
ti = TaskInstance(task=task)
3838

3939
assert not DagUnpausedDep().is_met(ti=ti)
4040

@@ -44,6 +44,6 @@ def test_all_conditions_met(self):
4444
"""
4545
dag = Mock(**{"get_is_paused.return_value": False})
4646
task = Mock(dag=dag)
47-
ti = TaskInstance(task=task, execution_date=None)
47+
ti = TaskInstance(task=task)
4848

4949
assert DagUnpausedDep().is_met(ti=ti)

tests/ti_deps/deps/test_not_in_retry_period_dep.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
class TestNotInRetryPeriodDep:
3535
def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)):
3636
task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
37-
ti = TaskInstance(task=task, state=state, execution_date=None)
37+
ti = TaskInstance(task=task, state=state)
3838
ti.end_date = end_date
3939
return ti
4040

0 commit comments

Comments
 (0)