Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming operator and API #858

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,6 @@ public Response searchDataset(@QueryParam("q") final String searchTerm) throws D
return Response.ok().cacheControl(MyriaApiUtils.doNotCache()).entity(relationKeys).build();
}

/**
* Helper function to parse a format string, with default value "csv".
*
* @param format the format string, with default value "csv".
* @return the cleaned-up format string.
*/
private String validateFormat(final String format) {
String cleanFormat = format;
if (cleanFormat == null) {
cleanFormat = "csv";
}
cleanFormat = cleanFormat.trim().toLowerCase();
/* CSV is legal */
if (cleanFormat.equals("csv")) {
return cleanFormat;
}
/* TSV is legal */
if (cleanFormat.equals("tsv")) {
return cleanFormat;
}
/* JSON is legal */
if (cleanFormat.equals("json")) {
return cleanFormat;
}
throw new MyriaApiException(Status.BAD_REQUEST, "format must be 'csv', 'tsv', or 'json'");
}

/**
* @param userName the user who owns the target relation.
* @param programName the program to which the target relation belongs.
Expand Down Expand Up @@ -182,7 +155,7 @@ public Response getDatasetData(
RelationKey relationKey = RelationKey.of(userName, programName, relationName);

/* Validate the request format. This will throw a MyriaApiException if format is invalid. */
String validFormat = validateFormat(format);
String validFormat = MyriaApiUtils.validateFormat(format);

/* Allocate the pipes by which the {@link DataOutput} operator will talk to the {@link StreamingOutput} object that
* will stream data to the client. */
Expand Down Expand Up @@ -244,7 +217,7 @@ public Response getQueryData(
ResponseBuilder response = Response.ok();

/* Validate the request format. This will throw a MyriaApiException if format is invalid. */
String validFormat = validateFormat(format);
String validFormat = MyriaApiUtils.validateFormat(format);

/* Allocate the pipes by which the {@link DataOutput} operator will talk to the {@link StreamingOutput} object that
* will stream data to the client. */
Expand Down Expand Up @@ -322,7 +295,7 @@ public Response replaceDataset(
Status.NOT_FOUND, "The dataset was not found: " + relationKey.toString());
}

String validFormat = validateFormat(format);
String validFormat = MyriaApiUtils.validateFormat(format);
Character delimiter;
if (validFormat.equals("csv")) {
delimiter = ',';
Expand Down
2 changes: 2 additions & 0 deletions src/edu/washington/escience/myria/api/MasterApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public MasterApplication(final Server server, final String adminPassword) {
* Disable WADL - throws error messages when using Swagger, and not needed.
*/
property(ServerProperties.WADL_FEATURE_DISABLE, true);
// Disable buffering of server-side response entity
property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);

/* Enable Jackson's JSON Serialization/Deserialization. */
register(JacksonJsonProvider.class);
Expand Down
28 changes: 28 additions & 0 deletions src/edu/washington/escience/myria/api/MyriaApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

/**
* Helper functions for the API.
Expand Down Expand Up @@ -30,4 +31,31 @@ public static CacheControl doNotCache() {
public static ResponseBuilder doNotCache(final ResponseBuilder response) {
return response.cacheControl(doNotCache());
}

/**
* Helper function to parse a format string, with default value "csv".
*
* @param format the format string, with default value "csv".
* @return the cleaned-up format string.
*/
public static String validateFormat(final String format) {
String cleanFormat = format;
if (cleanFormat == null) {
cleanFormat = "csv";
}
cleanFormat = cleanFormat.trim().toLowerCase();
/* CSV is legal */
if (cleanFormat.equals("csv")) {
return cleanFormat;
}
/* TSV is legal */
if (cleanFormat.equals("tsv")) {
return cleanFormat;
}
/* JSON is legal */
if (cleanFormat.equals("json")) {
return cleanFormat;
}
throw new MyriaApiException(Status.BAD_REQUEST, "format must be 'csv', 'tsv', or 'json'");
}
}
78 changes: 78 additions & 0 deletions src/edu/washington/escience/myria/api/QueryResource.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package edu.washington.escience.myria.api;

import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URI;
import java.util.Collections;
import java.util.List;

import javax.ws.rs.Consumes;
Expand All @@ -20,15 +23,22 @@
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

import org.glassfish.jersey.media.multipart.ContentDisposition;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

