Commit b1ba6de3 authored by Margrenzo Gunawan's avatar Margrenzo Gunawan 💬

Production

Dags dari production
parent f16c0353
File added
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_ebanking' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_EBANKING")
DS_FOLDER = 'ebanking'
DS_DB = 'ebanking'
DS_SCHEMA = 'ebanking'
DS_CREATE_TABLE = ''
def create_ddl(schema, column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('ebanking', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {DS_SCHEMA}.{table['file_id'].replace(".", "_")} to STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_ebanking",
start_date=datetime(2021, 1, 1),
schedule=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql=f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql=f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
import json, os
import csv
from datetime import date
import calendar
import pandas
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_fincloud' + Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_fincloud'
DS_FOLDER = 'fincloud'
DS_DB = 'fincloud'
DS_SCHEMA = 'fincloud'
DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/cbr/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('fincloud', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote E'\b'""",
"file_id": f"""{table['table_source']}/{table['file_id']}"""}}
for
table in json.loads(iris[0][0][0])]
# CREATE TABLE
def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{table['table_source']}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace('.', '_').replace(' ', '_').replace('-', '_').lower() for w in field_names_list])} text);"""})
return arr
def ds_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""/opt/airflow/dags/DFE/{DS_FOLDER}/{file_id}""")
with DAG("APJ_1_fincloud_file",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
sftp_cbr = BashOperator(
task_id="sftp_cbr",
bash_command=f"""sshpass -p {Variable.get("SFTP_FINCLOUD_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_FINCLOUD_USER")}@{Variable.get("SFTP_FINCLOUD_HOST")}:cbr/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/cbr/""",
)
sftp_daily = BashOperator(
task_id="sftp_daily",
bash_command=f"""sshpass -p {Variable.get("SFTP_FINCLOUD_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_FINCLOUD_USER")}@{Variable.get("SFTP_FINCLOUD_HOST")}:daily/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/daily/""",
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
drop_schema = PostgresOperator(
sql=f"""drop schema if exists {POSTGRES_SCHEMA} cascade;""",
task_id="drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_schema = PostgresOperator(
sql=f"""create schema if not exists {POSTGRES_SCHEMA};""",
task_id="create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax = PythonOperator(
task_id='ds_syntax_push',
python_callable=ds_push_syntax
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_push_csv,
dag=dag
).expand_kwargs(
XComArg(ds_push_syntax),
)
ds_week = TriggerDagRunOperator(
task_id=f'ds_week',
trigger_dag_id="APJ_1_fincloud_obox",
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history_fincloud('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/cbr/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/*/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> sftp_cbr >> sftp_daily >> history_start >> drop_schema >> create_schema >> ds_list_extractor >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> ds_csv_to_table >> ds_week >> ds_to_history >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
\ No newline at end of file
import json, os
import csv
from datetime import date
import calendar
import pandas
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = (datetime.strptime(yesterday_nodash, '%Y%m%d') ).strftime('%Y-%m-%d')
yesterday_strip_md = (datetime.strptime(yesterday_nodash, '%Y%m%d')- timedelta(2)).strftime('%Y-%m-%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_fincloud' + Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_fincloud'
DS_FOLDER = 'fincloud'
DS_DB = 'fincloud'
DS_SCHEMA = 'fincloud'
DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
print(F"""DATE :: {yesterday_nodash} """)
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/obox/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('fincloud-obox', '{yesterday_strip_md}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote E'\b'""",
"file_id": f"""{table['table_source']}/{yesterday_nodash}{table['file_id']}"""}}
for
table in json.loads(iris[0][0][0])]
# CREATE TABLE
def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{table['table_source']}/{yesterday_nodash}{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace('.', '_').replace(' ', '_').replace('-', '_').lower() for w in field_names_list])} text);"""})
return arr
def ds_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""/opt/airflow/dags/DFE/{DS_FOLDER}/{file_id}""")
with DAG("APJ_1_fincloud_obox",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
sftp_obox = BashOperator(
task_id="sftp_obox",
bash_command=f"""sshpass -p {Variable.get("SFTP_FINCLOUD_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_FINCLOUD_USER")}@{Variable.get("SFTP_FINCLOUD_HOST")}:obox/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/obox/""",
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}_obox'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
# Menghapus ST.META.DATA dari file EXTRACTOR
obox_sed = BashOperator(
task_id="obox_sed",
bash_command=f"""for FILE in /opt/airflow/dags/DFE/{DS_FOLDER}/obox/{yesterday_nodash}/* ; do sed -i 1,3d $FILE; done""",
)
#
# drop_schema = PostgresOperator(
# sql=f"""drop schema if exists {POSTGRES_SCHEMA} cascade;""",
# task_id="drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# create_schema = PostgresOperator(
# sql=f"""create schema if not exists {POSTGRES_SCHEMA};""",
# task_id="create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax = PythonOperator(
task_id='ds_syntax_push',
python_callable=ds_push_syntax
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_push_csv,
dag=dag
).expand_kwargs(
XComArg(ds_push_syntax),
)
ds_to_history = PostgresOperator(
sql=f"""select ds_conf.ds_t24_create_table_history_fincloud('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
task_id="ds_to_history",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/obox/{yesterday_nodash}/done_ext.csv;
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}_obox' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> sftp_obox >> history_start >> obox_sed >> ds_list_extractor >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> ds_csv_to_table >> ds_to_history >> set_access_schemma >> set_access_all_table >> ds_ext_done >> history_finish
\ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_fraud' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_FRAUD")
DS_FOLDER = 'fraud'
DS_DB = 'fraud'
DS_SCHEMA = 'public'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,') || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('fraud', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY (select * from {table['table_source']}.{table['file_id'].replace(".", "_")} {table['condition']}) to STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_fraud",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table
\ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_bib' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_IBB")
DS_FOLDER = 'bib'
DS_DB = 'bib'
DS_SCHEMA = 'bib'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('ibb', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {DS_SCHEMA}.{table['file_id'].replace(".", "_")} to STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_ibb",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
\ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_igate' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_IGATE")
DS_FOLDER = 'igate'
DS_DB = 'igate'
DS_SCHEMA = 'public'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('igate', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY ( select * from {DS_SCHEMA}.{table['file_id'].replace(".", "_")}{table['partition_table_format']} {table['condition']}) to STDOUT delimiter '{table['delimiter']}' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_igate",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
\ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_los_fintech' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_LOS_FINTECH")
DS_FOLDER = 'los-fintech'
DS_DB = 'los-fintech'
DS_SCHEMA = 'public'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('los-fintech', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {DS_SCHEMA}.{table['file_id'].replace(".", "_")} to STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_los_fintech",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
\ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_los_kpt' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_LOS_KPT")
DS_FOLDER = 'los-kpt'
DS_DB = 'los-kpt'
DS_SCHEMA = 'public'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('los-kpt', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {DS_SCHEMA}.{table['file_id'].replace(".", "_")} to STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_los_kpt",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
\ No newline at end of file
import json, os
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_SCHEMA = 'ds_mgate' + Variable.get("ENV_T24")
DS_CONN_ID = Variable.get("DS_MGATE")
DS_FOLDER = 'mgate'
DS_DB = 'mgate'
DS_SCHEMA = 'mgate'
DS_CREATE_TABLE = ''
def create_ddl(schema,column):
return f"""select string_agg(x,'') from (select
'create table {POSTGRES_SCHEMA}.'||table_name || ' ( ' || string_agg(column_name, ' text,' order by ordinal_position) || ' text'|| ' ); ' as x
from information_schema.columns where table_schema = '{schema}'
and table_name in ({column})
group by table_schema, table_name) as x ;"""
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('mgate', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_get_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY ( select * from {DS_SCHEMA}.{table['file_id'].replace(".", "_")}{table['partition_table_format']} {table['condition']}) to STDOUT delimiter '{table['delimiter']}' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def ds_get_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(DS_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
#
# def ds_drop_syntax(ti):
# iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
# if not iris:
# raise Exception('No data.')
# return [{
# "sql": f"""drop table ds_ebanking.{table['file_id'].replace(".", "_")} CASCADE;"""}
# for
# table in json.loads(iris[0][0][0])]
def ds_get_ddl(ti):
global DS_CREATE_TABLE
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column = []
for table in json.loads(iris[0][0][0]):
column.append(f"""'{table['file_id']}'""")
print(create_ddl(DS_SCHEMA, ','.join(column)))
sql_stmt = create_ddl(DS_SCHEMA, ','.join(column))
pg_hook = PostgresHook(
postgres_conn_id=DS_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()[0][0]
def pg_ddl_syntax(ti):
print(ti.xcom_pull(task_ids=['ds_get_ddl'])[0])
return [{"sql": ti.xcom_pull(task_ids=['ds_get_ddl'])[0]}]
def pg_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['file_id'].replace(".", "_")} from STDOUT delimiter '|' CSV header""",
"file_id": table['file_id']}}
for
table in json.loads(iris[0][0][0])]
def pg_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}/{yesterday_nodash}.""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""{file_id}.csv""")
with DAG("APJ_1_mgate",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}"""
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
ds_get_syntax = PythonOperator(
task_id='ds_syntax_get',
python_callable=ds_get_syntax
)
ds_table_to_csv = PythonOperator.partial(
task_id="ds_table_to_csv",
python_callable=ds_get_csv,
dag=dag
).expand_kwargs(
XComArg(ds_get_syntax),
)
ds_get_ddl = PythonOperator(
task_id='ds_get_ddl',
python_callable=ds_get_ddl,
do_xcom_push=True
)
# ds_drop_syntax = PythonOperator(
# task_id='ds_drop_syntax',
# python_callable=ds_drop_syntax
# )
pg_drop_schema = PostgresOperator(
sql= f"""drop schema IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
task_id="pg_drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_create_schema = PostgresOperator(
sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
task_id="pg_create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
pg_push_syntax = PythonOperator(
task_id='pg_syntax_push',
python_callable=pg_push_syntax
)
pg_csv_to_table = PythonOperator.partial(
task_id="pg_csv_to_table",
python_callable=pg_push_csv,
dag=dag
).expand_kwargs(
XComArg(pg_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
delete_before = BashOperator(
task_id="delete_before",
bash_command=f"""
rm -r /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_lusa};
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> create_folder >> ds_list_extractor >> ds_get_syntax >> ds_table_to_csv >> ds_get_ddl >> pg_drop_schema >> pg_create_schema >> pg_ddl_syntax >> pg_create_table >> pg_push_syntax >> pg_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> delete_before >> history_finish
\ No newline at end of file
import json, os
import csv
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
# from airflow.providers.postgres.operators.postgres import PostgresOperator
# from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.models.xcom import XCom
# from airflow.decorators import task
# from airflow.hooks.postgres_hook import PostgresHook
# from airflow import XComArg
# # yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
# "DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
# yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
# yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
# POSTGRES_CONN_ID = Variable.get("DS_DB")
# POSTGRES_ENV = Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
# DS_CONN_ID = 'ds_t24'
# DS_FOLDER = 't24_interface'
# DS_DB = 't24_interface'
# DS_SCHEMA = 't24_interface'
# DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
with DAG("APJ_1_REPORT_6th_MONTH",
start_date=datetime(2021, 1, 1),
schedule_interval='0 7 7 * *',
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
pentaho = BashOperator(
task_id='pentaho',
bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/M_6TH_REPORTS.kjb'"""
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
)
begin >> pentaho
import json, os
import csv
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
# from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
# from airflow.providers.postgres.operators.postgres import PostgresOperator
# from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.models.xcom import XCom
# from airflow.decorators import task
# from airflow.hooks.postgres_hook import PostgresHook
# from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
yesterday_lusa = (datetime.strptime(yesterday_nodash, '%Y%m%d') - timedelta(1)).strftime('%Y%m%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
# POSTGRES_ENV = Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_t24' + Variable.get("ENV_T24")
# DS_FOLDER = 't24'
def _start():
print("Start :: Extractor ")
def check_dmfa(**context):
sql_stmt = f"""select * from ds_conf."_CBR";"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
print('#####################')
print(files)
print('#####################')
if files == []:
print('BELOM')
return False
else:
print('OK')
return True
def check_run(**context):
sql_stmt = f"""SELECT *FROM ds_conf."Datasource_history" WHERE date_of_data = '{yesterday_nodash}' and source = 'RR_OBOX';"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
print('#####################')
print(files)
print('#####################')
if files == []:
print('BELOM')
return True
else:
print('OK')
return False
with DAG("APJ_1_REPORT_WEEKLY_RI",
start_date=datetime(2021, 1, 1),
schedule_interval='*/30 * * * 1',
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_run = ShortCircuitOperator(
task_id='check_run',
provide_context=True,
python_callable=check_run,
do_xcom_push=True
)
check_dmfa = ShortCircuitOperator(
task_id='check_dmfa',
provide_context=True,
python_callable=check_dmfa,
do_xcom_push=True
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('RR_OBOX'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
pentaho = BashOperator(
task_id='pentaho',
bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/W_REPORTS.kjb'"""
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/M_6TH_REPORTS.kjb'"""
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
)
begin >> check_run >> check_dmfa >> history_start >> pentaho
import json, os
import csv
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
cron_tab = None if Variable.get("CRON_JOB") == 'manual' else Variable.get("CRON_JOB")
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_switching' + Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_switching'
DS_FOLDER = 'switching'
DS_DB = 'switching'
DS_SCHEMA = 'switching'
DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('switching', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote '"' """,
"file_id": f"""/{table['file_id']}"""}}
for
table in json.loads(iris[0][0][0])]
def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace('.', '_').replace(' ', '_').replace('-', '_').lower() for w in field_names_list])} text);"""})
return arr
def ds_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""/opt/airflow/dags/DFE/{DS_FOLDER}/{file_id}""")
with DAG("APJ_1_switching",
start_date=datetime(2021, 1, 1),
schedule_interval=cron_tab,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
sftp_xx = BashOperator(
task_id="sftp_xx",
bash_command=f"""sshpass -p {Variable.get("SFTP_SWITCHING_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_SWITCHING_USER")}@{Variable.get("SFTP_SWITCHING_HOST")}:/switching/{yesterday_nodash} {Variable.get("LOCAL_PATH")}switching""",
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
drop_schema = PostgresOperator(
sql=f"""drop schema if exists {POSTGRES_SCHEMA} cascade;""",
task_id="drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_schema = PostgresOperator(
sql=f"""create schema if not exists {POSTGRES_SCHEMA};""",
task_id="create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax = PythonOperator(
task_id='ds_syntax_push',
python_callable=ds_push_syntax
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_push_csv,
dag=dag
).expand_kwargs(
XComArg(ds_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> sftp_xx >> history_start >> drop_schema >> create_schema >> ds_list_extractor >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> ds_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> history_finish
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import json, os
import csv
from datetime import date
import calendar
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models.xcom import XCom
from airflow.decorators import task
from airflow.hooks.postgres_hook import PostgresHook
from airflow import XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d') if Variable.get(
"DATE_OF_DATA") == 'today' else Variable.get("DATE_OF_DATA")
yesterday_strip = datetime.strptime(yesterday_nodash, '%Y%m%d').strftime('%Y-%m-%d')
d = f"""{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
today = d[:]
POSTGRES_CONN_ID = Variable.get("DS_DB")
POSTGRES_ENV = Variable.get("ENV_T24")
POSTGRES_SCHEMA = 'ds_wealth_management' + Variable.get("ENV_T24")
# POSTGRES_SCHEMA = 'ds_switching'
DS_FOLDER = 'wm'
DS_DB = 'wm'
DS_SCHEMA = 'wm'
DS_CREATE_TABLE = ''
def _start():
print("Start :: Extractor ")
def check_done_ext(**kwargs):
from random import randint
# number = randint(0, 10)
print(f'''{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/done.csv''')
if os.path.isfile(f'''{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{yesterday_nodash}/done_ext.csv'''):
print("A")
return False
else:
print("B")
# STOP DAG
return True
def ds_list_extractor():
sql_stmt = f"""select * from ds_conf.ds_extractor_list_extractor('wm', '{yesterday_strip}');"""
pg_hook = PostgresHook(
postgres_conn_id=POSTGRES_CONN_ID,
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
files = cursor.fetchall()
return files
def ds_push_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
if not iris:
raise Exception('No data.')
return [{"op_kwargs": {
"copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote '"'""",
"file_id": f"""/{table['file_id']}"""}}
for
table in json.loads(iris[0][0][0])]
def pg_ddl_syntax(ti):
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
arr = []
for table in json.loads(iris[0][0][0]):
with open(f"""{Variable.get("LOCAL_PATH")}{DS_FOLDER}/{table['file_id']}""") as csvFile:
reader = csv.reader(csvFile, delimiter=f"""{table['delimiter']}""")
field_names_list = reader.__next__()
arr.append({"sql": f"""drop table if exists {POSTGRES_SCHEMA}.{table['table_name']} cascade; create table {POSTGRES_SCHEMA}.{table['table_name']} ({' text, '.join([w.replace('.', '_').replace(' ', '_').replace('-', '_').replace("'", '').replace("(",'').replace(")",'').replace('/','or').lower() for w in field_names_list])} text);"""})
return arr
def ds_push_csv(ti, copy_sql, file_id):
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
pg_hook.copy_expert(copy_sql, filename=f"""/opt/airflow/dags/DFE/{DS_FOLDER}/{file_id}""")
with DAG("APJ_1_wealth_management",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
concurrency=3) as dag:
begin = PythonOperator(
task_id=f"Begin",
python_callable=_start,
op_kwargs={
}
)
check_done_ext = ShortCircuitOperator(
task_id="check_done_ext",
provide_context=True,
python_callable=check_done_ext,
op_kwargs={},
)
history_start = PostgresOperator(
sql=f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
task_id="history_start",
postgres_conn_id=POSTGRES_CONN_ID,
)
drop_schema = PostgresOperator(
sql=f"""drop schema if exists {POSTGRES_SCHEMA} cascade;""",
task_id="drop_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
create_schema = PostgresOperator(
sql=f"""create schema if not exists {POSTGRES_SCHEMA};""",
task_id="create_schema",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_list_extractor = PythonOperator(
task_id='ds_list_extractor',
python_callable=ds_list_extractor,
do_xcom_push=True
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
pg_ddl_syntax = PythonOperator(
task_id='pg_ddl_syntax',
python_callable=pg_ddl_syntax
)
pg_create_table = PostgresOperator.partial(
task_id="pg_create_table",
postgres_conn_id=POSTGRES_CONN_ID,
).expand_kwargs(
XComArg(pg_ddl_syntax),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax = PythonOperator(
task_id='ds_syntax_push',
python_callable=ds_push_syntax
)
ds_csv_to_table = PythonOperator.partial(
task_id="ds_csv_to_table",
python_callable=ds_push_csv,
dag=dag
).expand_kwargs(
XComArg(ds_push_syntax),
)
set_access_schemma = PostgresOperator(
sql=f"""GRANT USAGE ON SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_schemma",
postgres_conn_id=POSTGRES_CONN_ID,
)
set_access_all_table = PostgresOperator(
sql=f"""GRANT SELECT ON ALL TABLES IN SCHEMA {POSTGRES_SCHEMA} TO readaccess;""",
task_id="set_access_all_table",
postgres_conn_id=POSTGRES_CONN_ID,
)
ds_ext_done = BashOperator(
task_id="ds_ext_done",
bash_command=f"""
touch /opt/airflow/dags/DFE/{DS_FOLDER}/{yesterday_nodash}/done_ext.csv;
""",
)
history_finish = PostgresOperator(
sql=f"""
UPDATE ds_conf."Datasource_history"
SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
""",
task_id="history_finish",
postgres_conn_id=POSTGRES_CONN_ID,
)
begin >> check_done_ext >> history_start >> drop_schema >> create_schema >> ds_list_extractor >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> ds_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> history_finish
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment