Commit 449697a3 authored by Timothy Ardha's avatar Timothy Ardha

add interface monthly

parent 1ceffdc0
import json, os
import csv
from datetime import date
import shutil
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
DS_CONN_ID = 'ds_t24'
DS_FOLDER = 't24_interface'
DS_DB = 't24_interface'
DS_SCHEMA = 't24_interface'
DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('t24_interface_monthly', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
# "copy_sql": f"""COPY {table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote E'\b'""",
"copy_sql": f"""COPY {table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote '"'""",
"file_id": f"""{yesterday_nodash}/{table['file_id']}"""}}
for
table in json.loads(iris[0][0][0])]
def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""create table {table['table_name']} ({' text, '.join([w.replace(' ', '_').replace('.', '_').replace('/', '_').replace('(', '_').replace(')', '_').replace('+', '_').replace('___', '_').lower().replace((table['delimiter']+'limit'+table['delimiter']), (table['delimiter']+'limit_reff'+table['delimiter'])) for w in field_names_list])} text);"""})
return arr
def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""")})
return arr
def ds_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""/opt/airflow/dags/DFE/t24/{file_id}""")
def sql_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""select 'OK' """ if table['sql_command'] == 'nil' else f"""{";".join(table['sql_command'].split('|;|;|')).replace('T24_SOURCE', POSTGRES_SCHEMA)}"""})
return arr
def stop_task(**kwargs):
return True
def rename_reversal():
formatted_date_today = datetime.now().strftime('%Y%m%d')
formatted_date_yesterday = yesterday_nodash
files_to_process = [f'T24-FT.REVERSAL-{formatted_date_today}.csv', f'T24-TT.REVERSAL-{formatted_date_today}.csv']
for file_info in files_to_process:
destination_path = f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{file_info.replace(formatted_date_today, formatted_date_yesterday)}"""
source_path = f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{file_info}"""
print("file sudah ada") if os.path.exists(destination_path) else shutil.copy2(source_path, destination_path)
with DAG("APJ_1_t24_interface",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
sftp_ppap_t24 = BashOperator(
task_id="sftp_ppap_t24",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/PPAP.NOMINATIF/*{yesterday_nodash}*.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
sftp_tax_t24 = BashOperator(
task_id="sftp_tax_t24",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/TAX.REPORT.FDS/ID0010001/*{yesterday_nodash}*.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
sftp_de_o_history = BashOperator(
task_id="sftp_de_o_history",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/DFE/{yesterday_nodash}/*{yesterday_nodash}.ST.DE.O.HISTORY.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
sftp_de_i_history = BashOperator(
task_id="sftp_de_i_history",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/DFE/{yesterday_nodash}/*{yesterday_nodash}.ST.DE.I.HISTORY.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
sftp_de_i_history = BashOperator(
task_id="sftp_de_i_history",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/DFE/{yesterday_nodash}/*{yesterday_nodash}.ST.DE.I.HISTORY.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
sftp_fincloud = BashOperator(
task_id="sftp_fincloud",
bash_command=f"""sshpass -p {Variable.get("SFTP_FINCLOUD_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_FINCLOUD_USER")}@{Variable.get("SFTP_FINCLOUD_HOST")}:/home/fincloud.user/dwh/{yesterday_nodash}/Laporan_PPAP_Akhir_Bulan.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
# sftp_neraca = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
# )
# sftp_neraca_tel = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
# )
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
# ft_reve = BashOperator(
# task_id="ft_reve",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
# )
csv_clean_syntax = PythonOperator(
task_id='csv_clean_syntax',
python_callable=csv_clean_syntax
)
clean_csv = BashOperator.partial(
task_id="clean_csv",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).expand_kwargs(
XComArg(csv_clean_syntax),
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
stop_task = ShortCircuitOperator(
task_id="stop_task",
provide_context=True,
python_callable=stop_task,
op_kwargs={},
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax = PythonOperator(
task_id='ds_syntax_push',
python_callable=ds_push_syntax
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_push_csv,
dag=dag
).expand_kwargs(
XComArg(ds_push_syntax),
)
sql_clean_syntax = PythonOperator(
task_id='sql_clean_syntax',
python_callable=sql_clean_syntax
)
ds_clean_data = PostgresOperator.partial(
# sql=f"""select ds_conf.ds_t24_create_table_history_wo_t24_dfe('{POSTGRES_SCHEMA}');""",
task_id="ds_clean_data",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(sql_clean_syntax),
)
begin >> sftp_ppap_t24 >> sftp_tax_t24 >> sftp_de_o_history >> sftp_de_i_history >> sftp_fincloud >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment