diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 63088715d867..1c3782595337 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -210,7 +210,7 @@ public String create(PipelineOptions options) { } catch (Exception e) { throw new IllegalArgumentException( "Error constructing default value for stagingLocation: failed to retrieve gcpTempLocation. " - + "Either stagingLocation must be set explicitly or a valid value must be provided" + + "Either stagingLocation must be set explicitly or a valid value must be provided " + "for gcpTempLocation.", e); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index d9c6c5f16764..225b3d045d20 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -181,7 +181,7 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { @Description( "If non-null, StreamingDataflowWorkerHarness will periodically snapshot it's status pages" - + "and thread stacks to a file in this directory. Generally only set for tests.") + + " and thread stacks to a file in this directory. Generally only set for tests.") @Default.InstanceFactory(PeriodicStatusPageDirectoryFactory.class) String getPeriodicStatusPageOutputDirectory(); diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java index a14fbbc7514d..13d744fd3950 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java @@ -49,7 +49,7 @@ public State cancel() throws IOException { @Override public State waitUntilFinish(Duration duration) { Log.debug( - "Twister2 runner does not currently support wait with duration" + "Twister2 runner does not currently support wait with duration " + "default waitUntilFinish will be executed"); return waitUntilFinish(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index a597013c1c19..9bace292d6ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -732,7 +732,7 @@ private void validateErrorHandlers() { for (ErrorHandler errorHandler : errorHandlers) { if (!errorHandler.isClosed()) { throw new IllegalStateException( - "One or more ErrorHandlers aren't closed, and this pipeline" + "One or more ErrorHandlers aren't closed, and this pipeline " + "cannot be run. See the ErrorHandler documentation for expected usage"); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/UnknownCoderWrapper.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/UnknownCoderWrapper.java index e72b99f2fd8e..8510d8dd0c12 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/UnknownCoderWrapper.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/UnknownCoderWrapper.java @@ -46,7 +46,7 @@ public void encode(Object value, OutputStream outStream) throws CoderException, "`UnknownCoderWrapper` was used to perform an actual encoding in the Java SDK. " + "Potentially a `PCollection` that was generated by a cross-language transform, " + "that uses a coder that is not available in the Java SDK, is being consumed by a Java" - + "transform. Please make sure that cross-language transforms at the language" + + " transform. Please make sure that cross-language transforms at the language " + "boundary use Beam portable coders."); } @@ -54,7 +54,7 @@ public void encode(Object value, OutputStream outStream) throws CoderException, public Object decode(InputStream inStream) throws CoderException, IOException { throw new CoderException( "`UnknownCoderWrapper` was used to perform an actual decoding in the Java SDK. " - + "Potentially a Java transform is being followed by a cross-language transform that" + + "Potentially a Java transform is being followed by a cross-language transform that " + "uses a coder that is not available in the Java SDK. Please make sure that Python " + "transforms at the multi-language boundary use Beam portable coders."); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java index dd24b6bef8f8..1b809c5d170a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java @@ -145,7 +145,7 @@ public void testParDoRequiresStableInput() { value -> { throw new RuntimeException( "Deliberate failure: should happen only once for each application of the DoFn" - + "within the transform graph."); + + " within the transform graph."); }; PCollection singleton = p.apply("CreatePCollectionOfOneValue", Create.of(VALUE)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/MorePipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/MorePipelineTest.java index 50b1aacdcd00..3616b20cf849 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/MorePipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/MorePipelineTest.java @@ -156,7 +156,7 @@ public void visitPrimitiveTransform(Node node) { checkState( viewRef.compareAndSet(null, createViewTransform.getView()), "Found more than one instance of a CreatePCollectionView when" - + "attempting to replace %s, found [%s, %s]", + + " attempting to replace %s, found [%s, %s]", transform.getTransform(), viewRef.get(), createViewTransform.getView()); diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java index 855434c7cd21..e0336518a412 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java @@ -160,9 +160,9 @@ public FileReadSchemaTransformConfiguration build() { if (terminateAfterSecondsSinceNewOutput != null && terminateAfterSecondsSinceNewOutput > 0L) { checkArgument( pollIntervalMillis != null && pollIntervalMillis > 0L, - "Found positive value for terminateAfterSecondsSinceNewOutput but non-positive" + "Found positive value for terminateAfterSecondsSinceNewOutput but non-positive " + "value for pollIntervalMillis. Please set pollIntervalMillis as well to enable" - + "watching for new files."); + + " watching for new files."); } return config; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 4fdd3513c806..0206d48b813c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1080,7 +1080,7 @@ Optional> getImportGcsDeadLetterPath() { public Write.Result expand(PCollection input) { checkState( input.isBounded() == IsBounded.BOUNDED, - "FhirIO.Import should only be used on bounded PCollections as it is" + "FhirIO.Import should only be used on bounded PCollections as it is " + "intended for batch use only."); // fall back on pipeline's temp location. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 392069f1926e..231a1b9e49e1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1554,7 +1554,7 @@ public PCollection> expand(PBegin input) { if (kafkaRead.getBadRecordErrorHandler() != null) { LOG.warn( "The Legacy implementation of Kafka Read does not support writing malformed" - + "messages to an error handler. Use the SDF implementation instead."); + + " messages to an error handler. Use the SDF implementation instead."); } // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded> unbounded = diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index ce5ccb559870..4cbed36c697f 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -209,7 +209,7 @@ public Row toConfigRow(Read transform) { if (transform.getBadRecordErrorHandler() != null) { throw new RuntimeException( "Upgrading KafkaIO read transforms that have `withBadRecordErrorHandler` property set" - + "is not supported yet."); + + " is not supported yet."); } return Row.withSchema(schema).withFieldValues(fieldValues).build(); @@ -505,7 +505,7 @@ public Row toConfigRow(Write transform) { instanceof ErrorHandler.DefaultErrorHandler)) { throw new RuntimeException( "Upgrading KafkaIO write transforms that have `withBadRecordErrorHandler` property set" - + "is not supported yet."); + + " is not supported yet."); } return Row.withSchema(schema).withFieldValues(fieldValues).build(); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 70e3f7168b5a..de61b94e68be 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -438,7 +438,7 @@ void setPubsubMessageSerializationMethod( @Description( "If non-negative, events from the Kafka topic will get their timestamps from the Kafka createtime, with the maximum delay for" - + "disorder as specified.") + + " disorder as specified.") @Default.Integer(60) int getKafkaTopicCreateTimeMaxDelaySec();