Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update default pipeline when inputs are renamed or deleted #21541

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.graylog.plugins.pipelineprocessor.rest;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.swrve.ratelimitedlogger.RateLimitedLog;
Expand Down Expand Up @@ -49,22 +48,20 @@
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.db.RuleDao;
import org.graylog.plugins.pipelineprocessor.db.RuleService;
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog2.audit.jersey.AuditEvent;
import org.graylog2.audit.jersey.NoAuditEvent;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.PaginatedList;
import org.graylog2.inputs.InputRoutingService;
import org.graylog2.plugin.rest.PluginRestResource;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.rest.models.PaginatedResponse;
import org.graylog2.search.SearchQuery;
import org.graylog2.search.SearchQueryField;
import org.graylog2.search.SearchQueryParser;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

Expand All @@ -73,7 +70,6 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -99,27 +95,23 @@ public class PipelineResource extends RestResource implements PluginRestResource

private final SearchQueryParser searchQueryParser;
private final PaginatedPipelineService paginatedPipelineService;

private final PipelineService pipelineService;
private final PipelineRuleParser pipelineRuleParser;
private final PipelineStreamConnectionsService connectionsService;
private final RuleService ruleService;
private final StreamService streamService;
private final InputRoutingService inputRoutingService;

@Inject
public PipelineResource(PipelineService pipelineService,
PaginatedPipelineService paginatedPipelineService,
PipelineRuleParser pipelineRuleParser,
PipelineStreamConnectionsService connectionsService,
RuleService ruleService,
StreamService streamService) {
InputRoutingService inputRoutingService) {
this.pipelineService = pipelineService;
this.pipelineRuleParser = pipelineRuleParser;
this.paginatedPipelineService = paginatedPipelineService;
this.searchQueryParser = new SearchQueryParser(PipelineDao.FIELD_TITLE, SEARCH_FIELD_MAPPING);
this.connectionsService = connectionsService;
this.ruleService = ruleService;
this.streamService = streamService;
this.inputRoutingService = inputRoutingService;
}

@ApiOperation(value = "Create a processing pipeline from source")
Expand Down Expand Up @@ -245,30 +237,7 @@ public PipelineSource get(@ApiParam(name = "id") @PathParam("id") String id) thr
public PipelineSource update(@ApiParam(name = "id") @PathParam("id") String id,
@ApiParam(name = "pipeline", required = true) @NotNull PipelineSource update) throws NotFoundException {
checkPermission(PipelineRestPermissions.PIPELINE_EDIT, id);

final PipelineDao dao = pipelineService.load(id);
final Pipeline pipeline;
try {
pipeline = pipelineRuleParser.parsePipeline(update.id(), update.source());
} catch (ParseException e) {
throw new BadRequestException(Response.status(Response.Status.BAD_REQUEST).entity(e.getErrors()).build());
}
final PipelineDao toSave = dao.toBuilder()
.title(pipeline.name())
.description(update.description())
.source(update.source())
.modifiedAt(DateTime.now(DateTimeZone.UTC))
.build();

final PipelineDao savedPipeline;
try {
savedPipeline = pipelineService.save(toSave);
} catch (IllegalArgumentException e) {
log.error(e.getMessage(), e);
throw new BadRequestException(e.getMessage());
}

return PipelineSource.fromDao(pipelineRuleParser, savedPipeline);
return PipelineUtils.update(pipelineService, pipelineRuleParser, id, update);
}

