Commit 734cab19 authored by Timothy Ardhaneswara's avatar Timothy Ardhaneswara

Fix scipt t24

parent 07f3c839
......@@ -19,7 +19,6 @@ from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
......@@ -37,11 +36,7 @@ def _start():
print(f"""Start :: Extractor :: {yesterday_strip}""")
def check_file_done(**kwargs):
from random import randint
# number = randint(0, 10)
print(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/DONE.2.csv''')
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/DONE.2.csv'''):
print("A")
return True
......@@ -51,10 +46,6 @@ def check_file_done(**kwargs):
return False
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
......@@ -90,8 +81,6 @@ def metadata_syntax_copy(ti):
table in json.loads(iris[0][0][0])]
def meta_extract(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}t24_bnk/""" + (
......@@ -103,7 +92,6 @@ def meta_extract(ti, copy_sql, file_id):
# ##############
def ds_list_extractor(**context):
sql_stmt = f"""select * from ds_conf.ds_t24_list_extractor();"""
pg_hook = PostgresHook(
......@@ -120,9 +108,6 @@ def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
# with open(f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}""") as csvFile:
# reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
# field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv > {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv && mv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv > {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv && mv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv""")})
return arr
......@@ -149,7 +134,6 @@ def ds_csv_to_table(ti, copy_sql, file_id, **context):
with DAG("APJ_1_T24",
start_date=datetime(2021, 1, 1),
schedule_interval='0/30 2-8 * * *',
# schedule_interval=None,
catchup=False,
concurrency=10) as dag:
# ###########################################
......@@ -171,24 +155,11 @@ with DAG("APJ_1_T24",
op_kwargs={},
)
sftp_xx = BashOperator(
task_id="sftp_xx",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:DFE/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""",
)
# ds_get_t24_extractor = SFTPOperator(
# task_id="ds_get_t24_extractor",
# ssh_conn_id="ssh_default",
# local_filepath=f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# remote_filepath=f"""DFE/{yesterday_nodash}/""",
# operation="get",
# create_intermediate_dirs=True,
# dag=dag
# )
check_file_done = ShortCircuitOperator(
task_id="check_file_done",
provide_context=True,
......@@ -282,7 +253,6 @@ with DAG("APJ_1_T24",
clean_csv = BashOperator.partial(
task_id="clean_csv",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).expand_kwargs(
XComArg(csv_clean_syntax),
)
......@@ -306,8 +276,6 @@ with DAG("APJ_1_T24",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_create_metadata_file = PostgresOperator(
sql=f"""create table {POSTGRES_SCHEMA}.a_metadata as (select * from ds_conf.fx_metadata_file());""",
task_id="ds_create_metadata_file",
......@@ -320,26 +288,23 @@ with DAG("APJ_1_T24",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_nominatif = TriggerDagRunOperator(
task_id=f'ds_nominatif',
trigger_dag_id="APJ_1_t24_interface",
)
ds_remove_bnk = BashOperator(
task_id="ds_remove_bnk",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash};
""",
rm -r /opt/airflow/dags/DFE/t24_bnk/{yesterday_nodash};
""",
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/t24/{yesterday_nodash}/done_ext.csv;
""",
touch /opt/airflow/dags/DFE/t24/{yesterday_nodash}/done_ext.csv;
""",
)
history_finish = PostgresOperator(
......@@ -347,7 +312,7 @@ with DAG("APJ_1_T24",
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment