Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
Airflow
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Metrics
Incidents
Environments
Packages & Registries
Packages & Registries
Package Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Margrenzo Gunawan
Airflow
Commits
b549740e
Commit
b549740e
authored
Dec 30, 2023
by
Margrenzo Gunawan
💬
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
FDSSUPPORT
parent
560cd95f
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
646 additions
and
0 deletions
+646
-0
1_t24_fdssupport.py
1_t24_fdssupport.py
+345
-0
1_t24_interface_fdssupport.py
1_t24_interface_fdssupport.py
+301
-0
No files found.
1_t24_fdssupport.py
0 → 100644
View file @
b549740e
import
json
,
os
from
datetime
import
date
import
calendar
from
airflow
import
DAG
from
airflow.operators.python
import
PythonOperator
,
BranchPythonOperator
,
ShortCircuitOperator
from
airflow.operators.bash
import
BashOperator
from
datetime
import
datetime
,
timedelta
from
airflow.providers.sftp.operators.sftp
import
SFTPOperator
from
airflow.models
import
Variable
from
airflow.providers.postgres.operators.postgres
import
PostgresOperator
from
airflow.operators.trigger_dagrun
import
TriggerDagRunOperator
from
airflow.utils.trigger_rule
import
TriggerRule
from
airflow.models.xcom
import
XCom
from
airflow.decorators
import
task
from
airflow.providers.postgres.hooks.postgres
import
PostgresHook
from
airflow
import
XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash
=
(
datetime
.
now
()
-
timedelta
(
1
)).
strftime
(
'%Y%m%d'
)
if
Variable
.
get
(
"DATE_OF_DATA"
)
==
'today'
else
Variable
.
get
(
"DATE_OF_DATA"
)
yesterday_strip
=
datetime
.
strptime
(
yesterday_nodash
,
'%Y%m%d'
).
strftime
(
'%Y-%m-%d'
)
yesterday_lusa
=
(
datetime
.
strptime
(
yesterday_nodash
,
'%Y%m%d'
)
-
timedelta
(
1
)).
strftime
(
'%Y%m%d'
)
d
=
f"""
{
datetime
.
now
().
strftime
(
'%Y-%m-%d %H:%M:%S'
)
}
"""
today
=
d
[:]
POSTGRES_CONN_ID
=
Variable
.
get
(
"DS_DB"
)
POSTGRES_ENV
=
Variable
.
get
(
"ENV_T24"
)
POSTGRES_SCHEMA
=
'ds_t24'
+
Variable
.
get
(
"ENV_T24"
)
DS_FOLDER
=
't24'
def
_start
():
print
(
f"""Start :: Extractor ::
{
yesterday_nodash
}
"""
)
print
(
f"""Start :: Extractor ::
{
yesterday_strip
}
"""
)
def
check_file_done
(
**
kwargs
):
from
random
import
randint
# number = randint(0, 10)
print
(
f'''
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/DONE.2.csv'''
)
if
os
.
path
.
isfile
(
f'''
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/DONE.2.csv'''
):
print
(
"FILE DONE.2.csv sudah ada"
)
return
True
else
:
print
(
"FILE DONE.2.csv tidak ada"
)
# STOP DAG
return
False
def
check_done_ext
(
**
kwargs
):
from
random
import
randint
# number = randint(0, 10)
if
os
.
path
.
isfile
(
f'''
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/done_ext.csv'''
):
print
(
"FILE done_ext sudah ada"
)
return
False
else
:
print
(
"FILE done_ext tidak ada"
)
# STOP DAG
return
True
# ##############
# METADATA
# ##############
def
metadata_list_extractor
():
sql_stmt
=
f"""select * from ds_conf.ds_t24_meta_file_list();"""
pg_hook
=
PostgresHook
(
postgres_conn_id
=
POSTGRES_CONN_ID
)
pg_conn
=
pg_hook
.
get_conn
()
cursor
=
pg_conn
.
cursor
()
cursor
.
execute
(
sql_stmt
)
files
=
cursor
.
fetchall
()
return
files
def
metadata_syntax_copy
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'metadata_list_extractor'
])
if
not
iris
:
raise
Exception
(
'No data.'
)
return
[{
"op_kwargs"
:
{
"copy_sql"
:
f"""COPY ds_conf.
\"
Datasource_
{
table
[
'file_id'
].
replace
(
"."
,
""
).
replace
(
"ST"
,
""
).
lower
()
}
temp
\"
FROM STDOUT delimiter '
{
table
[
'field_delimiter'
]
}
' CSV QUOTE E'
\b
'"""
,
"file_id"
:
table
[
'file_id'
]}}
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
])]
def
meta_extract
(
ti
,
copy_sql
,
file_id
):
pg_hook
=
PostgresHook
.
get_hook
(
POSTGRES_CONN_ID
)
pg_hook
.
copy_expert
(
copy_sql
,
filename
=
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24_bnk/"""
+
(
f"""
{
yesterday_nodash
}
/
{
yesterday_nodash
}
."""
if
Variable
.
get
(
"WITH_DATE"
)
==
'Y'
else
''
)
+
f"""
{
file_id
}
.csv"""
)
# ##############
# DATASOURCE
# ##############
def
ds_list_extractor
(
**
context
):
sql_stmt
=
f"""select * from ds_conf.ds_t24_list_extractor();"""
pg_hook
=
PostgresHook
(
postgres_conn_id
=
POSTGRES_CONN_ID
)
pg_conn
=
pg_hook
.
get_conn
()
cursor
=
pg_conn
.
cursor
()
cursor
.
execute
(
sql_stmt
)
files
=
cursor
.
fetchall
()
return
files
def
ds_syntax_copy
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'ds_list_extractor'
])
if
not
iris
:
raise
Exception
(
'No data.'
)
return
[{
"op_kwargs"
:
{
"copy_sql"
:
f"""COPY
{
POSTGRES_SCHEMA
}
.
{
table
[
'file_id'
].
replace
(
"."
,
"_"
)
}
FROM STDOUT delimiter '
{
table
[
'field_delimiter'
]
}
' CSV QUOTE E'
\b
'"""
,
"file_id"
:
table
[
'file_id'
]}}
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
])]
def
ds_csv_to_table
(
ti
,
copy_sql
,
file_id
,
**
context
):
print
(
file_id
)
pg_hook
=
PostgresHook
.
get_hook
(
POSTGRES_CONN_ID
)
pg_hook
.
copy_expert
(
copy_sql
,
filename
=
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24_bnk/"""
+
(
f"""
{
yesterday_nodash
}
/
{
yesterday_nodash
}
."""
if
Variable
.
get
(
"WITH_DATE"
)
==
'Y'
else
''
)
+
f"""
{
file_id
}
.csv"""
)
with
DAG
(
"APJ_FDS_T24"
,
start_date
=
datetime
(
2021
,
1
,
1
),
# schedule='0/30 * * * *',
schedule_interval
=
None
,
catchup
=
False
,
concurrency
=
10
)
as
dag
:
# ###########################################
# START
# ###########################################
begin
=
PythonOperator
(
task_id
=
f"Begin"
,
python_callable
=
_start
,
op_kwargs
=
{
}
)
check_done_ext
=
ShortCircuitOperator
(
task_id
=
"check_done_ext"
,
provide_context
=
True
,
python_callable
=
check_done_ext
,
op_kwargs
=
{},
)
# sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_FDS_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_FDS_USER")}@{Variable.get("SFTP_T24_HOST")}:DFE/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""",
# )
# ds_get_t24_extractor = SFTPOperator(
# task_id="ds_get_t24_extractor",
# ssh_conn_id="ssh_default",
# local_filepath=f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# remote_filepath=f"""DFE/{yesterday_nodash}/""",
# operation="get",
# create_intermediate_dirs=True,
# dag=dag
# )
check_file_done
=
ShortCircuitOperator
(
task_id
=
"check_file_done"
,
provide_context
=
True
,
python_callable
=
check_file_done
,
op_kwargs
=
{},
)
# ###########################################
# METADATA
# ###########################################
# history_start = PostgresOperator(
# sql=f"""INSERT INTO ds_conf."Datasource_history"
# (source, start_time, finish_time, status, env, date_of_data) VALUES
# ('{POSTGRES_SCHEMA}'::varchar(255), '{today}'::timestamp with time zone, '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '{yesterday_nodash}');""",
# task_id="history_start",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# Copt Source dari file EXTRACTOR
file_sc_cp
=
BashOperator
(
task_id
=
"file_sc_cp"
,
bash_command
=
f"""
cp -r /opt/airflow/dags/DFE/t24/
{
yesterday_nodash
}
/opt/airflow/dags/DFE/t24_bnk/
{
yesterday_nodash
}
;
"""
,
)
# Menghapus hapus temporary METADATA
metadata_truncate
=
PostgresOperator
(
sql
=
f"""select ds_conf.ds_t24_truncate_metadata();"""
,
task_id
=
"metadata_truncate"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# Menghapus ST.META.DATA dari file EXTRACTOR
metadata_sed
=
BashOperator
(
task_id
=
"metadata_sed"
,
bash_command
=
f"""sed -i '/ST.META.DATA/d' /opt/airflow/dags/DFE/t24_bnk/
{
yesterday_nodash
}
/
{
yesterday_nodash
}
.ST.META.DATA.csv"""
,
)
# list file METADATA
metadata_list_extractor
=
PythonOperator
(
task_id
=
'metadata_list_extractor'
,
python_callable
=
metadata_list_extractor
,
do_xcom_push
=
True
)
# syntax copy METADATA
metadata_syntax_copy
=
PythonOperator
(
task_id
=
'metadata_syntax_copy'
,
python_callable
=
metadata_syntax_copy
)
# file metadata to DWHHub
metadata_import
=
PythonOperator
.
partial
(
task_id
=
"metadata_import"
,
python_callable
=
meta_extract
,
dag
=
dag
).
expand_kwargs
(
XComArg
(
metadata_syntax_copy
),
)
# file refresh metadata
metadata_refresh
=
PostgresOperator
(
sql
=
f"""select ds_conf.ds_t24_create_metadata();"""
,
task_id
=
"metadata_refresh"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# ###########################################
# DATA SOURCE
# ###########################################
ds_create_table
=
PostgresOperator
(
sql
=
f"""select ds_conf.ds_t24_create_table('
{
POSTGRES_ENV
}
');"""
,
task_id
=
"ds_create_table"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
ds_list_extractor
=
PythonOperator
(
task_id
=
'ds_list_extractor'
,
python_callable
=
ds_list_extractor
,
do_xcom_push
=
True
)
ds_syntax_copy
=
PythonOperator
(
task_id
=
'ds_syntax_copy'
,
python_callable
=
ds_syntax_copy
)
ds_csv_to_table
=
PythonOperator
.
partial
(
task_id
=
"ds_csv_to_table"
,
python_callable
=
ds_csv_to_table
,
dag
=
dag
).
expand_kwargs
(
XComArg
(
ds_syntax_copy
),
)
# ds_create_table_history = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_create_table_history();""",
# task_id="ds_create_table_history",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
ds_create_metadata_file
=
PostgresOperator
(
sql
=
f"""create table
{
POSTGRES_SCHEMA
}
.a_metadata as (select * from ds_conf.fx_metadata_file());"""
,
task_id
=
"ds_create_metadata_file"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
ds_create_metadata_file_sys
=
PostgresOperator
(
sql
=
f"""create table
{
POSTGRES_SCHEMA
}
.a_metadata_sys as (select * from ds_conf.fx_metadata_file_system());"""
,
task_id
=
"ds_create_metadata_file_sys"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_schemma
=
PostgresOperator
(
sql
=
f"""GRANT USAGE ON SCHEMA
{
POSTGRES_SCHEMA
}
TO readaccess;"""
,
task_id
=
"set_access_schemma"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_all_table
=
PostgresOperator
(
sql
=
f"""GRANT SELECT ON ALL TABLES IN SCHEMA
{
POSTGRES_SCHEMA
}
TO readaccess;"""
,
task_id
=
"set_access_all_table"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
ds_nominatif
=
TriggerDagRunOperator
(
task_id
=
f'ds_nominatif'
,
trigger_dag_id
=
"APJ_FDS_T24_Interface"
,
)
ds_remove_bnk
=
BashOperator
(
task_id
=
"ds_remove_bnk"
,
bash_command
=
f"""
rm -r /opt/airflow/dags/DFE/t24_bnk/
{
yesterday_nodash
}
;
"""
,
)
ds_ext_done
=
BashOperator
(
task_id
=
"ds_ext_done"
,
bash_command
=
f"""
touch /opt/airflow/dags/DFE/t24/
{
yesterday_nodash
}
/done_ext.csv;
"""
,
)
# history_finish = PostgresOperator(
# sql=f"""
# UPDATE ds_conf."Datasource_history"
# SET status = 'WAITING.NOMINATIF', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
# WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
# """,
# task_id="history_finish",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
begin
>>
check_done_ext
>>
check_file_done
>>
file_sc_cp
>>
metadata_truncate
>>
metadata_sed
>>
metadata_list_extractor
>>
metadata_syntax_copy
>>
metadata_import
>>
metadata_refresh
>>
ds_create_table
>>
ds_list_extractor
>>
ds_syntax_copy
>>
ds_csv_to_table
>>
ds_create_metadata_file
>>
ds_create_metadata_file_sys
>>
set_access_schemma
>>
set_access_all_table
>>
ds_nominatif
>>
ds_remove_bnk
>>
ds_ext_done
1_t24_interface_fdssupport.py
0 → 100644
View file @
b549740e
import
json
,
os
import
csv
from
datetime
import
date
import
calendar
from
airflow
import
DAG
from
airflow.operators.python
import
PythonOperator
,
BranchPythonOperator
,
ShortCircuitOperator
from
airflow.operators.bash
import
BashOperator
from
datetime
import
datetime
,
timedelta
from
airflow.providers.sftp.operators.sftp
import
SFTPOperator
from
airflow.models
import
Variable
from
airflow.providers.postgres.operators.postgres
import
PostgresOperator
from
airflow.operators.trigger_dagrun
import
TriggerDagRunOperator
from
airflow.utils.trigger_rule
import
TriggerRule
from
airflow.models.xcom
import
XCom
from
airflow.decorators
import
task
from
airflow.hooks.postgres_hook
import
PostgresHook
from
airflow
import
XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash
=
(
datetime
.
now
()
-
timedelta
(
1
)).
strftime
(
'%Y%m%d'
)
if
Variable
.
get
(
"DATE_OF_DATA"
)
==
'today'
else
Variable
.
get
(
"DATE_OF_DATA"
)
yesterday_strip
=
datetime
.
strptime
(
yesterday_nodash
,
'%Y%m%d'
).
strftime
(
'%Y-%m-%d'
)
yesterday_lusa
=
(
datetime
.
strptime
(
yesterday_nodash
,
'%Y%m%d'
)
-
timedelta
(
1
)).
strftime
(
'%Y%m%d'
)
POSTGRES_CONN_ID
=
Variable
.
get
(
"DS_DB"
)
POSTGRES_ENV
=
Variable
.
get
(
"ENV_T24"
)
POSTGRES_SCHEMA
=
'ds_t24'
+
Variable
.
get
(
"ENV_T24"
)
DS_CONN_ID
=
'ds_t24'
DS_FOLDER
=
't24_interface'
DS_DB
=
't24_interface'
DS_SCHEMA
=
't24_interface'
DS_CREATE_TABLE
=
''
def
_start
():
print
(
"Start :: Extractor "
)
def
ds_list_extractor
():
sql_stmt
=
f"""select * from ds_conf.ds_extractor_list_extractor('t24_interface', '
{
yesterday_strip
}
');"""
pg_hook
=
PostgresHook
(
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
pg_conn
=
pg_hook
.
get_conn
()
cursor
=
pg_conn
.
cursor
()
cursor
.
execute
(
sql_stmt
)
files
=
cursor
.
fetchall
()
return
files
def
ds_push_syntax
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'ds_list_extractor'
])
if
not
iris
:
raise
Exception
(
'No data.'
)
return
[{
"op_kwargs"
:
{
# "copy_sql": f"""COPY {POSTGRES_SCHEMA}.{table['table_name']} from STDOUT delimiter '{table['delimiter']}' CSV HEADER quote E'\b'""",
"copy_sql"
:
f"""COPY
{
POSTGRES_SCHEMA
}
.
{
table
[
'table_name'
]
}
from STDOUT delimiter '
{
table
[
'delimiter'
]
}
' CSV HEADER quote '"'"""
,
"file_id"
:
f"""
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
"""
}}
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
])]
def
pg_ddl_syntax
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'ds_list_extractor'
])
arr
=
[]
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
]):
with
open
(
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
"""
)
as
csvFile
:
reader
=
csv
.
reader
(
csvFile
,
delimiter
=
f"""
{
table
[
'delimiter'
]
}
"""
)
field_names_list
=
reader
.
__next__
()
arr
.
append
({
"sql"
:
f"""drop table if exists
{
POSTGRES_SCHEMA
}
.
{
table
[
'table_name'
]
}
cascade; create table
{
POSTGRES_SCHEMA
}
.
{
table
[
'table_name'
]
}
(
{
' text, '
.
join
([
w
.
replace
(
' '
,
'_'
).
replace
(
'.'
,
'_'
).
replace
(
'/'
,
'_'
).
replace
(
'('
,
'_'
).
replace
(
')'
,
'_'
).
replace
(
'+'
,
'_'
).
replace
(
'___'
,
'_'
).
lower
().
replace
((
table
[
'delimiter'
]
+
'limit'
+
table
[
'delimiter'
]),
(
table
[
'delimiter'
]
+
'limit_reff'
+
table
[
'delimiter'
]))
for
w
in
field_names_list
])
}
text);"""
})
return
arr
def
csv_clean_syntax
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'ds_list_extractor'
])
arr
=
[]
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
]):
with
open
(
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
"""
)
as
csvFile
:
reader
=
csv
.
reader
(
csvFile
,
delimiter
=
f"""
{
table
[
'delimiter'
]
}
"""
)
field_names_list
=
reader
.
__next__
()
arr
.
append
({
"bash_command"
:
f"""echo 'OK' """
if
table
[
'sed_command'
]
==
'nil'
else
(
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
>
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
_bk && mv
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
_bk
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
&& """
.
join
(
table
[
'sed_command'
].
split
(
'|;|;|'
))
+
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
>
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
_bk && mv
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
_bk
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
"""
)})
return
arr
def
ds_push_csv
(
ti
,
copy_sql
,
file_id
):
pg_hook
=
PostgresHook
.
get_hook
(
POSTGRES_CONN_ID
)
pg_hook
.
copy_expert
(
copy_sql
,
filename
=
f"""/opt/airflow/dags/DFE/t24/
{
file_id
}
"""
)
def
sql_clean_syntax
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'ds_list_extractor'
])
arr
=
[]
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
]):
with
open
(
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/
{
table
[
'file_id'
]
}
"""
)
as
csvFile
:
reader
=
csv
.
reader
(
csvFile
,
delimiter
=
f"""
{
table
[
'delimiter'
]
}
"""
)
field_names_list
=
reader
.
__next__
()
arr
.
append
({
"sql"
:
f"""select 'OK' """
if
table
[
'sql_command'
]
==
'nil'
else
f"""
{
";"
.
join
(
table
[
'sql_command'
].
split
(
'|;|;|'
)).
replace
(
'T24_SOURCE'
,
POSTGRES_SCHEMA
)
}
"""
})
return
arr
def
stop_task
(
**
kwargs
):
return
True
with
DAG
(
"APJ_FDS_T24_Interface"
,
start_date
=
datetime
(
2021
,
1
,
1
),
schedule_interval
=
None
,
catchup
=
False
,
concurrency
=
3
)
as
dag
:
begin
=
PythonOperator
(
task_id
=
f"Begin"
,
python_callable
=
_start
,
op_kwargs
=
{
}
)
# sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NOM/TREASURY/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# )
# sftp_ppap = BashOperator(
# task_id="sftp_ppap",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/PPAP.NOMINATIF/*{yesterday_nodash}*COB.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# )
# sftp_neraca = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
# )
# sftp_neraca_tel = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:/REPORT.BP/NERACA/*{yesterday_nodash}* {Variable.get("LOCAL_PATH")}t24_neraca/{yesterday_nodash}/""",
# )
ds_list_extractor
=
PythonOperator
(
task_id
=
'ds_list_extractor'
,
python_callable
=
ds_list_extractor
,
do_xcom_push
=
True
)
# ft_reve = BashOperator(
# task_id="ft_reve",
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
# )
csv_clean_syntax
=
PythonOperator
(
task_id
=
'csv_clean_syntax'
,
python_callable
=
csv_clean_syntax
)
clean_csv
=
BashOperator
.
partial
(
task_id
=
"clean_csv"
,
# bash_command=f"""head -n -2 {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv > {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv && mv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.2.csv {Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/FT.REVERSE.csv""",
).
expand_kwargs
(
XComArg
(
csv_clean_syntax
),
)
# pg_drop_schema = PostgresOperator(
# sql= f"""DROP TABLE IF EXISTS {POSTGRES_SCHEMA} CASCADE;""",
# task_id="pg_drop_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
#
# pg_create_schema = PostgresOperator(
# sql= f"""create schema IF NOT EXISTS {POSTGRES_SCHEMA};""",
# task_id="pg_create_schema",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
stop_task
=
ShortCircuitOperator
(
task_id
=
"stop_task"
,
provide_context
=
True
,
python_callable
=
stop_task
,
op_kwargs
=
{},
)
pg_ddl_syntax
=
PythonOperator
(
task_id
=
'pg_ddl_syntax'
,
python_callable
=
pg_ddl_syntax
)
pg_create_table
=
PostgresOperator
.
partial
(
task_id
=
"pg_create_table"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
).
expand_kwargs
(
XComArg
(
pg_ddl_syntax
),
)
# ds_truncate_syntax = PythonOperator(
# task_id='ds_truncate_syntax',
# python_callable=ds_truncate_syntax
# )
#
# ds_truncate = PostgresOperator.partial(
# task_id="ds_truncate",
# postgres_conn_id=POSTGRES_CONN_ID,
# ).expand_kwargs(
# XComArg(ds_truncate_syntax)
# )
#
ds_push_syntax
=
PythonOperator
(
task_id
=
'ds_syntax_push'
,
python_callable
=
ds_push_syntax
)
ds_csv_to_table
=
PythonOperator
.
partial
(
task_id
=
"ds_csv_to_table"
,
python_callable
=
ds_push_csv
,
dag
=
dag
).
expand_kwargs
(
XComArg
(
ds_push_syntax
),
)
sql_clean_syntax
=
PythonOperator
(
task_id
=
'sql_clean_syntax'
,
python_callable
=
sql_clean_syntax
)
ds_clean_data
=
PostgresOperator
.
partial
(
# sql=f"""select ds_conf.ds_t24_create_table_history_wo_t24_dfe('{POSTGRES_SCHEMA}');""",
task_id
=
"ds_clean_data"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
).
expand_kwargs
(
XComArg
(
sql_clean_syntax
),
)
# ds_create_table_history_nominatif = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_create_table_history_wo_t24_dfe('{POSTGRES_SCHEMA}');""",
# task_id="ds_create_table_history_nominatif",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
# ds_to_history = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_copy_to_history('{yesterday_strip}', '{POSTGRES_SCHEMA}');""",
# task_id="ds_to_history",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
set_access_schemma
=
PostgresOperator
(
sql
=
f"""GRANT USAGE ON SCHEMA
{
POSTGRES_SCHEMA
}
TO readaccess;"""
,
task_id
=
"set_access_schemma"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_all_table
=
PostgresOperator
(
sql
=
f"""GRANT SELECT ON ALL TABLES IN SCHEMA
{
POSTGRES_SCHEMA
}
TO readaccess;"""
,
task_id
=
"set_access_all_table"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_schemma_his
=
PostgresOperator
(
sql
=
f"""GRANT USAGE ON SCHEMA
{
POSTGRES_SCHEMA
}
_his TO readaccess;"""
,
task_id
=
"set_access_schemma_his"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_all_table_his
=
PostgresOperator
(
sql
=
f"""GRANT SELECT ON ALL TABLES IN SCHEMA
{
POSTGRES_SCHEMA
}
_his TO readaccess;"""
,
task_id
=
"set_access_all_table_his"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# pentaho = BashOperator(
# task_id='pentaho',
# bash_command=f"""curl '{Variable.get("PENTAHO_HOST_PASSWORD")}/kettle/executeJob/?job=/home/oper/files/scripts/etl/reports/D_REPORTS.kjb'"""
# # bash_command=f"""curl '{Variable.get("PENTAHO_HOST")}/kettle/executeJob/?rep=test-repo&job=/home/oper/files/f/Untitled'"""
# )
zip_today
=
BashOperator
(
task_id
=
"zip_today"
,
bash_command
=
f"""
zip -r /opt/airflow/dags/DFE/t24/
{
yesterday_nodash
}
.zip /opt/airflow/dags/DFE/t24/
{
yesterday_nodash
}
;
"""
,
)
# delete_before = BashOperator(
# task_id="delete_before",
# bash_command=f"""
# rm -r /opt/airflow/dags/DFE/t24/{yesterday_lusa};
# """,
# )
# history_finish = PostgresOperator(
# sql=f"""
# UPDATE ds_conf."Datasource_history"
# SET status = 'DONE', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
# WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS' and date_of_data = '{yesterday_nodash}';
# """,
# task_id="history_finish",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
begin
>>
ds_list_extractor
>>
csv_clean_syntax
>>
clean_csv
>>
pg_ddl_syntax
>>
pg_create_table
>>
ds_push_syntax
>>
sql_clean_syntax
>>
ds_csv_to_table
>>
ds_clean_data
>>
stop_task
>>
set_access_schemma
>>
set_access_all_table
>>
set_access_schemma_his
>>
set_access_all_table_his
>>
zip_today
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment