-
Notifications
You must be signed in to change notification settings - Fork 602
Skip/Limit Support in MongoInputSplit cursor #56
base: master
Are you sure you want to change the base?
Conversation
Can you provide a valid use case for skip/limit? We have internally been discussing deprecating it. |
We're internally trying a custom Splitter that cuts down on job execution time when the number of records returned by the query is relatively small compared to the total data size. From what we can see, the adapter creates splits for every chunk/vector in the database, and the query is essentially run on each, correct? We have some jobs that can sometimes return few or no records - e.g. "give me the documents that have changed in the last 20 minutes". On our unsharded database, with > 1M records, this created ~6k splits, taking ~1hr for 0 actual input records. The current solution we have is to count up the number of hits on the query up front - if its a relatively low percentage of the actual database, we create splits using the query, using limit and skip calculated based on # of hit records and average document size. I got bit passing in the limit and skip without effect, so patched it so that they were considered. I know it's far from optimal, and can (and does) have worse performance when crunching a large amount of the data, but when processing a small subset of the data brings our total job execution down drastically because of fewer splits created. If you have any ideas, I would love to discuss it with you if you have the time. |
Hey I just saw this - we ran into the exact problem described by bpfoster above a while back - in case its useful to anyone else who comes upon this discussion, we developed a custom splitter: https://github.com/IKANOW/Infinit.e/tree/master/core/infinit.e.data_model/src/com/ikanow/infinit/e/data_model/custom Just dump those files (they're under Apache-Licensed) in a library and use them instead of the default ones. eg hj.setInputFormatClass((Class<? extends InputFormat>) Class.forName ("com.ikanow.infinit.e.data_model.custom.InfiniteMongoInputFormat", true, child));
hj.setOutputFormatClass((Class<? extends OutputFormat>) Class.forName ("com.mongodb.hadoop.MongoOutputFormat", true, child)); or <property><!-- InputFormat Class --><name>mongo.job.input.format</name><value>com.ikanow.infinit.e.data_model.custom.InfiniteMongoInputFormat</value></property>
<property><!-- OutputFormat Class --><name>mongo.job.output.format</name><value>com.mongodb.hadoop.MongoOutputFormat</value></property> The only additional configuration that is needed are 2 XML fields specifying some limits: <property><!-- Maximum number of splits [optional] --><name>max.splits</name><value>8</value></property>
<property><!-- Maximum number of docs per split [optional] --><name>max.docs.per.split</name><value>12500</value></property>
Note that if the maximums would be exceeded, the splitter "backs out" to the default MongoInputFormat. |
We're running into this issue as well - I'll describe our use case, and maybe someone from the Mongo team can chime in with any thoughts? Basically, our full set of documents is many times larger than the data we want to analyze at any given time. For instance, we might have a collection with 32 million items, split across 13000 shard chunks, but we're only analyzing about 100k documents. We've tried almost every permutation of the mongo config settings in the connector, and as near as I can tell, your options really only are: 1) 1 Split / Shard, or 2) 1 Split / Shard Chunk. Is that a fair assessment, is there a way (besides using the splitter reference above, thanks for sharing, BTW) to create a more reasonable number of chunks? Happy to provide more information. Thanks! |
Pat, In case it's any help - We ended up with 2 solutions
And then also:
|
Hey Alex, Thanks! |
No worries - I just wish I'd been less lazy and made the components more re-usable for other people! I also haven't checked the repo in the last year, so it's possible other people have already contributed some of these types of solutions back. |
@bwmcadams or @mpobrien or @erh Could someone from the maintainers chime in? I'm happy to build this and PR, but last time I did a PR against mongo-hadoop it took 3 months to review and was completely broken by the time you guys looked at it. Do you have any thoughts about how to handle this situation? Why was it even an internal debate about whether or not to support it? Thanks guys! EDIT: I should say, I'm happy to make some of Alex's ideas into something more reusable, packaged with the connector. |
Hi Pat, @llvtt is maintaining this project now - he can take a look at it for you. |
Let me see if I'm understanding this correctly: If you want to analyze only a small fraction of the contents of a MongoDB collection, there isn't an efficient way to do that given current the current Splitters, since they work based on shard chunks, shards, or $min/$max, thus creating more splits than necessary, if that data is spread across many chunks or parts of an index when calling 'splitVector'. You're proposing to get around this by adding another means of splitting MongoDB collections by number of documents, done with $skip and $limit in addition to mongo.input.query... is that correct? This sounds fine to me. Make a pull request, and I'll review it. |
Thanks so much for chiming in Lyle! Quick question - I might be missing something, but is there a way to use min / max for split generation against sharded clusters? As near as I could tell, the only way to create splits is shard chunks or actual shards - is there another way? Then, quick design question, from a config perspective, how would you want a user to enable this split option? Also, I'm not 100% sure I'd use skip / limit since I'm trying to get a more general way to just limit the number of splits, period. |
$min/$max is only used for shard chunks and when splitting an unsharded collection (both per the results of the As for configuration, there's already a config option ( |
Pinging this discussion, though I know it's been inactive for awhile. I'm working on implementing something like this as part of https://jira.mongodb.org/browse/HADOOP-83. My current plan is to implement a SkipAndLimitSplitter. This Splitter exchanges more load on MongoDB for splits are all nonempty and (mostly) evenly sized, even after applying So as not to duplicate effort or step on anyone's toes, I thought I'd ask here if someone has written this. If not, no worries, I'll just use the code I've written. @patwhite, did you make any progress on your Splitter implementation? |
If limit or skip is supplied to MongoInputSplit, add it to the query cursor rather than just ignoring it.