How to persist a scala list to mongodb using spark

How to persist a scala list to mongodb using spark

so I have a spark code that fetch some documents from mongodb, does some transformation and tries to store it back to mongodb.
The problem happens when I try to persist a List object using the following functions:
First I generate some tuples using this function:
val usersRDD = rdd.flatMap( breakoutFileById ).distinct().groupByKey().mapValues(_.toList)

Then I convert the tuples fields to Documents using a custom mapToDocument function and call the saveToMongoDB function:
usersRDD.map( mapToDocument ).saveToMongoDB()

I’m getting the following error message:
org.bson.codecs.configuration.CodecConfigurationException: Can’t find a codec for class scala.collection.immutable.$colon$colon.
at org.bson.codecs.configuration.CodecCache.getOrThrow(CodecCache.java:46)
at org.bson.codecs.configuration.ProvidersCodecRegistry.get(ProvidersCodecRegistry.java:63)
at org.bson.codecs.configuration.ChildCodecRegistry.get(ChildCodecRegistry.java:51)
at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:174)
at org.bson.codecs.DocumentCodec.writeMap(DocumentCodec.java:189)
at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:131)
at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:45)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
at com.mongodb.connection.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:101)
at com.mongodb.connection.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:43)
at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129)
at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:212)
at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101)
at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:67)
at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:37)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
at com.mongodb.connection.DefaultServerConnection.insertCommand(DefaultServerConnection.java:115)
at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteCommandProtocol(MixedBulkWriteOperation.java:455)
at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:646)
at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:401)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:179)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
at com.mongodb.Mongo.execute(Mongo.java:781)
at com.mongodb.Mongo$2.execute(Mongo.java:764)
at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:323)
at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:311)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:132)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:131)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:131)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Related:  mongoose: Populating a discriminated subdocument

If I remove the list (do not put as a field in the document) in the mapToDocument function, everything works. I already searched over the internet for similar problems and I couldn’t find any solution that fits.
Does anyone has a clue of how to solve it?
Thanks in advance

Solutions/Answers:

Solution 1:

From the unsupported types section in the documentation:

Some Scala types (e.g. Lists) are unsupported and should be converted
to their Java equivalent. To convert from Scala into native types
include the following import statement to use the .asJava method.

import scala.collection.JavaConverters._
import org.bson.Document

val documents = sc.parallelize(
  Seq(new Document("fruits", List("apples", "oranges", "pears").asJava))
)
MongoSpark.save(documents)

The reason they are unsupported is due to the Mongo Spark Connector using the Mongo Java Driver underneath as theres no point in using the Scala async driver in this context. However, it does mean for RDD’s you have to map to supported Java types. When using Datasets these conversions are automatically done for you.

Related:  Do you need Solr/Lucene for MongoDB, CouchDB and Cassandra?

References