Commit 24b1084c authored by Margrenzo Gunawan's avatar Margrenzo Gunawan 💬

good prod

ok
parent b995576d
...@@ -41,13 +41,28 @@ DS_SCHEMA = 'opics' ...@@ -41,13 +41,28 @@ DS_SCHEMA = 'opics'
DS_CREATE_TABLE = '' DS_CREATE_TABLE = ''
# def create_ddl(schema,column):
# return f"""SELECT 'CREATE TABLE {POSTGRES_SCHEMA}.opics_' + lower(table_name) + ' ( ' +
# STUFF(
# (
# SELECT ', ' + lower(column_name) + ' ' + replace(case when DATA_TYPE = 'char' then DATA_TYPE + '(' + CONVERT(varchar(10),CHARACTER_MAXIMUM_LENGTH)+ ')' else DATA_TYPE end, 'datetime','timestamp')
# FROM information_schema.columns c2
# WHERE c2.table_schema = c.table_schema
# AND c2.table_name = c.table_name
# ORDER BY ordinal_position
# FOR XML PATH(''), TYPE
# ).value('.', 'NVARCHAR(MAX)'), 1, 2, ''
# ) + ' );' AS create_table_statement
# FROM information_schema.columns c
# WHERE table_schema = 'dbo'
# AND table_name in ({column})
# GROUP BY table_schema, table_name;"""
def create_ddl(schema,column): def create_ddl(schema,column):
return f"""SELECT 'CREATE TABLE {POSTGRES_SCHEMA}.opics_' + lower(table_name) + ' ( ' + return f"""SELECT 'CREATE TABLE {POSTGRES_SCHEMA}.opics_' + lower(table_name) + ' ( ' +
STUFF( STUFF(
( (
SELECT ', ' + lower(column_name) + ' ' + replace(case when DATA_TYPE = 'char' then DATA_TYPE + '(' + CONVERT(varchar(10),CHARACTER_MAXIMUM_LENGTH)+ ')' else DATA_TYPE end, 'datetime','timestamp') SELECT ', ' + lower(column_name) + ' text'
FROM information_schema.columns c2 FROM information_schema.columns c2
WHERE c2.table_schema = c.table_schema WHERE c2.table_schema = c.table_schema
AND c2.table_name = c.table_name AND c2.table_name = c.table_name
...@@ -93,7 +108,7 @@ def ds_get_syntax(ti): ...@@ -93,7 +108,7 @@ def ds_get_syntax(ti):
raise Exception('No data.') raise Exception('No data.')
return [{ return [{
# "bash_command": f"""which bcp""" # "bash_command": f"""which bcp"""
"bash_command": f"""bcp {table['table_source']}.dbo.{table['file_id'].replace(".", "_")} out {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + ( "bash_command": f"""bcp { '"'+table['query_out']+'" queryout ' if table['file_type'] == 'query' else table['table_source']+'.dbo.'+table['file_id'].replace(".", "_")+' out'} {Variable.get("LOCAL_PATH")}{DS_FOLDER}/""" + (
f"""{yesterday_nodash}""" if Variable.get( f"""{yesterday_nodash}""" if Variable.get(
"WITH_DATE") == 'Y' else '') + f"""/{table['file_id'].replace(".", "_")}.csv -c -t'{table['delimiter']}' -S172.19.3.80,1433 -Udwh_user -PJtrust@123 -u""", "WITH_DATE") == 'Y' else '') + f"""/{table['file_id'].replace(".", "_")}.csv -c -t'{table['delimiter']}' -S172.19.3.80,1433 -Udwh_user -PJtrust@123 -u""",
# "file_id": table['file_id'] # "file_id": table['file_id']
...@@ -135,7 +150,6 @@ def ds_get_ddl(ti): ...@@ -135,7 +150,6 @@ def ds_get_ddl(ti):
x = cursor.fetchall() x = cursor.fetchall()
print(x) print(x)
iris = ti.xcom_pull(task_ids=['ds_list_extractor'])
column_i = [] column_i = []
for table_i in json.loads(iris[0][0][0]): for table_i in json.loads(iris[0][0][0]):
column_i.append(f"""'{table_i['file_id']}'""") column_i.append(f"""'{table_i['file_id']}'""")
...@@ -150,7 +164,14 @@ def ds_get_ddl(ti): ...@@ -150,7 +164,14 @@ def ds_get_ddl(ti):
cursor_i.execute(sql_stmt_i) cursor_i.execute(sql_stmt_i)
x_i = cursor_i.fetchall() x_i = cursor_i.fetchall()
return x + x_i column_q = []
for table_q in json.loads(iris[0][0][0]):
print(table_q['query_table'])
if table_q['query_table'] is not None:
column_q.append([f"""{table_q['query_table']}"""])
return x + x_i + column_q
def pg_ddl_syntax(ti): def pg_ddl_syntax(ti):
colus = [] colus = []
......
...@@ -132,8 +132,20 @@ with DAG("APJ_1_regla_jtrust", ...@@ -132,8 +132,20 @@ with DAG("APJ_1_regla_jtrust",
op_kwargs={}, op_kwargs={},
) )
create_folder = BashOperator(
task_id='create_folder',
bash_command=f"""mkdir -p {Variable.get("LOCAL_PATH")}{DS_FOLDER}/{ddmmyyyy}"""
)
sftp_input = BashOperator(
task_id="sftp_input",
bash_command=f"""sshpass -p '{Variable.get("SFTP_REGLA_PASSWORD")}' sftp -o StrictHostKeyChecking=no -r -P 2222 {Variable.get("SFTP_REGLA_USER")}@{Variable.get("SFTP_REGLA_HOST")}:input/*/*{ddmmyyyy}* {Variable.get("LOCAL_PATH")}regla/{ddmmyyyy}/""",
)
sftp_output = BashOperator( sftp_output = BashOperator(
task_id="sftp_nominatif", task_id="sftp_output",
bash_command=f"""sshpass -p '{Variable.get("SFTP_REGLA_PASSWORD")}' sftp -o StrictHostKeyChecking=no -r -P 2222 {Variable.get("SFTP_REGLA_USER")}@{Variable.get("SFTP_REGLA_HOST")}:output/*/*{ddmmyyyy}* {Variable.get("LOCAL_PATH")}regla/{ddmmyyyy}/""", bash_command=f"""sshpass -p '{Variable.get("SFTP_REGLA_PASSWORD")}' sftp -o StrictHostKeyChecking=no -r -P 2222 {Variable.get("SFTP_REGLA_USER")}@{Variable.get("SFTP_REGLA_HOST")}:output/*/*{ddmmyyyy}* {Variable.get("LOCAL_PATH")}regla/{ddmmyyyy}/""",
) )
...@@ -266,4 +278,4 @@ with DAG("APJ_1_regla_jtrust", ...@@ -266,4 +278,4 @@ with DAG("APJ_1_regla_jtrust",
begin >> check_done_ext >> sftp_output >> history_start >> drop_schema >> create_schema >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> ds_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> ds_to_history >> history_finish begin >> check_done_ext >> create_folder >> sftp_input >> sftp_output >> history_start >> drop_schema >> create_schema >> ds_list_extractor >> csv_clean_syntax >> clean_csv >> pg_ddl_syntax >> pg_create_table >> ds_push_syntax >> ds_csv_to_table >> set_access_schemma >> set_access_all_table >> ds_ext_done >> ds_to_history >> history_finish
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment