관리 메뉴

IT.FARMER

airflow example 본문

Auto Build(CI SCM GIT)/Airflow

airflow example

아이티.파머 2022. 8. 23. 18:33
반응형

DAG 생성 및 BashOperator 예제 (test.py)

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['yje14800@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

with DAG(
    'tutorial_01_skan',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2), 
    tags=['skan'],
) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )

    t1.doc_md = dedent(
            """\
        #### Task Documentation
        You can document your task using the attributes `doc_md` (markdown),
        `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
        rendered in the UI's Task Instance Details page.
        ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

        """
        )

    templated_command = """

    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}

    """

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag,
    )

    t1 >>  [t2, t3]

PythonOperator 예제 (hello.py)

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,timedelta
import requests

dag = DAG('hello-airflow',
          description='Hello airflow DAG',
          schedule_interval='0 */6 * * *',
          tags=['skan'],
          start_date=datetime(2022,7,19),catchup=False)

def print_hello():
    return 'Hello Airflow'
def call_myApi():
    print("액세스 토큰 갱신 시작")
    URL = "http://localhost:8080/campaign/media/kakao/auth/refreshAccessToken"
    response = requests.post(URL)
    if response.status_code == 200:
        print("Access Token Refresh Success")
    else:
        print("ERROR!!["+response.status_code+"]"+response.text)
    print("액세스 토큰 갱신 완료")

python_task = PythonOperator(
    task_id='python_operator',
    python_callable = call_myApi,
    dag = dag)

bash_task = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

bash_task.set_downstream(python_task)

BranchPythonOperator 예제 (advertisement-synchronized.py)

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from random import randrange
from airflow import AirflowException
import os
import requests

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization

API_CONN_URL = 'http://192.168.70.55:8081'
API_METHOD = 'GET'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'advertisement-synchronized',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    tags=['skan'],
)
#####################################################################################################################################################
## Python Code

'''
    xcom Push 처리
    ex)  {{ ti.xcom_pull(task_ids='init_transaction') }}
'''

def init_transaction(**kwargs):
    return str(randrange(1,10))

'''
    분기 및 예외 처리
'''
def advertisement_synccall(**kwargs):
    try:
        if kwargs['params']['method'] !='':
            API_METHOD = kwargs['params']['method']
        # Python Http 처리
        print('API_METHOD = ' + API_METHOD)

        response = requests.get(API_CONN_URL + "/campaign/batch/sync/media"+ "?param1=" + kwargs['ti'].xcom_pull(task_ids='init_transaction'))

        if response.status_code == 200:
            return 'success_operator_01'
        else:
            raise Exception(' 호출 실패 ')

    except Exceptions as e :
        print("exception" , e)
        return 'fail_operator_02'

# Http 호출 SimpleHttpOperater
init_log = SimpleHttpOperator(
    task_id='init_log',
    method='GET',
    #http_conn_id = 'WEATHER_SERVICE',
    #endpoint='/weather/Dallas',
    http_conn_id = 'LOCAL_SERVICE',
    endpoint='/campaign/test/api-1',
    headers={},
    dag=dag,
)

# 단일 Task PythonOperater
init = PythonOperator(
    task_id='init_transaction',
    provide_context=True,
    python_callable=init_transaction,
    dag=dag,
)

data_sync = BranchPythonOperator(
        task_id='data_sync',
        retries=0,
        python_callable=advertisement_synccall,
        params={"method": "GET", "adgroup_method": "{{dag_run.conf.adgroup_method}}", "ad_method": "{{dag_run.conf.ad_method}}"},
        provide_context=True,
        dag=dag
)

success_operator = BashOperator(
    task_id='success_operator_01',
    bash_command='echo success',
    dag=dag,
)

fail_operator = BashOperator(
    task_id='fail_operator_02',
    bash_command='echo fail',
    dag=dag,
)

# init >> loging >> 분기 가능한 BranchPythonOperater >> [성공: operater , 실패: operater]
init >> init_log >> data_sync >> [success_operator, fail_operator]
반응형

'Auto Build(CI SCM GIT) > Airflow' 카테고리의 다른 글

Airflow Variable 사용법  (0) 2022.09.06
Airflow XCOM 예제  (0) 2022.09.06
Airflow Operator 종류  (0) 2022.09.06
Airflow concepts (주요 용어)  (0) 2022.09.06
Airflow  (0) 2022.09.06