Commit 21c39f90 authored by Timothy Ardha's avatar Timothy Ardha

Move Check Surrounding task

parent e96b8c23
...@@ -51,27 +51,6 @@ def check_file_done(**kwargs): ...@@ -51,27 +51,6 @@ def check_file_done(**kwargs):
# STOP DAG # STOP DAG
return False return False
def check_surrounding_done(**kwargs):
paths_labels = [
("switching", "Switching"),
("fincloud/cbr", "Fincloud"),
("igate", "Igate"),
("los-fintech", "Los Fintech"),
("los-kpt", "Los KPT"),
("bib", "BIB"),
("ebanking", "Ebanking"),
("mgate", "MGate")
]
base_path = f"""{Variable.get("LOCAL_PATH")}"""
done_ext = f"""{yesterday_nodash}/done_ext.csv"""
for path, label in paths_labels:
file_path = f"""{base_path}{path}/{done_ext}"""
print(f"{label} {'SELESAI' if os.path.isfile(f'{base_path}{path}/{done_ext}') else 'BELUM SELESAI'} - Path : {file_path}")
if not os.path.isfile(f'{base_path}{path}/{done_ext}'):
return False
return True
def check_done_ext(**kwargs): def check_done_ext(**kwargs):
from random import randint from random import randint
...@@ -219,13 +198,6 @@ with DAG("APJ_1_T24", ...@@ -219,13 +198,6 @@ with DAG("APJ_1_T24",
op_kwargs={}, op_kwargs={},
) )
check_surrounding_done = ShortCircuitOperator(
task_id="check_surrounding_done",
provide_context=True,
python_callable=check_surrounding_done,
op_kwargs={}
)
# ########################################### # ###########################################
# METADATA # METADATA
...@@ -394,4 +366,4 @@ with DAG("APJ_1_T24", ...@@ -394,4 +366,4 @@ with DAG("APJ_1_T24",
# postgres_conn_id=POSTGRES_CONN_ID, # postgres_conn_id=POSTGRES_CONN_ID,
# ) # )
begin >> check_done_ext >> sftp_xx >> check_file_done >> check_surrounding_done >> ds_ext_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk begin >> check_done_ext >> sftp_xx >> check_file_done >> ds_ext_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk
...@@ -112,6 +112,29 @@ def rename_reversal(): ...@@ -112,6 +112,29 @@ def rename_reversal():
print("file sudah ada") if os.path.exists(destination_path) else shutil.copy2(source_path, destination_path) print("file sudah ada") if os.path.exists(destination_path) else shutil.copy2(source_path, destination_path)
def check_surrounding_done(**kwargs):
paths_labels = [
("switching", "Switching"),
("fincloud/cbr", "Fincloud"),
("igate", "Igate"),
("los-fintech", "Los Fintech"),
("los-kpt", "Los KPT"),
("bib", "BIB"),
("ebanking", "Ebanking"),
("mgate", "MGate")
]
base_path = f"""{Variable.get("LOCAL_PATH")}"""
done_ext = f"""{yesterday_nodash}/done_ext.csv"""
for path, label in paths_labels:
file_path = f"""{base_path}{path}/{done_ext}"""
print(f"{label} {'SELESAI' if os.path.isfile(f'{base_path}{path}/{done_ext}') else 'BELUM SELESAI'} - Path : {file_path}")
if not os.path.isfile(f'{base_path}{path}/{done_ext}'):
return False
return True
with DAG("APJ_1_t24_interface", with DAG("APJ_1_t24_interface",
start_date=datetime(2021, 1, 1), start_date=datetime(2021, 1, 1),
schedule_interval=None, schedule_interval=None,
...@@ -281,6 +304,13 @@ with DAG("APJ_1_t24_interface", ...@@ -281,6 +304,13 @@ with DAG("APJ_1_t24_interface",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
check_surrounding_done = ShortCircuitOperator(
task_id="check_surrounding_done",
provide_context=True,
python_callable=check_surrounding_done,
op_kwargs={}
)
pentaho = BashOperator( pentaho = BashOperator(
task_id='pentaho', task_id='pentaho',
bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/D_REPORTS.kjb'""" bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/D_REPORTS.kjb'"""
...@@ -314,4 +344,4 @@ with DAG("APJ_1_t24_interface", ...@@ -314,4 +344,4 @@ with DAG("APJ_1_t24_interface",
) )
begin >> sftp_xx >> sftp_ppap >> rename_reversal >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> ds_create_table_history_nominatif >> ds_to_history >> set_access_schemma >> set_access_all_table >> set_access_schemma_his >> set_access_all_table_his >> pentaho >> zip_today >> delete_before >> history_finish begin >> sftp_xx >> sftp_ppap >> rename_reversal >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> ds_create_table_history_nominatif >> ds_to_history >> set_access_schemma >> set_access_all_table >> set_access_schemma_his >> set_access_all_table_his >> check_surrounding_done >> pentaho >> zip_today >> delete_before >> history_finish
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment