반응형
2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow
2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow concepts (주요 용어)
2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow Operator 종류
2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow XCOM 예제
2022.09.06 - [분류 전체보기] - Airflow Variable 사용법
2022.09.06 - [Auto Build(CI SCM GIT)] - Airflow connection 예제
XCOM
Xcom은 Airflow 의 task 간에 값을 전달하기 위해 사용된다. pull, push 방식으로 데이터가 공유된다. valiable과 같이 key-value 형식으로 사용된다.
주의할 점은 같은 DAG 안에서만 공유되며 다른 DAG에서도 공유하려면 Valiable과 같은 다른 방식을 사용하여야 한다.
Jinja template 을 사용한 XCom예제
- URI parameter 에 init_transaction 메소드의 값이 들어가도록 한다.
- push-pull 을 이용한 Xcom 사용시 context[’task_instance’]와 context[’ti’]는 같은 의미로써 단축된 언어로 사용할 수 있다.
- Python Task를 사용하는 경우 python_callable 로 호출한 함수에서 return 되는 값이 자동으로 xcom_push()가 실행된다.
provide_context=*True*,
로 설정해야한다.
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from random import randrange
from airflow import AirflowException
import os
import requests
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
API_CONN_URL = 'http://127.0.0.1:8081'
API_METHOD = 'GET'
## 디폴트 설정값
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(0),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
### DAG 생성
dag = DAG(
'advertisement-synchronized',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
tags=['skan'],
)
#####################################################################################################################################################
## Python Code
'''
xcom Push 처리
ex) {{ ti.xcom_pull(task_ids='init_transaction') }}
'''
def init_transaction(**kwargs):
return str(randrange(1,10))
'''
분기 및 예외 처리
'''
def advertisement_synccall(**kwargs):
try:
if kwargs['params']['method'] !='':
API_METHOD = kwargs['params']['method']
# Python Http 처리
print('API_METHOD = ' + API_METHOD)
response = requests.get(API_CONN_URL + "/campaign/batch/sync/media"+ "?param1={{kwargs['ti'].xcom_pull(task_ids='init_transaction'))}}"
if response.status_code == 200:
return 'success_operator_01'
else:
raise Exception(' 호출 실패 ')
except Exceptions as e :
print("exception" , e)
return 'fail_operator_02'
# 단일 Task PythonOperater
# init_transaction 을 호출한다.
init = PythonOperator(
task_id='init_transaction',
provide_context=True,
python_callable=init_transaction,
dag=dag,
)
data_sync = BranchPythonOperator(
task_id='data_sync',
retries=0,
python_callable=advertisement_synccall,
params={"method": "GET", "adgroup_method": "{{dag_run.conf.adgroup_method}}", "ad_method": "{{dag_run.conf.ad_method}}"},
provide_context=True,
dag=dag
)
init >> data_sync
예제 코드의 Xcom순서
- init task 를 실행한다. python_callable 의 메소드를 실행한다. provide_context 값이 ture 임으로 xcom_push 가 메소드에서 return 시 자동으로 등록된다.
- data_sync가 실행된다. advertisement_synccall 메소드를 수행한다.
- kwargs['ti'].xcom_pull(task_ids='init_transaction') 에서 init_transaction 타스크 아이디의 리턴값으로 생성된 값을 불러온다.
반응형
'Auto Build(CI SCM GIT) > Airflow' 카테고리의 다른 글
Airflow connection 예제 (0) | 2022.09.06 |
---|---|
Airflow Variable 사용법 (0) | 2022.09.06 |
Airflow Operator 종류 (0) | 2022.09.06 |
Airflow concepts (주요 용어) (0) | 2022.09.06 |
Airflow (0) | 2022.09.06 |