Commit 07f3c839 authored by Timothy Ardhaneswara's avatar Timothy Ardhaneswara

Update script airflow BOII

parent 76209c8a
import json import json, os
from datetime import date from datetime import date
import calendar import calendar
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta from datetime import datetime, timedelta
...@@ -19,30 +19,50 @@ from airflow.decorators import task ...@@ -19,30 +19,50 @@ from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get( yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA") "DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d') yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB") POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_ebanking' + Variable.get("ENV_T24") POSTGRES_SCHEMA = 'ds_ebanking' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_EBANKING") DS_CONN_ID = Variable.get("DS_EBANKING")
DS_FOLDER = 'ebanking' DS_FOLDER = 'ebanking'
DS_DB = 'ebanking' DS_DB = 'ebanking'
DS_SCHEMA = 'ebanking' DS_SCHEMA = 'public'
DS_CREATE_TABLE = '' DS_CREATE_TABLE = ''
def create_ddl(schema,column):
def create_ddl(schema, column):
return f"""select string_agg(x,'') from (select return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,') || ' text'|| ' ); ' as x 'create table {POSTGRES_SCHEMA}.'||table_catalog||'_'||table_name|| ' ( ' || string_agg('"'||column_name||'"', ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}' from information_schema.columns where table_schema = '{schema}'
and table_name in ({column}) and table_name in ({column})
group by table_schema, table_name) as x ;""" group by table_catalog, table_schema, table_name) as x ;"""
def _start(): def _start():
print("Start :: Extractor ") print(f"""Start :: Extractor :: {yesterday_nodash}""")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor(): def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('ebanking', '{yesterday_strip}');""" sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('ebanking', '{yesterday_strip}');"""
...@@ -55,22 +75,31 @@ def ds_list_extractor(): ...@@ -55,22 +75,31 @@ def ds_list_extractor():
files = cursor.fetchall() files = cursor.fetchall()
return files return files
def ds_get_syntax(ti): def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris: if not iris:
raise Exception('No data.') raise Exception('No data.')
return [{"op_kwargs": { return [{"op_kwargs": {
"copy_sql": f"""COPY {DS_SCHEMA}.{table['file_id'].replace(".", "_")} to STDOUT delimiter '|' CSV header""", "copy_sql": f"""COPY ( select * from {DS_SCHEMA}.{table['file_id'].replace(".", "_")}{table['partition_table_format']} {table['condition']}) to STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}} "file_id": f"""{table['table_source']}_{table['file_id']}""",
"database": f"""{table['table_source']}"""}
}
for for
table in json.loads(iris[0][0][0])] table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID) def ds_get_csv(ti, copy_sql, file_id, database):
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
schema=database
)
print('INFO::',pg_hook)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + ( pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get( f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""") "WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
# #
# def ds_drop_syntax(ti): # def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor']) # iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
...@@ -85,48 +114,49 @@ def ds_get_csv(ti, copy_sql, file_id): ...@@ -85,48 +114,49 @@ def ds_get_csv(ti, copy_sql, file_id):
def ds_get_ddl(ti): def ds_get_ddl(ti):
global DS_CREATE_TABLE global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = [] sql_table = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""") print(create_ddl(DS_SCHEMA, f"""'{table['file_id']}'"""))
print(create_ddl(DS_SCHEMA, ','.join(column))) sql_stmt = create_ddl(DS_SCHEMA, f"""'{table['file_id']}'""")
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column)) pg_hook = PostgresHook(
pg_hook = PostgresHook( postgres_conn_id=DS_CONN_ID,
postgres_conn_id=DS_CONN_ID, schema=table['table_source']
) )
pg_conn = pg_hook.get_conn() pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor() cursor = pg_conn.cursor()
cursor.execute(sql_stmt) cursor.execute(sql_stmt)
return cursor.fetchall()[0][0] sql_table.append(cursor.fetchall()[0][0])
return sql_table
def pg_ddl_syntax(ti): def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0]) print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}] return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti): def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris: if not iris:
raise Exception('No data.') raise Exception('No data.')
return [{"op_kwargs": { return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""", "copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_source']}_{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}} "file_id": f"""{table['table_source']}_{table['file_id']}""",}}
for for
table in json.loads(iris[0][0][0])] table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id): def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID) pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + ( pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get( f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""") "WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_ebanking", with DAG("APJ_1_ebanking",
start_date=datetime(2021, 1, 1), start_date=datetime(2021, 1, 1),
schedule_interval='0 * * * *', schedule=cron_tab,
catchup=False, catchup=False,
concurrency=3) as dag: concurrency=3) as dag:
begin = PythonOperator( begin = PythonOperator(
task_id=f"Begin", task_id=f"Begin",
python_callable=_start, python_callable=_start,
...@@ -135,6 +165,20 @@ with DAG("APJ_1_ebanking", ...@@ -135,6 +165,20 @@ with DAG("APJ_1_ebanking",
} }
) )
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
create_folder = BashOperator( create_folder = BashOperator(
task_id='create_folder', task_id='create_folder',
...@@ -172,13 +216,13 @@ with DAG("APJ_1_ebanking", ...@@ -172,13 +216,13 @@ with DAG("APJ_1_ebanking",
# ) # )
pg_drop_schema = PostgresOperator( pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""", sql=f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema", task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
pg_create_schema = PostgresOperator( pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""", sql=f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema", task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
...@@ -208,18 +252,47 @@ with DAG("APJ_1_ebanking", ...@@ -208,18 +252,47 @@ with DAG("APJ_1_ebanking",
XComArg(pg_push_syntax), XComArg(pg_push_syntax),
) )
set_access_schemma = PostgresOperator( set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""", sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO ebanking;""",
task_id="set_access_schemma", task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
set_access_all_table = PostgresOperator( set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""", sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO ebanking;""",
task_id="set_access_all_table", task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history_surrounding('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> ds_to_history >> delete_before >> history_finish
\ No newline at end of file \ No newline at end of file
This diff is collapsed.
import json, os import json, os
import shutil
from datetime import date from datetime import date
import calendar import calendar
...@@ -25,7 +24,6 @@ from airflow import XComArg ...@@ -25,7 +24,6 @@ from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get( yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA") "DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d') yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1))
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}""" d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:] today = d[:]
...@@ -46,7 +44,7 @@ and table_name in ({column}) ...@@ -46,7 +44,7 @@ and table_name in ({column})
group by table_schema, table_name) as x ;""" group by table_schema, table_name) as x ;"""
def _start(): def _start():
print("Start :: Extractor ") print(f"""Start :: Extractor :: {yesterday_nodash}""")
def check_done_ext(**kwargs): def check_done_ext(**kwargs):
from random import randint from random import randint
...@@ -137,11 +135,6 @@ def pg_push_csv(ti, copy_sql, file_id): ...@@ -137,11 +135,6 @@ def pg_push_csv(ti, copy_sql, file_id):
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get( f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""") "WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
def to_zip():
path_folder = f"{Variable.get("LOCAL_PATH")}{yesterday_lusa}"
shutil.make_archive(path_folder, 'zip', path_folder)
shutil.rmtree(path_folder)
with DAG("APJ_1_mgate", with DAG("APJ_1_mgate",
start_date=datetime(2021, 1, 1), start_date=datetime(2021, 1, 1),
schedule_interval='10 0 * * *', schedule_interval='10 0 * * *',
...@@ -273,12 +266,7 @@ with DAG("APJ_1_mgate", ...@@ -273,12 +266,7 @@ with DAG("APJ_1_mgate",
task_id="history_finish", task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
to_zip = PythonOperator(
task_id='to_zip',
python_callable=to_zip
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> history_finish >> to_zip begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> history_finish
\ No newline at end of file \ No newline at end of file
...@@ -44,7 +44,7 @@ and table_name in ({column}) ...@@ -44,7 +44,7 @@ and table_name in ({column})
group by table_schema, table_name) as x ;""" group by table_schema, table_name) as x ;"""
def _start(): def _start():
print("Start :: Extractor ") print(f"""Start :: Extractor :: {yesterday_nodash}""")
def check_done_ext(**kwargs): def check_done_ext(**kwargs):
from random import randint from random import randint
...@@ -256,6 +256,12 @@ with DAG("APJ_1_SWITCHING_DB", ...@@ -256,6 +256,12 @@ with DAG("APJ_1_SWITCHING_DB",
""", """,
) )
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history_surrounding('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
history_finish = PostgresOperator( history_finish = PostgresOperator(
sql=f""" sql=f"""
UPDATE ds_conf."Datasource_history" UPDATE ds_conf."Datasource_history"
...@@ -269,4 +275,4 @@ with DAG("APJ_1_SWITCHING_DB", ...@@ -269,4 +275,4 @@ with DAG("APJ_1_SWITCHING_DB",
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> history_finish begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> ds_to_history >> history_finish
\ No newline at end of file \ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
tomorrow = (datetime.now() + timedelta(1))
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = f"""ds_switching_{yesterday_nodash}"""
DS_CONN_ID = Variable.get("DS_SWITCHING")
DS_FOLDER = 'switching'
DS_DB = 'switching'
DS_SCHEMA = 'public'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print(f"""Start :: Extractor :: {yesterday_nodash}""")
def check_last_month():
if tomorrow.day == 1:
print("OK")
print(tomorrow.day)
return True
else:
print("NOT OK")
print(tomorrow.day)
return False
def check_done_ext(**kwargs):
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return True
else:
print("B")
# STOP DAG
return False
def check_monthly_done(**kwargs):
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/monthly_done.csv'''):
print("A")
return False
else:
print("B")
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('switching', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY ( select * from {DS_SCHEMA}.{table['file_id'].replace(".", "_")}{table['partition_table_format']} {table['condition']}) to STDOUT delimiter '{table['delimiter']}' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_SWITCHING_MONTHLY",
start_date=datetime(2021, 1, 1),
schedule_interval='0 5 1 * *',
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
check_monthly_done = ShortCircuitOperator(
task_id="check_monthly_done",
provide_context=True,
python_callable=check_monthly_done,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('ds_switching_monthly'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
#ds_get_syntax = PythonOperator(
# task_id='ds_syntax_get',
# python_callable=ds_get_syntax
#)
#ds_table_to_csv = PythonOperator.partial(
# task_id="ds_table_to_csv",
# python_callable=ds_get_csv,
# dag=dag
#).expand_kwargs(
# XComArg(ds_get_syntax),
#)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_monthly_done = BashOperator(
task_id="ds_monthly_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/monthly_done.csv;
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = 'ds_switching_monthly' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> check_monthly_done >> history_start >> ds_list_extractor >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_monthly_done >> history_finish
\ No newline at end of file
...@@ -148,8 +148,8 @@ def ds_csv_to_table(ti, copy_sql, file_id, **context): ...@@ -148,8 +148,8 @@ def ds_csv_to_table(ti, copy_sql, file_id, **context):
with DAG("APJ_1_T24", with DAG("APJ_1_T24",
start_date=datetime(2021, 1, 1), start_date=datetime(2021, 1, 1),
schedule_interval='0/30 2-9 * * *', schedule_interval='0/30 2-8 * * *',
# schedule_interval=None, # schedule_interval=None,
catchup=False, catchup=False,
concurrency=10) as dag: concurrency=10) as dag:
# ########################################### # ###########################################
...@@ -284,7 +284,7 @@ with DAG("APJ_1_T24", ...@@ -284,7 +284,7 @@ with DAG("APJ_1_T24",
task_id="clean_csv", task_id="clean_csv",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""", # bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).expand_kwargs( ).expand_kwargs(
XComArg(csv_clean_syntax), XComArg(csv_clean_syntax),
) )
ds_syntax_copy = PythonOperator( ds_syntax_copy = PythonOperator(
...@@ -352,4 +352,4 @@ with DAG("APJ_1_T24", ...@@ -352,4 +352,4 @@ with DAG("APJ_1_T24",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
begin >> check_done_ext >> sftp_xx >> check_file_done >> ds_ext_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> ds_nominatif >> ds_remove_bnk >> history_finish begin >> check_done_ext >> sftp_xx >> check_file_done >> ds_ext_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> ds_syntax_copy >> ds_csv_to_table >> ds_create_table_history >> ds_create_metadata_file >> ds_create_metadata_file_sys >> ds_nominatif >> ds_remove_bnk >> history_finish
\ No newline at end of file
...@@ -342,4 +342,4 @@ with DAG("APJ_1_T24_EO", ...@@ -342,4 +342,4 @@ with DAG("APJ_1_T24_EO",
# postgres_conn_id=POSTGRES_CONN_ID, # postgres_conn_id=POSTGRES_CONN_ID,
# ) # )
begin >> check_done_ext >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_remove_bnk >> ds_ext_done begin >> check_done_ext >> check_file_done >> history_start >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_remove_bnk >> ds_ext_done
\ No newline at end of file
...@@ -342,4 +342,4 @@ with DAG("APJ_FDS_T24", ...@@ -342,4 +342,4 @@ with DAG("APJ_FDS_T24",
# postgres_conn_id=POSTGRES_CONN_ID, # postgres_conn_id=POSTGRES_CONN_ID,
# ) # )
begin >> check_done_ext >> check_file_done >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done begin >> check_done_ext >> check_file_done >> file_sc_cp >> metadata_truncate >> metadata_sed >> metadata_list_extractor >> metadata_syntax_copy >> metadata_import >> metadata_refresh >> ds_create_table >> ds_list_extractor >> ds_syntax_copy >> ds_csv_to_table >> ds_create_metadata_file >> ds_create_metadata_file_sys >> set_access_schemma >> set_access_all_table >> ds_nominatif >> ds_remove_bnk >> ds_ext_done
\ No newline at end of file
...@@ -2,6 +2,7 @@ import json, os ...@@ -2,6 +2,7 @@ import json, os
import csv import csv
from datetime import date from datetime import date
import calendar import calendar
import time
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
...@@ -67,17 +68,17 @@ def pg_ddl_syntax(ti): ...@@ -67,17 +68,17 @@ def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = [] arr = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile: with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""",encoding = "ISO-8859-1") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""") reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__() field_names_list = reader.__next__()
arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace(' ', '_').replace('.', '_').replace('/', '_').replace('(', '_').replace(')', '_').replace('+', '_').replace('___', '_').lower().replace((table['delimiter']+'limit'+table['delimiter']), (table['delimiter']+'limit_reff'+table['delimiter'])) for w in field_names_list])} text);"""}) arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace(' ', '_').replace('.', '_').replace('/', '_').replace('(', '_').replace(')', '_').replace('+', '_').replace('___', '_').replace('%', '').lower().replace((table['delimiter']+'limit'+table['delimiter']), (table['delimiter']+'limit_reff'+table['delimiter'])) for w in field_names_list])} text);"""})
return arr return arr
def csv_clean_syntax(ti): def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = [] arr = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile: with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""", encoding = "ISO-8859-1") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""") reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__() field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""")}) arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""")})
...@@ -92,7 +93,7 @@ def sql_clean_syntax(ti): ...@@ -92,7 +93,7 @@ def sql_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = [] arr = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile: with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""", encoding = "ISO-8859-1") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""") reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__() field_names_list = reader.__next__()
arr.append({"sql": f"""select 'OK' """ if table['sql_command'] == 'nil' else f"""{";".join(table['sql_command'].split('|;|;|')).replace('T24_SOURCE', POSTGRES_SCHEMA)}"""}) arr.append({"sql": f"""select 'OK' """ if table['sql_command'] == 'nil' else f"""{";".join(table['sql_command'].split('|;|;|')).replace('T24_SOURCE', POSTGRES_SCHEMA)}"""})
...@@ -101,6 +102,39 @@ def sql_clean_syntax(ti): ...@@ -101,6 +102,39 @@ def sql_clean_syntax(ti):
def stop_task(**kwargs): def stop_task(**kwargs):
return True return True
def check_backdate(**kwargs):
if Variable.get("DATE_OF_DATA") == 'today':
print(f"""Date: {Variable.get("DATE_OF_DATA")}""")
print("OK!")
return True
else:
print(f"""Date: {Variable.get("DATE_OF_DATA")}""")
print("This is Backdate!")
return False
def check_surrounding_done(**kwargs):
paths_labels = [
("mgate", "MGate"),
("switching", "Switching"),
("ibb", "IBB"),
("ebanking", "Ebanking")
]
run_next = False
base_path = f"""{Variable.get("LOCAL_PATH")}"""
done_ext = f"""{yesterday_nodash}/done_ext.csv"""
while run_next == False:
i = 0
for path, label in paths_labels:
file_path = f"""{base_path}{path}/{done_ext}"""
print(f"{label} {'SELESAI' if os.path.isfile(f'{base_path}{path}/{done_ext}') else 'BELUM SELESAI'} - Path : {file_path}")
i += (0 if not os.path.isfile(f'{base_path}{path}/{done_ext}') else 1)
print(i)
if i == 4:
return True
else:
print("===================================")
time.sleep(60)
with DAG("APJ_1_t24_interface", with DAG("APJ_1_t24_interface",
start_date=datetime(2021, 1, 1), start_date=datetime(2021, 1, 1),
schedule_interval=None, schedule_interval=None,
...@@ -119,6 +153,11 @@ with DAG("APJ_1_t24_interface", ...@@ -119,6 +153,11 @@ with DAG("APJ_1_t24_interface",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NOM/TREASURY/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""", bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NOM/TREASURY/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
) )
sftp_ppap_t24 = BashOperator(
task_id="sftp_ppap_t24",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/PPAP.NOMINATIF/*{yesterday_nodash}*.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
)
# sftp_neraca = BashOperator( # sftp_neraca = BashOperator(
# task_id="sftp_xx", # task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""", # bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
...@@ -248,6 +287,20 @@ with DAG("APJ_1_t24_interface", ...@@ -248,6 +287,20 @@ with DAG("APJ_1_t24_interface",
postgres_conn_id=POSTGRES_CONN_ID, postgres_conn_id=POSTGRES_CONN_ID,
) )
check_backdate = ShortCircuitOperator(
task_id="check_backdate",
provide_context=True,
python_callable=check_backdate,
op_kwargs={}
)
check_surrounding_done = ShortCircuitOperator(
task_id="check_surrounding_done",
provide_context=True,
python_callable=check_surrounding_done,
op_kwargs={}
)
pentaho = BashOperator( pentaho = BashOperator(
task_id='pentaho', task_id='pentaho',
bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/apj/files/scripts/etl/D_REPORT.kjb'""" bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/apj/files/scripts/etl/D_REPORT.kjb'"""
...@@ -276,4 +329,4 @@ with DAG("APJ_1_t24_interface", ...@@ -276,4 +329,4 @@ with DAG("APJ_1_t24_interface",
) )
begin >> sftp_xx >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> ds_create_table_history_nominatif >> ds_to_history >> set_access_schemma >> set_access_all_table >> pentaho >> rep_nominatif_customer >> zip_today >> delete_before begin >> sftp_xx >> sftp_ppap_t24 >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> ds_create_table_history_nominatif >> ds_to_history >> set_access_schemma >> set_access_all_table >> check_backdate >> check_surrounding_done >> pentaho >> rep_nominatif_customer >> zip_today >> delete_before
\ No newline at end of file
...@@ -298,4 +298,4 @@ with DAG("APJ_FDS_T24_Interface", ...@@ -298,4 +298,4 @@ with DAG("APJ_FDS_T24_Interface",
# ) # )
begin >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> set_access_schemma >> set_access_all_table >> set_access_schemma_his >> set_access_all_table_his >> zip_today begin >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task >> set_access_schemma >> set_access_all_table >> set_access_schemma_his >> set_access_all_table_his >> zip_today
\ No newline at end of file
...@@ -40,7 +40,7 @@ DS_CREATE_TABLE = '' ...@@ -40,7 +40,7 @@ DS_CREATE_TABLE = ''
def _start(): def _start():
print("Start :: Extractor ") print(f"Start :: Extractor :: {yesterday_nodash}")
def ds_list_extractor(): def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('t24_interface_monthly', '{yesterday_strip}');""" sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('t24_interface_monthly', '{yesterday_strip}');"""
...@@ -68,17 +68,17 @@ def pg_ddl_syntax(ti): ...@@ -68,17 +68,17 @@ def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = [] arr = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile: with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""", encoding = "ISO-8859-1") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""") reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__() field_names_list = reader.__next__()
arr.append({"sql": f"""create table {table['table_name']} ({' text, '.join([w.replace(' ', '_').replace('.', '_').replace('/', '_').replace('(', '_').replace(')', '_').replace('+', '_').replace('___', '_').lower().replace((table['delimiter']+'limit'+table['delimiter']), (table['delimiter']+'limit_reff'+table['delimiter'])) for w in field_names_list])} text);"""}) arr.append({"sql": f"""drop table if exists {table['table_name']}; create table {table['table_name']} ({' text, '.join([w.replace(' ', '_').replace('.', '_').replace('/', '_').replace('(', '_').replace(')', '_').replace('+', '_').replace('___', '_').replace('%','').lower().replace((table['delimiter']+'limit'+table['delimiter']), (table['delimiter']+'limit_reff'+table['delimiter'])) for w in field_names_list])} text);"""})
return arr return arr
def csv_clean_syntax(ti): def csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = [] arr = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile: with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""", encoding = "ISO-8859-1") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""") reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__() field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""")}) arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']} > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}_bk {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""")})
...@@ -93,7 +93,7 @@ def sql_clean_syntax(ti): ...@@ -93,7 +93,7 @@ def sql_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor']) iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = [] arr = []
for table in json.loads(iris[0][0][0]): for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""") as csvFile: with open(f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/{table['file_id']}""", encoding = "ISO-8859-1") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""") reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__() field_names_list = reader.__next__()
arr.append({"sql": f"""select 'OK' """ if table['sql_command'] == 'nil' else f"""{";".join(table['sql_command'].split('|;|;|')).replace('T24_SOURCE', POSTGRES_SCHEMA)}"""}) arr.append({"sql": f"""select 'OK' """ if table['sql_command'] == 'nil' else f"""{";".join(table['sql_command'].split('|;|;|')).replace('T24_SOURCE', POSTGRES_SCHEMA)}"""})
...@@ -131,7 +131,7 @@ with DAG("APJ_1_t24_interface_monthly", ...@@ -131,7 +131,7 @@ with DAG("APJ_1_t24_interface_monthly",
) )
sftp_tax_t24 = BashOperator( sftp_tax_t24 = BashOperator(
task_id="sftp_tax_t24", task_id="sftp_tax_t24",
bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/TAX.REPORT.FDS/ID0010001/*{yesterday_nodash}*.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""", bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/TAX.REPORT/ID0010001/*{yesterday_nodash}*.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
) )
sftp_de_o_history = BashOperator( sftp_de_o_history = BashOperator(
task_id="sftp_de_o_history", task_id="sftp_de_o_history",
...@@ -246,4 +246,4 @@ with DAG("APJ_1_t24_interface_monthly", ...@@ -246,4 +246,4 @@ with DAG("APJ_1_t24_interface_monthly",
) )
begin >> sftp_ppap_t24 >> sftp_tax_t24 >> sftp_de_o_history >> sftp_de_i_history >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task begin >> sftp_ppap_t24 >> sftp_tax_t24 >> sftp_de_o_history >> sftp_de_i_history >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> sql_clean_syntax >> ds_csv_to_table >> ds_clean_data >> stop_task
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment