Airflow v. 2.8.2
I need check several files on sftp at some day_time. If all files exist-all good go next task. But if can't find some files, i want send email with text ('Files: file_name1, file_name2 ... was not found') and also run next task.
i need something like on picture.
For this purpose i try use sftpsensor and branchoperator https://airflow.apache.org/docs/apache-airflow-providers-sftp/stable/_modules/airflow/providers/sftp/sensors/sftp.html#SFTPSensor, but sensor on default don't push xcom and i can't form list of failed sensors.
I try write my own sensor, based on basesensor to send in xcom task_id:pokereturnvalue for branch using this: https://www.astronomer.io/docs/learn/what-is-a-sensor/?tab=taskflow#sensor-decorator-pythonsensor. i'm new in python and try code like this for send xcom, but it's not work.
sftp_hook=SFTPHook(sftp_conn_id)
files_found=[]
try:
mod_time=sftp_hook.get_mod_time(path)
log.info("Poking for %s")
except OSError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise AirflowException from e
else:
files_found.append(path)
sftp_hook.close_conn()
if not len(files_found):
condition_met = False
operator_return_value = None
print (f"File not found")
else:
condition_met = True
operator_return_value=option.json()
return PokeReturnValue(is_done=condition_met,xcom_value=operator_return_value)