반응형
DAG 생성 및 BashOperator 예제 (test.py)
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['yje14800@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'tutorial_01_skan',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['skan'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
t1 >> [t2, t3]
PythonOperator 예제 (hello.py)
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,timedelta
import requests
dag = DAG('hello-airflow',
description='Hello airflow DAG',
schedule_interval='0 */6 * * *',
tags=['skan'],
start_date=datetime(2022,7,19),catchup=False)
def print_hello():
return 'Hello Airflow'
def call_myApi():
print("액세스 토큰 갱신 시작")
URL = "http://localhost:8080/campaign/media/kakao/auth/refreshAccessToken"
response = requests.post(URL)
if response.status_code == 200:
print("Access Token Refresh Success")
else:
print("ERROR!!["+response.status_code+"]"+response.text)
print("액세스 토큰 갱신 완료")
python_task = PythonOperator(
task_id='python_operator',
python_callable = call_myApi,
dag = dag)
bash_task = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
bash_task.set_downstream(python_task)
BranchPythonOperator 예제 (advertisement-synchronized.py)
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from random import randrange
from airflow import AirflowException
import os
import requests
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
API_CONN_URL = 'http://192.168.70.55:8081'
API_METHOD = 'GET'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(0),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'advertisement-synchronized',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
tags=['skan'],
)
#####################################################################################################################################################
## Python Code
'''
xcom Push 처리
ex) {{ ti.xcom_pull(task_ids='init_transaction') }}
'''
def init_transaction(**kwargs):
return str(randrange(1,10))
'''
분기 및 예외 처리
'''
def advertisement_synccall(**kwargs):
try:
if kwargs['params']['method'] !='':
API_METHOD = kwargs['params']['method']
# Python Http 처리
print('API_METHOD = ' + API_METHOD)
response = requests.get(API_CONN_URL + "/campaign/batch/sync/media"+ "?param1=" + kwargs['ti'].xcom_pull(task_ids='init_transaction'))
if response.status_code == 200:
return 'success_operator_01'
else:
raise Exception(' 호출 실패 ')
except Exceptions as e :
print("exception" , e)
return 'fail_operator_02'
# Http 호출 SimpleHttpOperater
init_log = SimpleHttpOperator(
task_id='init_log',
method='GET',
#http_conn_id = 'WEATHER_SERVICE',
#endpoint='/weather/Dallas',
http_conn_id = 'LOCAL_SERVICE',
endpoint='/campaign/test/api-1',
headers={},
dag=dag,
)
# 단일 Task PythonOperater
init = PythonOperator(
task_id='init_transaction',
provide_context=True,
python_callable=init_transaction,
dag=dag,
)
data_sync = BranchPythonOperator(
task_id='data_sync',
retries=0,
python_callable=advertisement_synccall,
params={"method": "GET", "adgroup_method": "{{dag_run.conf.adgroup_method}}", "ad_method": "{{dag_run.conf.ad_method}}"},
provide_context=True,
dag=dag
)
success_operator = BashOperator(
task_id='success_operator_01',
bash_command='echo success',
dag=dag,
)
fail_operator = BashOperator(
task_id='fail_operator_02',
bash_command='echo fail',
dag=dag,
)
# init >> loging >> 분기 가능한 BranchPythonOperater >> [성공: operater , 실패: operater]
init >> init_log >> data_sync >> [success_operator, fail_operator]
반응형
'Auto Build(CI SCM GIT) > Airflow' 카테고리의 다른 글
Airflow Variable 사용법 (0) | 2022.09.06 |
---|---|
Airflow XCOM 예제 (0) | 2022.09.06 |
Airflow Operator 종류 (0) | 2022.09.06 |
Airflow concepts (주요 용어) (0) | 2022.09.06 |
Airflow (0) | 2022.09.06 |