I'm building a family tree from a database on Apache Spark, using a recursive search to find the ultimate parent (i.e. the person at the top of the family tree) for each person in the DB.
It is assumed that the first person returned when searching for their id is the correct parent
val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personSeq = peopleById.lookup(personId)
val person = personSeq(0)
if(person.personId == "0 "|| person.id == person.parentId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
It is giving the following error
"Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example,
rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be performed inside of therdd1.map
transformation. For more information, see SPARK-5063."
I understand from reading other similar questions that the problem is that I'm calling the findUltimateParentId
from within the foreach loop, and if I call the method from the shell with a person's id, it returns the correct ultimate parent id
However, none of the other suggested solutions work for me, or at least I can't see how to implement them in my program, can anyone help?