ExternalTaskSensor

Airflow
Jul 03, 2024
ExternalTaskSensor

Sensors


센서는 오직 하나의 목적을 위해 설계된 특별한 오퍼레이터이다.
여기서 하나의 목적은 무언가 발생할 때 까지 기다리는 것을 말한다.
파일, 외부 이벤트, 시간 등 기다릴 수 있는 무엇이든 성공할 때까지 기다렸다가 다운스트림을 실행한다.
 

Sensors의 두 가지 타입

  • poke(default) : 센서가 주기적으로 외부 조건을 체크한다.
    • 초 단위로 업스트림을 확인하고 싶을 때 사용한다.
    • 대신, 리소스를 많이 소비한다.
  • reschedule : 센서가 외부 조건을 체크하고, 조건이 만족되지 않으면 작업을 재스케줄링하여 슬리핑 상태가 된다.
    • 분 단위로 업스트림을 확인하고 싶을 때 사용한다.
    • 리소스 사용을 절약할 수 있다.
 

ExternalTaskSensor


외부에 존재하는 특정 DAG와 해당 DAG의 Task를 주기적으로 감지하는 오퍼레이터이다.
 

사용법

from airflow import DAG from airflow.utils.dates import days_ago sensor = ExternalTaskSensor( task_id='wait_for_DAG_A', external_dag_id='DAG_A', external_task_id='end', start_date=days_ago(1), execution_date_fn=lambda x: x, # 동일한 execution_date 사용 mode='reschedule', # or poke timeout=120, )
위 코드를 해석하면 다음과 같다.
  1. ExternalTaskSensor의 태스크 ID는 wait_for_DAG_A다.
  1. 주기적으로 감지할 외부 DAG의 ID는 DAG_A다.
  1. 감지한 외부 DAG의 end 태스크가 성공적으로 끝나면 센서 오퍼레이터도 성공한다.
  1. timeout은 주기적으로 감지하는 총 시간을 120초로 한정한다는 뜻이다.
  1. 나머지는 아래에서 더 자세히 설명한다.
 

start_date와 execution_date_fn

  • start_date는 해당 센서가 실행되어야 하는 날짜를 나타낸다.
  • execution_date_fn은 센서의 execution_date을 조작할 수 있도록 도와준다.
 
그럼 왜 execution_date_fn이 필요할까?
감지할 대상이 되는 DAG의 execution_date과 센서의 execution_date이 동일해야 센서가 동작한다.
하지만 만약 감지할 대상이 되는 DAG이 2024-07-01에 실행되고, 센서가 2024-07-02에 실행되어야 한다면? 둘의 execution_date은 어쩔 수 없이 서로 다를 수 밖에 없다.
따라서 둘의 execution_date을 동일하게 만들기 위해 timedelta와 같은 모듈을 사용하여 시간을 조작하고, execution_date_fn에 지정한다.
 

TriggerDagRunOperator와 ExternalTaskSensor의 조합


notion image
나는 위와 같은 흐름을 가진 프로세스에서 DAG BDAG D가 모두 실행 완료되면 DAG E를 실행하도록 하고 싶었다.
하지만 센서를 포함하고 있는 DAG EDAG BDAG Dexecution_date과 일치하지 않았다.
그 이유는 DAG A와 DAG C만 동일한 schedule_interval을 가지고 있을 뿐, DAG B, DAG D 모두 TriggerDagRunOperator에 의해 트리거가 되므로 schedule_interval=None 이기 때문이다.
그럼 센서가 포함된 DAG E는 어떻게 업스트림 DAG의 execution_date과 동일하게 만들 수 있을까?
 
이를 해결하기 위해 다음과 같이 TriggerDagRunOperator를 실행할 때 execution_date을 명시적으로 지정하여 다음과 같이 넘겨줬다.
trigger_dag = TriggerDagRunOperator( task_id='trigger_dag', trigger_dag_id='DAG_B', # 트리거할 DAG의 ID execution_date='{{ execution_date }}', # 현재 DAG의 execution_date을 사용 dag=dag )
여기서 주의할 점은 execution_date='{{ ds }}'를 사용하면 안된다.
{{ ds }}는 2024-07-01와 같은 형식을 반환하고, {{ execution_date }}은 2024-07-01T00:00:00와 같은 형식을 반환하기 때문이다.
 
그리고 DAG BDAG D 중 하나의 DAG에 TriggerDagRunOperator를 생성하여 센서가 포함된 DAG E를 마찬가지로 트리거하여 해결할 수 있다.
 
Share article

zjacom