>

목록에서 다그를 동적으로 생성하고 있으며 작업 중 하나에 on_failure_callback을 추가하고 싶습니다. 다음 코드를 시도했지만 콜백이 실행되지 않는 것 같습니다.

dag_ids = ['dag_a', 'dag_b', 'dag_c']
for dag_id in dag_ids:
    def failure_callback():
        logging.info('Inside failure callback for {}'.format(dag_id))
    def python_callable(dag_id):
        logging.info('Inside python callable for {}'.format(dag_id))
        raise Exception('Exception raised for dag_id {}'.format(dag_id))
    yesterday = datetime.datetime.combine(
        datetime.datetime.today() - datetime.timedelta(1),
        datetime.datetime.min.time())
    default_args = {
        'start_date': yesterday
    }    
    dag = models.DAG(
            dag_id,
            schedule_interval=None,
            catchup=False,
            default_args=default_args)
    with dag:
        python_task = PythonOperator(
            task_id='python_task',
            python_callable=python_callable,
            op_kwargs={'dag_id': dag_id},
            on_failure_callback=failure_callback,
            dag=dag)
        python_task
    globals()[dag_id] = dag

여기서 내가 뭘 잘못하고 있는지 아십니까?

수정 :

제안에 따라 dag_id를 실패 콜백에 전달했습니다. 그러나 dag_id 대신 airflow가 컨텍스트 사전을 전달합니다. 컨텍스트 딕트 외에도 실패 콜백에 추가 인수를 전달하는 방법에 대한 아이디어가 있습니까?

ERROR - Inside failure callback for {u'next_execution_date': None, u'dag_run': <DagRun dag_a @ 2019-02-19 19:23:54.006241: manual__2019-02-19T19:23:54.006241, externally triggered: True>, u'tomorrow_ds_nodash': u'20190220', u'run_id': 'manual__2019-02-19T19:23:54.006241', u'test_mode': False, u'prev_execution_date': None, u'conf': <module 'airflow.configuration' from '/usr/local/lib/airflow/airflow/configuration.py'>, u'tables': None, u'task_instance_key_str': u'dag_a__python_task__20190219', u'END_DATE': '2019-02-19', u'execution_date': datetime.datetime(2019, 2, 19, 19, 23, 54, 6241), u'ts': '2019-02-19T19:23:54.006241', u'macros': <module 'airflow.macros' from '/usr/local/lib/airflow/airflow/macros/__init__.py'>, u'params': {}, u'ti': <TaskInstance: dag_a.python_task 2019-02-19 19:23:54.006241 [failed]>, u'var': {u'json': None, u'value': None}, u'ds_nodash': u'20190219', u'dag': <DAG: dag_a>, u'end_date': '2019-02-19', u'latest_date': '2019-02-19', u'ds': '2019-02-19', u'task_instance': <TaskInstance: dag_a.python_task 2019-02-19 19:23:54.006241 [failed]>, u'yesterday_ds_nodash': u'20190218', u'task': <Task(PythonOperator): python_task>, u'yesterday_ds': '2019-02-18', u'ts_nodash': u'20190219T192354.006241', u'tomorrow_ds': '2019-02-20'}

여기 질문을 참조하여 작동 시켰습니다!


  • 답변 # 1

    for 루프 안에서 dag_id를 failure_callback으로 전달하면 failure_callback ()에서 로그를 볼 수 있습니다.

    def failure_callback(dag_id):
        logging.info('Inside failure callback for {}'.format(dag_id))
    
    

  • 답변 # 2

    부분 패키지를 사용하여 작동했습니다. 업데이트 된 코드는 다음과 같습니다.

    from functools import partial
    dag_ids = ['dag_a', 'dag_b', 'dag_c']
    for dag_id in dag_ids:
        def failure_callback(dag_id, context):
            logging.error('Inside failure callback for {}'.format(dag_id))
        def python_callable(dag_id):
            logging.error('Inside python callable for {}'.format(dag_id))
            raise Exception('Exception raised for dag_id {}'.format(dag_id))
        yesterday = datetime.datetime.combine(
            datetime.datetime.today() - datetime.timedelta(1),
            datetime.datetime.min.time())
        default_args = {
            'start_date': yesterday
        }    
        dag = models.DAG(
                dag_id,
                # Continue to run DAG once per day
                schedule_interval=None,
                catchup=False,
                default_args=default_args)
        with dag:
            python_task = PythonOperator(
                task_id='python_task',
                python_callable=python_callable,
                op_kwargs={'dag_id': dag_id},
                on_failure_callback=partial(failure_callback, dag_id),
                dag=dag)
            python_task
        globals()[dag_id] = dag
    
    

  • 이전 prometheus - 일정 기간 동안 총 요청 수
  • 다음 javascript - reactjs - 하위 구성 요소에 대한 prop trought route를 전달할 수 없습니다