import edu.washington.escience.myria.CsvTupleWriter;
import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.JsonTupleWriter;
import edu.washington.escience.myria.TupleWriter;
import edu.washington.escience.myria.api.encoding.QueryEncoding;
import edu.washington.escience.myria.api.encoding.QuerySearchResults;
import edu.washington.escience.myria.api.encoding.QueryStatusEncoding;
import edu.washington.escience.myria.coordinator.CatalogException;
import edu.washington.escience.myria.io.PipeSink;
import edu.washington.escience.myria.operator.TupleSink;
import edu.washington.escience.myria.parallel.QueryFuture;
import edu.washington.escience.myria.parallel.Server;
import edu.washington.escience.myria.parallel.SubQueryId;
Expand Down Expand Up @@ -118,6 +128,74 @@ public Response postNewQuery(final QueryEncoding query, @Context final UriInfo u
.build();
}

/**
* For now, simply echoes back its input.
*
* @param query the query to be executed.
* @param uriInfo the URI of the current request.
* @return the URI of the created query.
* @throws CatalogException if there is an error in the catalog.
*/
@POST
@Path("stream")
public Response postNewStreamingQuery(
final QueryEncoding query,
@Context final UriInfo uriInfo,
@QueryParam("format") final String format)
throws CatalogException {
/* Validate the input. */
Preconditions.checkArgument(query != null, "Missing query encoding.");
query.validate();
/* Validate the request format. This will throw a MyriaApiException if format is invalid. */
final String validFormat = MyriaApiUtils.validateFormat(format);
// TODO: figure out how to choose the TupleWriter class corresponding to the format (hardcode to CSV for now)

/* Start the query, and get its Server-assigned Query ID */
final QueryFuture qf;
try {
qf = server.getQueryManager().submitQuery(query, query.plan.getPlan());
} catch (final MyriaApiException e) {
/* Passthrough MyriaApiException. */
throw e;
} catch (final CatalogException e) {
throw new MyriaApiException(Status.INTERNAL_SERVER_ERROR, e);
} catch (final Throwable e) {
/* Other exceptions mean that the request itself was likely bad. */
throw new MyriaApiException(Status.BAD_REQUEST, e);
}

/* Check to see if the query was submitted successfully. */
if (qf == null) {
throw new MyriaApiException(
Status.SERVICE_UNAVAILABLE, "The server cannot accept new queries right now.");
}

final long queryId = qf.getQueryId();

/* Start building the response. */
final ResponseBuilder response = Response.ok();
response.entity(new PipedStreamingOutput(server.getQueryOutput(queryId)));

if (validFormat.equals("csv") || validFormat.equals("tsv")) {
/* CSV or TSV : set application/octet-stream, attachment, and filename. */
final ContentDisposition contentDisposition =
ContentDisposition.type("attachment")
.fileName("query_" + queryId + '.' + validFormat)
.build();

response.header("Content-Disposition", contentDisposition);
response.type(MediaType.APPLICATION_OCTET_STREAM);
} else if (validFormat.equals("json")) {
/* JSON: set application/json. */
response.type(MyriaApiConstants.JSON_UTF_8);
} else {
/* Should not be possible to get here. */
throw new IllegalStateException(
"format should have been validated by now, and yet we got here");
}
return response.build();
}

