Commit e6540b4c authored by Margrenzo Gunawan's avatar Margrenzo Gunawan 💬

Clean Data unicode

Clean unicode csv
parent 08e6ef4a
......@@ -117,6 +117,17 @@ def ds_list_extractor(**context):
return files
def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
# with open(f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}""") as csvFile:
# reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
# field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}""")})
return arr
def ds_syntax_copy(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
......@@ -138,7 +149,7 @@ def ds_csv_to_table(ti, copy_sql, file_id, **context):
with DAG("APJ_1_T24",
start_date=datetime(2021, 1, 1),
schedule='0/30 0-12 * * *',
schedule='0/10 0-12 * * *',
# schedule_interval=None,
catchup=False,
concurrency=10) as dag:
......@@ -264,12 +275,18 @@ with DAG("APJ_1_T24",
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_syntax_copy = PythonOperator(
task_id='ds_syntax_copy',
python_callable=ds_syntax_copy
csv_clean_syntax = PythonOperator(
task_id='csv_clean_syntax',
python_callable=csv_clean_syntax
)
clean_csv = BashOperator.partial(
task_id="clean_csv",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).expand_kwargs(
XComArg(csv_clean_syntax),
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_csv_to_table,
......@@ -342,4 +359,4 @@ with DAG("APJ_1_T24",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
begin >> check_done_ext >> sftp_xx >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done
begin >> check_done_ext >> sftp_xx >> check_file_done >> ds_ext_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment