Commit cb37199d authored by Timothy Ardha's avatar Timothy Ardha

Clean Data Unicode

parent eaaf8cca
...@@ -117,6 +117,17 @@ def ds_list_extractor(**context): ...@@ -117,6 +117,17 @@ def ds_list_extractor(**context):
return files return files
def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
# with open(f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}""") as csvFile:
# reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
# field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv > {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv && mv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv > {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv && mv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}_bk.csv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{yesterday_nodash}.{table['file_id']}.csv""")})
return arr
def ds_syntax_copy(ti): def ds_syntax_copy(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris: if not iris:
...@@ -265,6 +276,19 @@ with DAG("APJ_1_T24", ...@@ -265,6 +276,19 @@ with DAG("APJ_1_T24",
do_xcom_push=True do_xcom_push=True
) )
csv_clean_syntax = PythonOperator(
task_id='csv_clean_syntax',
python_callable=csv_clean_syntax
)
clean_csv = BashOperator.partial(
task_id="clean_csv",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).expand_kwargs(
XComArg(csv_clean_syntax),
)
ds_syntax_copy = PythonOperator( ds_syntax_copy = PythonOperator(
task_id='ds_syntax_copy', task_id='ds_syntax_copy',
python_callable=ds_syntax_copy python_callable=ds_syntax_copy
...@@ -344,4 +368,4 @@ with DAG("APJ_1_T24", ...@@ -344,4 +368,4 @@ with DAG("APJ_1_T24",
# postgres_conn_id=POSTGRES_CONN_ID, # postgres_conn_id=POSTGRES_CONN_ID,
# ) # )
begin >> check_done_ext >> sftp_xx >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done begin >> check_done_ext >> sftp_xx >> check_file_done >> ds_ext_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment