1_report_6th_month.py 2.12 KB
Newer Older
Margrenzo Gunawan's avatar
Margrenzo Gunawan committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
import json, os
import csv
from datetime import date
import calendar

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
# from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
# from airflow.providers.postgres.operators.postgres import PostgresOperator

# from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# from airflow.utils.trigger_rule import TriggerRule

# from airflow.models.xcom import XCom
# from airflow.decorators import task
# from airflow.hooks.postgres_hook import PostgresHook
# from airflow import XComArg



# # yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
#     "DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
# yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
# yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')

# POSTGRES_CONN_ID = Variable.get("DS_DB")
# POSTGRES_ENV = Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
# DS_CONN_ID = 'ds_t24'
# DS_FOLDER = 't24_interface'
# DS_DB = 't24_interface'
# DS_SCHEMA = 't24_interface'
# DS_CREATE_TABLE = ''


def _start():
    print("Start :: Extractor ")


with DAG("APJ_1_REPORT_6th_MONTH",
         start_date=datetime(2021, 1, 1),
         schedule_interval='0 7 7 * *',
         catchup=False,
         concurrency=3) as dag:
    
    begin = PythonOperator(
        task_id=f"Begin",
        python_callable=_start,
        op_kwargs={

        }
    )

    pentaho = BashOperator(
            task_id='pentaho',
            bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/M_6TH_REPORTS.kjb'"""
            # bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
    )

    begin >> pentaho