Skip to content

Commit

Permalink
RANGER-4375: updated to log plugin activities asynchronously - #2
Browse files Browse the repository at this point in the history
  • Loading branch information
mneethiraj committed Aug 28, 2023
1 parent b876ae7 commit 2da8a1f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 42 deletions.
93 changes: 53 additions & 40 deletions security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.ranger.common.DateUtil;
import org.apache.ranger.common.JSONUtil;
import org.apache.ranger.common.MessageEnums;
import org.apache.ranger.common.PropertiesUtil;
import org.apache.ranger.common.RangerCommonEnums;
import org.apache.ranger.common.RangerConstants;
import org.apache.ranger.common.SearchCriteria;
Expand Down Expand Up @@ -75,6 +74,9 @@

@Component
public class AssetMgr extends AssetMgrBase {
private static final String PROP_RANGER_LOG_SC_NOT_MODIFIED = "ranger.log.SC_NOT_MODIFIED";
private static final String PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED = "ranger.plugin.activity.audit.not.modified";
private static final String PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE = "ranger.plugin.activity.audit.commit.inline";

@Autowired
XPermMapService xPermMapService;
Expand Down Expand Up @@ -136,7 +138,9 @@ public class AssetMgr extends AssetMgrBase {
@Autowired
ServiceMgr serviceMgr;

boolean pluginActivityAuditCommitInline = false;
boolean rangerLogNotModified = false;
boolean pluginActivityAuditLogNotModified = false;
boolean pluginActivityAuditCommitInline = false;

private static final Logger logger = LoggerFactory.getLogger(AssetMgr.class);

Expand All @@ -146,9 +150,13 @@ public class AssetMgr extends AssetMgrBase {
public void init() {
logger.info("==> AssetMgr.init()");

pluginActivityAuditCommitInline = RangerAdminConfig.getInstance().getBoolean("ranger.plugin.activity.audit.commit.inline", false);
rangerLogNotModified = RangerAdminConfig.getInstance().getBoolean(PROP_RANGER_LOG_SC_NOT_MODIFIED, false);
pluginActivityAuditLogNotModified = RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED, false);
pluginActivityAuditCommitInline = RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE, false);

logger.info("ranger.plugin.activity.audit.commit.inline={}", pluginActivityAuditCommitInline);
logger.info("{}={}", PROP_RANGER_LOG_SC_NOT_MODIFIED, rangerLogNotModified);
logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED, pluginActivityAuditLogNotModified);
logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE, pluginActivityAuditCommitInline);

