Как переписать Python код под DAG Airflow?
Помогите, пожалуйста.
У меня есть Python код выполняющий следующее действие в обычном Jupiter notebook:
d = datetime.now().date()
def transform_data(**kwargs):
внутри этой функции осуществляется подключение к PostgreSQL, далее происходит преобразования данных, а затем сохранение результата в витрину MS SQL:
postgres = PostgresHook(postgres_conn_id='connection_1')
conn_1 = postgres.get_conn()
mssql = MsSqlHook(mssql_conn_id='connection_2', schema= 'schema')
conn_2 = mssql.get_conn()
sql = """ преобразования1 """
zapad = pd.read_sql(sql, con=conn_1)
sever = zapad.loc[zapad['column_name']=="name",:].reset_index(drop=True)
sever.to_sql('NAME',con=conn_2,index=False,if_exists='replace')
def get_name_by_status(status,user,plan):
sn,su = status+'_by',status+'_by_name' user.columns=[sn, su, 'phone', 'email', 'adress', 'role'] user[sn] = user[sn].apply(lambda x: str(x)) plan = plan.merge(user.loc[:,[sn, su]], how='left', on=sn) return plan
def save_tasks(d):
sql = f"""select * from events where date_start <='{d}' and date_end >='{d}' """ wt = pd.read_sql(sql, con=conn_1) wt['dt'] = d conn_2.execute(f"delete from EVENTS where dt='{d}'") wt.loc[~wt['column_name_1'] & wt['column_name_2']\ .isin(sever["column_name_3"].unique()),['column_name_4','column_name_5','dt']]\ .to_sql("EVENTS",con=conn_2,index=False,if_exists='append') sql = """ преобразования """ plan = pd.read_sql(sql, con=conn_1) plan = plan.loc[plan['column_name_1'].isin(sever['column_name_2'].unique())] for status in ['planed','inwork','finished']: plan = get_name_by_status(status,user,plan) plan["dt"]=d try: conn_2.execute(f"delete from WORKS where dt='{d}'") except: print('no table WORKS') plan.to_sql("WORKS",con=conn_2,index=False,if_exists='append')
Итого получается 3 функции. В Jupiter notebook все работает без проблем, но как только пытаюсь обернуть этот код в DAG - ошибка на ошибке. Я уже все перепробовал, не понимаю как сделать так, чтобы DAG в Airflow заработал, при этом выполнился весь код, в результате работы которого должны обновиться все таблиц в БД MS SQL - NAME, EVENTS, WORKS. Благодарю за любую помощь и подсказку.