Skip to content

Commit

Permalink
changed property name + removed method
Browse files Browse the repository at this point in the history
  • Loading branch information
ranim-n committed Sep 16, 2024
1 parent 394ad00 commit 8fc2631
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ private void processProvenanceEvents(ReportingContext context) {
}

try {
indexEvent(allSources, context);
indexEvents(allSources, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", ex);
}
}));
}

public abstract void indexEvent(final List<Map<String, Object>> events, final ReportingContext context) throws IOException;
public abstract void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException;

@Override
public void onTrigger(final ReportingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

public void indexEvent(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,16 @@ public class EmailProvenanceReporter extends AbstractProvenanceReporter {
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_EMAIL = new PropertyDescriptor.Builder()
.name("Batch Email")
.displayName("Batch Email")
public static final PropertyDescriptor GROUP_SIMILAR_ERRORS = new PropertyDescriptor.Builder()
.name("Group Similar Errors")
.displayName("Group Similar Errors")
.description("Specifies whether to group multiple error events into a single email or not." +
" Set to true to receive an email with grouped errors. " +
"Set to false to receive individual emails for each error." +
" The grouping is by process group, process id and error information")
" The grouping is by process and error information")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

@Override
Expand All @@ -180,7 +180,7 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors.add(SPECIFIC_RECIPIENT_ATTRIBUTE_NAME);
descriptors.add(INPUT_CHARACTER_SET);
descriptors.add(EMAIL_SUBJECT_PREFIX);
descriptors.add(BATCH_EMAIL);
descriptors.add(GROUP_SIMILAR_ERRORS);

return descriptors;
}
Expand Down Expand Up @@ -320,15 +320,15 @@ private String getSpecificRecipientValue(final ReportingContext context, final M
return eventPreviousAttributes.get(specificRecipientAttributeName);
}

private String composeMessageContent(final Map<String, Object> event, String batchEmail, int groupedEventsSize) {
private String composeMessageContent(final Map<String, Object> event, Boolean groupSimilarErrors, int groupedEventsSize) {
final StringBuilder message = new StringBuilder();

message.append("Affected processor:\n")
.append("\tProcessor name: ").append(event.get("component_name")).append("\n")
.append("\tProcessor type: ").append(event.get("component_type")).append("\n")
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n");

if(batchEmail.equals("true")) {
if (groupSimilarErrors) {
message.append("\tTotal similar errors : ").append(groupedEventsSize).append("\n");
}

Expand Down Expand Up @@ -366,12 +366,20 @@ private String composeMessageContent(final Map<String, Object> event, String bat
}

@Override
public void indexEvent(final List<Map<String, Object>> events, final ReportingContext context) {
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) {
List<Map<String, Object>> errorEvents = filterErrorEvents(events);

if (context.getProperty(BATCH_EMAIL).getValue().equals("true")) {
if (context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean()) {
// Group all error events to send in a single batch email
groupErrorEvents(errorEvents, context);
Map<Map<String, String>, List<Map<String, Object>>> batchGroupedEvents = events.stream()
.collect(Collectors.groupingBy(event -> groupingKeys(event)));
batchGroupedEvents.forEach((groupingKeys, groupedEvents) -> {
try {
sendErrorEmail(groupedEvents.get(0), context, groupedEvents.size());
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
});
} else {
// Send individual emails for each error event
for (Map<String, Object> event : errorEvents) {
Expand All @@ -390,21 +398,9 @@ private List<Map<String, Object>> filterErrorEvents(final List<Map<String, Objec
.collect(Collectors.toList());
}

public void groupErrorEvents(final List<Map<String, Object>> events, final ReportingContext context) {
Map<Map<String, String>, List<Map<String, Object>>> batchGroupedEvents = events.stream()
.collect(Collectors.groupingBy(event -> groupingKeys(event)));
batchGroupedEvents.forEach((groupingKeys, groupedEvents) -> {
try {
sendErrorEmail(groupedEvents.getFirst(), context, groupedEvents.size());
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
});
}

private Map<String, String> groupingKeys(Map<String, Object> event) {
return Map.of(
"process_group_id", event.get("process_group_id").toString(),
"component_id", event.get("component_id").toString(),
"details", event.get("details").toString(),
"event_type", event.get("event_type").toString()
Expand All @@ -414,14 +410,14 @@ private Map<String, String> groupingKeys(Map<String, Object> event) {
public void sendErrorEmail(Map<String, Object> event, ReportingContext context, int groupedEventsSize) throws MessagingException {

String subjectPrefix = context.getProperty(EMAIL_SUBJECT_PREFIX).getValue();
String batchEmail = context.getProperty(BATCH_EMAIL).getValue();
Boolean groupSimilarErrors = context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean();
StringBuilder emailSubjectBuilder = new StringBuilder();

if (subjectPrefix != null) {
emailSubjectBuilder.append("[").append(subjectPrefix).append("] ");
}

if (batchEmail.equals("true")) {
if (groupSimilarErrors) {
emailSubjectBuilder.append(groupedEventsSize).append(" errors occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));
Expand Down Expand Up @@ -460,7 +456,7 @@ public void sendErrorEmail(Map<String, Object> event, ReportingContext context,
this.setMessageHeader("X-Mailer", context.getProperty(HEADER_XMAILER).getValue(), message);
message.setSubject(emailSubject);

final String messageText = composeMessageContent(event, batchEmail, groupedEventsSize);
final String messageText = composeMessageContent(event, groupSimilarErrors, groupedEventsSize);

final String contentType = context.getProperty(CONTENT_TYPE).getValue();
final Charset charset = getCharset(context);
Expand Down

0 comments on commit 8fc2631

Please sign in to comment.