diff --git a/README.md b/README.md index 4419b29..0cb3b2a 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ datasponge ========== A toolkit for crawling the web and extracting information that can be run on a single host or distributed across multiple hosts (for scalability). -This is a SpringBoot application that can use an embedded ActiveMQ JMS broker (or optionally an external broker). +This is a Spring Boot application that can use an embedded ActiveMQ JMS broker (or optionally an external broker). ###Overview The Data Sponge provides a mechanism for performing a targeted crawl of a set of websites rooted at one or more URLs. The bulk of the configuration is stored in a JobDefinition JSON document (a sample of which can be found in the test/resources directory). @@ -38,6 +38,7 @@ The system has a pluggable architecture that allows users to easily specify thei * LAST - finds the last occurrence of the string * FULLTEXT - determines if the page contains the string * PdfTextExtractor - this extractor will output a single DataRecord per PDF document (the text will be in a field called "text" within the data record) +* DirectoryExtractor - crawls a local file system recursively and emits DataRecords for each file For all modes except FULLTEXT, the DataRecord will contain fields that include the index of the match within the body and the context (the match plus a configurable number of characters before/after). For FULLTEXT, the record will contain the entire body of the page. @@ -50,6 +51,9 @@ For all modes except FULLTEXT, the DataRecord will contain fields that include * GroovyEnhancer - a shim that allows for the dynamic loading of a Groovy script * DeduplicationEnhancer - attempts to detect and remove duplicate data records (local to a single node) +###Prerequisites +* JRE 1.7 or higher + ###Configuration ####Application Properties The following properties can be set in a application.properties file: @@ -61,6 +65,16 @@ The following properties can be set in a application.properties file: The job configuration file is a well-formed JSON document described by the schema document located in the "doc" directory +###Failures +If a node fails, so long as it is not the coordinator for that job, the remaining nodes will be notified and will split the responsibility for processing the data that was destined for the failed node. It is possible +that some pages within the crawl (anything accepted by the node prior to failure but not yet processed) will not be crawled. + +If the coordinator fails, the job may be contining to run on the other nodes in the ensemble. If this is the case, their local output will still be written but +any data destined for the executor on the coordinator will not be processed. Similarly, if the job was running with a coordinatorDataWriter set, this will not be running. + +As of now, once a node is failed, it stays failed. There is no facility to re-join a job that is in progress. + + ###Differences from Version 1.0 Version 2 is significantly different from the initial release. The highlights of the differences are as follows: * Jobs are now configured via a JSON file @@ -71,11 +85,12 @@ Version 2 is significantly different from the initial release. The highlights of ###Potential Enhancements and TODOs * better error handling for the REST API including validation * stand-alone job validation tool (schema validation?) -* heartbeats and node-reassignments when nodes fail * base RSS/Atom extractor * handle different types of binary content (xlsx, etc) for text search * UI for submitting/monitoring crawl jobs -* ability to abort jobs * more tests -* ability to load class files(for custom DataAdatpers) from external jars/locations - +* ability to load class files(for custom DataAdatpers) from external jars/locations +* ability to join (or re-join) a job in progress +* ability to recover from coordinator failures +* pluggable mechanism to "fetch Pages" in SpiderThread (thus allowing non-web/filesystem extraction jobs) +* piggyback heartbeats on other messages and only send HB if needed diff --git a/doc/jobSchema.json b/doc/jobSchema.json index 3d0901c..03d7ced 100644 --- a/doc/jobSchema.json +++ b/doc/jobSchema.json @@ -92,7 +92,9 @@ "enum": [ "SUBMITTED", "PROCESSING", - "COMPLETE" + "NODE_COMPLETE", + "COMPLETE", + "ABORTED" ] }, "description": "Status of this job." diff --git a/src/main/java/org/cataractsoftware/datasponge/DataSponge.java b/src/main/java/org/cataractsoftware/datasponge/DataSponge.java index 609bb44..b0bfa10 100644 --- a/src/main/java/org/cataractsoftware/datasponge/DataSponge.java +++ b/src/main/java/org/cataractsoftware/datasponge/DataSponge.java @@ -107,8 +107,12 @@ public static void main(String[] args) { isSingleJob = true; } initialize(useEmbeddedBroker, enableRest, isSingleJob, jobFile, args); + }else{ + System.out.println("Bad command line arguments\n"); + printHelp(); + System.exit(1); } - System.out.println("Starting app..."); + } diff --git a/src/main/java/org/cataractsoftware/datasponge/api/JobController.java b/src/main/java/org/cataractsoftware/datasponge/api/JobController.java index 506d16d..cdd0289 100644 --- a/src/main/java/org/cataractsoftware/datasponge/api/JobController.java +++ b/src/main/java/org/cataractsoftware/datasponge/api/JobController.java @@ -33,6 +33,19 @@ public Job getJob(@PathVariable("id") String id) { return coordinator.getJob(id); } + /** + * aborts a job identified by the id + * + * @param id + * @return + */ + @RequestMapping(value = "/{id}", method = RequestMethod.DELETE) + @ResponseBody + public boolean abortJob(@PathVariable("id") String id) { + return coordinator.abortJob(id); + } + + /** * submits a job for processing * diff --git a/src/main/java/org/cataractsoftware/datasponge/crawler/CrawlerWorkqueue.java b/src/main/java/org/cataractsoftware/datasponge/crawler/CrawlerWorkqueue.java index b44e42f..c574931 100644 --- a/src/main/java/org/cataractsoftware/datasponge/crawler/CrawlerWorkqueue.java +++ b/src/main/java/org/cataractsoftware/datasponge/crawler/CrawlerWorkqueue.java @@ -89,7 +89,14 @@ public void initialize(String jobId, Set excludeList, } this.nodeId = nodeId; this.modSize = modSize; - this.selectorVal = jobId + "-" + nodeId; + updateSelector(); + } + + /** + * sets the value to use for jms selectors + */ + private void updateSelector(){ + this.selectorVal = this.jobId + "-"+this.nodeId; } /** @@ -245,6 +252,23 @@ public synchronized String dequeue() { return item; } + /** + * respond to node failures by adjusting the selector used to determine which messages should be handled by this node. + * @param failedNodeId + * @return - true if the failed node is THIS node, false if not + */ + public synchronized boolean handleNodeFailure(int failedNodeId) { + if(this.nodeId == failedNodeId){ + return true; + } + if(this.nodeId> failedNodeId){ + this.nodeId--; + updateSelector(); + } + this.modSize--; + return false; + } + /** * called by the message listener container in response to receipt of a JMS message. This method will compare the value of the diff --git a/src/main/java/org/cataractsoftware/datasponge/engine/JobCoordinator.java b/src/main/java/org/cataractsoftware/datasponge/engine/JobCoordinator.java index f00356a..d17a456 100644 --- a/src/main/java/org/cataractsoftware/datasponge/engine/JobCoordinator.java +++ b/src/main/java/org/cataractsoftware/datasponge/engine/JobCoordinator.java @@ -5,7 +5,6 @@ import org.cataractsoftware.datasponge.model.Job; import org.cataractsoftware.datasponge.model.JobEnrollment; import org.cataractsoftware.datasponge.model.ManagementMessage; -import org.cataractsoftware.datasponge.model.ManagementMessage.Type; import org.cataractsoftware.datasponge.util.ComponentFactory; import org.cataractsoftware.datasponge.writer.DataWriter; import org.cataractsoftware.datasponge.writer.JmsDataWriter; @@ -25,6 +24,7 @@ import javax.jms.Message; import javax.jms.Session; import java.util.*; +import java.util.Map.Entry; /** * This component is responsible for coordination of any jobs. If running in a multi-node setup, there could be multiple coordinators @@ -35,29 +35,25 @@ public class JobCoordinator { private static final Logger logger = LoggerFactory .getLogger(JobCoordinator.class); - private static final long OUTPUT_FLUSH_INTERVAL = 3000; + private static final long OUTPUT_FLUSH_INTERVAL = 10000; + private static final long FAILURE_INTERVAL = 60000; private static final ObjectMapper mapper = new ObjectMapper(); private static final int WAIT_TIME_SECS = 10; - private static final String UNKNOWN_JOB_MSG = "UNKNOWN_JOB"; - private static final String HOST_KEY = "host"; - private static final String NODE_KEY = "nodeId"; - private static final String SIZE_KEY = "modSize"; - private static final String HOST_ID = UUID.randomUUID().toString(); @Resource(name = "jobTopicTemplate") private JmsTemplate jobTopicTemplate; - @Resource(name = "managementTopicTemplate") - private JmsTemplate managementTopicTemplate; + @Autowired + private ManagementMessageSender managementMessageSender; @Autowired private ComponentFactory componentFactory; - private Map jobMap = new HashMap(); - private Map dataWriterMap = new HashMap(); - private Map jobExecutorMap = new HashMap(); - private Map> enrollmentMap = new HashMap>(); + private volatile Map jobMap = new HashMap(); + private volatile Map dataWriterMap = new HashMap(); + private volatile Map jobExecutorMap = new HashMap(); + private volatile Map> enrollmentMap = new HashMap>(); private Timer jobProgressTimer; public JobCoordinator() { @@ -65,25 +61,12 @@ public JobCoordinator() { jobProgressTimer.schedule(new TimerTask() { @Override public void run() { - checkJobStatus(); + updateJobStatus(); } }, OUTPUT_FLUSH_INTERVAL, OUTPUT_FLUSH_INTERVAL); } - /** - * returns the status of a job identified by the id passed in as a String. - * - * @param guid - * @return - */ - public String getJobStatus(String guid) { - Job j = jobMap.get(guid); - if (j != null) { - return j.getStatus().toString(); - } else { - return UNKNOWN_JOB_MSG; - } - } + /** * returns the Job domain object for the given guid (or null if not present) @@ -125,7 +108,7 @@ public List getAllJobs(Job.Status status){ public Job submitJob(final Job j) { if (j != null) { j.setGuid(UUID.randomUUID().toString()); - j.setCoordinatorId(HOST_ID); + j.setCoordinatorId(ManagementMessageSender.HOST_ID); if (j.getCoordinatorDataWriter() != null) { dataWriterMap.put(j.getGuid(), (DataWriter) componentFactory.getNewDataAdapter(j.getGuid(), j.getCoordinatorDataWriter())); } @@ -146,16 +129,18 @@ public Message createMessage(Session session) } }; jobTopicTemplate.send(messageCreator); - sendEnrollment(j.getGuid()); + managementMessageSender.sendEnrollment(j.getGuid()); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { - List enrollments = enrollmentMap.get(j - .getGuid()); - if (enrollments != null) { - for (int i = 0; i < enrollments.size(); i++) { - sendAssignment(j.getGuid(), i, enrollments.size()); + synchronized(enrollmentMap) { + List enrollments = enrollmentMap.get(j + .getGuid()); + if (enrollments != null) { + for (int i = 0; i < enrollments.size(); i++) { + managementMessageSender.sendAssignment(j.getGuid(), i, enrollments.size()); + } } } } @@ -169,18 +154,44 @@ public void run() { * flushes output for any coordinator-writers and checks for completed jobs. If a job is completed, its status is * updated and the executor resources are recovered. */ - private void checkJobStatus() { + protected void updateJobStatus() { flushOutput(); + checkLocalCompletion(); + checkGlobalCompletion(); + checkForFailures(); + } + + /** + * if on the job coordinator, checks for any nodes with the lastHearbeat > FAILURE_INTERVAL + */ + protected synchronized void checkForFailures(){ + for(Entry> enrollmentEntry: enrollmentMap.entrySet()){ + if(isJobCoordinator(enrollmentEntry.getKey())) { + for (JobEnrollment enrollment : enrollmentEntry.getValue()) { + if(enrollment.getLastHeartbeat()> 0) { + if(System.currentTimeMillis() - enrollment.getLastHeartbeat() > FAILURE_INTERVAL){ + managementMessageSender.sendFailure(enrollment.getJobId(),enrollment.getHostId()); + } + } + } + } + } + } + + + /** + * checks if the local executor for a job is complete. If so, it sends a complete management message, otherwise it sends + * a heartbeat message. If the executor is complete, the executor bean will be destroyed. + */ + protected void checkLocalCompletion(){ List completedJobs = new ArrayList(); for (Map.Entry executorEntry : jobExecutorMap.entrySet()) { + String jobId = executorEntry.getKey(); if (executorEntry.getValue().isDone()) { - String jobId = executorEntry.getKey(); completedJobs.add(jobId); - DataWriter writer = dataWriterMap.get(jobId); - if (writer != null) { - writer.finish(); - dataWriterMap.remove(jobId); - } + managementMessageSender.sendComplete(jobId); + } else { + managementMessageSender.sendHeartbeat(jobId); } } if (completedJobs.size() > 0) { @@ -189,7 +200,7 @@ private void checkJobStatus() { logger.info("Removed executor for job " + jobId); Job job = jobMap.get(jobId); if (job != null) { - job.setStatus(Job.Status.COMPLETE); + job.setStatus(Job.Status.NODE_COMPLETE); } //TODO: need to unregister the JMS listeners @@ -197,33 +208,58 @@ private void checkJobStatus() { } } + /** - * iterates over all dataWriters and flushes their output + * checks if we have recived COMPLETE messages for all enrollments for a job. If so, the job is marked complete. + * For jobs with a coordinatorDataWriter configured, the finish method will be called on the writer upon executor completion */ - private void flushOutput() { - for (DataWriter writer : dataWriterMap.values()) { - writer.flushBatch(); + protected synchronized void checkGlobalCompletion(){ + List completedJobs = new ArrayList(); + for(Map.Entry> enrollmentEntry: enrollmentMap.entrySet()){ + if(enrollmentEntry.getValue()!=null){ + boolean allComplete = true; + for(JobEnrollment e: enrollmentEntry.getValue()){ + if(!e.isComplete()){ + allComplete = false; + break; + } + } + if(allComplete) { + //if we're here, then all nodes have reported completion; + DataWriter writer = dataWriterMap.get(enrollmentEntry.getKey()); + if (writer != null) { + writer.finish(); + dataWriterMap.remove(enrollmentEntry.getKey()); + } + Job job = jobMap.get(enrollmentEntry.getKey()); + if (job != null) { + job.setStatus(Job.Status.COMPLETE); + } + //can also clean up enrollment map + completedJobs.add(enrollmentEntry.getKey()); + } + } + + } + if(completedJobs.size()>0){ + for(String jobId: completedJobs){ + enrollmentMap.remove(jobId); + } } } + /** - * sends the ASSIGNMENT message on the control topic to assign an ID to each participant. - * - * @param jobId - * @param nodeId - * @param modSize + * iterates over all dataWriters and flushes their output */ - private void sendAssignment(String jobId, int nodeId, int modSize) { - ManagementMessage assignmentMessage = new ManagementMessage(); - assignmentMessage.setJobId(jobId); - assignmentMessage.setType(Type.ASSIGNMENT); - Map data = new HashMap(); - data.put(NODE_KEY, nodeId + ""); - data.put(SIZE_KEY, modSize + ""); - assignmentMessage.setData(data); - managementTopicTemplate.send(buildMessageCreator(assignmentMessage)); + protected void flushOutput() { + for (DataWriter writer : dataWriterMap.values()) { + writer.flushBatch(); + } } + + /** * respond to job messages by enrolling. * @@ -237,7 +273,7 @@ public void handleJobMessage(String message) { // if we don't already know about this job jobMap.put(job.getGuid(), job); //TODO: perform check to ensure this node can handle the job (i.e. no ClassNotFoundException when instantiating DataAdapters) - sendEnrollment(job.getGuid()); + managementMessageSender.sendEnrollment(job.getGuid()); } } catch (Exception e) { throw new RuntimeException("Could not process job message", e); @@ -246,7 +282,6 @@ public void handleJobMessage(String message) { /** * dispatches messages received on the management topic - * * @param message */ @JmsListener(destination = "datasponge.management.topic", containerFactory = "topicContainerFactory") @@ -260,18 +295,24 @@ public void handleManagementMessage(String message) { break; case ASSIGNMENT: initializeEngineForJob(msg.getJobId(), - Integer.parseInt(msg.getData().get(NODE_KEY)), - Integer.parseInt(msg.getData().get(SIZE_KEY)), + Integer.parseInt(msg.getData().get(ManagementMessageSender.NODE_KEY)), + Integer.parseInt(msg.getData().get(ManagementMessageSender.SIZE_KEY)), isJobCoordinator(msg.getJobId())); - break; case HEARTBEAT: - //TODO: track heartbeats and re-assign node keys based on size of ensemble + updateEnrollment(msg, false); break; case ABORT: - //TODO: handle abort + handleAbort(msg.getJobId()); + break; + case COMPLETE: + updateEnrollment(msg,true); + break; + case NODE_FAILURE: + handleFailure(msg.getJobId(),Integer.parseInt(msg.getData().get(ManagementMessageSender.NODE_KEY))); break; default: + logger.error("Unknown control message type: "+msg.getType()); break; } @@ -280,6 +321,64 @@ public void handleManagementMessage(String message) { } } + /** + * handles the failure of a node by updating the executor so it can adjust its share of the workqueue + * @param jobId + * @param nodeId + */ + protected void handleFailure(String jobId, int nodeId){ + JobExecutor executor = jobExecutorMap.get(jobId); + if(executor!=null){ + if(executor.handleNodeFailure(nodeId)){ + //if the node that failed is this node,then the coordinator must not be getting our messages so we can terminates + handleAbort(jobId); + } + } + } + + + /** + * handles an abort by shutting down the executor for the job and flushing anything pending + * @param msg + */ + protected synchronized void handleAbort(String jobId){ + JobExecutor executor = jobExecutorMap.get(jobId); + + if(executor != null){ + logger.info("Aborting job "+jobId); + executor.destroy(); + jobExecutorMap.remove(jobId); + } + if(dataWriterMap.get(jobId) !=null){ + dataWriterMap.get(jobId).finish(); + dataWriterMap.remove(jobId); + } + Job j = jobMap.get(jobId); + if(j != null){ + j.setStatus(Job.Status.ABORTED); + } + } + + /** + * updates the heartbeat timestamp on the enrollment + * @param msg + */ + protected void updateEnrollment(ManagementMessage msg, boolean isComplete){ + String jobId = msg.getJobId(); + List enrollments = enrollmentMap.get(jobId); + if(enrollments != null){ + for(JobEnrollment e: enrollments){ + if(ManagementMessageSender.HOST_ID.equals(e.getHostId())){ + if(isComplete){ + e.setComplete(true); + }else { + e.setLastHeartbeat(System.currentTimeMillis()); + } + } + } + } + } + /** * records an enrollment * @@ -297,8 +396,7 @@ protected void handleEnrollmentMessage(ManagementMessage msg) { enrollments = new ArrayList(); enrollmentMap.put(jobId, enrollments); } - JobEnrollment enrollment = new JobEnrollment(jobId, msg.getData() - .get(HOST_KEY)); + JobEnrollment enrollment = new JobEnrollment(jobId, msg.getSenderHostId()); enrollments.add(enrollment); } } @@ -335,38 +433,7 @@ protected void initializeEngineForJob(String jobId, int nodeId, executor.executeCrawl(); } - /** - * sends the enrollment message on the management topic. - * - * @param jobId - */ - protected void sendEnrollment(final String jobId) { - ManagementMessage msg = new ManagementMessage(); - msg.setType(Type.ENROLLMENT); - msg.setJobId(jobId); - Map payload = new HashMap(); - payload.put(HOST_KEY, HOST_ID); - msg.setData(payload); - managementTopicTemplate.send(buildMessageCreator(msg)); - } - protected MessageCreator buildMessageCreator(final ManagementMessage msg) { - return new MessageCreator() { - @Override - public Message createMessage(Session session) throws JMSException { - try { - Message m = session.createTextMessage(mapper - .writeValueAsString(msg)); - - return m; - } catch (Exception e) { - logger.error("Could not publish json message", e); - throw new JMSException("Could not publish message: " - + e.getMessage()); - } - } - }; - } /** * returns true if this node is the coordinator for the job identified by the id passed in. @@ -378,7 +445,7 @@ private boolean isJobCoordinator(String jobId) { if (jobId != null) { Job j = jobMap.get(jobId); if (j != null && j.getCoordinatorId() != null - && this.HOST_ID.equals(j.getCoordinatorId())) { + && ManagementMessageSender.HOST_ID.equals(j.getCoordinatorId())) { return true; } else { return false; @@ -398,7 +465,7 @@ private boolean isJobCoordinator(String jobId) { public boolean areAllJobsDone() { if (jobMap.size() > 0) { for (Map.Entry jobEntry : jobMap.entrySet()) { - if (Job.Status.COMPLETE != jobEntry.getValue().getStatus()) { + if (Job.Status.COMPLETE != jobEntry.getValue().getStatus() && Job.Status.ABORTED != jobEntry.getValue().getStatus()) { return false; } } @@ -408,6 +475,20 @@ public boolean areAllJobsDone() { } } + /** + * sends the abort control message on the management topic + * @param jobId + * @return + */ + public boolean abortJob(String jobId){ + if(jobMap.get(jobId)!=null){ + managementMessageSender.sendAbort(jobId); + return true; + }else{ + return false; + } + } + @PreDestroy public void destroy() { if (jobProgressTimer != null) { diff --git a/src/main/java/org/cataractsoftware/datasponge/engine/JobExecutor.java b/src/main/java/org/cataractsoftware/datasponge/engine/JobExecutor.java index be8f758..19f5efd 100644 --- a/src/main/java/org/cataractsoftware/datasponge/engine/JobExecutor.java +++ b/src/main/java/org/cataractsoftware/datasponge/engine/JobExecutor.java @@ -16,9 +16,7 @@ import org.springframework.stereotype.Component; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; +import java.util.*; /** * This bean is responsible for executing a crawl job on a single host. It is a prototype bean that is initialized in response @@ -174,6 +172,15 @@ private List spawnThreads(int threadCount, return threadList; } + /** + * handles node failures. Returns true if the nodeId passed in corresponds to this node + * @param nodeId + * @return + */ + public boolean handleNodeFailure(int nodeId){ + return workQueue.handleNodeFailure(nodeId); + } + /** * initializes the crawler program by loading the properties, creating the * common work queue and seeding it with the list of URLs at which to start. diff --git a/src/main/java/org/cataractsoftware/datasponge/engine/ManagementMessageSender.java b/src/main/java/org/cataractsoftware/datasponge/engine/ManagementMessageSender.java new file mode 100644 index 0000000..14ef508 --- /dev/null +++ b/src/main/java/org/cataractsoftware/datasponge/engine/ManagementMessageSender.java @@ -0,0 +1,144 @@ +package org.cataractsoftware.datasponge.engine; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.cataractsoftware.datasponge.model.ManagementMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * Component used to create and send ManagementMessages to the coordination topic. + */ +@Component +public class ManagementMessageSender { + private static final Logger logger = LoggerFactory + .getLogger(ManagementMessageSender.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + public static final String NODE_KEY = "nodeId"; + public static final String SIZE_KEY = "modSize"; + public static final String HOST_ID = UUID.randomUUID().toString(); + + + + @Resource(name = "managementTopicTemplate") + private JmsTemplate managementTopicTemplate; + + + /** + * sends the enrollment message on the management topic. + * + * @param jobId + */ + public void sendEnrollment(String jobId) { + sendBasicMessage(jobId, ManagementMessage.Type.ENROLLMENT); + } + + /** + * sends the ASSIGNMENT message on the control topic to assign an ID to each participant. + * + * @param jobId + * @param nodeId + * @param modSize + */ + public void sendAssignment(String jobId, int nodeId, int modSize) { + ManagementMessage assignmentMessage = constructMessage(jobId, ManagementMessage.Type.ASSIGNMENT); + Map data = new HashMap(); + data.put(NODE_KEY, nodeId + ""); + data.put(SIZE_KEY, modSize + ""); + assignmentMessage.setData(data); + managementTopicTemplate.send(buildMessageCreator(assignmentMessage)); + } + + /** + * sends a status management message on the control topic to broadcast proof of life for this node/job + * @param jobId + */ + protected void sendBasicMessage(String jobId, ManagementMessage.Type type){ + managementTopicTemplate.send(buildMessageCreator(constructMessage(jobId,type))); + } + + /** + * helper method to construct a base ManagementMessage object + * @param jobId + * @param type + * @return + */ + private ManagementMessage constructMessage(String jobId, ManagementMessage.Type type){ + ManagementMessage msg = new ManagementMessage(); + msg.setJobId(jobId); + msg.setSenderHostId(HOST_ID); + msg.setType(type); + return msg; + } + + /** + * sends an ABORT message + * @param jobId + */ + public void sendAbort(String jobId){ + sendBasicMessage(jobId, ManagementMessage.Type.ABORT); + } + + /** + * sends heartbeat messages + */ + public void sendHeartbeat(String jobId){ + sendBasicMessage(jobId, ManagementMessage.Type.HEARTBEAT); + } + + /** + * sends complete message + * @param jobId + */ + public void sendComplete(String jobId){ + sendBasicMessage(jobId, ManagementMessage.Type.COMPLETE); + } + + /** + * sends a message indicating hostId failed for the job identified by jobId + * @param jobId + * @param hostId + */ + public void sendFailure(String jobId, String hostId){ + ManagementMessage failureMessage = constructMessage(jobId, ManagementMessage.Type.NODE_FAILURE); + Map data = new HashMap(); + data.put(NODE_KEY, hostId + ""); + failureMessage.setData(data); + managementTopicTemplate.send(buildMessageCreator(failureMessage)); + } + + /** + * builds a SpringJMS MessageCreator for use with the JMSTemplate when sending messages. + * @param msg + * @return + */ + protected MessageCreator buildMessageCreator(final ManagementMessage msg) { + return new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + try { + Message m = session.createTextMessage(mapper + .writeValueAsString(msg)); + + return m; + } catch (Exception e) { + logger.error("Could not publish json message", e); + throw new JMSException("Could not publish message: " + + e.getMessage()); + } + } + }; + } + +} diff --git a/src/main/java/org/cataractsoftware/datasponge/enhancer/DeduplicationEnhancer.java b/src/main/java/org/cataractsoftware/datasponge/enhancer/DeduplicationEnhancer.java index 98c22d0..5503ad4 100644 --- a/src/main/java/org/cataractsoftware/datasponge/enhancer/DeduplicationEnhancer.java +++ b/src/main/java/org/cataractsoftware/datasponge/enhancer/DeduplicationEnhancer.java @@ -18,6 +18,8 @@ * can be made by changing the implementation of equals for DataRecord and/or by setting the customDetector * property on the plugin config (in the job configuration) to the fully-qualified name of a DuplicateDetector instance. * + * TODO: update to only cache SHA-256 hash of the record instead of the whole thing + * * @author Christopher Fagiani */ public class DeduplicationEnhancer implements DataEnhancer, DuplicateDetector { diff --git a/src/main/java/org/cataractsoftware/datasponge/model/Job.java b/src/main/java/org/cataractsoftware/datasponge/model/Job.java index 6f61b15..e9fa32c 100644 --- a/src/main/java/org/cataractsoftware/datasponge/model/Job.java +++ b/src/main/java/org/cataractsoftware/datasponge/model/Job.java @@ -154,7 +154,7 @@ public void setCoordinatorDataWriter(PluginConfig coordinatorDataWriter) { } public enum Status { - SUBMITTED, PROCESSING, COMPLETE + SUBMITTED, PROCESSING, NODE_COMPLETE, COMPLETE, ABORTED } public enum Mode { diff --git a/src/main/java/org/cataractsoftware/datasponge/model/JobEnrollment.java b/src/main/java/org/cataractsoftware/datasponge/model/JobEnrollment.java index 51cb71e..7762eeb 100644 --- a/src/main/java/org/cataractsoftware/datasponge/model/JobEnrollment.java +++ b/src/main/java/org/cataractsoftware/datasponge/model/JobEnrollment.java @@ -1,7 +1,5 @@ package org.cataractsoftware.datasponge.model; -import java.util.Date; - /** * data structure used to have a server enroll in a crawl job * @@ -11,7 +9,8 @@ public class JobEnrollment { private String jobId; private String hostId; - private Date lastHeartbeat; + private long lastHeartbeat=0l; + private boolean complete = false; public JobEnrollment() { @@ -38,14 +37,22 @@ public void setHostId(String hostId) { this.hostId = hostId; } - public Date getLastHeartbeat() { + public long getLastHeartbeat() { return lastHeartbeat; } - public void setLastHeartbeat(Date lastHeartbeat) { + public void setLastHeartbeat(long lastHeartbeat) { this.lastHeartbeat = lastHeartbeat; } + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + @Override public int hashCode() { final int prime = 31; diff --git a/src/main/java/org/cataractsoftware/datasponge/model/ManagementMessage.java b/src/main/java/org/cataractsoftware/datasponge/model/ManagementMessage.java index 5d31529..bea04b4 100644 --- a/src/main/java/org/cataractsoftware/datasponge/model/ManagementMessage.java +++ b/src/main/java/org/cataractsoftware/datasponge/model/ManagementMessage.java @@ -12,6 +12,7 @@ public class ManagementMessage { private Type type; private Map data; private String jobId; + private String senderHostId; public Type getType() { return type; @@ -37,8 +38,16 @@ public void setJobId(String jobId) { this.jobId = jobId; } + public String getSenderHostId() { + return senderHostId; + } + + public void setSenderHostId(String senderHostId) { + this.senderHostId = senderHostId; + } + public enum Type { - HEARTBEAT, ENROLLMENT, ASSIGNMENT, ACK, ABORT + HEARTBEAT, ENROLLMENT, ASSIGNMENT, NODE_FAILURE,COMPLETE, ABORT }