Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Batch email #68

Merged
merged 7 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void processProvenanceEvents(ReportingContext context) {
final List<String> detailsAsError =
Arrays.asList(context.getProperty(DETAILS_AS_ERROR).getValue().toLowerCase().split(","));
final String nifiUrl = context.getProperty(NIFI_URL).getValue();
final List<Map<String, Object>> allSources = new ArrayList<>();

consumer.consumeEvents(context, ((componentMapHolder, provenanceEventRecords) -> {
getLogger().debug("Starting to consume events");
Expand Down Expand Up @@ -229,16 +230,18 @@ private void processProvenanceEvents(ReportingContext context) {
source.put("view_input_content_uri", viewContentUri + "/input");
source.put("view_output_content_uri", viewContentUri + "/output");

try {
indexEvent(source, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", e);
}
allSources.add(source);
}

try {
indexEvents(allSources, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", ex);
}
}));
}

public abstract void indexEvent(final Map<String, Object> event, final ReportingContext context) throws IOException;
public abstract void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException;

@Override
public void onTrigger(final ReportingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
Expand Down Expand Up @@ -87,46 +88,55 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

public void indexEvent(final Map<String, Object> event, final ReportingContext context) throws IOException {
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}
events.forEach(event -> {
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));

} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}

});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Tags({"email", "provenance", "smtp"})
Expand Down Expand Up @@ -148,6 +149,17 @@ public class EmailProvenanceReporter extends AbstractProvenanceReporter {
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor GROUP_SIMILAR_ERRORS = new PropertyDescriptor.Builder()
.name("Group Similar Errors")
.displayName("Group Similar Errors")
.description("Specifies whether to group multiple error events into a single email or not." +
" Set to true to receive an email with grouped errors. " +
"Set to false to receive individual emails for each error." +
" The grouping is by process and error information")
.required(false)
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand All @@ -168,6 +180,7 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors.add(SPECIFIC_RECIPIENT_ATTRIBUTE_NAME);
descriptors.add(INPUT_CHARACTER_SET);
descriptors.add(EMAIL_SUBJECT_PREFIX);
descriptors.add(GROUP_SIMILAR_ERRORS);

return descriptors;
}
Expand Down Expand Up @@ -307,68 +320,114 @@ private String getSpecificRecipientValue(final ReportingContext context, final M
return eventPreviousAttributes.get(specificRecipientAttributeName);
}

