Commit 03f4a1c0 authored by Margrenzo Gunawan's avatar Margrenzo Gunawan 💬

Production

Dags from Production
parent f16c0353
File added
import json, os
import csv
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
# from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
# from airflow.providers.postgres.operators.postgres import PostgresOperator
# from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.models.xcom import XCom
# from airflow.decorators import task
# from airflow.hooks.postgres_hook import PostgresHook
# from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
# POSTGRES_ENV = Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
# DS_FOLDER = 't24'
def _start():
print("Start :: Extractor ")
def check_dmfa(**context):
sql_stmt = f"""SELECT * FROM ds_conf."Datasource_history" WHERE date_of_data = '{yesterday_nodash}' and source = 'ds_t24_his' and status = 'DONE';"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
print('#####################')
print(files)
print('#####################')
if files == []:
print('BELOM')
return False
else:
print('OK')
return True
def check_run(**context):
sql_stmt = f"""SELECT * FROM ds_conf."Datasource_history" WHERE date_of_data = '{yesterday_nodash}' and source = 'RI_DAILY';"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
print('#####################')
print(files)
print('#####################')
if files == []:
print('BELOM')
return True
else:
print('OK')
return False
with DAG("APJ_1_CBI_RI_DAILY",
start_date=datetime(2021, 1, 1),
schedule_interval='*/30 * * * 2-7',
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_run = ShortCircuitOperator(
task_id='check_run',
provide_context=True,
python_callable=check_run,
do_xcom_push=True
)
check_dmfa = ShortCircuitOperator(
task_id='check_dmfa',
provide_context=True,
python_callable=check_dmfa,
do_xcom_push=True
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('RI_DAILY'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
pentaho = BashOperator(
task_id='pentaho',
bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/etl/DAILY_ETL.kjb'"""
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/M_6TH_REPORTS.kjb'"""
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
)
begin >> check_run >> check_dmfa >> history_start >> pentaho
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow import XComArg
import boto3
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
tommorow_date = (1) if datetime.strptime(yesterday_nodash, '%Y%m%d').weekday() < 5 else (2)
tommorow = (datetime.strptime(yesterday_nodash, '%Y%m%d') + timedelta(days=tommorow_date)).strftime('%Y%m%d')
# s3 boto client
s3 = boto3.client('s3')
# list objects in the bucket
response = s3.list_objects(Bucket='cbi-aws-dpprd-targetbucket', Prefix=f"""cbi-aws-dpprd-cmit-admin-folder/Staging-DataSource/{yesterday_nodash}/""")
print(response.get('Contents'))
if response.get('Contents') != None:
for obj in response.get('Contents', []):
print(obj)
else:
print(f"""DATA ::: {yesterday_nodash} :: Not FOUND""")
def download_from_s3():
# get objects in the bucket
if response.get('Contents') != None:
for obj in response.get('Contents', [])[1:]:
s3.download_file('cbi-aws-dpprd-targetbucket',obj['Key'], f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{obj['Key'].split('/')[-1]}""")
else:
print(f"""DATA ::: {yesterday_nodash} :: Not FOUND""")
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
DS_FOLDER = 't24'
def _start():
print(f"""Start :: Extractor :: {yesterday_nodash}""")
print(f"""Start :: Extractor :: {yesterday_strip}""")
def check_file_done(**kwargs):
from random import randint
# number = randint(0, 10)
print(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/DONE.SBS.2.csv''')
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/DONE.SBS.2.csv'''):
print("FILE DONE.SBS.2.csv sudah ada")
return True
else:
print("FILE DONE.SBS.2.csv tidak ada")
# STOP DAG
return False
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/done_ext.csv'''):
print("FILE done_ext sudah ada")
return False
else:
print("FILE done_ext tidak ada")
# STOP DAG
return True
# ##############
# METADATA
# ##############
def metadata_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_t24_meta_file_list();"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def metadata_syntax_copy(ti):
iris = ti.xcom_pull(task_ids=['metadata_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY ds_conf.\"Datasource_{table['file_id'].replace(".", "").replace("ST","").lower()}temp\" FROM STDOUT delimiter '{table['field_delimiter']}' CSV QUOTE E'\b'""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def meta_extract(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}t24_bnk/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
# ##############
# DATASOURCE
# ##############
def ds_list_extractor(**context):
sql_stmt = f"""select * from ds_conf.ds_t24_list_extractor();"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_syntax_copy(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} FROM STDOUT delimiter '{table['field_delimiter']}' CSV QUOTE E'\b'""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_csv_to_table(ti, copy_sql, file_id, **context):
print(file_id)
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}t24_bnk/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
def set_tommorow():
Variable.set('DATE_OF_DATA', tommorow)
print(tommorow)
with DAG("APJ_1_T24",
start_date=datetime(2021, 1, 1),
schedule='0 0/1 * * *',
schedule_interval=None,
catchup=False,
concurrency=10) as dag:
# ###########################################
# START
# ###########################################
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
create_folder = BashOperator(
task_id="create_folder",
bash_command=f"""
mkdir -p /opt/airflow/dags/DFE/t24/{yesterday_nodash};
""",
)
# list file METADATA
download_from_s3 = PythonOperator(
task_id='download_from_s3',
python_callable=download_from_s3,
do_xcom_push=True
)
#sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:DFE/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""",
#)
#sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""aws s3 cp s3://cbi-aws-dpprd-targetbucket/datasource-extractor/20231001_20231031/{yesterday_nodash}.zip {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}""",
#)
# unzip_sftp = BashOperator(
# task_id="unzip_sftp",
# bash_command=f"""unzip {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/{yesterday_nodash}.zip -d {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}""",
#)
# ds_get_t24_extractor = SFTPOperator(
# task_id="ds_get_t24_extractor",
# ssh_conn_id="ssh_default",
# local_filepath=f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# remote_filepath=f"""DFE/{yesterday_nodash}/""",
# operation="get",
# create_intermediate_dirs=True,
# dag=dag
# )
check_file_done = ShortCircuitOperator(
task_id="check_file_done",
provide_context=True,
python_callable=check_file_done,
op_kwargs={},
)
# ###########################################
# METADATA
# ###########################################
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
# Copt Source dari file EXTRACTOR
file_sc_cp = BashOperator(
task_id="file_sc_cp",
bash_command=f"""
cp -r /opt/airflow/dags/DFE/t24/{yesterday_nodash} /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash};
""",
)
change_delimiter = BashOperator(
task_id="change_delimiter",
bash_command=f"""
/opt/airflow/dags/DFE/t24_bnk/chdel {yesterday_nodash};
""",
)
clean_ft = BashOperator(
task_id="clean_ft",
bash_command=f"""
/opt/airflow/dags/DFE/t24_bnk/ftx {yesterday_nodash};
""",
)
# # Menghapus hapus temporary METADATA
# metadata_truncate = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_truncate_metadata();""",
# task_id="metadata_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# # Menghapus ST.META.DATA dari file EXTRACTOR
# metadata_sed = BashOperator(
# task_id="metadata_sed",
# bash_command=f"""sed -i '/ST.META.DATA/d' /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash}/{yesterday_nodash}.ST.META.DATA.csv""",
# )
# # list file METADATA
# metadata_list_extractor = PythonOperator(
# task_id='metadata_list_extractor',
# python_callable=metadata_list_extractor,
# do_xcom_push=True
# )
# # syntax copy METADATA
# metadata_syntax_copy = PythonOperator(
# task_id='metadata_syntax_copy',
# python_callable=metadata_syntax_copy
# )
# # file metadata to DWHHub
# metadata_import = PythonOperator.partial(
# task_id="metadata_import",
# python_callable=meta_extract,
# dag=dag
# ).expand_kwargs(
# XComArg(metadata_syntax_copy),
# )
# # file refresh metadata
# metadata_refresh = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_create_metadata();""",
# task_id="metadata_refresh",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# ###########################################
# DATA SOURCE
# ###########################################
ds_create_table = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table('{POSTGRES_ENV}');""",
task_id="ds_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_syntax_copy = PythonOperator(
task_id='ds_syntax_copy',
python_callable=ds_syntax_copy
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_csv_to_table,
dag=dag
).expand_kwargs(
XComArg(ds_syntax_copy),
)
ds_create_table_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history();""",
task_id="ds_create_table_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_create_metadata_file = PostgresOperator(
sql=f"""create table {POSTGRES_SCHEMA}.a_metadata as (select * from ds_conf.fx_metadata_file());""",
task_id="ds_create_metadata_file",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_create_metadata_file_sys = PostgresOperator(
sql=f"""create table {POSTGRES_SCHEMA}.a_metadata_sys as (select * from ds_conf.fx_metadata_file_system());""",
task_id="ds_create_metadata_file_sys",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_copy_to_history('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_history_complete = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}_his'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'DONE'::varchar(100), '', '{yesterday_nodash}');""",
task_id="ds_history_complete",
postgres_conn_id=POSTGRES_CONN_ID,
)
# ds_nominatif = TriggerDagRunOperator(
# task_id=f'ds_nominatif',
# trigger_dag_id="APJ_1_t24_interface",
# )
ds_remove_bnk = BashOperator(
task_id="ds_remove_bnk",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash};
""",
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/t24/{yesterday_nodash}/done_ext.csv;
""",
)
# set_tommorow = PythonOperator(
# task_id='set_tommorow',
# python_callable=set_tommorow,
# do_xcom_push=True
# )
# ds_nominatif = TriggerDagRunOperator(
# task_id=f'ds_nominatif',
# trigger_dag_id="APJ_1_T24",
# )
#
# history_finish = PostgresOperator(
# sql=f"""
# UPDATE ds_conf."Datasource_history"
# SET status = 'WAITING.NOMINATF', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
# WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
# """,
# task_id="history_finish",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# begin >> check_done_ext >> sftp_xx >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done
begin >> check_done_ext >> create_folder >> download_from_s3 >> check_file_done >> history_start >> file_sc_cp >> change_delimiter >> clean_ft >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_to_history >> ds_history_complete >> ds_remove_bnk >> ds_ext_done
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow import XComArg
import boto3
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
tommorow_date = (1) if datetime.strptime(yesterday_nodash, '%Y%m%d').weekday() < 5 else (2)
tommorow = (datetime.strptime(yesterday_nodash, '%Y%m%d') + timedelta(days=tommorow_date)).strftime('%Y%m%d')
# s3 boto client
s3 = boto3.client('s3')
# list objects in the bucket
response = s3.list_objects(Bucket='cbi-aws-dpprd-targetbucket', Prefix=f"""cbi-aws-dpprd-cmit-admin-folder/Staging-DataSource/{yesterday_nodash}/""")
print(response.get('Contents'))
if response.get('Contents') != None:
for obj in response.get('Contents', []):
print(obj)
else:
print(f"""DATA ::: {yesterday_nodash} :: Not FOUND""")
def download_from_s3():
# get objects in the bucket
if response.get('Contents') != None:
for obj in response.get('Contents', [])[1:]:
s3.download_file('cbi-aws-dpprd-targetbucket',obj['Key'], f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{obj['Key'].split('/')[-1]}""")
else:
print(f"""DATA ::: {yesterday_nodash} :: Not FOUND""")
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
DS_FOLDER = 't24'
def _start():
print(f"""Start :: Extractor :: {yesterday_nodash}""")
print(f"""Start :: Extractor :: {yesterday_strip}""")
def check_file_done(**kwargs):
from random import randint
# number = randint(0, 10)
print(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/DONE.SBS.2.csv''')
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/DONE.SBS.2.csv'''):
print("FILE DONE.SBS.2.csv sudah ada")
return True
else:
print("FILE DONE.SBS.2.csv tidak ada")
# STOP DAG
return False
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/done_ext.csv'''):
print("FILE done_ext sudah ada")
return False
else:
print("FILE done_ext tidak ada")
# STOP DAG
return True
# ##############
# METADATA
# ##############
def metadata_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_t24_meta_file_list();"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def metadata_syntax_copy(ti):
iris = ti.xcom_pull(task_ids=['metadata_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY ds_conf.\"Datasource_{table['file_id'].replace(".", "").replace("ST","").lower()}temp\" FROM STDOUT delimiter '{table['field_delimiter']}' CSV QUOTE E'\b'""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def meta_extract(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}t24_bnk/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
# ##############
# DATASOURCE
# ##############
def ds_list_extractor(**context):
sql_stmt = f"""select * from ds_conf.ds_t24_list_extractor();"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_syntax_copy(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} FROM STDOUT delimiter '{table['field_delimiter']}' CSV QUOTE E'\b'""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_csv_to_table(ti, copy_sql, file_id, **context):
print(file_id)
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}t24_bnk/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
def set_tommorow():
Variable.set('DATE_OF_DATA', tommorow)
print(tommorow)
with DAG("APJ_1_T24_BACK",
start_date=datetime(2021, 1, 1),
# schedule='0/30 * * * *',
schedule_interval=None,
catchup=False,
concurrency=10) as dag:
# ###########################################
# START
# ###########################################
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
create_folder = BashOperator(
task_id="create_folder",
bash_command=f"""
mkdir -p /opt/airflow/dags/DFE/t24/{yesterday_nodash};
""",
)
# list file METADATA
download_from_s3 = PythonOperator(
task_id='download_from_s3',
python_callable=download_from_s3,
do_xcom_push=True
)
#sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:DFE/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""",
#)
#sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""aws s3 cp s3://cbi-aws-dpprd-targetbucket/datasource-extractor/20231001_20231031/{yesterday_nodash}.zip {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}""",
#)
# unzip_sftp = BashOperator(
# task_id="unzip_sftp",
# bash_command=f"""unzip {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/{yesterday_nodash}.zip -d {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}""",
#)
# ds_get_t24_extractor = SFTPOperator(
# task_id="ds_get_t24_extractor",
# ssh_conn_id="ssh_default",
# local_filepath=f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# remote_filepath=f"""DFE/{yesterday_nodash}/""",
# operation="get",
# create_intermediate_dirs=True,
# dag=dag
# )
check_file_done = ShortCircuitOperator(
task_id="check_file_done",
provide_context=True,
python_callable=check_file_done,
op_kwargs={},
)
# ###########################################
# METADATA
# ###########################################
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
# Copt Source dari file EXTRACTOR
file_sc_cp = BashOperator(
task_id="file_sc_cp",
bash_command=f"""
cp -r /opt/airflow/dags/DFE/t24/{yesterday_nodash} /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash};
""",
)
change_delimiter = BashOperator(
task_id="change_delimiter",
bash_command=f"""
/opt/airflow/dags/DFE/t24_bnk/chdel {yesterday_nodash};
""",
)
clean_ft = BashOperator(
task_id="clean_ft",
bash_command=f"""
/opt/airflow/dags/DFE/t24_bnk/ftx {yesterday_nodash};
""",
)
# # Menghapus hapus temporary METADATA
# metadata_truncate = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_truncate_metadata();""",
# task_id="metadata_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# # Menghapus ST.META.DATA dari file EXTRACTOR
# metadata_sed = BashOperator(
# task_id="metadata_sed",
# bash_command=f"""sed -i '/ST.META.DATA/d' /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash}/{yesterday_nodash}.ST.META.DATA.csv""",
# )
# # list file METADATA
# metadata_list_extractor = PythonOperator(
# task_id='metadata_list_extractor',
# python_callable=metadata_list_extractor,
# do_xcom_push=True
# )
# # syntax copy METADATA
# metadata_syntax_copy = PythonOperator(
# task_id='metadata_syntax_copy',
# python_callable=metadata_syntax_copy
# )
# # file metadata to DWHHub
# metadata_import = PythonOperator.partial(
# task_id="metadata_import",
# python_callable=meta_extract,
# dag=dag
# ).expand_kwargs(
# XComArg(metadata_syntax_copy),
# )
# # file refresh metadata
# metadata_refresh = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_create_metadata();""",
# task_id="metadata_refresh",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# ###########################################
# DATA SOURCE
# ###########################################
ds_create_table = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table('{POSTGRES_ENV}');""",
task_id="ds_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_syntax_copy = PythonOperator(
task_id='ds_syntax_copy',
python_callable=ds_syntax_copy
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_csv_to_table,
dag=dag
).expand_kwargs(
XComArg(ds_syntax_copy),
)
ds_create_table_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history();""",
task_id="ds_create_table_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_create_metadata_file = PostgresOperator(
sql=f"""create table {POSTGRES_SCHEMA}.a_metadata as (select * from ds_conf.fx_metadata_file());""",
task_id="ds_create_metadata_file",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_create_metadata_file_sys = PostgresOperator(
sql=f"""create table {POSTGRES_SCHEMA}.a_metadata_sys as (select * from ds_conf.fx_metadata_file_system());""",
task_id="ds_create_metadata_file_sys",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_copy_to_history('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
# ds_nominatif = TriggerDagRunOperator(
# task_id=f'ds_nominatif',
# trigger_dag_id="APJ_1_t24_interface",
# )
ds_remove_bnk = BashOperator(
task_id="ds_remove_bnk",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash};
""",
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/t24/{yesterday_nodash}/done_ext.csv;
""",
)
set_tommorow = PythonOperator(
task_id='set_tommorow',
python_callable=set_tommorow,
do_xcom_push=True
)
ds_nominatif = TriggerDagRunOperator(
task_id=f'ds_nominatif',
trigger_dag_id="APJ_1_T24",
)
#
# history_finish = PostgresOperator(
# sql=f"""
# UPDATE ds_conf."Datasource_history"
# SET status = 'WAITING.NOMINATF', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
# WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
# """,
# task_id="history_finish",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# begin >> check_done_ext >> sftp_xx >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done
begin >> check_done_ext >> create_folder >> download_from_s3 >> check_file_done >> history_start >> file_sc_cp >> change_delimiter >> clean_ft >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_to_history >> ds_remove_bnk >> ds_ext_done >> set_tommorow >> ds_nominatif
import json, os
import csv
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
DS_CONN_ID = 'ds_t24'
DS_FOLDER = 't24_interface'
DS_DB = 't24_interface'
DS_SCHEMA = 't24_interface'
DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('t24_interface', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
# "copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote E'\b'""",
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote '"'""",
"file_id": f"""{yesterday_nodash}/{table['file_id']}"""}}
for
table in json.loads(iris[0][0][0])]
def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace(' ', '_').replace('.', '_').replace('/', '_').replace('(', '_').replace(')', '_').replace('+', '_').replace('___', '_').lower().replace((table['delimiter']+'limit'+table['delimiter']), (table['delimiter']+'limit_reff'+table['delimiter'])) for w in field_names_list])} text);"""})
return arr
def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""")})
return arr
def ds_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""/opt/airflow/dags/DFE/t24/{file_id}""")
def sql_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""select 'OK' """ if table['sql_command'] == 'nil' else f"""{";".join(table['sql_command'].split('|;|;|')).replace('T24_SOURCE', POSTGRES_SCHEMA)}"""})
return arr
def stop_task(**kwargs):
return True
with DAG("APJ_1_t24_interface",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
sftp_xx = BashOperator(
task_id="sftp_xx",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NOM/TREASURY/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
sftp_ppap = BashOperator(
task_id="sftp_ppap",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/PPAP.NOMINATIF/*{yesterday_nodash}*COB.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
# sftp_neraca = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
# )
# sftp_neraca_tel = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
# )
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
# ft_reve = BashOperator(
# task_id="ft_reve",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
# )
csv_clean_syntax = PythonOperator(
task_id='csv_clean_syntax',
python_callable=csv_clean_syntax
)
clean_csv = BashOperator.partial(
task_id="clean_csv",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).expand_kwargs(
XComArg(csv_clean_syntax),
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
stop_task = ShortCircuitOperator(
task_id="stop_task",
provide_context=True,
python_callable=stop_task,
op_kwargs={},
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax = PythonOperator(
task_id='ds_syntax_push',
python_callable=ds_push_syntax
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_push_csv,
dag=dag
).expand_kwargs(
XComArg(ds_push_syntax),
)
sql_clean_syntax = PythonOperator(
task_id='sql_clean_syntax',
python_callable=sql_clean_syntax
)
ds_clean_data = PostgresOperator.partial(
# sql=f"""select ds_conf.ds_t24_create_table_history_wo_t24_dfe('{POSTGRES_SCHEMA}');""",
task_id="ds_clean_data",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(sql_clean_syntax),
)
ds_create_table_history_nominatif = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history_wo_t24_dfe('{POSTGRES_SCHEMA}');""",
task_id="ds_create_table_history_nominatif",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_copy_to_history('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
pentaho = BashOperator(
task_id='pentaho',
bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/D_REPORTS.kjb'"""
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
)
zip_today = BashOperator(
task_id="zip_today",
bash_command=f"""
zip -r {yesterday_nodash}.zip /opt/airflow/dags/DFE/t24/{yesterday_nodash};
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/t24/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'WAITING.NOMINATIF';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> sftp_xx >> sftp_ppap >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> ds_create_table_history_nominatif >> ds_to_history >> set_access_schemma >> set_access_all_table >> pentaho >> zip_today >> delete_before >> history_finish
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment