Hello I have a use case where I need to run a sequence of transformations on a collection of data.
I grouped the sequence of transformation in a task group in order to dynamically map the tg to the list of data received from a previous task.
Here's a snippet similar to what I'm trying to achieve with the dag so you can have a better idea.
with DAG(
dag_id="sandbox",
start_date=datetime(2024, 1, 1, 9),
schedule_interval=None,
catchup=False,
max_active_runs=1,
default_args={
}
) as dag:
@task
def return_a_list():
return [i for i in range(20)]
@task_group
def tg(n):
@task
def task1(n):
print("task1 received " + str(n) + " at " + str(datetime.now()))
return n
@task
def task2(n):
print("task2 received " + str(n) + " at " + str(datetime.now()))
task2(task1(n))
nbrs = return_a_list()
tg.expand(n=nbrs)
The problem is that the first task is executed 20 times (ie: for all the nbrs) before the second task execute for the whole of the nbrs as well, like this (task1)x20 >> (task2)x20.
where in reality i want the tasks to be executed one after another for each nbr coming from the expansion, like this (task1>>task2)x20.
My doubt is around the usage of expansion, is it the correct way to achieve what I described or should I group the tasks in one function and revert to using a for loop instead of trying to do it the "airflowy" way? if it is, what am I doing wrong here?