Skip to content

Commit

Permalink
Add tests for schema compilation at processor level
Browse files Browse the repository at this point in the history
  • Loading branch information
whiver committed Mar 6, 2018
1 parent c52af89 commit 5e6f880
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ public void onTriggerDecodeValidFiles() throws IOException {
}

/**
* Test decoding valid files given an uncompiled .proto schema
* Test decoding valid files given an uncompiled .proto schema specified at flowfile level
* @throws Exception
*/
@Test
public void onTriggerCompileSchemaAndDecodeValidFiles() throws Exception {
public void onTriggerCompileFlowfileSchemaAndDecodeValidFiles() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder());
runner.setProperty("protobuf.compileSchema", "true");
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true");

InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data");
HashMap<String, String> personProperties = new HashMap<>();
Expand All @@ -115,6 +115,34 @@ public void onTriggerCompileSchemaAndDecodeValidFiles() throws Exception {
Assert.assertEquals("The parsing result of Person.data is not as expected", expected, given);
}

/**
* Test decoding valid files given an uncompiled .proto schema specified at processor level
* @throws Exception
*/
@Test
public void onTriggerCompileProcessorSchemaAndDecodeValidFiles() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new ProtobufDecoder());
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true");
runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufDecoderTest.class.getResource("/schemas/Person.proto").getPath());

InputStream dataFile = ProtobufDecoderTest.class.getResourceAsStream("/data/Person.data");
HashMap<String, String> personProperties = new HashMap<>();
personProperties.put("protobuf.messageType", "Person");
runner.enqueue(dataFile, personProperties);

runner.assertValid();
runner.run(1);
runner.assertQueueEmpty();

runner.assertAllFlowFilesTransferred(ProtobufDecoder.SUCCESS);
MockFlowFile result = runner.getFlowFilesForRelationship(ProtobufDecoder.SUCCESS).get(0);

ObjectMapper mapper = new ObjectMapper();
JsonNode expected = mapper.readTree(this.getClass().getResourceAsStream("/data/Person.json"));
JsonNode given = mapper.readTree(runner.getContentAsByteArray(result));
Assert.assertEquals("The parsing result of Person.data is not as expected", expected, given);
}

/**
* Test if the per-flowfile schema have priority on the processor-wide one
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ public void onTriggerEncodeValidFiles() throws IOException {
}

/**
* Test encoding valid files given an uncompiled .proto schema
* Test encoding valid files given an uncompiled .proto schema specified at flowfile level
* @throws Exception
*/
@Test
public void onTriggerCompileSchemaAndEncodeValidFiles() throws Exception {
public void onTriggerCompileFlowfileSchemaAndEncodeValidFiles() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder());
runner.setProperty("protobuf.compileSchema", "true");
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true");

InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json");
HashMap<String, String> personProperties = new HashMap<>();
Expand All @@ -112,6 +112,31 @@ public void onTriggerCompileSchemaAndEncodeValidFiles() throws Exception {
results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data"));
}

/**
* Test encoding valid files given an uncompiled .proto schema specified at processor level
* @throws Exception
*/
@Test
public void onTriggerCompileProcessorSchemaAndEncodeValidFiles() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new ProtobufEncoder());
runner.setProperty(ProtobufProcessor.COMPILE_SCHEMA, "true");
runner.setProperty(ProtobufProcessor.PROTOBUF_SCHEMA, ProtobufEncoderTest.class.getResource("/schemas/Person.proto").getPath());

InputStream jsonFile = ProtobufEncoderTest.class.getResourceAsStream("/data/Person.json");
HashMap<String, String> personProperties = new HashMap<>();
personProperties.put("protobuf.messageType", "Person");
runner.enqueue(jsonFile, personProperties);

runner.assertValid();
runner.run(1);
runner.assertQueueEmpty();

runner.assertAllFlowFilesTransferred(ProtobufEncoder.SUCCESS);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(ProtobufEncoder.SUCCESS);
Assert.assertEquals("The Person flowfile should be returned to success", 1, results.size());
results.get(0).assertContentEquals(ProtobufEncoderTest.class.getResourceAsStream("/data/Person.data"));
}

/**
* Test if the per-flowfile schema have priority on the processor-wide one
* @throws IOException
Expand Down

0 comments on commit 5e6f880

Please sign in to comment.