Skip to content

Commit

Permalink
RANGER-4400: Implemented processResults(Collection<RangerAccessResult…
Browse files Browse the repository at this point in the history
…> results) for RangerKafkaAuditHandler to prevent audit of cluster resource level topic creation while creating topic (#281)
  • Loading branch information
fateh288 authored Sep 13, 2023
1 parent 7f86e62 commit da68108
Showing 1 changed file with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,70 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;

public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler {
private static final Logger LOG = LoggerFactory.getLogger(RangerKafkaAuditHandler.class);

private AuthzAuditEvent auditEvent = null;

private ArrayList<AuthzAuditEvent> auditEventList = new ArrayList<>();

public RangerKafkaAuditHandler(){
}

@Override
public void processResult(RangerAccessResult result) {
// If Cluster Resource Level Topic Creation is not Allowed we don't audit.
// Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited.
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.processResult()");
}
if (!isAuditingNeeded(result)) {
return;
}
auditEvent = super.getAuthzEvents(result);
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerKafkaAuditHandler.processResult()");
}
}
@Override
public void processResults(Collection<RangerAccessResult> results) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.processResults(" + results + ")");
}
for(RangerAccessResult res: results){
if (isAuditingNeeded(res)){
AuthzAuditEvent event = super.getAuthzEvents(res);
if(event!=null){
if(LOG.isDebugEnabled()) {
LOG.debug("Got event=" + event + " for RangerAccessResult=" + res);
}
auditEventList.add(event);
}
else{
if(LOG.isDebugEnabled()) {
LOG.debug("No audit event for :" + res);
}
}
}
else {
if(LOG.isDebugEnabled()) {
LOG.debug("Auditing not required for :"+res);
}
}
}

if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerKafkaAuditHandler.processResults(" + results + ")");
}
}

private boolean isAuditingNeeded(final RangerAccessResult result) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.isAuditingNeeded()");
}
boolean ret = true;
boolean isAllowed = result.getIsAllowed();
RangerAccessRequest request = result.getAccessRequest();
Expand All @@ -57,18 +102,31 @@ private boolean isAuditingNeeded(final RangerAccessResult result) {
ret = false;
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("RangerKafkaAuditHandler: isAuditingNeeded()");
LOG.debug("request:"+request);
LOG.debug("resource:"+resource);
LOG.debug("resourceName:"+resourceName);
LOG.debug("request.getAccessType():"+request.getAccessType());
LOG.debug("isAllowed:"+isAllowed);
LOG.debug("ret="+ret);
LOG.debug("<== RangerKafkaAuditHandler.isAuditingNeeded() = "+ret+" for result="+result);
}
return ret;
}

public void flushAudit() {
if(LOG.isDebugEnabled()) {
LOG.info("==> RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")");
LOG.debug("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent +" list="+ auditEventList+ ")");
}
if (auditEvent != null) {
super.logAuthzAudit(auditEvent);
}
else if (auditEventList.size()>0){
super.logAuthzAudits(auditEventList);
}
if(LOG.isDebugEnabled()) {
LOG.info("<== RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")");
LOG.debug("<== RangerKafkaAuditHandler.flushAudit()");
}
}
}

0 comments on commit da68108

Please sign in to comment.