Skip to content

Commit acb7966

Browse files
author
arnett, stu
committed
v2.1.3
1 parent 306c66f commit acb7966

17 files changed

+351
-227
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515
allprojects {
1616
group = 'com.emc.ecs'
17-
version = '2.1.2'
17+
version = '2.1.3'
1818
}
1919

2020
ext.mainClass = 'com.emc.ecs.sync.EcsSync'

script/ova/configure-centos.sh

+6-4
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ yum -y install iperf telnet
2828
# apache
2929
yum -y install httpd mod_ssl
3030
# configure proxy and auth
31-
cp "${DIST_DIR}/ova/httpd/.htaccess" /etc/httpd
31+
cp "${DIST_DIR}/ova/httpd/.htpasswd" /etc/httpd
3232
cp "${DIST_DIR}/ova/httpd/conf.d/ecs-sync.conf" /etc/httpd/conf.d
3333
systemctl enable httpd
3434
systemctl start httpd
@@ -42,15 +42,17 @@ if [ -f /etc/my.cnf.d/server.cnf ]; then
4242
sed -i '/\[server\]/a\
4343
innodb_file_format=Barracuda\
4444
innodb_large_prefix=1\
45-
innodb_file_per_table=1' /etc/my.cnf.d/server.cnf
45+
innodb_file_per_table=1\
46+
bind-address=127.0.0.1' /etc/my.cnf.d/server.cnf
4647
fi
4748
systemctl daemon-reload
48-
systemctl enable mariadb-server
49-
systemctl start mariadb-server
49+
systemctl enable mariadb.service
50+
systemctl start mariadb.service
5051
# remove test DBs and set root PW
5152
mysql_secure_installation
5253
# create database for ecs-sync
5354
MYSQL_DIR="$(cd "$(dirname $0)/../mysql" && pwd)"
55+
echo 'Please enter the mySQL/mariaDB root password'
5456
mysql -u root -p < "${MYSQL_DIR}/utf8/create_mysql_user_db.sql"
5557

5658
# sysctl tweaks
File renamed without changes.

src/main/java/com/emc/ecs/sync/EcsSync.java

+1
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ public void run() {
639639
}
640640
if (dbTable != null) dbService.setObjectsTableName(dbTable);
641641
}
642+
dbService.setTimingPlugin(source);
642643

643644
// create thread pools
644645
queryExecutor = new EnhancedThreadPoolExecutor(queryThreadCount, new LinkedBlockingDeque<Runnable>(), "query-pool");

src/main/java/com/emc/ecs/sync/service/DbService.java

+8
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,14 @@ public SyncRecord mapRow(ResultSet rs, int rowNum) throws SQLException {
304304
}
305305
}
306306

