The cleanest solution is to pass additional arguments using closure:
def make_topic_word(topic_words):
return udf(lambda c: label_maker_topic(c, topic_words))
df = sc.parallelize([(["union"], )]).toDF(["tokens"])
(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
.show())
This doesn’t require any changes in keyword_list
or the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of sets
for efficient lookups.
If you want to use your current UDF and pass topic_words
directly you’ll have to convert it to a column literal first:
from pyspark.sql.functions import array, lit
ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()
Depending on your data and requirements there can alternative, more efficient solutions, which don’t require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).