private String composeMessageContent(final Map<String, Object> event) {
private String composeMessageContent(final Map<String, Object> event, Boolean groupSimilarErrors, int groupedEventsSize) {
final StringBuilder message = new StringBuilder();

message.append("Affected processor:\n")
.append("\tProcessor name: ").append(event.get("component_name")).append("\n")
.append("\tProcessor type: ").append(event.get("component_type")).append("\n")
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n")
.append("\tURL: ").append(event.get("component_url")).append("\n");
.append("\tProcessor name: ").append(event.get("component_name")).append("\n")
.append("\tProcessor type: ").append(event.get("component_type")).append("\n")
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n");

if (groupSimilarErrors) {
message.append("\tTotal similar errors : ").append(groupedEventsSize).append("\n");
}

message.append("\tURL: ").append(event.get("component_url")).append("\n");

message.append("\n");
message.append("Error information:\n")
.append("\tDetails: ").append(event.get("details")).append("\n")
.append("\tEvent type: ").append(event.get("event_type")).append("\n");
.append("\tDetails: ").append(event.get("details")).append("\n")
.append("\tEvent type: ").append(event.get("event_type")).append("\n");

if (event.containsKey("updated_attributes")) {
Map<String, String> updatedAttributes = (Map<String, String>) event.get("updated_attributes");
message.append("\nFlow file - Updated attributes:\n");
updatedAttributes.keySet().stream().sorted().forEach(attributeName ->
message.append(String.format("\t%1$s: %2$s\n", attributeName, updatedAttributes.get(attributeName)))
message.append(String.format("\t%1$s: %2$s\n", attributeName, updatedAttributes.get(attributeName)))
);
}

if (event.containsKey("previous_attributes")) {
Map<String, String> previousAttributes = (Map<String, String>) event.get("previous_attributes");
message.append("\nFlow file - Previous attributes:\n");
previousAttributes.keySet().stream().sorted().forEach(attributeName ->
message.append(String.format("\t%1$s: %2$s\n", attributeName, previousAttributes.get(attributeName)))
message.append(String.format("\t%1$s: %2$s\n", attributeName, previousAttributes.get(attributeName)))
);
}

message.append("\nFlow file - content:\n")
.append("\tDownload input: ").append(event.get("download_input_content_uri")).append("\n")
.append("\tDownload output: ").append(event.get("download_output_content_uri")).append("\n")
.append("\tView input: ").append(event.get("view_input_content_uri")).append("\n")
.append("\tView output: ").append(event.get("view_output_content_uri")).append("\n");
.append("\tDownload input: ").append(event.get("download_input_content_uri")).append("\n")
.append("\tDownload output: ").append(event.get("download_output_content_uri")).append("\n")
.append("\tView input: ").append(event.get("view_input_content_uri")).append("\n")
.append("\tView output: ").append(event.get("view_output_content_uri")).append("\n");

message.append("\n");
return message.toString();
}

@Override
public void indexEvent(final Map<String, Object> event, final ReportingContext context) {
try {
// Send the email message only if it is an error event
if (event.containsKey("status") && event.get("status").equals("Error")) {
sendErrorEmail(event, context);
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) {
List<Map<String, Object>> errorEvents = filterErrorEvents(events);

if (context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean()) {
// Group all error events to send in a single batch email
Map<Map<String, String>, List<Map<String, Object>>> batchGroupedEvents = events.stream()
.collect(Collectors.groupingBy(event -> groupingKeys(event)));
batchGroupedEvents.forEach((groupingKeys, groupedEvents) -> {
try {
sendErrorEmail(groupedEvents.get(0), context, groupedEvents.size());
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
});
} else {
// Send individual emails for each error event
for (Map<String, Object> event : errorEvents) {
try {
sendErrorEmail(event, context, 0);
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
}
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
}

public void sendErrorEmail(Map<String, Object> event, ReportingContext context) throws MessagingException {
String emailSubject;
if (context.getProperty(EMAIL_SUBJECT_PREFIX).getValue() != null) {
emailSubject = "[" + context.getProperty(EMAIL_SUBJECT_PREFIX).getValue() + "] "
+ "Error occurred in processor " + event.get("component_name") + " "
+ "in process group " + event.get("process_group_name");
private List<Map<String, Object>> filterErrorEvents(final List<Map<String, Object>> events) {
return events.stream()
.filter(event -> "Error".equals(event.get("status")))
.collect(Collectors.toList());
}


private Map<String, String> groupingKeys(Map<String, Object> event) {
return Map.of(
"component_id", event.get("component_id").toString(),
"details", event.get("details").toString(),
"event_type", event.get("event_type").toString()
);
}

public void sendErrorEmail(Map<String, Object> event, ReportingContext context, int groupedEventsSize) throws MessagingException {

String subjectPrefix = context.getProperty(EMAIL_SUBJECT_PREFIX).getValue();
Boolean groupSimilarErrors = context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean();
StringBuilder emailSubjectBuilder = new StringBuilder();

if (subjectPrefix != null) {
emailSubjectBuilder.append("[").append(subjectPrefix).append("] ");
}

if (groupSimilarErrors) {
emailSubjectBuilder.append(groupedEventsSize).append(" errors occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));

} else {
emailSubject = "Error occurred in processor " + event.get("component_name") + " "
+ "in process group " + event.get("process_group_name");
emailSubjectBuilder.append("Error occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));
}
String emailSubject = emailSubjectBuilder.toString();


final Properties properties = this.getEmailProperties(context);
Expand Down Expand Up @@ -397,7 +456,7 @@ public void sendErrorEmail(Map<String, Object> event, ReportingContext context)
this.setMessageHeader("X-Mailer", context.getProperty(HEADER_XMAILER).getValue(), message);
message.setSubject(emailSubject);

final String messageText = composeMessageContent(event);
final String messageText = composeMessageContent(event, groupSimilarErrors, groupedEventsSize);

final String contentType = context.getProperty(CONTENT_TYPE).getValue();
final Charset charset = getCharset(context);
Expand Down
Loading