Skip to content

Commit

Permalink
Merge branch 'genomic-v2' into ALS-6330
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Oct 3, 2024
2 parents 8626102 + 3f604b4 commit 5d27e75
Show file tree
Hide file tree
Showing 21 changed files with 919 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ public String toString() {
writePartFormat("Observation Count Fields", fields, builder, true);
break;
case DATAFRAME:
case DATAFRAME_MERGED:
case SECRET_ADMIN_DATAFRAME:
writePartFormat("Data Export Fields", fields, builder, true);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ public enum ResultType {
* Return the number of observations for included patients and
* included fields, broken up across the included cross count fields.
*/
OBSERVATION_CROSS_COUNT,
/**
* This was developed for UDN, but is completely useless and should
* be deleted.
*/
DATAFRAME_MERGED,
OBSERVATION_CROSS_COUNT,
/**
* Not completely implemented and currently dead code. Someone with
* statistics experience needs to develop a p-value based filter for
Expand Down Expand Up @@ -94,5 +89,10 @@ public enum ResultType {
* is suitable to time series analysis and/or loading into another
* instance of HPDS.
*/
DATAFRAME_TIMESERIES
DATAFRAME_TIMESERIES,
/**
* Exports data as PFB, using avro
* <a href="https://uc-cdis.github.io/pypfb/">https://uc-cdis.github.io/pypfb/</a>
*/
DATAFRAME_PFB
}
11 changes: 11 additions & 0 deletions data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<exclusions>
<!--Spring boot will complain about this dep on startup. It's not needed (we use SLF4J + Logback)-->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
26 changes: 24 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dockerfile-maven-version>1.4.10</dockerfile-maven-version>
<aws.version>2.20.153</aws.version>
</properties>
<repositories>
<repository>
Expand Down Expand Up @@ -199,7 +200,7 @@
<dependency>
<groupId>edu.harvard.hms.dbmi.avillach</groupId>
<artifactId>pic-sure-resource-api</artifactId>
<version>2.1.0-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
Expand Down Expand Up @@ -310,7 +311,28 @@
<version>1.18.30</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws.version}</version>
<exclusions>
<!--Spring boot will complain about this dep on startup. It's not needed (we use SLF4J + Logback)-->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
10 changes: 10 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,15 @@
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,11 +19,34 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.exception.NotEnoughMemoryException;
import org.springframework.http.MediaType;

public class AsyncResult implements Runnable, Comparable<AsyncResult>{

private static Logger log = LoggerFactory.getLogger(AsyncResult.class);


public byte[] readAllBytes() {
try {
return stream.readAllBytes();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void closeWriter() {
stream.closeWriter();
}

private MediaType responseType;

public MediaType getResponseType() {
return responseType;
}

public File getFile() {
return stream.getFile();
}

public static enum Status{
SUCCESS {
@Override
Expand Down Expand Up @@ -52,29 +81,82 @@ public PicSureStatus toPicSureStatus() {
public abstract PicSureStatus toPicSureStatus();
}

public Query query;

public Status status;

public long queuedTime;

public long completedTime;

public int retryCount;

public int queueDepth;

public int positionInQueue;

public int numRows;
private Query query;

public Query getQuery() {
return query;
}

private Status status;

public Status getStatus() {
return status;
}

public AsyncResult setStatus(Status status) {
this.status = status;
return this;
}

private long queuedTime;

public long getQueuedTime() {
return queuedTime;
}

public AsyncResult setQueuedTime(long queuedTime) {
this.queuedTime = queuedTime;
return this;
}

private long completedTime;

public long getCompletedTime() {
return completedTime;
}

private int retryCount;

public int numColumns;
private int queueDepth;

public int getQueueDepth() {
return queueDepth;
}

public AsyncResult setQueueDepth(int queueDepth) {
this.queueDepth = queueDepth;
return this;
}

private int positionInQueue;

public AsyncResult setPositionInQueue(int positionInQueue) {
this.positionInQueue = positionInQueue;
return this;
}

private int numRows;

public String id;
private int numColumns;

private String id;

public String getId() {
return id;
}

public AsyncResult setId(String id) {
this.id = id;
return this;
}

@JsonIgnore
public ResultStoreStream stream;

private ResultStoreStream stream;

public ResultStoreStream getStream() {
return stream;
}

@JsonIgnore
private String[] headerRow;

Expand All @@ -86,21 +168,48 @@ public PicSureStatus toPicSureStatus() {
* The actual exception is thrown in @see ResultStore#constructor
*/
@JsonIgnore
public ExecutorService jobQueue;
private ExecutorService jobQueue;

public ExecutorService getJobQueue() {
return jobQueue;
}

public AsyncResult setJobQueue(ExecutorService jobQueue) {
this.jobQueue = jobQueue;
return this;
}

@JsonIgnore
public HpdsProcessor processor;
private HpdsProcessor processor;

public HpdsProcessor getProcessor() {
return processor;
}

public AsyncResult(Query query, String[] headerRow) {
public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) {
this.query = query;
this.headerRow = headerRow;
this.processor = processor;
this.headerRow = processor.getHeaderRow(query);
this.responseType = writer.getResponseType();
try {
stream = new ResultStoreStream(headerRow, query.getExpectedResultType() == ResultType.DATAFRAME_MERGED);
stream = new ResultStoreStream(headerRow, writer);
} catch (IOException e) {
log.error("Exception creating result stream", e);
}
}

public void appendResults(List<String[]> dataEntries) {
stream.appendResults(dataEntries);
}
public void appendMultiValueResults(List<List<List<String>>> dataEntries) {
stream.appendMultiValueResults(dataEntries);
}

public void appendResultStore(ResultStore resultStore) {
stream.appendResultStore(resultStore);
}


@Override
public void run() {
status = AsyncResult.Status.RUNNING;
Expand All @@ -127,9 +236,15 @@ public void enqueue() {
}
}

public void open() {
stream.open();
}

@Override
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ public Optional<VariableVariantMasks> getMasks(String path, VariantBucketHolder<

@Override
public Set<String> getInfoStoreColumns() {
return null;
return Set.of();
}

@Override
public Set<String> getInfoStoreValues(String conceptPath) {
return null;
return Set.of();
}

@Override
public List<InfoColumnMeta> getInfoColumnMeta() {
return null;
return List.of();
}

@Override
Expand Down
Loading

0 comments on commit 5d27e75

Please sign in to comment.