Commit d5912b00 authored by Timothy Ardha's avatar Timothy Ardha

clean data ds_cbr

parent dc8256c8
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
# from acme.operators.mssql_import_operator import MsSqlImportOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_cbr' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_CBR")
DS_CONN_ID_INTER = Variable.get("DS_CBR_INTER")
DS_FOLDER = 'cbr'
DS_DB = 'cbr'
DS_SCHEMA = 'cbr'
DS_CREATE_TABLE = ''
# def create_ddl(schema,column):
# return f"""SELECT 'CREATE TABLE {POSTGRES_SCHEMA}.cbr_' + lower(table_name) + ' ( ' +
# STUFF(
# (
# SELECT ', ' + lower(column_name) + ' ' + replace(case when DATA_TYPE = 'char' then DATA_TYPE + '(' + CONVERT(varchar(10),CHARACTER_MAXIMUM_LENGTH)+ ')' else DATA_TYPE end, 'datetime','timestamp')
# FROM information_schema.columns c2
# WHERE c2.table_schema = c.table_schema
# AND c2.table_name = c.table_name
# ORDER BY ordinal_position
# FOR XML PATH(''), TYPE
# ).value('.', 'NVARCHAR(MAX)'), 1, 2, ''
# ) + ' );' AS create_table_statement
# FROM information_schema.columns c
# WHERE table_schema = 'dbo'
# AND table_name in ({column})
# GROUP BY table_schema, table_name;"""
def create_ddl(schema,column):
return f"""SELECT 'CREATE TABLE {POSTGRES_SCHEMA}."' + lower(table_name) + '" ( ' +
STUFF(
(
SELECT ', "' + lower(column_name) + '" text'
FROM information_schema.columns c2
WHERE c2.table_schema = c.table_schema
AND c2.table_name = c.table_name
ORDER BY ordinal_position
FOR XML PATH(''), TYPE
).value('.', 'NVARCHAR(MAX)'), 1, 2, ''
) + ' );' AS create_table_statement
FROM information_schema.columns c
WHERE table_schema = 'dbo'
AND table_name in ({column})
GROUP BY table_schema, table_name;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('cbr_db', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{
# "bash_command": f"""which bcp"""
"bash_command": f"""bcp { '"'+table['query_out']+'" queryout ' if table['file_type'] == 'query' else table['table_source']+'.dbo.'+table['file_id'].replace(".", "_")+' out'} {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""/{table['file_id'].replace(".", "_")}.csv -c -t'{table['delimiter']}' -S172.19.3.136,1433 -Udwh -PUser12 -u""",
# "file_id": table['file_id']
}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl('dbo', ','.join(column)))
sql_stmt = create_ddl('dbo', ','.join(column))
mssql_hook = MsSqlHook(
mssql_conn_id=DS_CONN_ID,
)
ms_conn = mssql_hook.get_conn()
cursor = ms_conn.cursor()
cursor.execute(sql_stmt)
x = cursor.fetchall()
print(x)
# column_i = []
# for table_i in json.loads(iris[0][0][0]):
# column_i.append(f"""'{table_i['file_id']}'""")
# print(create_ddl('dbo', ','.join(column_i)))
# sql_stmt_i = create_ddl('dbo', ','.join(column_i))
# mssql_hook_i = MsSqlHook(
# mssql_conn_id=DS_CONN_ID_INTER,
# )
# ms_conn_i = mssql_hook_i.get_conn()
# cursor_i = ms_conn_i.cursor()
# cursor_i.execute(sql_stmt_i)
# x_i = cursor_i.fetchall()
column_q = []
for table_q in json.loads(iris[0][0][0]):
print(table_q['query_table'])
if table_q['query_table'] is not None:
column_q.append([f"""{table_q['query_table']}"""])
return x
def pg_ddl_syntax(ti):
colus = []
print(ti.xcom_pull(task_ids=['ds_get_ddl'][0]))
for table in ti.xcom_pull(task_ids=['ds_get_ddl'][0]):
colus.append(f"""{table[0]}""")
print(colus)
return [{"sql": colus}]
def pg_csv_clean_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
# with open(f"""{Variable.get("LOCAL_PATH")}t24_bnk/{yesterday_nodash}/{table['file_id']}""") as csvFile:
# reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
# field_names_list = reader.__next__()
arr.append({"bash_command": f"""echo 'OK' """ if table['sed_command'] == 'nil' else (f"""{Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}.csv > {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}_bk.csv && mv {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}_bk.csv {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}.csv && """.join(table['sed_command'].split('|;|;|')) + f""" {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}.csv > {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}_bk.csv && mv {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}_bk.csv {Variable.get("LOCAL_PATH")}cbr/{yesterday_nodash}/{table['file_id']}.csv""")})
return arr
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name'].replace(".", "_").replace(table['table_source']+'_', "")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_cbr_db",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = BashOperator.partial(
task_id="ds_table_to_csv",
).expand_kwargs(
XComArg(ds_get_syntax),
)
# ds_table_to_csv = PythonOperator.partial(
# task_id="ds_table_to_csv",
# python_callable=ds_get_csv,
# dag=dag
# ).expand_kwargs(
# XComArg(ds_get_syntax),
# )
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_csv_clean_syntax = PythonOperator(
task_id='pg_csv_clean_syntax',
python_callable= pg_csv_clean_syntax
)
pg_clean_csv = BashOperator.partial(
task_id="pg_clean_csv",
).expand_kwargs(
XComArg(pg_csv_clean_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history_surrounding('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_csv_clean_syntax >> pg_clean_csv >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> ds_to_history >> delete_before >> history_finish
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment