Skip to content

Commit

Permalink
added ability to abort jobs via api
Browse files Browse the repository at this point in the history
  • Loading branch information
cfagiani committed Oct 10, 2015
1 parent 6157156 commit 85d3301
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 121 deletions.
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ datasponge
==========

A toolkit for crawling the web and extracting information that can be run on a single host or distributed across multiple hosts (for scalability).
This is a SpringBoot application that can use an embedded ActiveMQ JMS broker (or optionally an external broker).
This is a Spring Boot application that can use an embedded ActiveMQ JMS broker (or optionally an external broker).

###Overview
The Data Sponge provides a mechanism for performing a targeted crawl of a set of websites rooted at one or more URLs. The bulk of the configuration is stored in a JobDefinition JSON document (a sample of which can be found in the test/resources directory).
Expand Down Expand Up @@ -38,6 +38,7 @@ The system has a pluggable architecture that allows users to easily specify thei
* LAST - finds the last occurrence of the string
* FULLTEXT - determines if the page contains the string
* PdfTextExtractor - this extractor will output a single DataRecord per PDF document (the text will be in a field called "text" within the data record)
* DirectoryExtractor - crawls a local file system recursively and emits DataRecords for each file

For all modes except FULLTEXT, the DataRecord will contain fields that include the index of the match within the body and the context (the match plus a configurable number of characters before/after). For FULLTEXT, the record will contain the entire body of the page.

Expand All @@ -50,6 +51,9 @@ For all modes except FULLTEXT, the DataRecord will contain fields that include
* GroovyEnhancer - a shim that allows for the dynamic loading of a Groovy script
* DeduplicationEnhancer - attempts to detect and remove duplicate data records (local to a single node)

###Prerequisites
* JRE 1.7 or higher

###Configuration
####Application Properties
The following properties can be set in a application.properties file:
Expand All @@ -61,6 +65,16 @@ The following properties can be set in a application.properties file:
The job configuration file is a well-formed JSON document described by the schema document located in the "doc" directory


###Failures
If a node fails, so long as it is not the coordinator for that job, the remaining nodes will be notified and will split the responsibility for processing the data that was destined for the failed node. It is possible
that some pages within the crawl (anything accepted by the node prior to failure but not yet processed) will not be crawled.

If the coordinator fails, the job may be contining to run on the other nodes in the ensemble. If this is the case, their local output will still be written but
any data destined for the executor on the coordinator will not be processed. Similarly, if the job was running with a coordinatorDataWriter set, this will not be running.

As of now, once a node is failed, it stays failed. There is no facility to re-join a job that is in progress.


###Differences from Version 1.0
Version 2 is significantly different from the initial release. The highlights of the differences are as follows:
* Jobs are now configured via a JSON file
Expand All @@ -71,11 +85,12 @@ Version 2 is significantly different from the initial release. The highlights of
###Potential Enhancements and TODOs
* better error handling for the REST API including validation
* stand-alone job validation tool (schema validation?)
* heartbeats and node-reassignments when nodes fail
* base RSS/Atom extractor
* handle different types of binary content (xlsx, etc) for text search
* UI for submitting/monitoring crawl jobs
* ability to abort jobs
* more tests
* ability to load class files(for custom DataAdatpers) from external jars/locations

* ability to load class files(for custom DataAdatpers) from external jars/locations
* ability to join (or re-join) a job in progress
* ability to recover from coordinator failures
* pluggable mechanism to "fetch Pages" in SpiderThread (thus allowing non-web/filesystem extraction jobs)
* piggyback heartbeats on other messages and only send HB if needed
4 changes: 3 additions & 1 deletion doc/jobSchema.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
"enum": [
"SUBMITTED",
"PROCESSING",
"COMPLETE"
"NODE_COMPLETE",
"COMPLETE",
"ABORTED"
]
},
"description": "Status of this job."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ public static void main(String[] args) {
isSingleJob = true;
}
initialize(useEmbeddedBroker, enableRest, isSingleJob, jobFile, args);
}else{
System.out.println("Bad command line arguments\n");
printHelp();
System.exit(1);
}
System.out.println("Starting app...");


}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ public Job getJob(@PathVariable("id") String id) {
return coordinator.getJob(id);
}

/**
* aborts a job identified by the id
*
* @param id
* @return
*/
@RequestMapping(value = "/{id}", method = RequestMethod.DELETE)
@ResponseBody
public boolean abortJob(@PathVariable("id") String id) {
return coordinator.abortJob(id);
}


/**
* submits a job for processing
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ public void initialize(String jobId, Set<String> excludeList,
}
this.nodeId = nodeId;
this.modSize = modSize;
this.selectorVal = jobId + "-" + nodeId;
updateSelector();
}

/**
* sets the value to use for jms selectors
*/
private void updateSelector(){
this.selectorVal = this.jobId + "-"+this.nodeId;
}

/**
Expand Down Expand Up @@ -245,6 +252,23 @@ public synchronized String dequeue() {
return item;
}

/**
* respond to node failures by adjusting the selector used to determine which messages should be handled by this node.
* @param failedNodeId
* @return - true if the failed node is THIS node, false if not
*/
public synchronized boolean handleNodeFailure(int failedNodeId) {
if(this.nodeId == failedNodeId){
return true;
}
if(this.nodeId> failedNodeId){
this.nodeId--;
updateSelector();
}
this.modSize--;
return false;
}


/**
* called by the message listener container in response to receipt of a JMS message. This method will compare the value of the
Expand Down
Loading

0 comments on commit 85d3301

Please sign in to comment.