
Sensors
센서는 오직 하나의 목적을 위해 설계된 특별한 오퍼레이터이다.
여기서 하나의 목적은 무언가 발생할 때 까지 기다리는 것을 말한다.
파일, 외부 이벤트, 시간 등 기다릴 수 있는 무엇이든 성공할 때까지 기다렸다가 다운스트림을 실행한다.
Sensors의 두 가지 타입
- poke(default) : 센서가 주기적으로 외부 조건을 체크한다.
- 초 단위로 업스트림을 확인하고 싶을 때 사용한다.
- 대신, 리소스를 많이 소비한다.
- reschedule : 센서가 외부 조건을 체크하고, 조건이 만족되지 않으면 작업을 재스케줄링하여 슬리핑 상태가 된다.
- 분 단위로 업스트림을 확인하고 싶을 때 사용한다.
- 리소스 사용을 절약할 수 있다.
ExternalTaskSensor
외부에 존재하는 특정 DAG와 해당 DAG의 Task를 주기적으로 감지하는 오퍼레이터이다.
사용법
from airflow import DAG
from airflow.utils.dates import days_ago
sensor = ExternalTaskSensor(
task_id='wait_for_DAG_A',
external_dag_id='DAG_A',
external_task_id='end',
start_date=days_ago(1),
execution_date_fn=lambda x: x, # 동일한 execution_date 사용
mode='reschedule', # or poke
timeout=120,
)
위 코드를 해석하면 다음과 같다.
ExternalTaskSensor
의 태스크 ID는wait_for_DAG_A
다.
- 주기적으로 감지할 외부 DAG의 ID는
DAG_A
다.
- 감지한 외부 DAG의
end
태스크가 성공적으로 끝나면 센서 오퍼레이터도 성공한다.
timeout
은 주기적으로 감지하는 총 시간을 120초로 한정한다는 뜻이다.
- 나머지는 아래에서 더 자세히 설명한다.
start_date와 execution_date_fn
start_date
는 해당 센서가 실행되어야 하는 날짜를 나타낸다.
execution_date_fn
은 센서의execution_date
을 조작할 수 있도록 도와준다.
그럼 왜
execution_date_fn
이 필요할까?감지할 대상이 되는 DAG의
execution_date
과 센서의 execution_date
이 동일해야 센서가 동작한다.하지만 만약 감지할 대상이 되는 DAG이 2024-07-01에 실행되고, 센서가 2024-07-02에 실행되어야 한다면? 둘의
execution_date
은 어쩔 수 없이 서로 다를 수 밖에 없다.따라서 둘의
execution_date
을 동일하게 만들기 위해 timedelta
와 같은 모듈을 사용하여 시간을 조작하고, execution_date_fn
에 지정한다.TriggerDagRunOperator와 ExternalTaskSensor의 조합

나는 위와 같은 흐름을 가진 프로세스에서
DAG B
와 DAG D
가 모두 실행 완료되면 DAG E를 실행하도록 하고 싶었다.하지만 센서를 포함하고 있는 DAG E는
DAG B
와 DAG D
의 execution_date
과 일치하지 않았다.그 이유는 DAG A와 DAG C만 동일한
schedule_interval
을 가지고 있을 뿐, DAG B
, DAG D
모두 TriggerDagRunOperator
에 의해 트리거가 되므로 schedule_interval=None
이기 때문이다.그럼 센서가 포함된 DAG E는 어떻게 업스트림 DAG의
execution_date
과 동일하게 만들 수 있을까?이를 해결하기 위해 다음과 같이
TriggerDagRunOperator
를 실행할 때 execution_date
을 명시적으로 지정하여 다음과 같이 넘겨줬다.trigger_dag = TriggerDagRunOperator(
task_id='trigger_dag',
trigger_dag_id='DAG_B', # 트리거할 DAG의 ID
execution_date='{{ execution_date }}', # 현재 DAG의 execution_date을 사용
dag=dag
)
여기서 주의할 점은
execution_date='{{ ds }}'
를 사용하면 안된다.{{ ds }}
는 2024-07-01와 같은 형식을 반환하고, {{ execution_date }}
은 2024-07-01T00:00:00와 같은 형식을 반환하기 때문이다.그리고
DAG B
나 DAG D
중 하나의 DAG에 TriggerDagRunOperator
를 생성하여 센서가 포함된 DAG E를 마찬가지로 트리거하여 해결할 수 있다.Share article