Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALS-7581: Implement multiple values per patient in CSV output #122

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class PfbProcessor implements HpdsProcessor {
public class MultiValueQueryProcessor implements HpdsProcessor {

public static final String PATIENT_ID_FIELD_NAME = "patient_id";
private final int ID_BATCH_SIZE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value annotation

private final AbstractProcessor abstractProcessor;

private Logger log = LoggerFactory.getLogger(PfbProcessor.class);
private Logger log = LoggerFactory.getLogger(MultiValueQueryProcessor.class);


@Autowired
public PfbProcessor(AbstractProcessor abstractProcessor) {
public MultiValueQueryProcessor(AbstractProcessor abstractProcessor) {
this.abstractProcessor = abstractProcessor;
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undo this

ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "1000"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import com.google.common.base.Joiner;
import org.springframework.http.MediaType;

import java.io.File;
Expand All @@ -9,6 +10,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public class CsvWriter implements ResultWriter {

Expand Down Expand Up @@ -43,13 +45,27 @@ public void writeEntity(Collection<String[]> data) {
try {
csvWriter.write(fileWriter, data);
} catch (IOException e) {
throw new RuntimeException("IOException while appending to CSV file", e);
throw new UncheckedIOException("IOException while appending to CSV file", e);
}
}

@Override
public void writeMultiValueEntity(Collection<List<List<String>>> data) {
throw new RuntimeException("Method not implemented");
List<String[]> collect = data.stream().map(line -> {
return line.stream()
.map(cell -> {
if (cell == null) {
return "";
}
return Joiner.on('\t').join(cell);
})
.toArray(String[]::new);
}).toList();
try {
csvWriter.write(fileWriter, collect);
} catch (IOException e) {
throw new UncheckedIOException("IOException while appending to CSV file", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ public ResponseEntity<QueryStatus> query(@RequestBody QueryRequest queryJson) {
} catch (IOException e) {
log.error("IOException caught in query processing:", e);
return ResponseEntity.status(500).build();
} catch (ClassNotFoundException e) {
return ResponseEntity.status(500).build();
}
} else {
QueryStatus status = new QueryStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;

import edu.harvard.dbmi.avillach.util.UUIDv5;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.processing.*;
Expand Down Expand Up @@ -48,7 +46,7 @@ public class QueryService {
private final QueryProcessor queryProcessor;
private final TimeseriesProcessor timeseriesProcessor;
private final CountProcessor countProcessor;
private final PfbProcessor pfbProcessor;
private final MultiValueQueryProcessor multiValueQueryProcessor;

HashMap<String, AsyncResult> results = new HashMap<>();

Expand All @@ -58,15 +56,15 @@ public QueryService (AbstractProcessor abstractProcessor,
QueryProcessor queryProcessor,
TimeseriesProcessor timeseriesProcessor,
CountProcessor countProcessor,
PfbProcessor pfbProcessor,
MultiValueQueryProcessor multiValueQueryProcessor,
@Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit,
@Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads,
@Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) {
this.abstractProcessor = abstractProcessor;
this.queryProcessor = queryProcessor;
this.timeseriesProcessor = timeseriesProcessor;
this.countProcessor = countProcessor;
this.pfbProcessor = pfbProcessor;
this.multiValueQueryProcessor = multiValueQueryProcessor;

SMALL_JOB_LIMIT = smallJobLimit;
SMALL_TASK_THREADS = smallTaskThreads;
Expand All @@ -83,7 +81,7 @@ public QueryService (AbstractProcessor abstractProcessor,
smallTaskExecutor = createExecutor(smallTaskExecutionQueue, SMALL_TASK_THREADS);
}

public AsyncResult runQuery(Query query) throws ClassNotFoundException, IOException {
public AsyncResult runQuery(Query query) throws IOException {
// Merging fields from filters into selected fields for user validation of results
mergeFilterFieldsIntoSelectedFields(query);

Expand Down Expand Up @@ -112,11 +110,10 @@ public int runCount(Query query) throws InterruptedException, ExecutionException
return countProcessor.runCounts(query);
}

private AsyncResult initializeResult(Query query) throws ClassNotFoundException, FileNotFoundException, IOException {
private AsyncResult initializeResult(Query query) throws IOException {

HpdsProcessor p;
switch(query.getExpectedResultType()) {
case DATAFRAME :
case SECRET_ADMIN_DATAFRAME:
p = queryProcessor;
break;
Expand All @@ -129,7 +126,8 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException,
p = countProcessor;
break;
case DATAFRAME_PFB:
p = pfbProcessor;
case DATAFRAME:
p = multiValueQueryProcessor;
break;
default :
throw new RuntimeException("UNSUPPORTED RESULT TYPE");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package edu.harvard.hms.dbmi.avillach.hpds.service;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.processing.AsyncResult;
import edu.harvard.hms.dbmi.avillach.hpds.test.util.BuildIntegrationTestEnvironment;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;
@ExtendWith(SpringExtension.class)
@EnableAutoConfiguration
@SpringBootTest(classes = edu.harvard.hms.dbmi.avillach.hpds.service.HpdsApplication.class)
@ActiveProfiles("integration-test")
class QueryServiceTest {

@Autowired
private QueryService queryService;

@BeforeAll
public static void beforeAll() {
BuildIntegrationTestEnvironment instance = BuildIntegrationTestEnvironment.INSTANCE;
}

@Test
public void dataframeMulti() throws IOException, InterruptedException {
Query query = new Query();
List<Query.VariantInfoFilter> variantInfoFilters = new ArrayList<>();
Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter();
variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"LOC102723996", "LOC101928576"});
variantInfoFilters.add(variantInfoFilter);
query.setVariantInfoFilters(variantInfoFilters);
query.setFields(List.of("\\open_access-1000Genomes\\data\\SYNTHETIC_AGE\\"));
query.setExpectedResultType(ResultType.DATAFRAME);

AsyncResult asyncResult = queryService.runQuery(query);

Thread.sleep(1000);
Copy link
Member

@Luke-Sikina Luke-Sikina Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

>:|


System.out.println(asyncResult.getStatus());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation

System.out.println(IOUtils.toString(new FileInputStream(asyncResult.getFile()), StandardCharsets.UTF_8));
;
}

}
Loading