Commit 795f9315 authored by Margrenzo Gunawan's avatar Margrenzo Gunawan 💬

backdate

parent 1f5c8440
......@@ -191,13 +191,13 @@ with DAG("APJ_1_T24_EO",
# METADATA
# ###########################################
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
# history_start = PostgresOperator(
# sql=f"""INSERT INTO ds_conf."Datasource_history"
# (source, start_time, finish_time, status, env, date_of_data) VALUES
# ('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
# task_id="history_start",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# Copt Source dari file EXTRACTOR
file_sc_cp = BashOperator(
......@@ -310,10 +310,10 @@ with DAG("APJ_1_T24_EO",
postgres_conn_id=POSTGRES_CONN_ID,
)
# ds_nominatif = TriggerDagRunOperator(
# task_id=f'ds_nominatif',
# trigger_dag_id="APJ_1_t24_interface",
# )
ds_nominatif = TriggerDagRunOperator(
task_id=f'ds_nominatif',
trigger_dag_id="APJ_1_t24_interface_backdate",
)
ds_remove_bnk = BashOperator(
......@@ -342,4 +342,4 @@ with DAG("APJ_1_T24_EO",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
begin >> check_done_ext >> sftp_xx >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_remove_bnk >> ds_ext_done
begin >> check_done_ext >> sftp_xx >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done
......@@ -280,22 +280,22 @@ with DAG("APJ_1_t24_interface_backdate",
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/t24/{yesterday_lusa};
""",
)
# delete_before = BashOperator(
# task_id="delete_before",
# bash_command=f"""
# rm -r /opt/airflow/dags/DFE/t24/{yesterday_lusa};
# """,
# )
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS' and date_of_data = '{yesterday_nodash}';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
# history_finish = PostgresOperator(
# sql=f"""
# UPDATE ds_conf."Datasource_history"
# SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
# WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS' and date_of_data = '{yesterday_nodash}';
# """,
# task_id="history_finish",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
begin >> sftp_xx >> sftp_ppap >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> set_access_schemma >> set_access_all_table >> set_access_schemma_his >> set_access_all_table_his >> zip_today >> delete_before >> history_finish
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment