Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
Airflow
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Metrics
Incidents
Environments
Packages & Registries
Packages & Registries
Package Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Margrenzo Gunawan
Airflow
Commits
560cd95f
Commit
560cd95f
authored
Dec 28, 2023
by
Margrenzo Gunawan
💬
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Create 1_t24_eo.py
parent
54f577d6
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
345 additions
and
0 deletions
+345
-0
1_t24_eo.py
1_t24_eo.py
+345
-0
No files found.
1_t24_eo.py
0 → 100644
View file @
560cd95f
import
json
,
os
from
datetime
import
date
import
calendar
from
airflow
import
DAG
from
airflow.operators.python
import
PythonOperator
,
BranchPythonOperator
,
ShortCircuitOperator
from
airflow.operators.bash
import
BashOperator
from
datetime
import
datetime
,
timedelta
from
airflow.providers.sftp.operators.sftp
import
SFTPOperator
from
airflow.models
import
Variable
from
airflow.providers.postgres.operators.postgres
import
PostgresOperator
from
airflow.operators.trigger_dagrun
import
TriggerDagRunOperator
from
airflow.utils.trigger_rule
import
TriggerRule
from
airflow.models.xcom
import
XCom
from
airflow.decorators
import
task
from
airflow.providers.postgres.hooks.postgres
import
PostgresHook
from
airflow
import
XComArg
# yesterday_nodash = (datetime.now() - timedelta(1)).strftime('%Y%m%d')
yesterday_nodash
=
(
datetime
.
now
()
-
timedelta
(
1
)).
strftime
(
'%Y%m%d'
)
if
Variable
.
get
(
"DATE_OF_DATA"
)
==
'today'
else
Variable
.
get
(
"DATE_OF_DATA"
)
yesterday_strip
=
datetime
.
strptime
(
yesterday_nodash
,
'%Y%m%d'
).
strftime
(
'%Y-%m-%d'
)
yesterday_lusa
=
(
datetime
.
strptime
(
yesterday_nodash
,
'%Y%m%d'
)
-
timedelta
(
1
)).
strftime
(
'%Y%m%d'
)
d
=
f"""
{
datetime
.
now
().
strftime
(
'%Y-%m-%d %H:%M:%S'
)
}
"""
today
=
d
[:]
POSTGRES_CONN_ID
=
Variable
.
get
(
"DS_DB"
)
POSTGRES_ENV
=
Variable
.
get
(
"ENV_T24"
)
POSTGRES_SCHEMA
=
'ds_t24'
+
Variable
.
get
(
"ENV_T24"
)
DS_FOLDER
=
't24'
def
_start
():
print
(
f"""Start :: Extractor ::
{
yesterday_nodash
}
"""
)
print
(
f"""Start :: Extractor ::
{
yesterday_strip
}
"""
)
def
check_file_done
(
**
kwargs
):
from
random
import
randint
# number = randint(0, 10)
print
(
f'''
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/DONE.2.csv'''
)
if
os
.
path
.
isfile
(
f'''
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/DONE.2.csv'''
):
print
(
"FILE DONE.2.csv sudah ada"
)
return
True
else
:
print
(
"FILE DONE.2.csv tidak ada"
)
# STOP DAG
return
False
def
check_done_ext
(
**
kwargs
):
from
random
import
randint
# number = randint(0, 10)
if
os
.
path
.
isfile
(
f'''
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24/
{
yesterday_nodash
}
/done_ext.csv'''
):
print
(
"FILE done_ext sudah ada"
)
return
False
else
:
print
(
"FILE done_ext tidak ada"
)
# STOP DAG
return
True
# ##############
# METADATA
# ##############
def
metadata_list_extractor
():
sql_stmt
=
f"""select * from ds_conf.ds_t24_meta_file_list();"""
pg_hook
=
PostgresHook
(
postgres_conn_id
=
POSTGRES_CONN_ID
)
pg_conn
=
pg_hook
.
get_conn
()
cursor
=
pg_conn
.
cursor
()
cursor
.
execute
(
sql_stmt
)
files
=
cursor
.
fetchall
()
return
files
def
metadata_syntax_copy
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'metadata_list_extractor'
])
if
not
iris
:
raise
Exception
(
'No data.'
)
return
[{
"op_kwargs"
:
{
"copy_sql"
:
f"""COPY ds_conf.
\"
Datasource_
{
table
[
'file_id'
].
replace
(
"."
,
""
).
replace
(
"ST"
,
""
).
lower
()
}
temp
\"
FROM STDOUT delimiter '
{
table
[
'field_delimiter'
]
}
' CSV QUOTE E'
\b
'"""
,
"file_id"
:
table
[
'file_id'
]}}
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
])]
def
meta_extract
(
ti
,
copy_sql
,
file_id
):
pg_hook
=
PostgresHook
.
get_hook
(
POSTGRES_CONN_ID
)
pg_hook
.
copy_expert
(
copy_sql
,
filename
=
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24_bnk/"""
+
(
f"""
{
yesterday_nodash
}
/
{
yesterday_nodash
}
."""
if
Variable
.
get
(
"WITH_DATE"
)
==
'Y'
else
''
)
+
f"""
{
file_id
}
.csv"""
)
# ##############
# DATASOURCE
# ##############
def
ds_list_extractor
(
**
context
):
sql_stmt
=
f"""select * from ds_conf.ds_t24_list_extractor();"""
pg_hook
=
PostgresHook
(
postgres_conn_id
=
POSTGRES_CONN_ID
)
pg_conn
=
pg_hook
.
get_conn
()
cursor
=
pg_conn
.
cursor
()
cursor
.
execute
(
sql_stmt
)
files
=
cursor
.
fetchall
()
return
files
def
ds_syntax_copy
(
ti
):
iris
=
ti
.
xcom_pull
(
task_ids
=
[
'ds_list_extractor'
])
if
not
iris
:
raise
Exception
(
'No data.'
)
return
[{
"op_kwargs"
:
{
"copy_sql"
:
f"""COPY
{
POSTGRES_SCHEMA
}
.
{
table
[
'file_id'
].
replace
(
"."
,
"_"
)
}
FROM STDOUT delimiter '
{
table
[
'field_delimiter'
]
}
' CSV QUOTE E'
\b
'"""
,
"file_id"
:
table
[
'file_id'
]}}
for
table
in
json
.
loads
(
iris
[
0
][
0
][
0
])]
def
ds_csv_to_table
(
ti
,
copy_sql
,
file_id
,
**
context
):
print
(
file_id
)
pg_hook
=
PostgresHook
.
get_hook
(
POSTGRES_CONN_ID
)
pg_hook
.
copy_expert
(
copy_sql
,
filename
=
f"""
{
Variable
.
get
(
"LOCAL_PATH"
)
}
t24_bnk/"""
+
(
f"""
{
yesterday_nodash
}
/
{
yesterday_nodash
}
."""
if
Variable
.
get
(
"WITH_DATE"
)
==
'Y'
else
''
)
+
f"""
{
file_id
}
.csv"""
)
with
DAG
(
"APJ_1_T24_EO"
,
start_date
=
datetime
(
2021
,
1
,
1
),
# schedule='0/30 * * * *',
schedule_interval
=
None
,
catchup
=
False
,
concurrency
=
10
)
as
dag
:
# ###########################################
# START
# ###########################################
begin
=
PythonOperator
(
task_id
=
f"Begin"
,
python_callable
=
_start
,
op_kwargs
=
{
}
)
check_done_ext
=
ShortCircuitOperator
(
task_id
=
"check_done_ext"
,
provide_context
=
True
,
python_callable
=
check_done_ext
,
op_kwargs
=
{},
)
# sftp_xx = BashOperator(
# task_id="sftp_xx",
# bash_command=f"""sshpass -p {Variable.get("SFTP_T24_PASSWORD")} sftp -o StrictHostKeyChecking=no -r {Variable.get("SFTP_T24_USER")}@{Variable.get("SFTP_T24_HOST")}:DFE/{yesterday_nodash}/ {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""",
# )
# ds_get_t24_extractor = SFTPOperator(
# task_id="ds_get_t24_extractor",
# ssh_conn_id="ssh_default",
# local_filepath=f"""{Variable.get("LOCAL_PATH")}t24/{yesterday_nodash}/""",
# remote_filepath=f"""DFE/{yesterday_nodash}/""",
# operation="get",
# create_intermediate_dirs=True,
# dag=dag
# )
check_file_done
=
ShortCircuitOperator
(
task_id
=
"check_file_done"
,
provide_context
=
True
,
python_callable
=
check_file_done
,
op_kwargs
=
{},
)
# ###########################################
# METADATA
# ###########################################
history_start
=
PostgresOperator
(
sql
=
f"""INSERT INTO ds_conf."Datasource_history"
(source, start_time, finish_time, status, env, date_of_data) VALUES
('
{
POSTGRES_SCHEMA
}
'::varchar(255), '
{
today
}
'::timestamp with time zone, '
{
datetime
.
now
().
strftime
(
'%Y-%m-%d %H:%M:%S'
)
}
'::timestamp with time zone, 'ONPROCESS'::varchar(100), '', '
{
yesterday_nodash
}
');"""
,
task_id
=
"history_start"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# Copt Source dari file EXTRACTOR
file_sc_cp
=
BashOperator
(
task_id
=
"file_sc_cp"
,
bash_command
=
f"""
cp -r /opt/airflow/dags/DFE/t24/
{
yesterday_nodash
}
/opt/airflow/dags/DFE/t24_bnk/
{
yesterday_nodash
}
;
"""
,
)
# Menghapus hapus temporary METADATA
metadata_truncate
=
PostgresOperator
(
sql
=
f"""select ds_conf.ds_t24_truncate_metadata();"""
,
task_id
=
"metadata_truncate"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# Menghapus ST.META.DATA dari file EXTRACTOR
metadata_sed
=
BashOperator
(
task_id
=
"metadata_sed"
,
bash_command
=
f"""sed -i '/ST.META.DATA/d' /opt/airflow/dags/DFE/t24_bnk/
{
yesterday_nodash
}
/
{
yesterday_nodash
}
.ST.META.DATA.csv"""
,
)
# list file METADATA
metadata_list_extractor
=
PythonOperator
(
task_id
=
'metadata_list_extractor'
,
python_callable
=
metadata_list_extractor
,
do_xcom_push
=
True
)
# syntax copy METADATA
metadata_syntax_copy
=
PythonOperator
(
task_id
=
'metadata_syntax_copy'
,
python_callable
=
metadata_syntax_copy
)
# file metadata to DWHHub
metadata_import
=
PythonOperator
.
partial
(
task_id
=
"metadata_import"
,
python_callable
=
meta_extract
,
dag
=
dag
).
expand_kwargs
(
XComArg
(
metadata_syntax_copy
),
)
# file refresh metadata
metadata_refresh
=
PostgresOperator
(
sql
=
f"""select ds_conf.ds_t24_create_metadata();"""
,
task_id
=
"metadata_refresh"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# ###########################################
# DATA SOURCE
# ###########################################
ds_create_table
=
PostgresOperator
(
sql
=
f"""select ds_conf.ds_t24_create_table('
{
POSTGRES_ENV
}
');"""
,
task_id
=
"ds_create_table"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
ds_list_extractor
=
PythonOperator
(
task_id
=
'ds_list_extractor'
,
python_callable
=
ds_list_extractor
,
do_xcom_push
=
True
)
ds_syntax_copy
=
PythonOperator
(
task_id
=
'ds_syntax_copy'
,
python_callable
=
ds_syntax_copy
)
ds_csv_to_table
=
PythonOperator
.
partial
(
task_id
=
"ds_csv_to_table"
,
python_callable
=
ds_csv_to_table
,
dag
=
dag
).
expand_kwargs
(
XComArg
(
ds_syntax_copy
),
)
# ds_create_table_history = PostgresOperator(
# sql=f"""select ds_conf.ds_t24_create_table_history();""",
# task_id="ds_create_table_history",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
ds_create_metadata_file
=
PostgresOperator
(
sql
=
f"""create table
{
POSTGRES_SCHEMA
}
.a_metadata as (select * from ds_conf.fx_metadata_file());"""
,
task_id
=
"ds_create_metadata_file"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
ds_create_metadata_file_sys
=
PostgresOperator
(
sql
=
f"""create table
{
POSTGRES_SCHEMA
}
.a_metadata_sys as (select * from ds_conf.fx_metadata_file_system());"""
,
task_id
=
"ds_create_metadata_file_sys"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_schemma
=
PostgresOperator
(
sql
=
f"""GRANT USAGE ON SCHEMA
{
POSTGRES_SCHEMA
}
TO readaccess;"""
,
task_id
=
"set_access_schemma"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
set_access_all_table
=
PostgresOperator
(
sql
=
f"""GRANT SELECT ON ALL TABLES IN SCHEMA
{
POSTGRES_SCHEMA
}
TO readaccess;"""
,
task_id
=
"set_access_all_table"
,
postgres_conn_id
=
POSTGRES_CONN_ID
,
)
# ds_nominatif = TriggerDagRunOperator(
# task_id=f'ds_nominatif',
# trigger_dag_id="APJ_1_t24_interface",
# )
ds_remove_bnk
=
BashOperator
(
task_id
=
"ds_remove_bnk"
,
bash_command
=
f"""
rm -r /opt/airflow/dags/DFE/t24_bnk/
{
yesterday_nodash
}
;
"""
,
)
ds_ext_done
=
BashOperator
(
task_id
=
"ds_ext_done"
,
bash_command
=
f"""
touch /opt/airflow/dags/DFE/t24/
{
yesterday_nodash
}
/done_ext.csv;
"""
,
)
# history_finish = PostgresOperator(
# sql=f"""
# UPDATE ds_conf."Datasource_history"
# SET status = 'WAITING.NOMINATIF', finish_time = '{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'::timestamp with time zone
# WHERE source = '{POSTGRES_SCHEMA}' and status = 'ONPROCESS';
# """,
# task_id="history_finish",
# postgres_conn_id=POSTGRES_CONN_ID,
# )
begin
>>
check_done_ext
>>
check_file_done
>>
history_start
>>
file_sc_cp
>>
metadata_truncate
>>
metadata_sed
>>
metadata_list_extractor
>>
metadata_syntax_copy
>>
metadata_import
>>
metadata_refresh
>>
ds_create_table
>>
ds_list_extractor
>>
ds_syntax_copy
>>
ds_csv_to_table
>>
ds_create_metadata_file
>>
ds_create_metadata_file_sys
>>
set_access_schemma
>>
set_access_all_table
>>
ds_remove_bnk
>>
ds_ext_done
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment