Skip to content

Commit

Permalink
Clean up unclosed resources in DBService classes (#21455)
Browse files Browse the repository at this point in the history
* Clean up unclosed resources in DBService classes
* Remove methods instead of deprecating. Add upgrading notes.
  • Loading branch information
kingzacko1 authored Feb 6, 2025
1 parent af91915 commit adf979f
Show file tree
Hide file tree
Showing 27 changed files with 686 additions and 530 deletions.
21 changes: 18 additions & 3 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,24 @@ packages, most prominently `com.mongodb.client.model.Filters`.

Additionally, the following Java Code API changes are included in this release:

| File/method | Description |
|------------------------------------------------|-------------|
| `org.graylog.scheduler.JobSchedule#toDBUpdate` | removed |
| File/method | Description |
|-----------------------------------------------------------------------------------|------------------------------------------|
| `org.graylog.scheduler.JobSchedule#toDBUpdate` | removed |
| `org.graylog.scheduler.DBJobTriggerService#all` | replaced by streamAll |
| `org.graylog.scheduler.DBJobTriggerService#getAllForJob` | replaced by streamAllForJob |
| `org.graylog.scheduler.DBJobTriggerService#findByQuery` | replaced by streamByQuery |
| `org.graylog.events.processor.DBEventDefinitionService#getByNotificationId` | replaced by streamByNotificationId |
| `org.graylog.events.processor.DBEventDefinitionService#getSystemEventDefinitions` | replaced by streamSystemEventDefinitions |
| `org.graylog.events.processor.DBEventDefinitionService#getByArrayValue` | replaced by streamByArrayValue |
| `org.graylog2.lookup.db.DBCacheService#findByIds` | replaced by streamByIds |
| `org.graylog2.lookup.db.DBCacheService#findAll` | replaced by streamAll |
| `org.graylog2.lookup.db.DBDataAdapterService#findByIds` | replaced by streamByIds |
| `org.graylog2.lookup.db.DBDataAdapterService#findAll` | replaced by streamAll |
| `org.graylog2.lookup.db.DBLookupTableService#findByCacheIds` | replaced by streamByCacheIds |
| `org.graylog2.lookup.db.DBLookupTableService#findByDataAdapterIds` | replaced by streamByDataAdapterIds |
| `org.graylog2.lookup.db.DBLookupTableService#findAll` | replaced by streamAll |

DBService classes' new streaming methods require streams to be closed after using - recommend using try-with-resource statements.

## REST API Endpoint Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.ImmutableGraph;
import com.google.common.graph.MutableGraph;
import jakarta.inject.Inject;
import org.graylog.events.contentpack.entities.EventDefinitionEntity;
import org.graylog.events.processor.DBEventDefinitionService;
import org.graylog.events.processor.EventDefinitionDto;
Expand All @@ -50,8 +51,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -172,10 +171,12 @@ public EntityExcerpt createExcerpt(EventDefinitionDto nativeEntity) {

@Override
public Set<EntityExcerpt> listEntityExcerpts() {
return eventDefinitionService.streamAll()
.filter(ed -> ed.config().isContentPackExportable())
.map(this::createExcerpt)
.collect(Collectors.toSet());
try (var stream = eventDefinitionService.streamAll()) {
return stream
.filter(ed -> ed.config().isContentPackExportable())
.map(this::createExcerpt)
.collect(Collectors.toSet());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.graylog.events.notifications;

import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import jakarta.ws.rs.InternalServerErrorException;
import jakarta.ws.rs.NotFoundException;
import org.graylog.events.processor.DBEventDefinitionService;
import org.graylog.events.processor.EventDefinitionDto;
import org.graylog.scheduler.DBJobDefinitionService;
Expand All @@ -25,13 +28,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import jakarta.ws.rs.InternalServerErrorException;
import jakarta.ws.rs.NotFoundException;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

public class NotificationResourceHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationResourceHandler.class);
Expand Down Expand Up @@ -168,20 +167,21 @@ public boolean delete(String dtoId) {
});

// Delete notification from existing events
eventDefinitionService.getByNotificationId(dtoId)
.forEach(eventDefinition -> {
LOG.debug("Removing notification <{}/{}> from event definition <{}/{}>",
dto.get().id(), dto.get().title(),
eventDefinition.id(), eventDefinition.title());
final ImmutableList<EventNotificationHandler.Config> notifications = eventDefinition.notifications().stream()
.filter(entry -> !entry.notificationId().equals(dtoId))
.collect(ImmutableList.toImmutableList());
EventDefinitionDto updatedEventDto = eventDefinition.toBuilder()
.notifications(notifications)
.build();
eventDefinitionService.save(updatedEventDto);

});
try (Stream<EventDefinitionDto> eventDefinitionStream = eventDefinitionService.streamByNotificationId(dtoId)) {
eventDefinitionStream
.forEach(eventDefinition -> {
LOG.debug("Removing notification <{}/{}> from event definition <{}/{}>",
dto.get().id(), dto.get().title(),
eventDefinition.id(), eventDefinition.title());
final ImmutableList<EventNotificationHandler.Config> notifications = eventDefinition.notifications().stream()
.filter(entry -> !entry.notificationId().equals(dtoId))
.collect(ImmutableList.toImmutableList());
EventDefinitionDto updatedEventDto = eventDefinition.toBuilder()
.notifications(notifications)
.build();
eventDefinitionService.save(updatedEventDto);
});
}
LOG.debug("Deleting notification definition <{}/{}>", dto.get().id(), dto.get().title());
return notificationService.delete(dtoId) > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.graylog.events.processor;

import com.google.errorprone.annotations.MustBeClosed;
import com.mongodb.client.MongoCollection;
import jakarta.inject.Inject;
import jakarta.validation.constraints.NotNull;
Expand Down Expand Up @@ -176,39 +177,43 @@ private long doDeleteUnregister(String id, Supplier<Long> deleteSupplier) {
}

/**
* Returns the list of event definitions that is using the given notification ID.
* Returns the stream of event definitions that is using the given notification ID.
*
* @param notificationId the notification ID
* @return the event definitions with the given notification ID
* @return stream of the event definitions with the given notification ID
*/
public List<EventDefinitionDto> getByNotificationId(String notificationId) {
@MustBeClosed
public Stream<EventDefinitionDto> streamByNotificationId(String notificationId) {
final String field = String.format(Locale.US, "%s.%s",
EventDefinitionDto.FIELD_NOTIFICATIONS,
EventNotificationConfig.FIELD_NOTIFICATION_ID);
return stream(collection.find(eq(field, notificationId))).toList();
return stream(collection.find(eq(field, notificationId)));
}

/**
* Returns the list of system event definitions
* Returns the stream of system event definitions
*
* @return the matching event definitions
* @return stream of the matching event definitions
*/
public List<EventDefinitionDto> getSystemEventDefinitions() {
return stream(collection.find(eq(EventDefinitionDto.FIELD_SCOPE, SystemNotificationEventEntityScope.NAME))).toList();
@MustBeClosed
public Stream<EventDefinitionDto> streamSystemEventDefinitions() {
return stream(collection.find(eq(EventDefinitionDto.FIELD_SCOPE, SystemNotificationEventEntityScope.NAME)));
}

/**
* Returns the list of event definitions that contain the given value in the specified array field
* Returns the stream of event definitions that contain the given value in the specified array field.
*/
@NotNull
public List<EventDefinitionDto> getByArrayValue(String arrayField, String field, String value) {
return stream(collection.find(elemMatch(arrayField, eq(field, value)))).toList();
@MustBeClosed
public Stream<EventDefinitionDto> streamByArrayValue(String arrayField, String field, String value) {
return stream(collection.find(elemMatch(arrayField, eq(field, value))));
}

public boolean isMutable(EventDefinitionDto eventDefinition) {
return scopedEntityMongoUtils.isMutable(eventDefinition);
}

@MustBeClosed
public Stream<EventDefinitionDto> streamAll() {
return stream(collection.find());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.primitives.Ints;
import com.google.errorprone.annotations.MustBeClosed;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Accumulators;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.mongodb.client.model.Filters.and;
Expand Down Expand Up @@ -144,12 +146,13 @@ public DBJobTriggerService(MongoConnection mongoConnection,
}

/**
* Loads all existing records and returns them.
* Streams all existing records and returns the stream.
*
* @return list of records
* @return stream of records
*/
public List<JobTriggerDto> all() {
return stream(collection.find().sort(descending(FIELD_ID))).toList();
@MustBeClosed
public Stream<JobTriggerDto> streamAll() {
return stream(collection.find().sort(descending(FIELD_ID)));
}

/**
Expand All @@ -170,24 +173,27 @@ public Optional<JobTriggerDto> get(String id) {
* @return One found job trigger
*/
public Optional<JobTriggerDto> getOneForJob(String jobDefinitionId) {
final List<JobTriggerDto> triggers = getAllForJob(jobDefinitionId);
// We are currently expecting only one trigger per job definition. This will most probably change in the
// future once we extend our scheduler usage.
// TODO: Don't throw exception when there is more than one trigger for a job definition.
// To be able to do this, we need some kind of label system to make sure we can differentiate between
// automatically created triggers (e.g. by event definition) and manually created ones.
if (triggers.size() > 1) {
throw new IllegalStateException("More than one trigger for job definition <" + jobDefinitionId + ">");
try (final Stream<JobTriggerDto> triggerStream = streamAllForJob(jobDefinitionId)) {
final List<JobTriggerDto> triggers = triggerStream.toList();
// We are currently expecting only one trigger per job definition. This will most probably change in the
// future once we extend our scheduler usage.
// TODO: Don't throw exception when there is more than one trigger for a job definition.
// To be able to do this, we need some kind of label system to make sure we can differentiate between
// automatically created triggers (e.g. by event definition) and manually created ones.
if (triggers.size() > 1) {
throw new IllegalStateException("More than one trigger for job definition <" + jobDefinitionId + ">");
}
return triggers.stream().findFirst();
}
return triggers.stream().findFirst();
}

public List<JobTriggerDto> getAllForJob(String jobDefinitionId) {
@MustBeClosed
public Stream<JobTriggerDto> streamAllForJob(String jobDefinitionId) {
if (isNullOrEmpty(jobDefinitionId)) {
throw new IllegalArgumentException("jobDefinitionId cannot be null or empty");
}

return stream(collection.find(eq(FIELD_JOB_DEFINITION_ID, jobDefinitionId))).toList();
return stream(collection.find(eq(FIELD_JOB_DEFINITION_ID, jobDefinitionId)));
}

/**
Expand Down Expand Up @@ -533,13 +539,14 @@ public Optional<JobTriggerDto> cancelTriggerByQuery(Bson query) {
}

/**
* Find triggers by using the provided query. Use judiciously!
* Stream triggers by using the provided query. Use judiciously!
*
* @param query The query
* @return All found JobTriggers
* @return Stream of all found JobTriggers
*/
public List<JobTriggerDto> findByQuery(Bson query) {
return stream(collection.find(query).sort(descending(FIELD_UPDATED_AT))).toList();
@MustBeClosed
public Stream<JobTriggerDto> streamByQuery(Bson query) {
return stream(collection.find(query).sort(descending(FIELD_UPDATED_AT)));
}

private record OverdueTrigger(@JsonProperty("_id") String type, @JsonProperty("count") long count) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
import org.graylog2.contentpacks.EntityDescriptorIds;
import org.graylog2.contentpacks.model.ModelId;
import org.graylog2.contentpacks.model.ModelType;
Expand All @@ -40,12 +41,11 @@
import org.graylog2.plugin.PluginMetaData;
import org.graylog2.plugin.lookup.LookupCacheConfiguration;

import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.graylog2.contentpacks.model.entities.references.ReferenceMapUtils.toReferenceMap;
import static org.graylog2.contentpacks.model.entities.references.ReferenceMapUtils.toValueMap;
Expand Down Expand Up @@ -163,9 +163,11 @@ public EntityExcerpt createExcerpt(CacheDto cacheDto) {

@Override
public Set<EntityExcerpt> listEntityExcerpts() {
return cacheService.findAll().stream()
.map(this::createExcerpt)
.collect(Collectors.toSet());
try (Stream<CacheDto> cacheStream = cacheService.streamAll()) {
return cacheStream
.map(this::createExcerpt)
.collect(Collectors.toSet());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
import org.graylog2.contentpacks.EntityDescriptorIds;
import org.graylog2.contentpacks.model.ModelId;
import org.graylog2.contentpacks.model.ModelType;
Expand All @@ -40,12 +41,11 @@
import org.graylog2.plugin.PluginMetaData;
import org.graylog2.plugin.lookup.LookupDataAdapterConfiguration;

import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.graylog2.contentpacks.model.entities.references.ReferenceMapUtils.toReferenceMap;
import static org.graylog2.contentpacks.model.entities.references.ReferenceMapUtils.toValueMap;
Expand Down Expand Up @@ -163,9 +163,11 @@ public EntityExcerpt createExcerpt(DataAdapterDto dataAdapterDto) {

@Override
public Set<EntityExcerpt> listEntityExcerpts() {
return dataAdapterService.findAll().stream()
.map(this::createExcerpt)
.collect(Collectors.toSet());
try (Stream<DataAdapterDto> dataAdapterStream = dataAdapterService.streamAll()) {
return dataAdapterStream
.map(this::createExcerpt)
.collect(Collectors.toSet());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.ImmutableGraph;
import com.google.common.graph.MutableGraph;
import jakarta.inject.Inject;
import org.graylog2.contentpacks.EntityDescriptorIds;
import org.graylog2.contentpacks.exceptions.ContentPackException;
import org.graylog2.contentpacks.exceptions.DivergingEntityConfigurationException;
Expand All @@ -45,12 +46,11 @@
import org.graylog2.lookup.dto.DataAdapterDto;
import org.graylog2.lookup.dto.LookupTableDto;

import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class LookupTableFacade implements EntityFacade<LookupTableDto> {
public static final ModelType TYPE_V1 = ModelTypes.LOOKUP_TABLE_V1;
Expand Down Expand Up @@ -202,9 +202,11 @@ public EntityExcerpt createExcerpt(LookupTableDto lookupTableDto) {

@Override
public Set<EntityExcerpt> listEntityExcerpts() {
return lookupTableService.findAll().stream()
.map(this::createExcerpt)
.collect(Collectors.toSet());
try (Stream<LookupTableDto> lookupTableStream = lookupTableService.streamAll()) {
return lookupTableStream
.map(this::createExcerpt)
.collect(Collectors.toSet());
}
}

@Override
Expand Down
Loading

0 comments on commit adf979f

Please sign in to comment.