0

Hello I have a use case where I need to run a sequence of transformations on a collection of data.
I grouped the sequence of transformation in a task group in order to dynamically map the tg to the list of data received from a previous task.

Here's a snippet similar to what I'm trying to achieve with the dag so you can have a better idea.

with DAG(
    dag_id="sandbox",
    start_date=datetime(2024, 1, 1, 9),
    schedule_interval=None,
    catchup=False,
    max_active_runs=1,
    default_args={
    }
) as dag:
    @task
    def return_a_list():
        return [i for i in range(20)]
    
    @task_group
    def tg(n):
        @task
        def task1(n):
            print("task1 received " + str(n) + " at " + str(datetime.now()))
            return n
        
        @task
        def task2(n):
            print("task2 received " + str(n) + " at " + str(datetime.now()))

        task2(task1(n))



    nbrs = return_a_list()
    tg.expand(n=nbrs)

The problem is that the first task is executed 20 times (ie: for all the nbrs) before the second task execute for the whole of the nbrs as well, like this (task1)x20 >> (task2)x20.
where in reality i want the tasks to be executed one after another for each nbr coming from the expansion, like this (task1>>task2)x20.

My doubt is around the usage of expansion, is it the correct way to achieve what I described or should I group the tasks in one function and revert to using a for loop instead of trying to do it the "airflowy" way? if it is, what am I doing wrong here?

1 Answer 1

0

You can't do that with airflow, dynamic task mapping is about having N run of a task where N is decide at run time.

So if you have a pipeline task_a >> task_b (where each is a dynamic task) airflow will run the N task_a then and only then the M task_b

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.