Skip to content

Commit

Permalink
v3.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
arnett, stu committed Jul 31, 2017
1 parent 1c73f42 commit 99581da
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 92 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
allprojects {
group = 'com.emc.ecs'
version = '3.2'
version = '3.2.1'
}

ext.mainClass = 'com.emc.ecs.sync.EcsSync'
Expand Down
67 changes: 35 additions & 32 deletions src/main/java/com/emc/ecs/sync/EcsSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static void main(String[] args) {
exitCode = 2;
}

// 0 = completed with no failures, 1 = invalid options, 2 = unexpected error, 3 = completed with some object failures
System.exit(exitCode);
}

Expand All @@ -149,11 +150,12 @@ private static String versionLine() {
private EnhancedThreadPoolExecutor listExecutor;
private EnhancedThreadPoolExecutor syncExecutor;
private EnhancedThreadPoolExecutor queryExecutor;
private EnhancedThreadPoolExecutor estimateQueryExecutor;
private EnhancedThreadPoolExecutor estimateExecutor;
private EnhancedThreadPoolExecutor retrySubmitter;
private SyncFilter firstFilter;
private SyncEstimate syncEstimate;
private volatile boolean paused, terminated;
private volatile boolean terminated;
private SyncStats stats = new SyncStats();

private SyncConfig syncConfig;
Expand Down Expand Up @@ -264,9 +266,11 @@ public void run() {
// create thread pools
listExecutor = new EnhancedThreadPoolExecutor(options.getThreadCount(),
new LinkedBlockingDeque<Runnable>(1000), "list-pool");
estimateQueryExecutor = new EnhancedThreadPoolExecutor(options.getThreadCount(),
new LinkedBlockingDeque<Runnable>(), "estimate-q-pool");
estimateExecutor = new EnhancedThreadPoolExecutor(options.getThreadCount(),
new LinkedBlockingDeque<Runnable>(1000), "estimate-pool");
queryExecutor = new EnhancedThreadPoolExecutor(options.getThreadCount() * 2,
queryExecutor = new EnhancedThreadPoolExecutor(options.getThreadCount(),
new LinkedBlockingDeque<Runnable>(), "query-pool");
syncExecutor = new EnhancedThreadPoolExecutor(options.getThreadCount(),
new LinkedBlockingDeque<Runnable>(1000), "sync-pool");
Expand Down Expand Up @@ -352,21 +356,12 @@ public void run() {
} finally {
if (!syncControl.isRunning()) log.warn("terminated early!");
syncControl.setRunning(false);
if (paused) {
paused = false;
// must interrupt the threads that are blocked
if (listExecutor != null) listExecutor.shutdownNow();
if (estimateExecutor != null) estimateExecutor.shutdownNow();
if (queryExecutor != null) queryExecutor.shutdownNow();
if (retrySubmitter != null) retrySubmitter.shutdownNow();
if (syncExecutor != null) syncExecutor.shutdownNow();
} else {
if (listExecutor != null) listExecutor.shutdown();
if (estimateExecutor != null) estimateExecutor.shutdown();
if (queryExecutor != null) queryExecutor.shutdown();
if (retrySubmitter != null) retrySubmitter.shutdown();
if (syncExecutor != null) syncExecutor.shutdown();
}
if (listExecutor != null) listExecutor.shutdown();
if (estimateQueryExecutor != null) estimateQueryExecutor.shutdown();
if (estimateExecutor != null) estimateExecutor.shutdown();
if (queryExecutor != null) queryExecutor.shutdown();
if (retrySubmitter != null) retrySubmitter.shutdown();
if (syncExecutor != null) syncExecutor.shutdown();
if (stats != null) stats.setStopTime(System.currentTimeMillis());

// clean up any resources in the plugins
Expand Down Expand Up @@ -399,38 +394,45 @@ public void run() {
* Stops the underlying executors from executing new tasks. Currently running tasks will complete and all threads
* will then block until resumed
*
* @return true if the state was changed from running to pause; false if already paused
* @throws IllegalStateException if the sync is complete or was terminated
*/
public boolean pause() {
public void pause() {
if (!syncControl.isRunning()) throw new IllegalStateException("sync is not running");
boolean changed = queryExecutor.pause() && syncExecutor.pause();
paused = true;
listExecutor.pause();
estimateQueryExecutor.pause();
estimateExecutor.pause();
queryExecutor.pause();
retrySubmitter.pause();
syncExecutor.pause();
stats.pause();
return changed;
}

/**
* Resumes the underlying executors so they may continue to execute tasks
*
* @return true if the state was changed from paused to running; false if already running
* @throws IllegalStateException if the sync is complete or was terminated
* @see #pause()
*/
public boolean resume() {
public void resume() {
if (!syncControl.isRunning()) throw new IllegalStateException("sync is not running");
boolean changed = queryExecutor.resume() && syncExecutor.resume();
paused = false;
listExecutor.resume();
estimateQueryExecutor.resume();
estimateExecutor.resume();
queryExecutor.resume();
retrySubmitter.resume();
syncExecutor.resume();
stats.resume();
return changed;
}

public void terminate() {
syncControl.setRunning(false);
terminated = true;
if (queryExecutor != null) queryExecutor.getQueue().clear();
if (retrySubmitter != null) retrySubmitter.getQueue().clear();
if (syncExecutor != null && syncExecutor.resume()) paused = false;
if (listExecutor != null) listExecutor.stop();
if (estimateQueryExecutor != null) estimateQueryExecutor.stop();
if (estimateExecutor != null) estimateExecutor.stop();
if (queryExecutor != null) queryExecutor.stop();
if (retrySubmitter != null) retrySubmitter.stop();
if (syncExecutor != null) syncExecutor.stop();
}

public String summarizeConfig() {
Expand Down Expand Up @@ -527,6 +529,7 @@ private void safeClose(AutoCloseable closeable) {
public void setThreadCount(int threadCount) {
syncConfig.getOptions().setThreadCount(threadCount);
if (listExecutor != null) listExecutor.resizeThreadPool(threadCount);
if (estimateQueryExecutor != null) estimateQueryExecutor.resizeThreadPool(threadCount);
if (estimateExecutor != null) estimateExecutor.resizeThreadPool(threadCount);
if (queryExecutor != null) queryExecutor.resizeThreadPool(threadCount);
if (syncExecutor != null) syncExecutor.resizeThreadPool(threadCount);
Expand Down Expand Up @@ -555,7 +558,7 @@ public boolean isRunning() {
}

public boolean isPaused() {
return paused;
return syncExecutor != null && syncExecutor.isPaused();
}

public boolean isTerminated() {
Expand Down Expand Up @@ -719,7 +722,7 @@ public void run() {
if (summary == null) summary = storage.parseListLine(listLine);
syncEstimate.incTotalObjectCount(1);
if (syncConfig.getOptions().isRecursive() && summary.isDirectory()) {
queryExecutor.blockingSubmit(new Runnable() {
estimateQueryExecutor.blockingSubmit(new Runnable() {
@Override
public void run() {
log.debug("[est.]>>>> querying children of {}", summary.getIdentifier());
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/emc/ecs/sync/rest/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.emc.ecs.sync.config.ConfigurationException;
import com.emc.ecs.sync.config.SyncConfig;
import com.emc.ecs.sync.service.JobNotFoundException;
import com.emc.ecs.sync.service.SyncJobService;
import com.sun.jersey.api.NotFoundException;
import com.sun.jersey.api.client.ClientResponse;
Expand Down Expand Up @@ -62,7 +63,7 @@ public Response delete(@PathParam("jobId") int jobId, @QueryParam("keepDatabase"
try {
SyncJobService.getInstance().deleteJob(jobId, keepDatabase);
return Response.ok().build();
} catch (IllegalArgumentException e) { // job not found
} catch (JobNotFoundException e) { // job not found
throw new NotFoundException(e.getMessage());
} catch (UnsupportedOperationException e) { // job is running or paused (can't be deleted)
return Response.status(ClientResponse.Status.CONFLICT).type(MediaType.TEXT_PLAIN).entity(e.toString()).build();
Expand All @@ -85,7 +86,7 @@ public Response setControl(@PathParam("jobId") int jobId, JobControl jobControl)
try {
SyncJobService.getInstance().setJobControl(jobId, jobControl);
return Response.ok().build();
} catch (IllegalArgumentException e) { // job not found
} catch (JobNotFoundException e) { // job not found
throw new NotFoundException(e.getMessage());
} catch (IllegalStateException e) { // job is stopped and cannot be restarted
return Response.status(ClientResponse.Status.CONFLICT).type(MediaType.TEXT_PLAIN).entity(e.toString()).build();
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/emc/ecs/sync/service/JobNotFoundException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.emc.ecs.sync.service;

public class JobNotFoundException extends RuntimeException {
public JobNotFoundException() {
}

public JobNotFoundException(String message) {
super(message);
}

public JobNotFoundException(String message, Throwable cause) {
super(message, cause);
}

public JobNotFoundException(Throwable cause) {
super(cause);
}

public JobNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/emc/ecs/sync/service/SqliteDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public class SqliteDbService extends AbstractDbService {

public SqliteDbService(String dbFile) {
this.dbFile = dbFile;
if (!dbFile.contains(":")) { // don't validate non-file locations (like :memory:)
if (!dbFile.startsWith(":")) { // don't validate non-file locations (like :memory:)
File file = new File(dbFile);
if (!file.getParentFile().exists() || (!file.exists() && !file.getParentFile().canWrite())
if ((!file.exists() && file.getParentFile() != null && !file.getParentFile().canWrite())
|| (file.exists() && !file.canWrite()))
throw new IllegalArgumentException("Cannot write to " + dbFile);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/emc/ecs/sync/service/SyncJobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public SyncConfig getJob(int jobId) {
public void deleteJob(int jobId, boolean keepDatabase) {
EcsSync sync = syncCache.get(jobId);

if (sync == null) throw new IllegalArgumentException("the specified job ID does not exist");
if (sync == null) throw new JobNotFoundException("the specified job ID does not exist");

if (!getJobStatus(sync).isFinalState())
throw new UnsupportedOperationException("the job must be stopped before it can be deleted");
Expand Down Expand Up @@ -187,7 +187,7 @@ public JobControl getJobControl(int jobId) {
public void setJobControl(int jobId, JobControl jobControl) {
EcsSync sync = syncCache.get(jobId);

if (sync == null) throw new IllegalArgumentException("the specified job ID does not exist");
if (sync == null) throw new JobNotFoundException("the specified job ID does not exist");

if (jobControl.getThreadCount() > 0) {
sync.setThreadCount(jobControl.getThreadCount());
Expand Down
19 changes: 8 additions & 11 deletions src/main/java/com/emc/ecs/sync/storage/TestStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -243,22 +242,22 @@ private String randChars(int count, boolean alphaNumOnly) {
return new String(chars);
}

private void mkdirs(File path) {
File parent = path.getParentFile();
private void mkdirs(String path) {
String parent = SyncUtil.parentPath(path);
// don't need to create the root path
if (ROOT_PATH.equals(parent.getPath()) || ROOT_PATH.equals(path.getPath())) return;
if (ROOT_PATH.equals(parent) || ROOT_PATH.equals(path)) return;
mkdirs(parent);
synchronized (this) {
String parentParent = parent.getParent();
String parentParent = SyncUtil.parentPath(parent);
if (parentParent == null) parentParent = "";
// find parent among grandparent's children
for (TestSyncObject object : getChildren(parentParent)) {
if (parent.getPath().equals(getIdentifier(object.getRelativePath(), true))) return;
if (parent.equals(getIdentifier(object.getRelativePath(), true))) return;
}
// create parent
ObjectMetadata metadata = new ObjectMetadata();
metadata.setDirectory(true);
addChild(parentParent, new TestSyncObject(this, getRelativePath(parent.getPath(), true), metadata, null));
addChild(parentParent, new TestSyncObject(this, getRelativePath(parent, true), metadata, null));
}
}

Expand All @@ -281,16 +280,14 @@ public void ingest(TestStorage source, String identifier) {
}

private void ingest(String identifier, TestSyncObject testObject) {
File file = new File(identifier);

// equivalent of mkdirs()
mkdirs(file);
mkdirs(identifier);

// add to lookup
idMap.put(identifier, testObject);

// add to parent
addChild(file.getParent(), testObject);
addChild(SyncUtil.parentPath(identifier), testObject);
}

private synchronized void addChild(String parentPath, TestSyncObject object) {
Expand Down
Loading

0 comments on commit 99581da

Please sign in to comment.