From e588d659373886816c7401ba2993a83975467a61 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 21 Oct 2024 12:21:03 -0400 Subject: [PATCH 1/4] ALS-7581: Add support for multi value CSV export --- .../avillach/hpds/data/query/ResultType.java | 3 +- .../hpds/processing/PfbProcessor.java | 2 +- .../hpds/processing/io/CsvWriter.java | 20 ++++++- .../avillach/hpds/service/PicSureService.java | 2 - .../avillach/hpds/service/QueryService.java | 5 +- .../hpds/service/QueryServiceTest.java | 59 +++++++++++++++++++ 6 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java index f7cd8165..36687003 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java @@ -94,5 +94,6 @@ public enum ResultType { * Exports data as PFB, using avro * https://uc-cdis.github.io/pypfb/ */ - DATAFRAME_PFB + DATAFRAME_PFB, + DATAFRAME_MULTI } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java index 6968a587..24dbfb94 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java @@ -28,7 +28,7 @@ public class PfbProcessor implements HpdsProcessor { @Autowired public PfbProcessor(AbstractProcessor abstractProcessor) { this.abstractProcessor = abstractProcessor; - ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0")); + ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "1000")); } @Override diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java index 3302d06e..5da10b36 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java @@ -1,5 +1,6 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing.io; +import com.google.common.base.Joiner; import org.springframework.http.MediaType; import java.io.File; @@ -9,6 +10,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; public class CsvWriter implements ResultWriter { @@ -43,13 +45,27 @@ public void writeEntity(Collection data) { try { csvWriter.write(fileWriter, data); } catch (IOException e) { - throw new RuntimeException("IOException while appending to CSV file", e); + throw new UncheckedIOException("IOException while appending to CSV file", e); } } @Override public void writeMultiValueEntity(Collection>> data) { - throw new RuntimeException("Method not implemented"); + List collect = data.stream().map(line -> { + return line.stream() + .map(cell -> { + if (cell == null) { + return ""; + } + return Joiner.on('\t').join(cell); + }) + .toArray(String[]::new); + }).toList(); + try { + csvWriter.write(fileWriter, collect); + } catch (IOException e) { + throw new UncheckedIOException("IOException while appending to CSV file", e); + } } @Override diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java index 04e5863f..c58705f3 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java @@ -180,8 +180,6 @@ public ResponseEntity query(@RequestBody QueryRequest queryJson) { } catch (IOException e) { log.error("IOException caught in query processing:", e); return ResponseEntity.status(500).build(); - } catch (ClassNotFoundException e) { - return ResponseEntity.status(500).build(); } } else { QueryStatus status = new QueryStatus(); diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index a41a94d2..4abcd318 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -83,7 +83,7 @@ public QueryService (AbstractProcessor abstractProcessor, smallTaskExecutor = createExecutor(smallTaskExecutionQueue, SMALL_TASK_THREADS); } - public AsyncResult runQuery(Query query) throws ClassNotFoundException, IOException { + public AsyncResult runQuery(Query query) throws IOException { // Merging fields from filters into selected fields for user validation of results mergeFilterFieldsIntoSelectedFields(query); @@ -112,7 +112,7 @@ public int runCount(Query query) throws InterruptedException, ExecutionException return countProcessor.runCounts(query); } - private AsyncResult initializeResult(Query query) throws ClassNotFoundException, FileNotFoundException, IOException { + private AsyncResult initializeResult(Query query) throws IOException { HpdsProcessor p; switch(query.getExpectedResultType()) { @@ -129,6 +129,7 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException, p = countProcessor; break; case DATAFRAME_PFB: + case DATAFRAME_MULTI: p = pfbProcessor; break; default : diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java new file mode 100644 index 00000000..7174ec53 --- /dev/null +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java @@ -0,0 +1,59 @@ +package edu.harvard.hms.dbmi.avillach.hpds.service; + +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType; +import edu.harvard.hms.dbmi.avillach.hpds.processing.AsyncResult; +import edu.harvard.hms.dbmi.avillach.hpds.test.util.BuildIntegrationTestEnvironment; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +@ExtendWith(SpringExtension.class) +@EnableAutoConfiguration +@SpringBootTest(classes = edu.harvard.hms.dbmi.avillach.hpds.service.HpdsApplication.class) +@ActiveProfiles("integration-test") +class QueryServiceTest { + + @Autowired + private QueryService queryService; + + @BeforeAll + public static void beforeAll() { + BuildIntegrationTestEnvironment instance = BuildIntegrationTestEnvironment.INSTANCE; + } + + @Test + public void dataframeMulti() throws IOException, InterruptedException { + Query query = new Query(); + List variantInfoFilters = new ArrayList<>(); + Query.VariantInfoFilter variantInfoFilter = new Query.VariantInfoFilter(); + variantInfoFilter.categoryVariantInfoFilters = Map.of("Gene_with_variant", new String[]{"LOC102723996", "LOC101928576"}); + variantInfoFilters.add(variantInfoFilter); + query.setVariantInfoFilters(variantInfoFilters); + query.setFields(List.of("\\open_access-1000Genomes\\data\\SYNTHETIC_AGE\\")); + query.setExpectedResultType(ResultType.DATAFRAME_MULTI); + + AsyncResult asyncResult = queryService.runQuery(query); + + Thread.sleep(1000); + + System.out.println(asyncResult.getStatus()); + System.out.println(IOUtils.toString(new FileInputStream(asyncResult.getFile()), StandardCharsets.UTF_8)); + ; + } + +} \ No newline at end of file From 6e8a1a65b357a90048f71714c074ce65886ac143 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Mon, 21 Oct 2024 14:07:05 -0400 Subject: [PATCH 2/4] Remove new expected result type --- .../harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java | 3 +-- .../harvard/hms/dbmi/avillach/hpds/service/QueryService.java | 3 +-- .../hms/dbmi/avillach/hpds/service/QueryServiceTest.java | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java index 36687003..f7cd8165 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java @@ -94,6 +94,5 @@ public enum ResultType { * Exports data as PFB, using avro * https://uc-cdis.github.io/pypfb/ */ - DATAFRAME_PFB, - DATAFRAME_MULTI + DATAFRAME_PFB } diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index 4abcd318..64fe9255 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -116,7 +116,6 @@ private AsyncResult initializeResult(Query query) throws IOException { HpdsProcessor p; switch(query.getExpectedResultType()) { - case DATAFRAME : case SECRET_ADMIN_DATAFRAME: p = queryProcessor; break; @@ -129,7 +128,7 @@ private AsyncResult initializeResult(Query query) throws IOException { p = countProcessor; break; case DATAFRAME_PFB: - case DATAFRAME_MULTI: + case DATAFRAME: p = pfbProcessor; break; default : diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java index 7174ec53..62634cc8 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java @@ -45,7 +45,7 @@ public void dataframeMulti() throws IOException, InterruptedException { variantInfoFilters.add(variantInfoFilter); query.setVariantInfoFilters(variantInfoFilters); query.setFields(List.of("\\open_access-1000Genomes\\data\\SYNTHETIC_AGE\\")); - query.setExpectedResultType(ResultType.DATAFRAME_MULTI); + query.setExpectedResultType(ResultType.DATAFRAME); AsyncResult asyncResult = queryService.runQuery(query); From 525086743bd3eeabe465619493c75e9a34cb74af Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Tue, 22 Oct 2024 09:07:17 -0400 Subject: [PATCH 3/4] Rename PFB processor to something more generic --- ...PfbProcessor.java => MultiValueQueryProcessor.java} | 7 +++---- .../hms/dbmi/avillach/hpds/service/QueryService.java | 10 ++++------ 2 files changed, 7 insertions(+), 10 deletions(-) rename processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/{PfbProcessor.java => MultiValueQueryProcessor.java} (95%) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java similarity index 95% rename from processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java rename to processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java index 24dbfb94..d734c31b 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java @@ -13,20 +13,19 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import java.util.stream.Stream; @Component -public class PfbProcessor implements HpdsProcessor { +public class MultiValueQueryProcessor implements HpdsProcessor { public static final String PATIENT_ID_FIELD_NAME = "patient_id"; private final int ID_BATCH_SIZE; private final AbstractProcessor abstractProcessor; - private Logger log = LoggerFactory.getLogger(PfbProcessor.class); + private Logger log = LoggerFactory.getLogger(MultiValueQueryProcessor.class); @Autowired - public PfbProcessor(AbstractProcessor abstractProcessor) { + public MultiValueQueryProcessor(AbstractProcessor abstractProcessor) { this.abstractProcessor = abstractProcessor; ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "1000")); } diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index 64fe9255..a00a8ad0 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -15,8 +15,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableMap; - import edu.harvard.dbmi.avillach.util.UUIDv5; import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import edu.harvard.hms.dbmi.avillach.hpds.processing.*; @@ -48,7 +46,7 @@ public class QueryService { private final QueryProcessor queryProcessor; private final TimeseriesProcessor timeseriesProcessor; private final CountProcessor countProcessor; - private final PfbProcessor pfbProcessor; + private final MultiValueQueryProcessor multiValueQueryProcessor; HashMap results = new HashMap<>(); @@ -58,7 +56,7 @@ public QueryService (AbstractProcessor abstractProcessor, QueryProcessor queryProcessor, TimeseriesProcessor timeseriesProcessor, CountProcessor countProcessor, - PfbProcessor pfbProcessor, + MultiValueQueryProcessor multiValueQueryProcessor, @Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit, @Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads, @Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) { @@ -66,7 +64,7 @@ public QueryService (AbstractProcessor abstractProcessor, this.queryProcessor = queryProcessor; this.timeseriesProcessor = timeseriesProcessor; this.countProcessor = countProcessor; - this.pfbProcessor = pfbProcessor; + this.multiValueQueryProcessor = multiValueQueryProcessor; SMALL_JOB_LIMIT = smallJobLimit; SMALL_TASK_THREADS = smallTaskThreads; @@ -129,7 +127,7 @@ private AsyncResult initializeResult(Query query) throws IOException { break; case DATAFRAME_PFB: case DATAFRAME: - p = pfbProcessor; + p = multiValueQueryProcessor; break; default : throw new RuntimeException("UNSUPPORTED RESULT TYPE"); From 1ae1adcd33070521b3e80cd4424b9281846d71d9 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Thu, 24 Oct 2024 11:40:32 -0400 Subject: [PATCH 4/4] Add value annotation, add validation to integration test --- .../processing/MultiValueQueryProcessor.java | 9 +++++---- .../application-integration-test.properties | 1 + .../avillach/hpds/service/QueryServiceTest.java | 16 ++++++++++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java index d734c31b..fd931fce 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/MultiValueQueryProcessor.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.*; @@ -18,16 +19,16 @@ public class MultiValueQueryProcessor implements HpdsProcessor { public static final String PATIENT_ID_FIELD_NAME = "patient_id"; - private final int ID_BATCH_SIZE; + private final int idBatchSize; private final AbstractProcessor abstractProcessor; private Logger log = LoggerFactory.getLogger(MultiValueQueryProcessor.class); @Autowired - public MultiValueQueryProcessor(AbstractProcessor abstractProcessor) { + public MultiValueQueryProcessor(AbstractProcessor abstractProcessor, @Value("${ID_BATCH_SIZE:0}") int idBatchSize) { this.abstractProcessor = abstractProcessor; - ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "1000")); + this.idBatchSize = idBatchSize; } @Override @@ -42,7 +43,7 @@ public String[] getHeaderRow(Query query) { public void runQuery(Query query, AsyncResult result) { Set idList = abstractProcessor.getPatientSubsetForQuery(query); log.info("Processing " + idList.size() + " rows for result " + result.getId()); - Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream() + Lists.partition(new ArrayList<>(idList), idBatchSize).stream() .forEach(patientIds -> { Map>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); List>> fieldValuesPerPatient = patientIds.stream().map(patientId -> { diff --git a/service/src/main/resources/application-integration-test.properties b/service/src/main/resources/application-integration-test.properties index f85994f6..90547f24 100644 --- a/service/src/main/resources/application-integration-test.properties +++ b/service/src/main/resources/application-integration-test.properties @@ -1,6 +1,7 @@ SMALL_JOB_LIMIT = 100 SMALL_TASK_THREADS = 1 LARGE_TASK_THREADS = 1 +ID_BATCH_SIZE=1000 VCF_EXCERPT_ENABLED=true hpds.genomicProcessor.impl=local diff --git a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java index 62634cc8..f3aa4ef4 100644 --- a/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java +++ b/service/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryServiceTest.java @@ -1,5 +1,7 @@ package edu.harvard.hms.dbmi.avillach.hpds.service; +import de.siegmar.fastcsv.reader.CsvContainer; +import de.siegmar.fastcsv.reader.CsvReader; import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType; import edu.harvard.hms.dbmi.avillach.hpds.processing.AsyncResult; @@ -49,11 +51,17 @@ public void dataframeMulti() throws IOException, InterruptedException { AsyncResult asyncResult = queryService.runQuery(query); - Thread.sleep(1000); + int retries = 0; + while ((AsyncResult.Status.RUNNING.equals(asyncResult.getStatus()) || AsyncResult.Status.PENDING.equals(asyncResult.getStatus())) && retries < 10) { + retries++; + Thread.sleep(200); + } - System.out.println(asyncResult.getStatus()); - System.out.println(IOUtils.toString(new FileInputStream(asyncResult.getFile()), StandardCharsets.UTF_8)); - ; + assertEquals(AsyncResult.Status.SUCCESS, asyncResult.getStatus()); + CsvReader csvReader = new CsvReader(); + CsvContainer csvContainer = csvReader.read(asyncResult.getFile(), StandardCharsets.UTF_8); + // 22 plus header + assertEquals(23, csvContainer.getRows().size()); } } \ No newline at end of file