![]() We finally went for the second option, because of some other high-level benefits of working with dataframes vs RDDs.įinally, don't forget to create the correct indexes in MongoDB!Įdit: I am using spark-sql 2.3.1, mongo-spark-connector 2.3.2 and mongo-java-driver 3.12.3. (DF with filters), as per example above: 560 seconds.(DF pipeline), as per example above: 260 seconds.(RDD pipeline), as per official documentation: 144 seconds.I did some tests to fetch 400K documents from a localhost Mongo DB holding in total 1.4M documents: I want to set 'upsert' option to False \ 0. I don't want to insert a new document when the document does not exist already. val readConfig: ReadConfig = ReadConfig(ĭf.filter(" = 'ACTIVE' AND " I process bunch of log files, generate the output RDDs and am writing to my MongoDB collection through mongo-spark connector. ![]() It should not be the case, but I presume it has to do with the query used. ![]() The alternative using filters and no pipeline fetches all data to Spark. Val df: DataFrame = MongoSpark.load(sparkSession, readConfig) This can be a bit difficult to find, since the official documentation only talks how to do it with RDDs.Īfter a lot of trials I managed to do this with Scala dataframes: val pipeLine = "" To improve performance, I had to pass a manual aggregation pipeline when loading the data. For my case filtering did not give the expected performance, as all filtering happened in Spark and not in Mongo.
0 Comments
Leave a Reply. |