public record RoutingRequest(
Expand All @@ -288,19 +257,7 @@ public RoutingResponse routing(@ApiParam(name = "body", required = true) @NotNul
checkPermission(RestPermissions.STREAMS_EDIT, request.streamId());
checkPermission(PipelineRestPermissions.PIPELINE_RULE_CREATE);

Stream stream;
try {
stream = streamService.load(request.streamId());
} catch (NotFoundException e) {
throw new NotFoundException(f("Unable to load stream %s", request.streamId()), e);
}

boolean removeFromDefault = true;
if (request.removeFromDefault() == null) {
removeFromDefault = stream.getRemoveMatchesFromDefaultStream();
}

RuleDao ruleDao = createRoutingRule(request, removeFromDefault, stream.getTitle());
RuleDao ruleDao = inputRoutingService.createRoutingRule(request);
PipelineDao pipelineDao;
try {
pipelineDao = pipelineService.loadByName(GL_INPUT_ROUTING_PIPELINE);
Expand All @@ -317,7 +274,7 @@ public RoutingResponse routing(@ApiParam(name = "body", required = true) @NotNul
if (rules0.stream().filter(ruleRef -> ruleRef.equals(ruleDao.title())).findFirst().isEmpty()) {
rules0.add(ruleDao.title());
pipelineSource = pipelineSource.toBuilder()
.source(createPipelineString(pipelineSource))
.source(PipelineUtils.createPipelineString(pipelineSource))
.build();
update(pipelineDao.id(), pipelineSource);
} else {
Expand Down Expand Up @@ -356,46 +313,6 @@ private void ensurePipelineConnection(String pipelineId, String streamId) {
connectionsService.save(pipelineConnections);
}

private RuleDao createRoutingRule(RoutingRequest request, boolean removeFromDefault, String streamName) {
String ruleName = "route_" + request.inputId() + "_to_" + streamName;
final Optional<RuleDao> ruleDaoOpt = ruleService.findByName(ruleName);
if (ruleDaoOpt.isPresent()) {
log.info(f("Routing rule %s already exists - skipping", ruleName));
return ruleDaoOpt.get();
}

String ruleSource =
"rule \"" + ruleName + "\"\n"
+ "when has_field(\"gl2_source_input\") AND to_string($message.gl2_source_input)==\"" + request.inputId() + "\"\n"
+ "then\n"
+ "route_to_stream(id:\"" + request.streamId() + "\""
+ ", remove_from_default: " + removeFromDefault
+ ");\nend\n";

RuleDao ruleDao = RuleDao.builder()
.title(ruleName)
.description("Input setup wizard routing rule")
.source(ruleSource)
.createdAt(DateTime.now(DateTimeZone.UTC))
.build();
return ruleService.save(ruleDao);
}

@VisibleForTesting
public static String createPipelineString(PipelineSource pipelineSource) {
StringBuilder result = new StringBuilder("pipeline \"" + pipelineSource.title() + "\"\n");
for (int stageNr = 0; stageNr < pipelineSource.stages().size(); stageNr++) {
StageSource currStage = pipelineSource.stages().get(stageNr);
result.append("stage ").append(stageNr).append(" match ").append(currStage.match()).append('\n');
for (String rule : currStage.rules()) {
result.append("rule \"").append(rule).append("\"\n");
}
}
result.append("end");

return result.toString();
}

@ApiOperation(value = "Delete a processing pipeline", notes = "It can take up to a second until the change is applied")
@Path("/{id}")
@DELETE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.plugins.pipelineprocessor.rest;

import com.swrve.ratelimitedlogger.RateLimitedLog;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog2.database.NotFoundException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import static org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.getRateLimitedLog;

/**
* Factor out utility methods that are used in multiple services to avoid circular dependencies.
*/
public class PipelineUtils {
private static final RateLimitedLog log = getRateLimitedLog(PipelineUtils.class);

private PipelineUtils() {
}

public static PipelineSource update(PipelineService pipelineService,
PipelineRuleParser pipelineRuleParser,
String id,
PipelineSource update) throws NotFoundException {
final PipelineDao dao = pipelineService.load(id);
final Pipeline pipeline;
try {
pipeline = pipelineRuleParser.parsePipeline(update.id(), update.source());
} catch (ParseException e) {
throw new BadRequestException(Response.status(Response.Status.BAD_REQUEST).entity(e.getErrors()).build());
}
final PipelineDao toSave = dao.toBuilder()
.title(pipeline.name())
.description(update.description())
.source(update.source())
.modifiedAt(DateTime.now(DateTimeZone.UTC))
.build();

final PipelineDao savedPipeline;
try {
savedPipeline = pipelineService.save(toSave);
} catch (IllegalArgumentException e) {
log.error(e.getMessage(), e);
throw new BadRequestException(e.getMessage());
}

return PipelineSource.fromDao(pipelineRuleParser, savedPipeline);
}

public static String createPipelineString(PipelineSource pipelineSource) {
StringBuilder result = new StringBuilder("pipeline \"" + pipelineSource.title() + "\"\n");
for (int stageNr = 0; stageNr < pipelineSource.stages().size(); stageNr++) {
StageSource currStage = pipelineSource.stages().get(stageNr);
result.append("stage ").append(stageNr).append(" match ").append(currStage.match()).append('\n');
for (String rule : currStage.rules()) {
result.append("rule \"").append(rule).append("\"\n");
}
}
result.append("end");

return result.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import jakarta.inject.Inject;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
Expand All @@ -36,7 +50,6 @@
import org.graylog.plugins.pipelineprocessor.db.RuleService;
import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry;
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
import org.graylog.plugins.pipelineprocessor.rulebuilder.parser.RuleBuilderService;
import org.graylog.plugins.pipelineprocessor.simulator.RuleSimulator;
import org.graylog2.audit.jersey.AuditEvent;
import org.graylog2.audit.jersey.NoAuditEvent;
Expand All @@ -49,29 +62,10 @@
import org.graylog2.search.SearchQueryField;
import org.graylog2.search.SearchQueryParser;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import javax.annotation.Nonnull;

import jakarta.inject.Inject;

import jakarta.validation.constraints.NotNull;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -105,7 +99,6 @@ public class RuleResource extends RestResource implements PluginRestResource {
private final PaginatedRuleService paginatedRuleService;
private final SearchQueryParser searchQueryParser;
private final PipelineServiceHelper pipelineServiceHelper;
private final RuleBuilderService ruleBuilderService;

@Inject
public RuleResource(RuleService ruleService,
Expand All @@ -114,9 +107,7 @@ public RuleResource(RuleService ruleService,
PipelineRuleService pipelineRuleService,
PaginatedRuleService paginatedRuleService,
FunctionRegistry functionRegistry,
PipelineServiceHelper pipelineServiceHelper,
StreamService streamService,
RuleBuilderService ruleBuilderService) {
PipelineServiceHelper pipelineServiceHelper) {
this.ruleService = ruleService;
this.ruleSimulator = ruleSimulator;
this.pipelineService = pipelineService;
Expand All @@ -125,12 +116,10 @@ public RuleResource(RuleService ruleService,
this.functionRegistry = functionRegistry;
this.paginatedRuleService = paginatedRuleService;
this.pipelineServiceHelper = pipelineServiceHelper;
this.ruleBuilderService = ruleBuilderService;

this.searchQueryParser = new SearchQueryParser(RuleDao.FIELD_TITLE, SEARCH_FIELD_MAPPING);
}


@ApiOperation(value = "Create a processing rule from source", notes = "")
@POST
@RequiresPermissions(PipelineRestPermissions.PIPELINE_RULE_CREATE)
Expand Down Expand Up @@ -260,7 +249,6 @@ Map<String, Object> prepareContextForPaginatedResponse(@Nonnull List<RuleDao> ru
return Map.of("used_in_pipelines", result);
}


@ApiOperation(value = "Get a processing rule", notes = "It can take up to a second until the change is applied")
@Path("/{id}")
@GET
Expand Down
Loading
Loading