From 04d68dbf66f9c29bfc9cfe577db875c0ad18201b Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Fri, 4 Sep 2015 15:34:13 -0600 Subject: [PATCH 1/7] Configure HTTP DefaultConnectionLimit --- src/ServiceControl/Bootstrapper.cs | 4 ++++ src/ServiceControl/Infrastructure/Settings/Settings.cs | 2 ++ 2 files changed, 6 insertions(+) diff --git a/src/ServiceControl/Bootstrapper.cs b/src/ServiceControl/Bootstrapper.cs index 39fb9c2ae5..d128b3c948 100644 --- a/src/ServiceControl/Bootstrapper.cs +++ b/src/ServiceControl/Bootstrapper.cs @@ -3,6 +3,7 @@ namespace Particular.ServiceControl using System; using System.Diagnostics; using System.IO; + using System.Net; using System.ServiceProcess; using Autofac; using Hosting; @@ -25,6 +26,9 @@ public class Bootstrapper public Bootstrapper(ServiceBase host = null, HostArguments hostArguments = null, Configure configure = null) { + // .NET default limit is 10. RavenDB in conjunction with transports that use HTTP exceeds that limit. + ServicePointManager.DefaultConnectionLimit = Settings.HttpDefaultConnectionLimit; + Settings.ServiceName = DetermineServiceName(host, hostArguments); ConfigureLogging(); var containerBuilder = new ContainerBuilder(); diff --git a/src/ServiceControl/Infrastructure/Settings/Settings.cs b/src/ServiceControl/Infrastructure/Settings/Settings.cs index f4d2be75d1..278519b946 100644 --- a/src/ServiceControl/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl/Infrastructure/Settings/Settings.cs @@ -234,5 +234,7 @@ static string DefaultLogPathForInstance() } return Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), string.Format("Particular\\{0}\\logs", ServiceName)); } + + public static int HttpDefaultConnectionLimit = SettingsReader.Read("HttpDefaultConnectionLimit", 100); } } From bf12252eb825dc1df1579988e234e7de2c7d0336 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 15 Sep 2015 09:59:50 +0800 Subject: [PATCH 2/7] Turn off MSMQ Batch Import --- .../Operations/AuditQueueImport.cs | 9 +- .../Operations/Msmq/MsmqAuditQueueImporter.cs | 924 +++++++++--------- 2 files changed, 463 insertions(+), 470 deletions(-) diff --git a/src/ServiceControl/Operations/AuditQueueImport.cs b/src/ServiceControl/Operations/AuditQueueImport.cs index 84f1d4bed7..58415517c7 100644 --- a/src/ServiceControl/Operations/AuditQueueImport.cs +++ b/src/ServiceControl/Operations/AuditQueueImport.cs @@ -11,7 +11,6 @@ using NServiceBus.Pipeline; using NServiceBus.Satellites; using NServiceBus.Transports; - using NServiceBus.Transports.Msmq; using NServiceBus.Unicast.Messages; using NServiceBus.Unicast.Transport; using Raven.Client; @@ -28,11 +27,6 @@ public class AuditQueueImport : IAdvancedSatellite, IDisposable #pragma warning restore 618 - public AuditQueueImport(IDequeueMessages receiver) - { - disabled = receiver is MsmqDequeueStrategy; - } - public bool Handle(TransportMessage message) { InnerHandle(message); @@ -107,7 +101,7 @@ public Address InputAddress public bool Disabled { - get { return disabled; } + get { return false; } } public Action GetReceiverCustomization() @@ -132,6 +126,5 @@ public void Dispose() SatelliteImportFailuresHandler satelliteImportFailuresHandler; static readonly ILog Logger = LogManager.GetLogger(typeof(AuditQueueImport)); - bool disabled; } } \ No newline at end of file diff --git a/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs b/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs index adf8b77fbf..e23f0cf0f2 100644 --- a/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs +++ b/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs @@ -1,476 +1,476 @@ -namespace ServiceControl.Operations -{ - using System; - using System.Collections.Generic; - using System.IO; - using System.Linq; - using System.Messaging; - using System.Threading; - using System.Threading.Tasks; - using Contracts.Operations; - using EndpointControl.Handlers; - using MessageAuditing; - using NServiceBus; - using NServiceBus.Logging; - using NServiceBus.ObjectBuilder; - using NServiceBus.Transports; - using NServiceBus.Transports.Msmq; - using NServiceBus.Unicast; - using Raven.Abstractions.Data; - using Raven.Client; - using ServiceBus.Management.Infrastructure.Settings; - - class MsmqAuditQueueImporter : IWantToRunWhenBusStartsAndStops - { - public MsmqAuditQueueImporter(IDocumentStore store, IBuilder builder, IDequeueMessages receiver) - { - this.store = store; - this.builder = builder; - enabled = receiver is MsmqDequeueStrategy; - - importFailuresHandler = new SatelliteImportFailuresHandler(store, - Path.Combine(Settings.LogPath, @"FailedImports\Audit"), tm => new FailedAuditImport - { - Message = tm, - }); - } - - public IBus Bus { get; set; } - public KnownEndpointsCache KnownEndpointsCache { get; set; } - public UnicastBus UnicastBus { get; set; } - public ISendMessages Forwarder { get; set; } - - public void Start() - { - // Any messages that fail conversion to a transportmessage is sent to the particular.servicecontrol.errors queue using low level Api - // The actual queue name is based on service name to support mulitple instances on same host (particular.servicecontrol.errors is the default) - var serviceControlErrorQueueAddress = Address.Parse(string.Format("{0}.errors", Settings.ServiceName)); - serviceControlErrorQueue = new MessageQueue(MsmqUtilities.GetFullPath(serviceControlErrorQueueAddress), false, true, QueueAccessMode.Send); - - if (!enabled) - { - return; - } - - if (Settings.AuditQueue == Address.Undefined) - { - Logger.Info("No Audit queue has been configured. No audit import will be performed. To enable imports add the ServiceBus/AuditQueue appsetting and restart ServiceControl"); - return; - } - - if (TerminateIfForwardingIsEnabledButQueueNotWritable()) - { - return; - } +//namespace ServiceControl.Operations +//{ +// using System; +// using System.Collections.Generic; +// using System.IO; +// using System.Linq; +// using System.Messaging; +// using System.Threading; +// using System.Threading.Tasks; +// using Contracts.Operations; +// using EndpointControl.Handlers; +// using MessageAuditing; +// using NServiceBus; +// using NServiceBus.Logging; +// using NServiceBus.ObjectBuilder; +// using NServiceBus.Transports; +// using NServiceBus.Transports.Msmq; +// using NServiceBus.Unicast; +// using Raven.Abstractions.Data; +// using Raven.Client; +// using ServiceBus.Management.Infrastructure.Settings; + +// class MsmqAuditQueueImporter : IWantToRunWhenBusStartsAndStops +// { +// public MsmqAuditQueueImporter(IDocumentStore store, IBuilder builder, IDequeueMessages receiver) +// { +// this.store = store; +// this.builder = builder; +// //enabled = receiver is MsmqDequeueStrategy; + +// importFailuresHandler = new SatelliteImportFailuresHandler(store, +// Path.Combine(Settings.LogPath, @"FailedImports\Audit"), tm => new FailedAuditImport +// { +// Message = tm, +// }); +// } + +// public IBus Bus { get; set; } +// public KnownEndpointsCache KnownEndpointsCache { get; set; } +// public UnicastBus UnicastBus { get; set; } +// public ISendMessages Forwarder { get; set; } + +// public void Start() +// { +// // Any messages that fail conversion to a transportmessage is sent to the particular.servicecontrol.errors queue using low level Api +// // The actual queue name is based on service name to support mulitple instances on same host (particular.servicecontrol.errors is the default) +// var serviceControlErrorQueueAddress = Address.Parse(string.Format("{0}.errors", Settings.ServiceName)); +// serviceControlErrorQueue = new MessageQueue(MsmqUtilities.GetFullPath(serviceControlErrorQueueAddress), false, true, QueueAccessMode.Send); + +// if (!enabled) +// { +// return; +// } + +// if (Settings.AuditQueue == Address.Undefined) +// { +// Logger.Info("No Audit queue has been configured. No audit import will be performed. To enable imports add the ServiceBus/AuditQueue appsetting and restart ServiceControl"); +// return; +// } + +// if (TerminateIfForwardingIsEnabledButQueueNotWritable()) +// { +// return; +// } - performanceCounters.Initialize(); +// performanceCounters.Initialize(); - queuePeeker = new MessageQueue(MsmqUtilities.GetFullPath(Settings.AuditQueue), QueueAccessMode.Peek); - queuePeeker.MessageReadPropertyFilter.ClearAll(); - queuePeeker.PeekCompleted += QueueOnPeekCompleted; +// queuePeeker = new MessageQueue(MsmqUtilities.GetFullPath(Settings.AuditQueue), QueueAccessMode.Peek); +// queuePeeker.MessageReadPropertyFilter.ClearAll(); +// queuePeeker.PeekCompleted += QueueOnPeekCompleted; - enrichers = builder.BuildAll().ToList(); +// enrichers = builder.BuildAll().ToList(); - Logger.InfoFormat("MSMQ Audit import is now started, feeding audit messages from: {0}", Settings.AuditQueue); +// Logger.InfoFormat("MSMQ Audit import is now started, feeding audit messages from: {0}", Settings.AuditQueue); - countDownEvent.Idle += OnIdle; +// countDownEvent.Idle += OnIdle; - Logger.Debug("Ready to BeginPeek"); - queuePeeker.BeginPeek(); - } +// Logger.Debug("Ready to BeginPeek"); +// queuePeeker.BeginPeek(); +// } - public void Stop() - { - if (!enabled) - { - return; - } +// public void Stop() +// { +// if (!enabled) +// { +// return; +// } - stopping = true; +// stopping = true; - queuePeeker.PeekCompleted -= QueueOnPeekCompleted; - - stopResetEvent.Wait(); - - performanceCounters.Dispose(); - - queuePeeker.Dispose(); - - stopResetEvent.Dispose(); - } - - bool TerminateIfForwardingIsEnabledButQueueNotWritable() - { - if (Settings.ForwardAuditMessages != true) - { - return false; - } - - try - { - //Send a message to test the forwarding queue - var testMessage = new TransportMessage(Guid.Empty.ToString("N"), new Dictionary()); - Forwarder.Send(testMessage, Settings.AuditLogQueue); - return false; - } - catch (Exception messageForwardingException) - { - //This call to RaiseCriticalError has to be on a seperate thread otherwise it deadlocks and doesn't stop correctly. - ThreadPool.QueueUserWorkItem(state => Configure.Instance.RaiseCriticalError("Audit Import cannot start", messageForwardingException)); - return true; - } - } - - static MessageQueue CreateReceiver() - { - var queue = new MessageQueue(MsmqUtilities.GetFullPath(Settings.AuditQueue), QueueAccessMode.Receive); - - var messageReadPropertyFilter = new MessagePropertyFilter - { - Body = true, - TimeToBeReceived = true, - Recoverable = true, - Id = true, - ResponseQueue = true, - CorrelationId = true, - Extension = true, - AppSpecific = true - }; - - queue.MessageReadPropertyFilter = messageReadPropertyFilter; - - return queue; - } - - void OnIdle(object sender, EventArgs eventArgs) - { - stopResetEvent.Set(); - - if (stopping) - { - return; - } - - Logger.Debug("Ready to BeginPeek again"); - queuePeeker.BeginPeek(); - } - - void QueueOnPeekCompleted(object sender, PeekCompletedEventArgs args) - { - stopResetEvent.Reset(); - - TryStartNewBatchImporter(); - } - - bool TryStartNewBatchImporter() - { - lock (lockObj) - { - if (countDownEvent.CurrentCount > UnicastBus.Transport.MaximumConcurrencyLevel) - { - return false; - } - countDownEvent.Add(); - } - - // If batchErrorLockObj can not be locked it means one of the Tasks has had a batch error, and RetryMessageImportById is running +// queuePeeker.PeekCompleted -= QueueOnPeekCompleted; + +// stopResetEvent.Wait(); + +// performanceCounters.Dispose(); + +// queuePeeker.Dispose(); + +// stopResetEvent.Dispose(); +// } + +// bool TerminateIfForwardingIsEnabledButQueueNotWritable() +// { +// if (Settings.ForwardAuditMessages != true) +// { +// return false; +// } + +// try +// { +// //Send a message to test the forwarding queue +// var testMessage = new TransportMessage(Guid.Empty.ToString("N"), new Dictionary()); +// Forwarder.Send(testMessage, Settings.AuditLogQueue); +// return false; +// } +// catch (Exception messageForwardingException) +// { +// //This call to RaiseCriticalError has to be on a seperate thread otherwise it deadlocks and doesn't stop correctly. +// ThreadPool.QueueUserWorkItem(state => Configure.Instance.RaiseCriticalError("Audit Import cannot start", messageForwardingException)); +// return true; +// } +// } + +// static MessageQueue CreateReceiver() +// { +// var queue = new MessageQueue(MsmqUtilities.GetFullPath(Settings.AuditQueue), QueueAccessMode.Receive); + +// var messageReadPropertyFilter = new MessagePropertyFilter +// { +// Body = true, +// TimeToBeReceived = true, +// Recoverable = true, +// Id = true, +// ResponseQueue = true, +// CorrelationId = true, +// Extension = true, +// AppSpecific = true +// }; + +// queue.MessageReadPropertyFilter = messageReadPropertyFilter; + +// return queue; +// } + +// void OnIdle(object sender, EventArgs eventArgs) +// { +// stopResetEvent.Set(); + +// if (stopping) +// { +// return; +// } + +// Logger.Debug("Ready to BeginPeek again"); +// queuePeeker.BeginPeek(); +// } + +// void QueueOnPeekCompleted(object sender, PeekCompletedEventArgs args) +// { +// stopResetEvent.Reset(); + +// TryStartNewBatchImporter(); +// } + +// bool TryStartNewBatchImporter() +// { +// lock (lockObj) +// { +// if (countDownEvent.CurrentCount > UnicastBus.Transport.MaximumConcurrencyLevel) +// { +// return false; +// } +// countDownEvent.Add(); +// } + +// // If batchErrorLockObj can not be locked it means one of the Tasks has had a batch error, and RetryMessageImportById is running - lock (batchErrorLockObj) - { - } +// lock (batchErrorLockObj) +// { +// } - if (stopping) - return true; +// if (stopping) +// return true; - batchTaskTracker.Add(Task.Factory - .StartNew(BatchImporter, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default) - .ContinueWith(task => - { - if (task.Exception != null) { - task.Exception.Handle(ex =>{ - Logger.Error("Error processing message.", ex); - return true; - }); - batchTaskTracker.Remove(task); - } - })); - return true; - } - - void BatchImporter() - { - String failedMessageID = null; - try - { - Logger.DebugFormat("Batch job started", Task.CurrentId); +// batchTaskTracker.Add(Task.Factory +// .StartNew(BatchImporter, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default) +// .ContinueWith(task => +// { +// if (task.Exception != null) { +// task.Exception.Handle(ex =>{ +// Logger.Error("Error processing message.", ex); +// return true; +// }); +// batchTaskTracker.Remove(task); +// } +// })); +// return true; +// } + +// void BatchImporter() +// { +// String failedMessageID = null; +// try +// { +// Logger.DebugFormat("Batch job started", Task.CurrentId); - var moreMessages = 0; - - using (var queueReceiver = CreateReceiver()) - { - do - { - if (moreMessages > RampUpConcurrencyMagicNumber) - { - if (TryStartNewBatchImporter()) - { - Logger.Debug("We have too many messages, starting another batch importer"); - moreMessages = 0; //Reset to 0 so we only ramp up once per BatchImporter - } - } - - moreMessages++; - - using (var msmqTransaction = new MessageQueueTransaction()) - { - msmqTransaction.Begin(); - using (var bulkInsert =store.BulkInsert(options:new BulkInsertOptions {CheckForUpdates = true})) - { - for (var idx = 0; idx < BatchSize; idx++) - { - Message message = null; - TransportMessage transportMessage; - try - { - message = queueReceiver.Receive(receiveTimeout, msmqTransaction); - performanceCounters.MessageDequeued(); - transportMessage = MsmqUtilities.Convert(message); - } - catch (MessageQueueException mqe) - { - if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) - { - moreMessages = 0; - break; - } - throw; - } - catch (Exception) - { - if (message != null) { - failedMessageID = message.Id; - } - throw; - } - - try - { - var importSuccessfullyProcessedMessage = new ImportSuccessfullyProcessedMessage(transportMessage); - foreach (var enricher in enrichers) - { - enricher.Enrich(importSuccessfullyProcessedMessage); - } - var auditMessage = new ProcessedMessage(importSuccessfullyProcessedMessage); - bulkInsert.Store(auditMessage); - performanceCounters.MessageProcessed(); - - if (Settings.ForwardAuditMessages == true) - { - Forwarder.Send(transportMessage, Settings.AuditLogQueue); - } - } - catch (Exception) - { - if (message != null) - { - failedMessageID = message.Id; - } - throw; - } - } - } - msmqTransaction.Commit(); - } - } while (moreMessages > 0 && !stopping); - } - Logger.Debug("Stopping batch importer"); - } - finally - { - if (!String.IsNullOrEmpty(failedMessageID)) - { - // Call RetryMessageImportById outside the Task as it checks for running tasks - ThreadPool.QueueUserWorkItem(state => RetryMessageImportById(failedMessageID)); - } - countDownEvent.Decrement(); - } - } - - void RetryMessageImportById(string messageID) - { - // Try to get the batchErrorLock, if we can't then exit, - // the message will trigger a retry next time on the next batch read. - // Retrymessage may be fired again for the same message until the batches drain so this - // prevents the message being processed twice, - if (Monitor.TryEnter(batchErrorLockObj)) - { - try - { - Logger.DebugFormat("Drain stop running batch importers"); - stopping = true; - var runningTasks = batchTaskTracker.Active(); - Task.WaitAll(runningTasks); - - var commitTransaction = false; - using (var queueReceiver = CreateReceiver()) - using (var msmqTransaction = new MessageQueueTransaction()) - { - msmqTransaction.Begin(); - Logger.DebugFormat("Retry import of messageID - {0}", messageID); - try - { - Message message; - TransportMessage transportMessage; - try - { - message = queueReceiver.ReceiveById(messageID); - performanceCounters.MessageDequeued(); - } - catch (Exception exception) - { - importFailuresHandler.FailedToReceive(exception); //logs and increments circuit breaker - return; - } - - try - { - transportMessage = MsmqUtilities.Convert(message); - } - catch (Exception convertException) - { - importFailuresHandler.FailedToReceive(convertException); //logs and increments circuit breaker - serviceControlErrorQueue.Send(message, msmqTransaction); // Send unconvertable message to SC's ErrorQueue so it's not lost - commitTransaction = true; // Can't convert the messsage, so commit to get message out of the queue - return; - } - - try - { - var importSuccessfullyProcessedMessage = new ImportSuccessfullyProcessedMessage(transportMessage); - foreach (var enricher in enrichers) - { - enricher.Enrich(importSuccessfullyProcessedMessage); - } - - using (var session = store.OpenSession()) - { - var auditMessage = new ProcessedMessage(importSuccessfullyProcessedMessage); - session.Store(auditMessage); - session.SaveChanges(); - } - performanceCounters.MessageProcessed(); - - if (Settings.ForwardAuditMessages == true) - { - Forwarder.Send(transportMessage, Settings.AuditLogQueue); - } - - commitTransaction = true; - } - catch (Exception importException) - { - importFailuresHandler.Log(transportMessage, importException); //Logs and Writes failure transport message to Raven - } - } - finally - { - if (commitTransaction) - { - msmqTransaction.Commit(); - } - } - } - } - finally - { - Monitor.Exit(batchErrorLockObj); - //Restart Batch mode - stopping = false; - Logger.Debug("Ready to BeginPeek again"); - queuePeeker.BeginPeek(); - } - } - } - - const int RampUpConcurrencyMagicNumber = 5; //How many batches before we ramp up? - const int BatchSize = 100; - - static readonly ILog Logger = LogManager.GetLogger(typeof(MsmqAuditQueueImporter)); - - readonly IBuilder builder; - readonly CountDownEvent countDownEvent = new CountDownEvent(); - readonly bool enabled; - readonly SatelliteImportFailuresHandler importFailuresHandler; - readonly object lockObj = new object(); - readonly object batchErrorLockObj = new object(); - readonly MsmqAuditImporterPerformanceCounters performanceCounters = new MsmqAuditImporterPerformanceCounters(); - readonly TimeSpan receiveTimeout = TimeSpan.FromSeconds(1); - readonly ManualResetEventSlim stopResetEvent = new ManualResetEventSlim(true); - readonly IDocumentStore store; - - BatchTaskTracker batchTaskTracker = new BatchTaskTracker(); - List enrichers; - MessageQueue queuePeeker; - MessageQueue serviceControlErrorQueue; - - volatile bool stopping; - - class BatchTaskTracker - { - List tasks = new List(); +// var moreMessages = 0; + +// using (var queueReceiver = CreateReceiver()) +// { +// do +// { +// if (moreMessages > RampUpConcurrencyMagicNumber) +// { +// if (TryStartNewBatchImporter()) +// { +// Logger.Debug("We have too many messages, starting another batch importer"); +// moreMessages = 0; //Reset to 0 so we only ramp up once per BatchImporter +// } +// } + +// moreMessages++; + +// using (var msmqTransaction = new MessageQueueTransaction()) +// { +// msmqTransaction.Begin(); +// using (var bulkInsert =store.BulkInsert(options:new BulkInsertOptions {CheckForUpdates = true})) +// { +// for (var idx = 0; idx < BatchSize; idx++) +// { +// Message message = null; +// TransportMessage transportMessage; +// try +// { +// message = queueReceiver.Receive(receiveTimeout, msmqTransaction); +// performanceCounters.MessageDequeued(); +// transportMessage = MsmqUtilities.Convert(message); +// } +// catch (MessageQueueException mqe) +// { +// if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) +// { +// moreMessages = 0; +// break; +// } +// throw; +// } +// catch (Exception) +// { +// if (message != null) { +// failedMessageID = message.Id; +// } +// throw; +// } + +// try +// { +// var importSuccessfullyProcessedMessage = new ImportSuccessfullyProcessedMessage(transportMessage); +// foreach (var enricher in enrichers) +// { +// enricher.Enrich(importSuccessfullyProcessedMessage); +// } +// var auditMessage = new ProcessedMessage(importSuccessfullyProcessedMessage); +// bulkInsert.Store(auditMessage); +// performanceCounters.MessageProcessed(); + +// if (Settings.ForwardAuditMessages == true) +// { +// Forwarder.Send(transportMessage, Settings.AuditLogQueue); +// } +// } +// catch (Exception) +// { +// if (message != null) +// { +// failedMessageID = message.Id; +// } +// throw; +// } +// } +// } +// msmqTransaction.Commit(); +// } +// } while (moreMessages > 0 && !stopping); +// } +// Logger.Debug("Stopping batch importer"); +// } +// finally +// { +// if (!String.IsNullOrEmpty(failedMessageID)) +// { +// // Call RetryMessageImportById outside the Task as it checks for running tasks +// ThreadPool.QueueUserWorkItem(state => RetryMessageImportById(failedMessageID)); +// } +// countDownEvent.Decrement(); +// } +// } + +// void RetryMessageImportById(string messageID) +// { +// // Try to get the batchErrorLock, if we can't then exit, +// // the message will trigger a retry next time on the next batch read. +// // Retrymessage may be fired again for the same message until the batches drain so this +// // prevents the message being processed twice, +// if (Monitor.TryEnter(batchErrorLockObj)) +// { +// try +// { +// Logger.DebugFormat("Drain stop running batch importers"); +// stopping = true; +// var runningTasks = batchTaskTracker.Active(); +// Task.WaitAll(runningTasks); + +// var commitTransaction = false; +// using (var queueReceiver = CreateReceiver()) +// using (var msmqTransaction = new MessageQueueTransaction()) +// { +// msmqTransaction.Begin(); +// Logger.DebugFormat("Retry import of messageID - {0}", messageID); +// try +// { +// Message message; +// TransportMessage transportMessage; +// try +// { +// message = queueReceiver.ReceiveById(messageID); +// performanceCounters.MessageDequeued(); +// } +// catch (Exception exception) +// { +// importFailuresHandler.FailedToReceive(exception); //logs and increments circuit breaker +// return; +// } + +// try +// { +// transportMessage = MsmqUtilities.Convert(message); +// } +// catch (Exception convertException) +// { +// importFailuresHandler.FailedToReceive(convertException); //logs and increments circuit breaker +// serviceControlErrorQueue.Send(message, msmqTransaction); // Send unconvertable message to SC's ErrorQueue so it's not lost +// commitTransaction = true; // Can't convert the messsage, so commit to get message out of the queue +// return; +// } + +// try +// { +// var importSuccessfullyProcessedMessage = new ImportSuccessfullyProcessedMessage(transportMessage); +// foreach (var enricher in enrichers) +// { +// enricher.Enrich(importSuccessfullyProcessedMessage); +// } + +// using (var session = store.OpenSession()) +// { +// var auditMessage = new ProcessedMessage(importSuccessfullyProcessedMessage); +// session.Store(auditMessage); +// session.SaveChanges(); +// } +// performanceCounters.MessageProcessed(); + +// if (Settings.ForwardAuditMessages == true) +// { +// Forwarder.Send(transportMessage, Settings.AuditLogQueue); +// } + +// commitTransaction = true; +// } +// catch (Exception importException) +// { +// importFailuresHandler.Log(transportMessage, importException); //Logs and Writes failure transport message to Raven +// } +// } +// finally +// { +// if (commitTransaction) +// { +// msmqTransaction.Commit(); +// } +// } +// } +// } +// finally +// { +// Monitor.Exit(batchErrorLockObj); +// //Restart Batch mode +// stopping = false; +// Logger.Debug("Ready to BeginPeek again"); +// queuePeeker.BeginPeek(); +// } +// } +// } + +// const int RampUpConcurrencyMagicNumber = 5; //How many batches before we ramp up? +// const int BatchSize = 100; + +// static readonly ILog Logger = LogManager.GetLogger(typeof(MsmqAuditQueueImporter)); + +// readonly IBuilder builder; +// readonly CountDownEvent countDownEvent = new CountDownEvent(); +// readonly bool enabled; +// readonly SatelliteImportFailuresHandler importFailuresHandler; +// readonly object lockObj = new object(); +// readonly object batchErrorLockObj = new object(); +// readonly MsmqAuditImporterPerformanceCounters performanceCounters = new MsmqAuditImporterPerformanceCounters(); +// readonly TimeSpan receiveTimeout = TimeSpan.FromSeconds(1); +// readonly ManualResetEventSlim stopResetEvent = new ManualResetEventSlim(true); +// readonly IDocumentStore store; + +// BatchTaskTracker batchTaskTracker = new BatchTaskTracker(); +// List enrichers; +// MessageQueue queuePeeker; +// MessageQueue serviceControlErrorQueue; + +// volatile bool stopping; + +// class BatchTaskTracker +// { +// List tasks = new List(); - public void Add(Task task) - { - lock (tasks) - { - tasks.Add(task); - } - } - - public void Remove(Task task) - { - lock (tasks) - { - tasks.Remove(task); - } - } - - public Task[] Active() - { - lock (tasks) - { - return tasks.Where(x => !x.IsCompleted).ToArray(); - } - } - } - - class CountDownEvent - { - public int CurrentCount - { - get { return counter; } - } - - public event EventHandler Idle; - - public void Add() - { -#pragma warning disable 420 - Interlocked.Increment(ref counter); -#pragma warning restore 420 - } - - public void Decrement() - { -#pragma warning disable 420 - if (Interlocked.Decrement(ref counter) == 0) -#pragma warning restore 420 - { - Idle(this, EventArgs.Empty); - } - } - - volatile int counter; - } - } -} \ No newline at end of file +// public void Add(Task task) +// { +// lock (tasks) +// { +// tasks.Add(task); +// } +// } + +// public void Remove(Task task) +// { +// lock (tasks) +// { +// tasks.Remove(task); +// } +// } + +// public Task[] Active() +// { +// lock (tasks) +// { +// return tasks.Where(x => !x.IsCompleted).ToArray(); +// } +// } +// } + +// class CountDownEvent +// { +// public int CurrentCount +// { +// get { return counter; } +// } + +// public event EventHandler Idle; + +// public void Add() +// { +//#pragma warning disable 420 +// Interlocked.Increment(ref counter); +//#pragma warning restore 420 +// } + +// public void Decrement() +// { +//#pragma warning disable 420 +// if (Interlocked.Decrement(ref counter) == 0) +//#pragma warning restore 420 +// { +// Idle(this, EventArgs.Empty); +// } +// } + +// volatile int counter; +// } +// } +//} \ No newline at end of file From aa444a35915d470c4041badf9e87bccc67cd83c1 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 15 Sep 2015 10:00:36 +0800 Subject: [PATCH 3/7] Do not put large message bodies into message metadata --- .../Operations/BodyStorage/BodyStorageEnricher.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs b/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs index cb0eff5cd5..abca9a40e0 100644 --- a/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs +++ b/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs @@ -37,14 +37,15 @@ bool TryStoreBody(ImportMessage message, int bodySize, string contentType) var isFailedMessage = message is ImportFailedMessage; var isBinary = contentType.Contains("binary"); var isBelowMaxSize = bodySize <= Settings.MaxBodySizeToStore; + var avoidsLargeObjectHeap = bodySize < LargeObjectHeapThreshold; - if (isFailedMessage || (isBinary && isBelowMaxSize)) + if (isFailedMessage || isBelowMaxSize) { bodyUrl = StoreBodyInBodyStorage(message, bodyId, contentType, bodySize); stored = true; } - if (isBelowMaxSize && !isBinary) + if (isBelowMaxSize && avoidsLargeObjectHeap && !isBinary) { message.Metadata.Add("Body", Encoding.UTF8.GetString(message.PhysicalMessage.Body)); stored = true; @@ -84,5 +85,7 @@ string StoreBodyInBodyStorage(ImportMessage message, string bodyId, string conte return bodyUrl; } } + + static int LargeObjectHeapThreshold = 85*1024; } } \ No newline at end of file From db611300aa50f747cf87db51b9c211756fcffad0 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 15 Sep 2015 11:19:20 +0800 Subject: [PATCH 4/7] Message cannot go into metadata unless it also makes it into body storage --- .../Operations/BodyStorage/BodyStorageEnricher.cs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs b/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs index abca9a40e0..5b50e48137 100644 --- a/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs +++ b/src/ServiceControl/Operations/BodyStorage/BodyStorageEnricher.cs @@ -32,7 +32,7 @@ public override void Enrich(ImportMessage message) bool TryStoreBody(ImportMessage message, int bodySize, string contentType) { var bodyId = message.MessageId; - var stored = false; + var storedInBodyStorage = false; var bodyUrl = string.Format("/messages/{0}/body", bodyId); var isFailedMessage = message is ImportFailedMessage; var isBinary = contentType.Contains("binary"); @@ -42,18 +42,17 @@ bool TryStoreBody(ImportMessage message, int bodySize, string contentType) if (isFailedMessage || isBelowMaxSize) { bodyUrl = StoreBodyInBodyStorage(message, bodyId, contentType, bodySize); - stored = true; + storedInBodyStorage = true; } if (isBelowMaxSize && avoidsLargeObjectHeap && !isBinary) { - message.Metadata.Add("Body", Encoding.UTF8.GetString(message.PhysicalMessage.Body)); - stored = true; + message.Metadata.Add("Body", Encoding.UTF8.GetString(message.PhysicalMessage.Body)); } message.Metadata.Add("BodyUrl", bodyUrl); - return stored; + return storedInBodyStorage; } static int GetContentLength(ImportMessage message) From 12422495309292da63151e98cad62f84f1c5b839 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 15 Sep 2015 13:27:25 +0800 Subject: [PATCH 5/7] Do not add to dictionary directly after object initializer as code analysis on build server thinks this should be done inline --- .../CompositeViews/MessagesViewTests.cs | 7 ++++--- .../Expiration/CustomExpirationBundleTests.cs | 6 +++--- src/ServiceControl.UnitTests/MessageExtensions.cs | 14 ++++++++++++++ .../ServiceControl.UnitTests.csproj | 1 + 4 files changed, 22 insertions(+), 6 deletions(-) create mode 100644 src/ServiceControl.UnitTests/MessageExtensions.cs diff --git a/src/ServiceControl.UnitTests/CompositeViews/MessagesViewTests.cs b/src/ServiceControl.UnitTests/CompositeViews/MessagesViewTests.cs index 4668932d7a..251d1d0829 100644 --- a/src/ServiceControl.UnitTests/CompositeViews/MessagesViewTests.cs +++ b/src/ServiceControl.UnitTests/CompositeViews/MessagesViewTests.cs @@ -24,15 +24,16 @@ public void Filter_out_system_messages() Id = "1", }; - processedMessage.MessageMetadata["IsSystemMessage"] = true; + processedMessage.MakeSystemMessage(); session.Store(processedMessage); + var processedMessage2 = new ProcessedMessage { Id = "2", }; - - processedMessage2.MessageMetadata["IsSystemMessage"] = false; + processedMessage2.MakeSystemMessage(false); session.Store(processedMessage2); + session.SaveChanges(); } diff --git a/src/ServiceControl.UnitTests/Expiration/CustomExpirationBundleTests.cs b/src/ServiceControl.UnitTests/Expiration/CustomExpirationBundleTests.cs index 6fc5d83b6d..d4fa37cc40 100644 --- a/src/ServiceControl.UnitTests/Expiration/CustomExpirationBundleTests.cs +++ b/src/ServiceControl.UnitTests/Expiration/CustomExpirationBundleTests.cs @@ -30,7 +30,7 @@ public void Processed_messages_are_being_expired() Id = "2", ProcessedAt = DateTime.UtcNow.AddHours(-(Settings.HoursToKeepMessagesBeforeExpiring * 2)), }; - processedMessage2.MessageMetadata["IsSystemMessage"] = true; + processedMessage2.MakeSystemMessage(); using (var session = documentStore.OpenSession()) { @@ -117,7 +117,7 @@ public void Only_processed_messages_are_being_expired() Id = "2", ProcessedAt = DateTime.UtcNow, }; - processedMessage2.MessageMetadata["IsSystemMessage"] = true; + processedMessage2.MakeSystemMessage(); using (var session = documentStore.OpenSession()) { @@ -155,7 +155,7 @@ public void Stored_bodies_are_being_removed_when_message_expires() ProcessedAt = DateTime.UtcNow.AddHours(-(Settings.HoursToKeepMessagesBeforeExpiring * 2)) }; - processedMessage.MessageMetadata["MessageId"] = messageId; + processedMessage.SetMessageId(messageId); using (var session = documentStore.OpenSession()) { diff --git a/src/ServiceControl.UnitTests/MessageExtensions.cs b/src/ServiceControl.UnitTests/MessageExtensions.cs new file mode 100644 index 0000000000..ee278f421e --- /dev/null +++ b/src/ServiceControl.UnitTests/MessageExtensions.cs @@ -0,0 +1,14 @@ +using ServiceControl.MessageAuditing; + +public static class MessageExtensions +{ + public static void MakeSystemMessage(this ProcessedMessage message, bool isSystem = true) + { + message.MessageMetadata["IsSystemMessage"] = isSystem; + } + + public static void SetMessageId(this ProcessedMessage message, string messageId) + { + message.MessageMetadata["MessageId"] = messageId; + } +} \ No newline at end of file diff --git a/src/ServiceControl.UnitTests/ServiceControl.UnitTests.csproj b/src/ServiceControl.UnitTests/ServiceControl.UnitTests.csproj index 859d3a93b9..e98d037b82 100644 --- a/src/ServiceControl.UnitTests/ServiceControl.UnitTests.csproj +++ b/src/ServiceControl.UnitTests/ServiceControl.UnitTests.csproj @@ -103,6 +103,7 @@ + From 09735db96c53d6b50f678ab1494622b1f528bdc0 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 15 Sep 2015 15:54:22 +0800 Subject: [PATCH 6/7] Ignore flakey test on build server --- ...hen_endpoints_heartbeats_are_received_in_a_timely_manner.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/HeartbeatMonitoring/When_endpoints_heartbeats_are_received_in_a_timely_manner.cs b/src/ServiceControl.AcceptanceTests/HeartbeatMonitoring/When_endpoints_heartbeats_are_received_in_a_timely_manner.cs index e9e85a51be..6b98e358e7 100644 --- a/src/ServiceControl.AcceptanceTests/HeartbeatMonitoring/When_endpoints_heartbeats_are_received_in_a_timely_manner.cs +++ b/src/ServiceControl.AcceptanceTests/HeartbeatMonitoring/When_endpoints_heartbeats_are_received_in_a_timely_manner.cs @@ -32,6 +32,7 @@ public class MyContext : ScenarioContext } [Test] + [Ignore("Test is flakey on Azure ServiceBus Transport on build server")] public void Should_be_reflected_as_active_endpoints_in_the_heartbeat_summary() { var context = new MyContext(); @@ -75,7 +76,7 @@ public void Should_be_reflected_as_active_endpoints_in_the_heartbeat_summary() { if(otherEndpoint.MonitorHeartbeat) { - Console.WriteLine("Disabling Heartbeats on {0}", otherEndpoint.MonitorHeartbeat); + Console.WriteLine("Disabling Heartbeats on {0}", otherEndpoint.Name); Patch("/api/endpoints/" + otherEndpoint.Id, new EndpointUpdateModel { MonitorHeartbeat = false From 2683a8a88fb9216fe94c0a72754f56b76a7883ae Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Wed, 16 Sep 2015 09:27:36 +0800 Subject: [PATCH 7/7] Remove MSMQ specific batch importer --- .../Operations/Msmq/MsmqAuditQueueImporter.cs | 476 ------------------ src/ServiceControl/ServiceControl.csproj | 1 - 2 files changed, 477 deletions(-) delete mode 100644 src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs diff --git a/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs b/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs deleted file mode 100644 index e23f0cf0f2..0000000000 --- a/src/ServiceControl/Operations/Msmq/MsmqAuditQueueImporter.cs +++ /dev/null @@ -1,476 +0,0 @@ -//namespace ServiceControl.Operations -//{ -// using System; -// using System.Collections.Generic; -// using System.IO; -// using System.Linq; -// using System.Messaging; -// using System.Threading; -// using System.Threading.Tasks; -// using Contracts.Operations; -// using EndpointControl.Handlers; -// using MessageAuditing; -// using NServiceBus; -// using NServiceBus.Logging; -// using NServiceBus.ObjectBuilder; -// using NServiceBus.Transports; -// using NServiceBus.Transports.Msmq; -// using NServiceBus.Unicast; -// using Raven.Abstractions.Data; -// using Raven.Client; -// using ServiceBus.Management.Infrastructure.Settings; - -// class MsmqAuditQueueImporter : IWantToRunWhenBusStartsAndStops -// { -// public MsmqAuditQueueImporter(IDocumentStore store, IBuilder builder, IDequeueMessages receiver) -// { -// this.store = store; -// this.builder = builder; -// //enabled = receiver is MsmqDequeueStrategy; - -// importFailuresHandler = new SatelliteImportFailuresHandler(store, -// Path.Combine(Settings.LogPath, @"FailedImports\Audit"), tm => new FailedAuditImport -// { -// Message = tm, -// }); -// } - -// public IBus Bus { get; set; } -// public KnownEndpointsCache KnownEndpointsCache { get; set; } -// public UnicastBus UnicastBus { get; set; } -// public ISendMessages Forwarder { get; set; } - -// public void Start() -// { -// // Any messages that fail conversion to a transportmessage is sent to the particular.servicecontrol.errors queue using low level Api -// // The actual queue name is based on service name to support mulitple instances on same host (particular.servicecontrol.errors is the default) -// var serviceControlErrorQueueAddress = Address.Parse(string.Format("{0}.errors", Settings.ServiceName)); -// serviceControlErrorQueue = new MessageQueue(MsmqUtilities.GetFullPath(serviceControlErrorQueueAddress), false, true, QueueAccessMode.Send); - -// if (!enabled) -// { -// return; -// } - -// if (Settings.AuditQueue == Address.Undefined) -// { -// Logger.Info("No Audit queue has been configured. No audit import will be performed. To enable imports add the ServiceBus/AuditQueue appsetting and restart ServiceControl"); -// return; -// } - -// if (TerminateIfForwardingIsEnabledButQueueNotWritable()) -// { -// return; -// } - -// performanceCounters.Initialize(); - -// queuePeeker = new MessageQueue(MsmqUtilities.GetFullPath(Settings.AuditQueue), QueueAccessMode.Peek); -// queuePeeker.MessageReadPropertyFilter.ClearAll(); -// queuePeeker.PeekCompleted += QueueOnPeekCompleted; - -// enrichers = builder.BuildAll().ToList(); - -// Logger.InfoFormat("MSMQ Audit import is now started, feeding audit messages from: {0}", Settings.AuditQueue); - -// countDownEvent.Idle += OnIdle; - -// Logger.Debug("Ready to BeginPeek"); -// queuePeeker.BeginPeek(); -// } - -// public void Stop() -// { -// if (!enabled) -// { -// return; -// } - -// stopping = true; - -// queuePeeker.PeekCompleted -= QueueOnPeekCompleted; - -// stopResetEvent.Wait(); - -// performanceCounters.Dispose(); - -// queuePeeker.Dispose(); - -// stopResetEvent.Dispose(); -// } - -// bool TerminateIfForwardingIsEnabledButQueueNotWritable() -// { -// if (Settings.ForwardAuditMessages != true) -// { -// return false; -// } - -// try -// { -// //Send a message to test the forwarding queue -// var testMessage = new TransportMessage(Guid.Empty.ToString("N"), new Dictionary()); -// Forwarder.Send(testMessage, Settings.AuditLogQueue); -// return false; -// } -// catch (Exception messageForwardingException) -// { -// //This call to RaiseCriticalError has to be on a seperate thread otherwise it deadlocks and doesn't stop correctly. -// ThreadPool.QueueUserWorkItem(state => Configure.Instance.RaiseCriticalError("Audit Import cannot start", messageForwardingException)); -// return true; -// } -// } - -// static MessageQueue CreateReceiver() -// { -// var queue = new MessageQueue(MsmqUtilities.GetFullPath(Settings.AuditQueue), QueueAccessMode.Receive); - -// var messageReadPropertyFilter = new MessagePropertyFilter -// { -// Body = true, -// TimeToBeReceived = true, -// Recoverable = true, -// Id = true, -// ResponseQueue = true, -// CorrelationId = true, -// Extension = true, -// AppSpecific = true -// }; - -// queue.MessageReadPropertyFilter = messageReadPropertyFilter; - -// return queue; -// } - -// void OnIdle(object sender, EventArgs eventArgs) -// { -// stopResetEvent.Set(); - -// if (stopping) -// { -// return; -// } - -// Logger.Debug("Ready to BeginPeek again"); -// queuePeeker.BeginPeek(); -// } - -// void QueueOnPeekCompleted(object sender, PeekCompletedEventArgs args) -// { -// stopResetEvent.Reset(); - -// TryStartNewBatchImporter(); -// } - -// bool TryStartNewBatchImporter() -// { -// lock (lockObj) -// { -// if (countDownEvent.CurrentCount > UnicastBus.Transport.MaximumConcurrencyLevel) -// { -// return false; -// } -// countDownEvent.Add(); -// } - -// // If batchErrorLockObj can not be locked it means one of the Tasks has had a batch error, and RetryMessageImportById is running - -// lock (batchErrorLockObj) -// { -// } - -// if (stopping) -// return true; - -// batchTaskTracker.Add(Task.Factory -// .StartNew(BatchImporter, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default) -// .ContinueWith(task => -// { -// if (task.Exception != null) { -// task.Exception.Handle(ex =>{ -// Logger.Error("Error processing message.", ex); -// return true; -// }); -// batchTaskTracker.Remove(task); -// } -// })); -// return true; -// } - -// void BatchImporter() -// { -// String failedMessageID = null; -// try -// { -// Logger.DebugFormat("Batch job started", Task.CurrentId); - -// var moreMessages = 0; - -// using (var queueReceiver = CreateReceiver()) -// { -// do -// { -// if (moreMessages > RampUpConcurrencyMagicNumber) -// { -// if (TryStartNewBatchImporter()) -// { -// Logger.Debug("We have too many messages, starting another batch importer"); -// moreMessages = 0; //Reset to 0 so we only ramp up once per BatchImporter -// } -// } - -// moreMessages++; - -// using (var msmqTransaction = new MessageQueueTransaction()) -// { -// msmqTransaction.Begin(); -// using (var bulkInsert =store.BulkInsert(options:new BulkInsertOptions {CheckForUpdates = true})) -// { -// for (var idx = 0; idx < BatchSize; idx++) -// { -// Message message = null; -// TransportMessage transportMessage; -// try -// { -// message = queueReceiver.Receive(receiveTimeout, msmqTransaction); -// performanceCounters.MessageDequeued(); -// transportMessage = MsmqUtilities.Convert(message); -// } -// catch (MessageQueueException mqe) -// { -// if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) -// { -// moreMessages = 0; -// break; -// } -// throw; -// } -// catch (Exception) -// { -// if (message != null) { -// failedMessageID = message.Id; -// } -// throw; -// } - -// try -// { -// var importSuccessfullyProcessedMessage = new ImportSuccessfullyProcessedMessage(transportMessage); -// foreach (var enricher in enrichers) -// { -// enricher.Enrich(importSuccessfullyProcessedMessage); -// } -// var auditMessage = new ProcessedMessage(importSuccessfullyProcessedMessage); -// bulkInsert.Store(auditMessage); -// performanceCounters.MessageProcessed(); - -// if (Settings.ForwardAuditMessages == true) -// { -// Forwarder.Send(transportMessage, Settings.AuditLogQueue); -// } -// } -// catch (Exception) -// { -// if (message != null) -// { -// failedMessageID = message.Id; -// } -// throw; -// } -// } -// } -// msmqTransaction.Commit(); -// } -// } while (moreMessages > 0 && !stopping); -// } -// Logger.Debug("Stopping batch importer"); -// } -// finally -// { -// if (!String.IsNullOrEmpty(failedMessageID)) -// { -// // Call RetryMessageImportById outside the Task as it checks for running tasks -// ThreadPool.QueueUserWorkItem(state => RetryMessageImportById(failedMessageID)); -// } -// countDownEvent.Decrement(); -// } -// } - -// void RetryMessageImportById(string messageID) -// { -// // Try to get the batchErrorLock, if we can't then exit, -// // the message will trigger a retry next time on the next batch read. -// // Retrymessage may be fired again for the same message until the batches drain so this -// // prevents the message being processed twice, -// if (Monitor.TryEnter(batchErrorLockObj)) -// { -// try -// { -// Logger.DebugFormat("Drain stop running batch importers"); -// stopping = true; -// var runningTasks = batchTaskTracker.Active(); -// Task.WaitAll(runningTasks); - -// var commitTransaction = false; -// using (var queueReceiver = CreateReceiver()) -// using (var msmqTransaction = new MessageQueueTransaction()) -// { -// msmqTransaction.Begin(); -// Logger.DebugFormat("Retry import of messageID - {0}", messageID); -// try -// { -// Message message; -// TransportMessage transportMessage; -// try -// { -// message = queueReceiver.ReceiveById(messageID); -// performanceCounters.MessageDequeued(); -// } -// catch (Exception exception) -// { -// importFailuresHandler.FailedToReceive(exception); //logs and increments circuit breaker -// return; -// } - -// try -// { -// transportMessage = MsmqUtilities.Convert(message); -// } -// catch (Exception convertException) -// { -// importFailuresHandler.FailedToReceive(convertException); //logs and increments circuit breaker -// serviceControlErrorQueue.Send(message, msmqTransaction); // Send unconvertable message to SC's ErrorQueue so it's not lost -// commitTransaction = true; // Can't convert the messsage, so commit to get message out of the queue -// return; -// } - -// try -// { -// var importSuccessfullyProcessedMessage = new ImportSuccessfullyProcessedMessage(transportMessage); -// foreach (var enricher in enrichers) -// { -// enricher.Enrich(importSuccessfullyProcessedMessage); -// } - -// using (var session = store.OpenSession()) -// { -// var auditMessage = new ProcessedMessage(importSuccessfullyProcessedMessage); -// session.Store(auditMessage); -// session.SaveChanges(); -// } -// performanceCounters.MessageProcessed(); - -// if (Settings.ForwardAuditMessages == true) -// { -// Forwarder.Send(transportMessage, Settings.AuditLogQueue); -// } - -// commitTransaction = true; -// } -// catch (Exception importException) -// { -// importFailuresHandler.Log(transportMessage, importException); //Logs and Writes failure transport message to Raven -// } -// } -// finally -// { -// if (commitTransaction) -// { -// msmqTransaction.Commit(); -// } -// } -// } -// } -// finally -// { -// Monitor.Exit(batchErrorLockObj); -// //Restart Batch mode -// stopping = false; -// Logger.Debug("Ready to BeginPeek again"); -// queuePeeker.BeginPeek(); -// } -// } -// } - -// const int RampUpConcurrencyMagicNumber = 5; //How many batches before we ramp up? -// const int BatchSize = 100; - -// static readonly ILog Logger = LogManager.GetLogger(typeof(MsmqAuditQueueImporter)); - -// readonly IBuilder builder; -// readonly CountDownEvent countDownEvent = new CountDownEvent(); -// readonly bool enabled; -// readonly SatelliteImportFailuresHandler importFailuresHandler; -// readonly object lockObj = new object(); -// readonly object batchErrorLockObj = new object(); -// readonly MsmqAuditImporterPerformanceCounters performanceCounters = new MsmqAuditImporterPerformanceCounters(); -// readonly TimeSpan receiveTimeout = TimeSpan.FromSeconds(1); -// readonly ManualResetEventSlim stopResetEvent = new ManualResetEventSlim(true); -// readonly IDocumentStore store; - -// BatchTaskTracker batchTaskTracker = new BatchTaskTracker(); -// List enrichers; -// MessageQueue queuePeeker; -// MessageQueue serviceControlErrorQueue; - -// volatile bool stopping; - -// class BatchTaskTracker -// { -// List tasks = new List(); - -// public void Add(Task task) -// { -// lock (tasks) -// { -// tasks.Add(task); -// } -// } - -// public void Remove(Task task) -// { -// lock (tasks) -// { -// tasks.Remove(task); -// } -// } - -// public Task[] Active() -// { -// lock (tasks) -// { -// return tasks.Where(x => !x.IsCompleted).ToArray(); -// } -// } -// } - -// class CountDownEvent -// { -// public int CurrentCount -// { -// get { return counter; } -// } - -// public event EventHandler Idle; - -// public void Add() -// { -//#pragma warning disable 420 -// Interlocked.Increment(ref counter); -//#pragma warning restore 420 -// } - -// public void Decrement() -// { -//#pragma warning disable 420 -// if (Interlocked.Decrement(ref counter) == 0) -//#pragma warning restore 420 -// { -// Idle(this, EventArgs.Empty); -// } -// } - -// volatile int counter; -// } -// } -//} \ No newline at end of file diff --git a/src/ServiceControl/ServiceControl.csproj b/src/ServiceControl/ServiceControl.csproj index 21a9fe279f..69a1e03321 100644 --- a/src/ServiceControl/ServiceControl.csproj +++ b/src/ServiceControl/ServiceControl.csproj @@ -436,7 +436,6 @@ -