Как переписать Python код под DAG Airflow?

Рейтинг: 0Ответов: 1Опубликовано: 15.05.2023

Помогите, пожалуйста.

У меня есть Python код выполняющий следующее действие в обычном Jupiter notebook:

    d = datetime.now().date()
  1. def transform_data(**kwargs):

внутри этой функции осуществляется подключение к PostgreSQL, далее происходит преобразования данных, а затем сохранение результата в витрину MS SQL:

     postgres = PostgresHook(postgres_conn_id='connection_1')
     conn_1 = postgres.get_conn()
     mssql = MsSqlHook(mssql_conn_id='connection_2', schema= 'schema')
     conn_2 = mssql.get_conn()       
     sql = """ преобразования1 """
     zapad = pd.read_sql(sql, con=conn_1)
     sever = zapad.loc[zapad['column_name']=="name",:].reset_index(drop=True)      
     sever.to_sql('NAME',con=conn_2,index=False,if_exists='replace')
  1. def get_name_by_status(status,user,plan):

     sn,su = status+'_by',status+'_by_name'
     user.columns=[sn, su, 'phone', 'email', 'adress', 'role']
     user[sn] = user[sn].apply(lambda x: str(x))
     plan = plan.merge(user.loc[:,[sn, su]], how='left', on=sn)
     return plan
    
  2. def save_tasks(d):

     sql = f"""select * from events where date_start <='{d}' and date_end >='{d}' """
     wt = pd.read_sql(sql, con=conn_1)
     wt['dt'] = d
     conn_2.execute(f"delete from EVENTS where dt='{d}'")
     wt.loc[~wt['column_name_1'] & wt['column_name_2']\
                .isin(sever["column_name_3"].unique()),['column_name_4','column_name_5','dt']]\
                .to_sql("EVENTS",con=conn_2,index=False,if_exists='append')
    
     sql = """ преобразования """
     plan = pd.read_sql(sql, con=conn_1)
     plan = plan.loc[plan['column_name_1'].isin(sever['column_name_2'].unique())]
     for status in ['planed','inwork','finished']:
         plan = get_name_by_status(status,user,plan)
     plan["dt"]=d
     try:
         conn_2.execute(f"delete from WORKS where dt='{d}'")
     except:
         print('no table WORKS')
     plan.to_sql("WORKS",con=conn_2,index=False,if_exists='append')
    

Итого получается 3 функции. В Jupiter notebook все работает без проблем, но как только пытаюсь обернуть этот код в DAG - ошибка на ошибке. Я уже все перепробовал, не понимаю как сделать так, чтобы DAG в Airflow заработал, при этом выполнился весь код, в результате работы которого должны обновиться все таблиц в БД MS SQL - NAME, EVENTS, WORKS. Благодарю за любую помощь и подсказку.

Ответы

▲ 0

Заработало вот так:

from datetime import datetime
import datetime as dt
import pandas as pd

from airflow import DAG
from airflow.utils.db import provide_session

from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
    
d = datetime.now().date()

def get_name_by_status(status,user,plan):
 sn,su = status+'_by',status+'_by_name'
 user.columns=[sn, su, 'phone', 'email', 'adress', 'role']
 user[sn] = user[sn].apply(lambda x: str(x))
 plan = plan.merge(user.loc[:,[sn, su]], how='left', on=sn)
 return plan

with DAG(
        dag_id='dag_name',
        default_args = {'owner': 'airflow'},
        schedule_interval='30 3,9,15 * * 1-5',
        start_date=datetime(2023, 4, 28),
        tags=['tag', 'tag', 'tag'],
        catchup=False,

) as dag:

def get_and_save_data(**kwargs):
  def save_tasks(d,conn_1,conn_2,mssql):
   curr = mssql.get_conn().cursor()
   sql = f"""select * from events where date_start <='{d}' and date_end >='{d}' 
   """
   wt = pd.read_sql(sql, con=conn_1)
   wt['dt'] = d
   curr.execute(f"delete from EVENTS where dt='{d}'")
   wt.loc[~wt['column_name_1'] & wt['column_name_2']\
            .isin(sever["column_name_3"].unique()), 
             ['column_name_4','column_name_5','dt']]\
            .to_sql("EVENTS",con=conn_2,index=False,if_exists='append')

   sql = """ преобразования """
   plan = plan.loc[plan['column_name_1'].isin(sever['column_name_2'].unique())]
   plan = pd.read_sql(sql, con=conn_1)
   if plan.shape[0]>0:
       """различные условия"""
      for status in ['planed','inwork','finished']:
          plan = get_name_by_status(status,user,plan)
      plan["dt"]=d
       """различные условия"""
      try:
         curr.execute(f"delete from WORKS where dt='{d}'")
      except:
         print('no table WORKS')
      plan.to_sql("WORKS",con=conn_2,index=False,if_exists='append')

 postgres = PostgresHook(postgres_conn_id='connection_1')
 conn_1 = postgres.get_conn()
 mssql = MsSqlHook(mssql_conn_id='connection_2', schema= 'schema')
 conn_2 = get_conn(mssql,"mssql+pymssql")       
 sql = """ преобразования1 """
 zapad = pd.read_sql(sql, con=conn_1)
 sever = zapad.loc[zapad['column_name']=="name",:].reset_index(drop=True)      
 sever.to_sql('NAME',con=conn_2,index=False,if_exists='replace')
 start = d +dt.timedelta(days=-3)
 stop = d
 while start<=stop:
      save_tasks(start,conn_1,conn_2,mssql)
      start = start + dt.timedelta(days=1)

t_save_tasks = PythonOperator(
    task_id='save_tasks',
    python_callable= get_and_save_data,
    dag=dag   
)

t_save_tasks

Одну функцию вынес за DAG, две другие функции вложил одна в другую. 2 недели ушло на то)