4

I'm building a family tree from a database on Apache Spark, using a recursive search to find the ultimate parent (i.e. the person at the top of the family tree) for each person in the DB.

It is assumed that the first person returned when searching for their id is the correct parent

val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personSeq = peopleById.lookup(personId)
    val person = personSeq(0)
    if(person.personId == "0 "|| person.id == person.parentId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

It is giving the following error

"Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063."

I understand from reading other similar questions that the problem is that I'm calling the findUltimateParentId from within the foreach loop, and if I call the method from the shell with a person's id, it returns the correct ultimate parent id

However, none of the other suggested solutions work for me, or at least I can't see how to implement them in my program, can anyone help?

4
  • You're taking a wrong approach here. It is unclear if Spark can be useful for you at all but if it is you consider using GraphX API.
    – zero323
    Commented Feb 18, 2016 at 0:15
  • sorry, my hands are tied on this one. I have to use Spark.
    – mccarthyj
    Commented Feb 18, 2016 at 0:28
  • 2
    GraphX is Spark. One way or another you should at least learn Spark API first :) There are at least few thin which doesn't make sense including the way how you you use lookup and foreach.
    – zero323
    Commented Feb 18, 2016 at 1:03
  • I've read through some tutorials on GraphX, how would I build the relationship Edge collection from person to person?
    – mccarthyj
    Commented Feb 18, 2016 at 11:32

2 Answers 2

6

If I understood you correctly - here's a solution that would work for any size of input (although performance might not be great) - it performs N iterations over the RDD where N is the "deepest family" (largest distance from ancestor to child) in the input:

// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])

// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
  def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}

object RecursiveParentLookup {
  // requested method
  def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {

    // all persons keyed by id
    def byId = rdd.keyBy(_.id).cache()

    // recursive function that "climbs" one generation at each iteration
    def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
      val cached = persons.cache()
      // find which persons can climb further up family tree
      val haveGrandparents = cached.filter(_.hasGrandparent)

      if (haveGrandparents.isEmpty()) {
        cached // we're done, return result
      } else {
        val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
        // for those who can - join with persons to find the grandparent and attach it instead of parent
        val withGrandparents = haveGrandparents
          .keyBy(_.ancestor.get.parentId.get) // grandparent id
          .join(byId)
          .values
          .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
        // call this method recursively on the result
        done ++ climbOneGeneration(withGrandparents)
      }
    }

    // call recursive method - start by assuming each person is its own parent, if it has one:
    climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
  }

}

Here's a test to better understand how this works:

/**
  *     Example input tree:
  *
  *            1             5
  *            |             |
  *      ----- 2 -----       6
  *      |           |
  *      3           4
  *
  */

val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))

test("find ultimate parent") {
  val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
  val result = RecursiveParentLookup.findUltimateParent(input).collect()
  result should contain theSameElementsAs Seq(
    WithAncestor(person1, None),
    WithAncestor(person2, Some(person1)),
    WithAncestor(person3, Some(person1)),
    WithAncestor(person4, Some(person1)),
    WithAncestor(person5, None),
    WithAncestor(person6, Some(person5))
  )
}

It should be easy to map your input into these Person objects, and to map the output WithAncestor objects into whatever it is you need. Note that this code assumes that if any person has parentId X - another person with that id actually exists in the input

3
  • exactly what I needed! Thank loads!
    – mccarthyj
    Commented Feb 22, 2016 at 17:00
  • Is there anyway to get also the intermediate parents, not just the root parent ? For example (3, (2, 1)), (4, (2, 1), (6, (5))) ? Commented Oct 4, 2016 at 11:32
  • I'm sure it's possible, you'll have to change WithAncestor to include some ordered list of ancestors and update it upon each iteration... can't spell out the exact changes required though. Commented Oct 4, 2016 at 11:44
0

fixed this by using SparkContext.broadcast:

val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())

def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personOption = broadcastedPeople.value.get(personId)
    if(personOption.isEmpty) {

        return "0";

    }
    val person = personOption.get
    if(person.personId == 0 || person.orgId == person.personId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

working great now!

1
  • 2
    note that this solution is limited to cases where peopleById is small enough to fit into driver memory (single machine), in which case you don't really need Spark at all... if this collection gets larger, you're likely to get an OutOfMemoryError on the second line, which collects all data from cluster to the driver machine. Commented Feb 18, 2016 at 20:06

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.