/**
* Get information about a query. This includes when it started, when it finished, its URL, etc.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
@Type(name = "ShuffleProducer", value = GenericShuffleProducerEncoding.class),
@Type(name = "Singleton", value = SingletonEncoding.class),
@Type(name = "StatefulApply", value = StatefulApplyEncoding.class),
@Type(name = "StreamingSink", value = StreamingSinkEncoding.class),
@Type(name = "SymmetricHashJoin", value = SymmetricHashJoinEncoding.class),
@Type(name = "SymmetricHashCountingJoin", value = SymmetricHashCountingJoinEncoding.class),
@Type(name = "TableScan", value = TableScanEncoding.class),
Expand Down
48 changes: 37 additions & 11 deletions src/edu/washington/escience/myria/api/encoding/QueryConstruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,34 +355,60 @@ private static void setAndVerifySingletonConstraints(
final List<PlanFragmentEncoding> fragments, final ConstructArgs args) {
List<Integer> singletonWorkers =
ImmutableList.of(args.getServer().getAliveWorkers().iterator().next());
List<Integer> masterWorkers = ImmutableList.of(MyriaConstants.MASTER_ID);

for (PlanFragmentEncoding fragment : fragments) {
boolean hasSingletonOp = false;
boolean runOnMaster = false;
for (OperatorEncoding<?> operator : fragment.operators) {
if (operator instanceof CollectConsumerEncoding
if (operator instanceof TupleSinkEncoding
|| operator instanceof StreamingSinkEncoding
|| operator instanceof CollectConsumerEncoding
|| operator instanceof SingletonEncoding
|| operator instanceof EOSControllerEncoding
|| operator instanceof TupleSourceEncoding
|| operator instanceof NChiladaFileScanEncoding
|| operator instanceof SeaFlowFileScanEncoding
|| operator instanceof TipsyFileScanEncoding) {

hasSingletonOp = true;
if (operator instanceof StreamingSinkEncoding) {
runOnMaster = true;
}
String encodingTypeName = operator.getClass().getSimpleName();
String operatorTypeName =
encodingTypeName.substring(0, encodingTypeName.indexOf("Encoding"));

if (fragment.workers == null) {
String encodingTypeName = operator.getClass().getSimpleName();
String operatorTypeName =
encodingTypeName.substring(0, encodingTypeName.indexOf("Encoding"));
LOGGER.warn(
"{} operator can only be instantiated on a single worker, assigning to random worker",
operatorTypeName);
fragment.workers = singletonWorkers;
if (operator instanceof StreamingSinkEncoding) {
LOGGER.warn("{} operator can only be instantiated on master", operatorTypeName);
} else {
LOGGER.warn(
"{} operator can only be instantiated on a single worker", operatorTypeName);
}
} else {
Preconditions.checkArgument(
fragment.workers.size() == 1,
"Fragment %s has a singleton operator %s, but workers %s",
fragment.fragmentIndex,
operator.opId,
operatorTypeName,
fragment.workers);
Preconditions.checkArgument(
!runOnMaster || fragment.workers.equals(masterWorkers),
"Fragment %s has a master-only operator %s, but workers %s",
fragment.fragmentIndex,
operatorTypeName,
fragment.workers);
}
/* We only need to verify singleton-ness once per fragment. */
break;
}
}
if (fragment.workers == null && hasSingletonOp) {
if (runOnMaster) {
LOGGER.warn("Assigning unassigned fragment {} to master", fragment.fragmentIndex);
fragment.workers = masterWorkers;
} else {
LOGGER.warn("Assigning unassigned fragment {} to random worker", fragment.fragmentIndex);
fragment.workers = singletonWorkers;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package edu.washington.escience.myria.api.encoding;

import java.io.IOException;

import javax.ws.rs.core.Response.Status;

import edu.washington.escience.myria.CsvTupleWriter;
import edu.washington.escience.myria.TupleWriter;
import edu.washington.escience.myria.api.MyriaApiException;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.io.PipeSink;
import edu.washington.escience.myria.operator.TupleSink;

public class StreamingSinkEncoding extends UnaryOperatorEncoding<TupleSink> {

@Override
public TupleSink construct(final ConstructArgs args) throws MyriaApiException {
// TODO: dynamically select TupleWriter impl from API format parameter
final TupleWriter tupleWriter = new CsvTupleWriter();
final PipeSink dataSink;
try {
dataSink = new PipeSink();
} catch (IOException e) {
throw new MyriaApiException(Status.INTERNAL_SERVER_ERROR, e);
}
args.getServer().registerQueryOutput(args.getQueryId(), dataSink.getInputStream());
return new TupleSink(null, tupleWriter, dataSink);
}
}
4 changes: 3 additions & 1 deletion src/edu/washington/escience/myria/io/FileSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import edu.washington.escience.myria.coordinator.CatalogException;
Expand All @@ -14,8 +15,9 @@ public class FileSink implements DataSink {
/** Required for Java serialization. */
private static final long serialVersionUID = 1L;

@JsonProperty private String filename;
private String filename;

@JsonCreator
public FileSink(@JsonProperty(value = "filename", required = true) final String filename)
throws CatalogException {
this.filename = filename;
Expand Down
20 changes: 11 additions & 9 deletions src/edu/washington/escience/myria/io/PipeSink.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
/**
*
*/
package edu.washington.escience.myria.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.fasterxml.jackson.annotation.JsonCreator;

import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.api.PipedStreamingOutput;

/**
*
*/
public class PipeSink implements DataSink {
/** Required for Java serialization. */
private static final long serialVersionUID = 1L;

final PipedOutputStream writerOutput;
final PipedInputStream input;
final PipedStreamingOutput responseEntity;
private final PipedOutputStream writerOutput;
private final PipedInputStream input;
private final PipedStreamingOutput responseEntity;

@JsonCreator
public PipeSink() throws IOException {
writerOutput = new PipedOutputStream();
input = new PipedInputStream(writerOutput, MyriaConstants.DEFAULT_PIPED_INPUT_STREAM_SIZE);
Expand All @@ -36,4 +34,8 @@ public OutputStream getOutputStream() {
public PipedStreamingOutput getResponse() {
return responseEntity;
}

public InputStream getInputStream() {
return input;
}
}
Loading