MongoSpark 28799错误

Exception in thread "main" com.mongodb.MongoCommandException: Command failed with error : 'Received error in response from 192.168.12.161:27018: { $err: "$sample stage could not find a non-duplicate document after 100 while using a random cursor. This is likely a sporadic failure, please try again.", code: 28799 }' on server 192.168.12.161:. The full response is { "ok" : 0.0, "errmsg" : "Received error in response from 192.168.12.161:27018: { $err: \"$sample stage could not find a non-duplicate document after 100 while using a random cursor. This is likely a sporadic failure, please try again.\", code: 28799 }", "code" : , "codeName" : "Location28799" }
at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:)
at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:)
at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:)
at com.mongodb.operation.AggregateOperation$.call(AggregateOperation.java:)
at com.mongodb.operation.AggregateOperation$.call(AggregateOperation.java:)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:)
at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:)
at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:)
at com.mongodb.Mongo.execute(Mongo.java:)
at com.mongodb.Mongo$.execute(Mongo.java:)
at com.mongodb.OperationIterable.iterator(OperationIterable.java:)
at com.mongodb.OperationIterable.forEach(OperationIterable.java:)
at com.mongodb.OperationIterable.into(OperationIterable.java:)
at com.mongodb.AggregateIterableImpl.into(AggregateIterableImpl.java:)
at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner$$anonfun$.apply(MongoSamplePartitioner.scala:)
at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner$$anonfun$.apply(MongoSamplePartitioner.scala:)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$.apply(MongoConnector.scala:)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$.apply(MongoConnector.scala:)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$.apply(MongoConnector.scala:)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$.apply(MongoConnector.scala:)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:)
at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:)
at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner.partitions(MongoSamplePartitioner.scala:)
at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:)
at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:)
at org.apache.spark.rdd.RDD$$anonfun$partitions$.apply(RDD.scala:)
at org.apache.spark.rdd.RDD$$anonfun$partitions$.apply(RDD.scala:)
at scala.Option.getOrElse(Option.scala:)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:)
at org.apache.spark.rdd.RDD.count(RDD.scala:)
at org.jh.TestSpark$.doTest(DocHandler.scala:)
at org.jh.TestSpark$.main(DocHandler.scala:)
at org.jh.TestSpark.main(DocHandler.scala)

错误如上,解决方式如下,根据connector源码(并没有完全看懂),分析出现这个问题的原因是因为:

if (numDocumentsPerPartition >= count) {
MongoSinglePartitioner.partitions(connector, readConfig, pipeline)
} else {
val samples = connector.withCollectionDo(readConfig, {
coll: MongoCollection[BsonDocument] =>
coll.aggregate(List(
Aggregates.`match`(matchQuery),
Aggregates.sample(numberOfSamples),
Aggregates.project(Projections.include(partitionKey)),
Aggregates.sort(Sorts.ascending(partitionKey))
).asJava).allowDiskUse(true).into(new util.ArrayList[BsonDocument]()).asScala
})
def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count - 1
val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}
PartitionerHelper.createPartitions(partitionKey, rightHandBoundaries, PartitionerHelper.locations(connector))
}

  numDocumentsPerPartition < count,导致执行了else代码出现的,else先进行sample,然后:

val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt
val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt

  为了避免出错,所以要降低numberOfSamples,那么就需要降低samplesPerPartition,增加numDocumentsPerPartition,samplesPerPartition通过调低spark.mongodb.input.partitionerOptions.samplesPerPartition实现,增加numDocumentsPerPartition通过调大spark.mongodb.input.partitionerOptions.partitionSizeMB实现。并且调大spark.mongodb.input.partitionerOptions.partitionSizeMB会提高numDocumentsPerPartition的数值,可以避免进入else下面的代码块。

  所以解决方案如下:

SparkSession.builder()
// .master("local")
.master(sparkURI)
.config(new SparkConf().setJars(Array(s"${hdfsURI}/mongolib/mongo-spark-connector_2.11-2.2.1.jar",
s"${hdfsURI}/mongolib/bson-3.4.2.jar",
s"${hdfsURI}/mongolib/mongo-java-driver-3.4.2.jar",
s"${hdfsURI}/mongolib/mongodb-driver-3.4.2.jar",
s"${hdfsURI}/mongolib/mongodb-driver-core-3.4.2.jar",
s"${hdfsURI}/mongolib/commons-io-2.5.jar",
s"${hdfsURI}/mongolib/config-1.2.1.jar",
s"${hdfsURI}/${jarName}") ++ extJars))
.config("spark.cores.max", 80)
.config("spark.executor.cores", 16)
.config("spark.executor.memory", "32g")
.config("spark.mongodb.input.uri", inp)
.config("spark.mongodb.output.uri", oup)
.config("spark.mongodb.input.partitionerOptions.samplesPerPartition", 1)
.config("spark.mongodb.input.partitionerOptions.partitionSizeMB", 128)
.getOrCreate()

  

上一篇:常见设计模式解析和实现(C++)FlyWeight模式


下一篇:[No00008B]远程桌面发送“Ctrl+Alt+Delete”组合键调用任务管理器