본문 바로가기
Auto Build(CI SCM GIT)/Airflow

Airflow XCOM 예제

by 아이티.파머 2022. 9. 6.
반응형

2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow

2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow concepts (주요 용어)

2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow Operator 종류

2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow XCOM 예제

2022.09.06 - [분류 전체보기] - Airflow Variable 사용법

2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow connection 예제

 

XCOM

Xcom은 Airflow 의 task 간에 값을 전달하기 위해 사용된다. pull, push 방식으로 데이터가 공유된다. valiable과 같이 key-value 형식으로 사용된다.

주의할 점은 같은 DAG 안에서만 공유되며 다른 DAG에서도 공유하려면 Valiable과 같은 다른 방식을 사용하여야 한다.

Jinja template 을 사용한 XCom예제

  • URI parameter 에 init_transaction 메소드의 값이 들어가도록 한다.
  • push-pull 을 이용한 Xcom 사용시 context[’task_instance’]와 context[’ti’]는 같은 의미로써 단축된 언어로 사용할 수 있다.
  • Python Task를 사용하는 경우 python_callable 로 호출한 함수에서 return 되는 값이 자동으로 xcom_push()가 실행된다.
  • provide_context=*True*, 로 설정해야한다.
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from random import randrange
from airflow import AirflowException
import os
import requests

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization

API_CONN_URL = 'http://127.0.0.1:8081'
API_METHOD = 'GET'

## 디폴트 설정값
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

### DAG 생성
dag = DAG(
    'advertisement-synchronized',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    tags=['skan'],
)
#####################################################################################################################################################
## Python Code

'''
    xcom Push 처리
    ex)  {{ ti.xcom_pull(task_ids='init_transaction') }}
'''

def init_transaction(**kwargs):
    return str(randrange(1,10))

'''
    분기 및 예외 처리
'''
def advertisement_synccall(**kwargs):
    try:
        if kwargs['params']['method'] !='':
            API_METHOD = kwargs['params']['method']
        # Python Http 처리
        print('API_METHOD = ' + API_METHOD)

        response = requests.get(API_CONN_URL + "/campaign/batch/sync/media"+ "?param1={{kwargs['ti'].xcom_pull(task_ids='init_transaction'))}}"

        if response.status_code == 200:
            return 'success_operator_01'
        else:
            raise Exception(' 호출 실패 ')

    except Exceptions as e :
        print("exception" , e)
        return 'fail_operator_02'

# 단일 Task PythonOperater
# init_transaction 을 호출한다.
init = PythonOperator(
    task_id='init_transaction',
    provide_context=True,
    python_callable=init_transaction,
    dag=dag,
)

data_sync = BranchPythonOperator(
        task_id='data_sync',
        retries=0,
        python_callable=advertisement_synccall,
        params={"method": "GET", "adgroup_method": "{{dag_run.conf.adgroup_method}}", "ad_method": "{{dag_run.conf.ad_method}}"},
        provide_context=True,
        dag=dag
)

init >> data_sync

예제 코드의 Xcom순서

  1. init task 를 실행한다. python_callable 의 메소드를 실행한다. provide_context 값이 ture 임으로 xcom_push 가 메소드에서 return 시 자동으로 등록된다.
  2. data_sync가 실행된다. advertisement_synccall 메소드를 수행한다.
  3. kwargs['ti'].xcom_pull(task_ids='init_transaction') 에서 init_transaction 타스크 아이디의 리턴값으로 생성된 값을 불러온다.
반응형

'Auto Build(CI SCM GIT) > Airflow' 카테고리의 다른 글

Airflow connection 예제  (0) 2022.09.06
Airflow Variable 사용법  (0) 2022.09.06
Airflow Operator 종류  (0) 2022.09.06
Airflow concepts (주요 용어)  (0) 2022.09.06
Airflow  (0) 2022.09.06