Skip to content

Commit

Permalink
Add tests for encoding/decoding with a precompiled schema at proc level
Browse files Browse the repository at this point in the history
Also add a test to ensure a correct behaviour when switching between
raw and precompiled schema.
  • Loading branch information
whiver committed Mar 6, 2018
1 parent 5e6f880 commit f82fb71
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,34 @@ public void onTriggerDecodeValidFiles() throws IOException {
}
}

/**
* Test encoding valid files given an already compiled schema specified at processor level
* @throws Exception
*/
@Test
public void onTriggerDecodeValidFilesWithSchemaAtProcessorLevel() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder());
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "false");
runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufDecoderTest.class.getResource("/schemas/Person.desc").getPath());

InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data");
HashMap<String, String> personProperties = new HashMap<>();
personProperties.put("protobuf.messageType", "Person");
runner.enqueue(dataFile, personProperties);

runner.assertValid();
runner.run(1);
runner.assertQueueEmpty();

runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS);
MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0);

ObjectMapper mapper = new ObjectMapper();
JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/Person.json"));
JsonNode given = mapper.readTree(runner.getContentAsByteArray(result));
Assert.assertEquals("The parsing result of Person.data is not as expected", expected, given);
}

/**
* Test decoding valid files given an uncompiled .proto schema specified at flowfile level
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,31 @@ public void onTriggerEncodeValidFiles() throws IOException {
}
}

/**
* Test encoding valid files given an already compiled schema specified at processor level
* @throws Exception
*/
@Test
public void onTriggerEncodeValidFilesWithSchemaAtProcessorLevel() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder());
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "false");
runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.desc").getPath());

InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json");
HashMap<String, String> personProperties = new HashMap<>();
personProperties.put("protobuf.messageType", "Person");
runner.enqueue(jsonFile, personProperties);

runner.assertValid();
runner.run(1);
runner.assertQueueEmpty();

runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS);
Assert.assertEquals("The Person flowfile should be returned to success", 1, results.size());
results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data"));
}

/**
* Test encoding valid files given an uncompiled .proto schema specified at flowfile level
* @throws Exception
Expand Down Expand Up @@ -233,4 +258,35 @@ public void onPropertyModified() throws Exception {
result.assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/AddressBook_basic.data"));

}

/**
* Ensure we can still encode properly when switching between a raw or precompiled schema at processor level
*/
@Test
public void onPropertyModifiedEncodeFileUsingSchemaAtProcessorLevel() {
TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder());
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true");
runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.proto").getPath());

HashMap<String, String> personProperties = new HashMap<>();
personProperties.put("protobuf.messageType", "Person");
runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"), personProperties);

runner.assertValid();
runner.run(1);
runner.assertQueueEmpty();

runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "false");
runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.desc").getPath());

runner.enqueue(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json"), personProperties);

runner.assertValid();
runner.run(1);
runner.assertQueueEmpty();

runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS);
Assert.assertEquals("The 2 flowfiles should be returned to success", 2, results.size());
}
}

0 comments on commit f82fb71

Please sign in to comment.