logger.info("<== AssetMgr.init()");
}
Expand Down Expand Up @@ -662,13 +670,10 @@ public void UpdateDefaultPolicyUserAndPerm(VXResource vXResource,
public void createPolicyAudit(final XXPolicyExportAudit xXPolicyExportAudit) {
final Runnable commitWork;
if (xXPolicyExportAudit.getHttpRetCode() == HttpServletResponse.SC_NOT_MODIFIED) {
boolean logNotModified = PropertiesUtil.getBooleanProperty("ranger.log.SC_NOT_MODIFIED", false);
if (!logNotModified) {
commitWork = null;
if (!rangerLogNotModified) {
logger.debug("Not logging HttpServletResponse. SC_NOT_MODIFIED. To enable, set configuration: {}=true", PROP_RANGER_LOG_SC_NOT_MODIFIED);

logger.debug("Not logging HttpServletResponse."
+ "SC_NOT_MODIFIED, to enable, update "
+ ": ranger.log.SC_NOT_MODIFIED");
commitWork = null;
} else {
// Create PolicyExportAudit record after transaction is completed. If it is created in-line here
// then the TransactionManager will roll-back the changes because the HTTP return code is
Expand Down Expand Up @@ -762,34 +767,40 @@ private void createOrUpdatePluginInfo(final RangerPluginInfo pluginInfo, int ent
final Runnable commitWork;

if (httpCode == HttpServletResponse.SC_NOT_MODIFIED) {
// Create or update PluginInfo record after transaction is completed. If it is created in-line here
// then the TransactionManager will roll-back the changes because the HTTP return code is
// HttpServletResponse.SC_NOT_MODIFIED

switch (entityType) {
case RangerPluginInfo.ENTITY_TYPE_POLICIES:
isTagVersionResetNeeded = rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName()) == null;
break;
case RangerPluginInfo.ENTITY_TYPE_TAGS:
isTagVersionResetNeeded = false;
break;
case RangerPluginInfo.ENTITY_TYPE_ROLES:
isTagVersionResetNeeded = false;
break;
case RangerPluginInfo.ENTITY_TYPE_USERSTORE:
isTagVersionResetNeeded = false;
break;
default:
isTagVersionResetNeeded = false;
break;
}
if (!pluginActivityAuditLogNotModified) {
logger.debug("Not logging HttpServletResponse. SC_NOT_MODIFIED. To enable, set configuration: {}=true", PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED);

commitWork = new Runnable() {
@Override
public void run() {
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded, clusterName);
commitWork = null;
} else {
// Create or update PluginInfo record after transaction is completed. If it is created in-line here
// then the TransactionManager will roll-back the changes because the HTTP return code is
// HttpServletResponse.SC_NOT_MODIFIED

switch (entityType) {
case RangerPluginInfo.ENTITY_TYPE_POLICIES:
isTagVersionResetNeeded = rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName()) == null;
break;
case RangerPluginInfo.ENTITY_TYPE_TAGS:
isTagVersionResetNeeded = false;
break;
case RangerPluginInfo.ENTITY_TYPE_ROLES:
isTagVersionResetNeeded = false;
break;
case RangerPluginInfo.ENTITY_TYPE_USERSTORE:
isTagVersionResetNeeded = false;
break;
default:
isTagVersionResetNeeded = false;
break;
}
};

commitWork = new Runnable() {
@Override
public void run() {
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded, clusterName);
}
};
}
} else if (httpCode == HttpServletResponse.SC_NOT_FOUND) {
if ((isPolicyDownloadRequest(entityType) && (pluginInfo.getPolicyActiveVersion() == null || pluginInfo.getPolicyActiveVersion() == -1))
|| (isTagDownloadRequest(entityType) && (pluginInfo.getTagActiveVersion() == null || pluginInfo.getTagActiveVersion() == -1))
Expand Down Expand Up @@ -820,10 +831,12 @@ public void run() {
};
}

if (pluginActivityAuditCommitInline) {
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
} else {
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
if (commitWork != null) {
if (pluginActivityAuditCommitInline) {
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
} else {
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
}
}

if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.ranger.service;

import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -35,22 +36,41 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

@Service
public class RangerTransactionService {
private static final String PROP_THREADPOOL_SIZE = "ranger.admin.transaction.service.threadpool.size";
private static final String PROP_SUMMARY_LOG_INTERVAL_SEC = "ranger.admin.transaction.service.summary.log.interval.sec";

@Autowired
@Qualifier(value = "transactionManager")
PlatformTransactionManager txManager;

private static final Logger LOG = LoggerFactory.getLogger(RangerTransactionService.class);

private ScheduledExecutorService scheduler = null;
private ScheduledExecutorService scheduler = null;
private AtomicLong scheduledTaskCount = new AtomicLong(0);
private AtomicLong executedTaskCount = new AtomicLong(0);
private AtomicLong failedTaskCount = new AtomicLong(0);
private long summaryLogIntervalMs = 5 * 60 * 1000;
private long nextLogSummaryTime = System.currentTimeMillis() + summaryLogIntervalMs;

@PostConstruct
public void init() {
scheduler = Executors.newScheduledThreadPool(1);
RangerAdminConfig config = RangerAdminConfig.getInstance();

int numOfThreads = config.getInt(PROP_THREADPOOL_SIZE, 1);
long summaryLogIntervalSec = config.getInt(PROP_SUMMARY_LOG_INTERVAL_SEC, 5 * 60);

scheduler = Executors.newScheduledThreadPool(numOfThreads);
summaryLogIntervalMs = summaryLogIntervalSec * 1000;
nextLogSummaryTime = System.currentTimeMillis() + summaryLogIntervalSec;

LOG.info("{}={}", PROP_THREADPOOL_SIZE, numOfThreads);
LOG.info("{}={}", PROP_SUMMARY_LOG_INTERVAL_SEC, summaryLogIntervalSec);
}

@PreDestroy
Expand All @@ -59,6 +79,8 @@ public void destroy() {
LOG.info("attempt to shutdown RangerTransactionService");
scheduler.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);

logSummary();
}
catch (InterruptedException e) {
LOG.error("RangerTransactionService tasks interrupted");
Expand Down Expand Up @@ -90,16 +112,47 @@ public Object doInTransaction(TransactionStatus status) {
}
});
} catch (Exception e) {
failedTaskCount.getAndIncrement();

LOG.error("Failed to commit TransactionService transaction", e);
LOG.error("Ignoring...");
} finally {
executedTaskCount.getAndIncrement();
logSummaryIfNeeded();
}
}
}
}, delayInMillis, MILLISECONDS);

scheduledTaskCount.getAndIncrement();

logSummaryIfNeeded();
} catch (Exception e) {
LOG.error("Failed to schedule TransactionService transaction:", e);
LOG.error("Ignroing...");
}
}

private void logSummaryIfNeeded() {
long now = System.currentTimeMillis();

if (summaryLogIntervalMs > 0 && now > nextLogSummaryTime) {
synchronized (this) {
if (now > nextLogSummaryTime) {
nextLogSummaryTime = now + summaryLogIntervalMs;

logSummary();
}
}
}
}

private void logSummary() {
long scheduled = scheduledTaskCount.get();
long executed = executedTaskCount.get();
long failed = failedTaskCount.get();
long pending = scheduled - executed;

LOG.info("RangerTransactionService: tasks(scheduled={}, executed={}, failed={}, pending={})", scheduled, executed, failed, pending);
}
}

0 comments on commit 2da8a1f

Please sign in to comment.