0

Airflow v. 2.8.2

I need check several files on sftp at some day_time. If all files exist-all good go next task. But if can't find some files, i want send email with text ('Files: file_name1, file_name2 ... was not found') and also run next task.

i need something like on picture.

For this purpose i try use sftpsensor and branchoperator https://airflow.apache.org/docs/apache-airflow-providers-sftp/stable/_modules/airflow/providers/sftp/sensors/sftp.html#SFTPSensor, but sensor on default don't push xcom and i can't form list of failed sensors.

I try write my own sensor, based on basesensor to send in xcom task_id:pokereturnvalue for branch using this: https://www.astronomer.io/docs/learn/what-is-a-sensor/?tab=taskflow#sensor-decorator-pythonsensor. i'm new in python and try code like this for send xcom, but it's not work.

sftp_hook=SFTPHook(sftp_conn_id)
            files_found=[]
            try:
                mod_time=sftp_hook.get_mod_time(path)
                log.info("Poking for %s")
            except OSError as e:
                if e.errno != SFTP_NO_SUCH_FILE:
                    raise AirflowException from e
            else:
                 files_found.append(path)
            sftp_hook.close_conn()
            if not len(files_found):
                condition_met = False
                operator_return_value = None
                print (f"File not found")
            else:
                condition_met = True
                operator_return_value=option.json()
            return           PokeReturnValue(is_done=condition_met,xcom_value=operator_return_value) 

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.