1_cbi_ri_daily.py 4.27 KB
Newer Older
Margrenzo Gunawan's avatar
Margrenzo Gunawan committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
import json, os
import csv
from datetime import date
import calendar

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

from datetime import datetime, timedelta
# from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
# from airflow.providers.postgres.operators.postgres import PostgresOperator

# from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# from airflow.utils.trigger_rule import TriggerRule

# from airflow.models.xcom import XCom
# from airflow.decorators import task
# from airflow.hooks.postgres_hook import PostgresHook
# from airflow import XComArg



# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
    "DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]

POSTGRES_CONN_ID = Variable.get("DS_DB")
# POSTGRES_ENV = Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
# DS_FOLDER = 't24'


def _start():
    print("Start :: Extractor ")

def check_dmfa(**context):
    sql_stmt = f"""SELECT * FROM ds_conf."Datasource_history" WHERE date_of_data = '{yesterday_nodash}' and source = 'ds_t24_his' and status = 'DONE';"""
    pg_hook = PostgresHook(
        postgres_conn_id=POSTGRES_CONN_ID
    )
    pg_conn = pg_hook.get_conn()
    cursor = pg_conn.cursor()
    cursor.execute(sql_stmt)
    files = cursor.fetchall()
    print('#####################')
    print(files)
    print('#####################')
    if files == []:
        print('BELOM')
        return False
    else:
        print('OK')
        return True

def check_run(**context):
    sql_stmt = f"""SELECT * FROM ds_conf."Datasource_history" WHERE date_of_data = '{yesterday_nodash}' and source = 'RI_DAILY';"""
    pg_hook = PostgresHook(
        postgres_conn_id=POSTGRES_CONN_ID
    )
    pg_conn = pg_hook.get_conn()
    cursor = pg_conn.cursor()
    cursor.execute(sql_stmt)
    files = cursor.fetchall()
    print('#####################')
    print(files)
    print('#####################')
    if files == []:
        print('BELOM')
        return True
    else:
        print('OK')
        return False



with DAG("APJ_1_CBI_RI_DAILY",
         start_date=datetime(2021, 1, 1),
         schedule_interval='*/30 * * * 2-7',
         catchup=False,
         concurrency=3) as dag:
    
    begin = PythonOperator(
        task_id=f"Begin",
        python_callable=_start,
        op_kwargs={

        }
    )


    check_run = ShortCircuitOperator(
        task_id='check_run',
        provide_context=True,
        python_callable=check_run,
        do_xcom_push=True
    )

    check_dmfa = ShortCircuitOperator(
        task_id='check_dmfa',
        provide_context=True,
        python_callable=check_dmfa,
        do_xcom_push=True
    )

    history_start = PostgresOperator(
        sql=f"""INSERT INTO ds_conf."Datasource_history"
        (source, start_time, finish_time, status, env, date_of_data) VALUES
        ('RI_DAILY'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
        task_id="history_start",
        postgres_conn_id=POSTGRES_CONN_ID,
    )


    pentaho = BashOperator(
            task_id='pentaho',
            bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/etl/DAILY_ETL.kjb'"""
            # bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/M_6TH_REPORTS.kjb'"""
            # bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
    )

    begin >> check_run >> check_dmfa >> history_start >> pentaho