307+
public SyncPlugin getTimingPlugin() {
308+
return timingPlugin;
309+
}
310+
311+
public void setTimingPlugin(SyncPlugin timingPlugin) {
312+
this.timingPlugin = timingPlugin;
313+
}
314+
307315
// purely for timing functions
308316
protected class DatabasePlugin extends SyncFilter {
309317
@Override

src/main/java/com/emc/ecs/sync/source/S3Source.java

+2
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ private void getNextBatch() {
357357
} else {
358358
listing = s3.listNextBatchOfObjects(listing);
359359
}
360+
listing.setMaxKeys(1000); // Google Storage compatibility
360361
objectIterator = listing.getObjectSummaries().iterator();
361362
}
362363
}
@@ -403,6 +404,7 @@ private void getNextVersionBatch() {
403404
if (versionListing == null) {
404405
versionListing = s3.listVersions(bucketName, "".equals(prefix) ? null : prefix);
405406
} else {
407+
versionListing.setMaxKeys(1000); // Google Storage compatibility
406408
versionListing = s3.listNextBatchOfVersions(versionListing);
407409
}
408410
versionIterator = versionListing.getVersionSummaries().iterator();

src/main/java/com/emc/ecs/sync/target/CasTarget.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.emc.ecs.sync.util.ClipTag;
2525
import com.emc.ecs.sync.util.ConfigurationException;
2626
import com.emc.ecs.sync.util.TimingUtil;
27+
import com.emc.object.util.ProgressInputStream;
2728
import com.emc.object.util.ProgressListener;
2829
import com.filepool.fplibrary.*;
2930
import org.apache.commons.cli.CommandLine;
@@ -215,8 +216,12 @@ protected void timedStreamBlob(final FPTag tag, final ClipTag blob) throws Excep
215216
TimingUtil.time(CasTarget.this, CasUtil.OPERATION_STREAM_BLOB, new Callable<Void>() {
216217
@Override
217218
public Void call() throws Exception {
218-
ProgressListener listener = isMonitorPerformance() ? new CasTargetProgress() : null;
219-
blob.writeToTag(tag, listener);
219+
InputStream sourceStream = blob.getBlobInputStream();
220+
if (isMonitorPerformance())
221+
sourceStream = new ProgressInputStream(sourceStream, new CasTargetProgress());
222+
try (InputStream stream = sourceStream) {
223+
tag.BlobWrite(stream);
224+
}
220225
return null;
221226
}
222227
});

src/main/java/com/emc/ecs/sync/target/CuaFilesystemTarget.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.io.File;
3131
import java.io.FileOutputStream;
32+
import java.io.InputStream;
3233
import java.io.OutputStream;
3334
import java.nio.file.Files;
3435
import java.nio.file.Path;
@@ -140,8 +141,13 @@ public void filter(SyncObject obj) {
140141
mkdirs(destFile.getParentFile());
141142

142143
// write the file
143-
try (OutputStream out = new FileOutputStream(destFile)) {
144-
blobTag.writeToStream(out);
144+
try (InputStream in = blobTag.getBlobInputStream();
145+
OutputStream out = new FileOutputStream(destFile)) {
146+
byte[] buffer = new byte[bufferSize];
147+
int read;
148+
while (((read = in.read(buffer)) != -1)) {
149+
out.write(buffer, 0, read);
150+
}
145151
}
146152

147153

src/main/java/com/emc/ecs/sync/target/S3Target.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ protected void putObject(SyncObject obj, String targetKey) {
415415
if (obj.isDirectory()) {
416416
req = new PutObjectRequest(bucketName, targetKey, new ByteArrayInputStream(new byte[0]), om);
417417
} else if (obj instanceof FileSyncObject) {
418-
req = new PutObjectRequest(bucketName, targetKey, ((FileSyncObject) obj).getRawSourceIdentifier());
418+
req = new PutObjectRequest(bucketName, targetKey, ((FileSyncObject) obj).getRawSourceIdentifier()).withMetadata(om);
419419
} else {
420420
req = new PutObjectRequest(bucketName, targetKey, obj.getInputStream(), om);
421421
}

src/main/java/com/emc/ecs/sync/util/AwsS3Util.java

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public static ListIterator<S3ObjectVersion> listVersions(
119119
do {
120120
if (listing == null) listing = s3.listVersions(bucket, key, null, null, "/", null);
121121
else listing = s3.listNextBatchOfVersions(listing);
122+
listing.setMaxKeys(1000); // Google Storage compatibility
122123

123124
for (S3VersionSummary summary : listing.getVersionSummaries()) {
124125

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.emc.ecs.sync.util;
2+
3+
import com.emc.object.util.ProgressListener;
4+
import com.emc.object.util.ProgressOutputStream;
5+
import com.filepool.fplibrary.FPTag;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.io.PipedInputStream;
12+
import java.io.PipedOutputStream;
13+
import java.security.DigestInputStream;
14+
import java.security.MessageDigest;
15+
import java.security.NoSuchAlgorithmException;
16+
17+
public class BlobInputStream extends CountingInputStream {
18+
private static final Logger log = LoggerFactory.getLogger(BlobInputStream.class);
19+
20+
private FPTag tag;
21+
BlobReader reader;
22+
Thread readerThread;
23+
24+
public BlobInputStream(FPTag tag, int bufferSize) throws IOException {
25+
this(tag, bufferSize, null);
26+
}
27+
28+
public BlobInputStream(FPTag tag, int bufferSize, ProgressListener listener) throws IOException {
29+
super(null);
30+
this.tag = tag;
31+
32+
// piped streams and a reader task are necessary because of the odd stream handling in the CAS JNI wrapper
33+
PipedInputStream pin = new PipedInputStream(bufferSize);
34+
PipedOutputStream pout = new PipedOutputStream(pin);
35+
36+
try {
37+
in = new DigestInputStream(pin, MessageDigest.getInstance("md5"));
38+
} catch (NoSuchAlgorithmException e) {
39+
throw new RuntimeException("could not initialize MD5 digest", e);
40+
}
41+
42+
OutputStream out = pout;
43+
if (listener != null) out = new ProgressOutputStream(out, listener);
44+
45+
reader = new BlobReader(out);
46+
readerThread = new Thread(reader);
47+
readerThread.start();
48+
}
49+
50+
@Override
51+
public int read() throws IOException {
52+
checkReader();
53+
int result = super.read();
54+
checkReader(); // also check after read in case we were blocked
55+
return result;
56+
}
57+
58+
@Override
59+
public int read(byte[] b) throws IOException {
60+
return read(b, 0, b.length);
61+
}
62+
63+
@Override
64+
public int read(byte[] b, int off, int len) throws IOException {
65+
checkReader();
66+
int result = super.read(b, off, len);
67+
checkReader(); // also check after read in case we were blocked
68+
return result;
69+
}
70+
71+
private void checkReader() throws IOException {
72+
if (reader.isFailed()) throw new IOException("blob reader failed", reader.getError());
73+
}
74+
75+
@Override
76+
public void close() throws IOException {
77+
try {
78+
super.close();
79+
} finally {
80+
81+
// if the reader is active, closing the pipe (above) will throw an exception in PipedOutputStream.write
82+
// if it is waiting for buffer space however, it will need to be interrupted or it will be frozen indefinitely
83+
if (!reader.isComplete() && !reader.isFailed())
84+
readerThread.interrupt();
85+
86+
// if the reader is complete, this does nothing; if close was called early, this will wait until the reader
87+
// thread is notified of the close (an IOException will be thrown from PipedOutputStream.write)
88+
try {
89+
readerThread.join();
90+
} catch (Throwable t) {
91+
log.warn("could not join reader thread", t);
92+
}
93+
}
94+
}
95+
96+
public byte[] getMd5Digest() {
97+
if (!(in instanceof DigestInputStream)) throw new UnsupportedOperationException("MD5 checksum is not enabled");
98+
if (!isClosed()) throw new UnsupportedOperationException("cannot get MD5 until stream is closed");
99+
return ((DigestInputStream) in).getMessageDigest().digest();
100+
}
101+
102+
class BlobReader implements Runnable {
103+
private OutputStream out;
104+
private volatile boolean complete = false;
105+
private volatile boolean failed = false;
106+
private volatile Throwable error;
107+
108+
BlobReader(OutputStream out) {
109+
this.out = out;
110+
}
111+
112+
@Override
113+
public synchronized void run() {
114+
try (OutputStream outputStream = out) {
115+
tag.BlobRead(outputStream);
116+
complete = true;
117+
} catch (Throwable t) {
118+
failed = true;
119+
error = t;
120+
}
121+
}
122+
123+
public boolean isComplete() {
124+
return complete;
125+
}
126+
127+
public boolean isFailed() {
128+
return failed;
129+
}
130+
131+
public Throwable getError() {
132+
return error;
133+
}
134+
}
135+
}

src/main/java/com/emc/ecs/sync/util/CasInputStream.java

-79
This file was deleted.

0 commit comments

Comments
 (0)