diff --git a/src/ServiceControl.AcceptanceTesting/Support/ScenarioRunner.cs b/src/ServiceControl.AcceptanceTesting/Support/ScenarioRunner.cs index a9d1fd16d8..605c74145a 100644 --- a/src/ServiceControl.AcceptanceTesting/Support/ScenarioRunner.cs +++ b/src/ServiceControl.AcceptanceTesting/Support/ScenarioRunner.cs @@ -253,7 +253,7 @@ static void PerformScenarios(RunDescriptor runDescriptor, IEnumerable Task.Factory.StartNew(() => SpinWait.SpinUntil(done, maxTime))).Cast().ToArray(), maxTime); + Task.WaitAll(Task.Run(() => SpinWait.SpinUntil(done, maxTime))); try { diff --git a/src/ServiceControl.AcceptanceTests/Contexts/TransportIntegration/MsmqTransportIntegration.cs b/src/ServiceControl.AcceptanceTests/Contexts/TransportIntegration/MsmqTransportIntegration.cs index e13625bbfb..f01540c6c0 100644 --- a/src/ServiceControl.AcceptanceTests/Contexts/TransportIntegration/MsmqTransportIntegration.cs +++ b/src/ServiceControl.AcceptanceTests/Contexts/TransportIntegration/MsmqTransportIntegration.cs @@ -41,6 +41,7 @@ static void DeleteQueues(string name) if (messageQueue.QueueName.StartsWith(nameFilter, StringComparison.OrdinalIgnoreCase)) { queuesToBeDeleted.Add(messageQueue.Path); + Console.WriteLine("Deleted '{0}' queue", messageQueue.Path); } } } diff --git a/src/ServiceControl/Bootstrapper.cs b/src/ServiceControl/Bootstrapper.cs index ad6178cf09..dba62be4bd 100644 --- a/src/ServiceControl/Bootstrapper.cs +++ b/src/ServiceControl/Bootstrapper.cs @@ -66,6 +66,7 @@ public Bootstrapper(ServiceBase host = null, HostArguments hostArguments = null, configuration.DisableFeature(); configuration.DisableFeature(); configuration.DisableFeature(); + configuration.DisableFeature(); configuration.UseSerialization(); diff --git a/src/ServiceControl/HeartbeatMonitoring/SaveHeartbeatHandler.cs b/src/ServiceControl/HeartbeatMonitoring/SaveHeartbeatHandler.cs index 521b40f304..37d9501735 100644 --- a/src/ServiceControl/HeartbeatMonitoring/SaveHeartbeatHandler.cs +++ b/src/ServiceControl/HeartbeatMonitoring/SaveHeartbeatHandler.cs @@ -1,19 +1,28 @@ namespace ServiceControl.HeartbeatMonitoring { using System; - using Contracts.HeartbeatMonitoring; + using System.Linq; using Contracts.Operations; using Infrastructure; using NServiceBus; - using NServiceBus.Logging; using Plugin.Heartbeat.Messages; + using Raven.Abstractions.Data; using Raven.Client; + using Raven.Json.Linq; + using ServiceControl.Contracts.HeartbeatMonitoring; class SaveHeartbeatHandler : IHandleMessages { - public IDocumentSession Session { get; set; } - public IBus Bus { get; set; } - public HeartbeatStatusProvider HeartbeatStatusProvider { get; set; } + private readonly IBus bus; + private readonly HeartbeatStatusProvider statusProvider; + private readonly IDocumentStore store; + + public SaveHeartbeatHandler(IBus bus, HeartbeatStatusProvider statusProvider, IDocumentStore store) + { + this.bus = bus; + this.statusProvider = statusProvider; + this.store = store; + } public void Handle(EndpointHeartbeat message) { @@ -31,69 +40,95 @@ public void Handle(EndpointHeartbeat message) { throw new Exception("Received an EndpointHeartbeat message without proper initialization of the HostId in the schema"); } - + var id = DeterministicGuid.MakeId(message.EndpointName, message.HostId.ToString()); + var key = store.Conventions.DefaultFindFullDocumentKeyFromNonStringIdentifier(id, typeof(Heartbeat), false); - var heartbeat = Session.Load(id); - - if (heartbeat != null) + var endpointDetails = new EndpointDetails { - if (heartbeat.Disabled) - { - return; - } - } + HostId = message.HostId, + Host = message.Host, + Name = message.EndpointName + }; - var isNew = false; + var patchResult = store.DatabaseCommands.Patch(key, new ScriptedPatchRequest + { + Script = @" +if(new Date(lastReported) <= new Date(this.LastReportAt)) { + return; +} - if (heartbeat == null) +if(this.ReportedStatus === deadStatus) { + output('wasDead'); +} +this.LastReportAt = lastReported; +this.ReportedStatus = reportedStatus; +", + Values = + { + {"lastReported", message.ExecutedAt}, + {"reportedStatus", (int) Status.Beating}, + {"deadStatus", (int) Status.Dead}, + } + }, new ScriptedPatchRequest { - isNew = true; - heartbeat = new Heartbeat + Script = @" +this.LastReportAt = lastReported; +this.ReportedStatus = reportedStatus; +this.EndpointDetails = { + 'Host': endpointDetails_Host, + 'HostId': endpointDetails_HostId, + 'Name': endpointDetails_Name +}; +this.Disabled = false; +output('isNew'); +", + Values = { - Id = id, - ReportedStatus = Status.Beating - }; - Session.Store(heartbeat); - } + {"lastReported", message.ExecutedAt}, + {"reportedStatus", (int) Status.Beating}, + {"endpointDetails_Host", endpointDetails.Host}, + {"endpointDetails_HostId", endpointDetails.HostId.ToString()}, + {"endpointDetails_Name", endpointDetails.Name} + } + }, RavenJObject.Parse(String.Format(@" + {{ + ""Raven-Entity-Name"": ""{0}"", + ""Raven-Clr-Type"": ""{1}"" + }}", + store.Conventions.GetTypeTagName(typeof(Heartbeat)), + typeof(Heartbeat).AssemblyQualifiedName))); - if (message.ExecutedAt <= heartbeat.LastReportAt) - { - Logger.InfoFormat("Out of sync heartbeat received for endpoint {0}", message.EndpointName); - return; - } + var debugStatements = patchResult.Value("Debug"); + var ravenJToken = debugStatements.SingleOrDefault(); + bool isNew = false, wasDead = false; - heartbeat.LastReportAt = message.ExecutedAt; - heartbeat.EndpointDetails = new EndpointDetails + if (ravenJToken != null) { - HostId = message.HostId, - Host = message.Host, - Name = message.EndpointName - }; + var result = ravenJToken.Value(); + isNew = result == "isNew"; + wasDead = result == "wasDead"; + } if (isNew) // New endpoint heartbeat { - Bus.Publish(new HeartbeatingEndpointDetected + bus.Publish(new HeartbeatingEndpointDetected { - Endpoint = heartbeat.EndpointDetails, - DetectedAt = heartbeat.LastReportAt, + Endpoint = endpointDetails, + DetectedAt = message.ExecutedAt }); } - - if (heartbeat.ReportedStatus == Status.Dead) + else if (wasDead) { - heartbeat.ReportedStatus = Status.Beating; - Bus.Publish(new EndpointHeartbeatRestored + bus.Publish(new EndpointHeartbeatRestored { - Endpoint = heartbeat.EndpointDetails, - RestoredAt = heartbeat.LastReportAt + Endpoint = endpointDetails, + RestoredAt = message.ExecutedAt }); } - HeartbeatStatusProvider.RegisterHeartbeatingEndpoint(heartbeat.EndpointDetails, heartbeat.LastReportAt); + statusProvider.RegisterHeartbeatingEndpoint(endpointDetails, message.ExecutedAt); } - - static readonly ILog Logger = LogManager.GetLogger(typeof(SaveHeartbeatHandler)); } } \ No newline at end of file diff --git a/src/ServiceControl/Infrastructure/OWIN/ApiLogger.cs b/src/ServiceControl/Infrastructure/OWIN/ApiLogger.cs new file mode 100644 index 0000000000..bc6205bfe4 --- /dev/null +++ b/src/ServiceControl/Infrastructure/OWIN/ApiLogger.cs @@ -0,0 +1,22 @@ +namespace ServiceControl.Infrastructure.OWIN +{ + using System.Threading.Tasks; + using Microsoft.Owin; + using NServiceBus.Logging; + + class LogApiCalls : OwinMiddleware + { + public LogApiCalls(OwinMiddleware next) : base(next) { } + + public override async Task Invoke(IOwinContext context) + { + log.DebugFormat("Begin {0}: {1}", context.Request.Method, context.Request.Uri.ToString()); + + await Next.Invoke(context); + + log.DebugFormat("End {0}: {1}", context.Request.Method, context.Request.Uri.ToString()); + } + + static ILog log = LogManager.GetLogger(); + } +} diff --git a/src/ServiceControl/Infrastructure/OWIN/Startup.cs b/src/ServiceControl/Infrastructure/OWIN/Startup.cs index b166969967..78f76b9060 100644 --- a/src/ServiceControl/Infrastructure/OWIN/Startup.cs +++ b/src/ServiceControl/Infrastructure/OWIN/Startup.cs @@ -11,6 +11,7 @@ using ServiceControl.Infrastructure.SignalR; using Autofac; using Microsoft.AspNet.SignalR.Json; + using ServiceControl.Infrastructure.OWIN; using JsonNetSerializer = Microsoft.AspNet.SignalR.Json.JsonNetSerializer; public class Startup @@ -30,6 +31,8 @@ public void Configuration(IAppBuilder app) return func(); }); + app.Use(); + ConfigureSignalR(app); app.UseNancy(new NancyOptions { Bootstrapper = new NServiceBusContainerBootstrapper() }); } diff --git a/src/ServiceControl/Infrastructure/TimeKeeper.cs b/src/ServiceControl/Infrastructure/TimeKeeper.cs index 059fe1a9cd..ce5562ad57 100644 --- a/src/ServiceControl/Infrastructure/TimeKeeper.cs +++ b/src/ServiceControl/Infrastructure/TimeKeeper.cs @@ -10,21 +10,23 @@ public class TimeKeeper : IDisposable ConcurrentDictionary timers = new ConcurrentDictionary(); private ILog log = LogManager.GetLogger(); - public Timer New(Action callback, TimeSpan dueTime, TimeSpan period) + public Timer NewTimer(Func callback, TimeSpan dueTime, TimeSpan period) { Timer timer = null; timer = new Timer(_ => { + var reschedule = false; + try { - callback(); + reschedule = callback(); } catch (Exception ex) { log.Error("Reoccurring timer task failed.", ex); } - if (timers.ContainsKey(timer)) + if (reschedule && timers.ContainsKey(timer)) { try { @@ -41,6 +43,15 @@ public Timer New(Action callback, TimeSpan dueTime, TimeSpan period) return timer; } + public Timer New(Action callback, TimeSpan dueTime, TimeSpan period) + { + return NewTimer(() => + { + callback(); + return true; + }, dueTime, period); + } + public void Release(Timer timer) { object _; diff --git a/src/ServiceControl/Operations/SatelliteImportFailuresHandler.cs b/src/ServiceControl/Operations/SatelliteImportFailuresHandler.cs index c125f00288..3acee158e2 100644 --- a/src/ServiceControl/Operations/SatelliteImportFailuresHandler.cs +++ b/src/ServiceControl/Operations/SatelliteImportFailuresHandler.cs @@ -36,31 +36,10 @@ public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception Handle(e, messageBuilder(message), logPath); } - public void FailedToReceive(Exception exception) - { - try - { - var id = Guid.NewGuid(); - - var filePath = Path.Combine(logPath, id + ".txt"); - File.WriteAllText(filePath, exception.ToFriendlyString()); - WriteEvent("A message import has failed. A log file has been written to " + filePath); - } - finally - { - failureCircuitBreaker.Increment(exception); - } - } - public void Init(Address address) { } - public void Log(TransportMessage message, Exception e) - { - DoLogging(e, messageBuilder(message), logPath); - } - void Handle(Exception exception, dynamic failure, string logDirectory) { try diff --git a/src/ServiceControl/Recoverability/Grouping/Archiving/ArchiveAllInGroupHandler.cs b/src/ServiceControl/Recoverability/Grouping/Archiving/ArchiveAllInGroupHandler.cs index ff918071cc..e39c865ed2 100644 --- a/src/ServiceControl/Recoverability/Grouping/Archiving/ArchiveAllInGroupHandler.cs +++ b/src/ServiceControl/Recoverability/Grouping/Archiving/ArchiveAllInGroupHandler.cs @@ -6,18 +6,10 @@ namespace ServiceControl.Recoverability using Raven.Abstractions.Data; using Raven.Abstractions.Extensions; using Raven.Client; - using ServiceControl.Infrastructure; using ServiceControl.MessageFailures; public class ArchiveAllInGroupHandler : IHandleMessages { - private bool abort; - - public ArchiveAllInGroupHandler(ShutdownNotifier notifier) - { - notifier.Register(() => { abort = true; }); - } - public void Handle(ArchiveAllInGroup message) { var result = Session.Advanced.DocumentStore.DatabaseCommands.UpdateByIndex( diff --git a/src/ServiceControl/Recoverability/Grouping/Retries/RetryAllInGroupHandler.cs b/src/ServiceControl/Recoverability/Grouping/Retries/RetryAllInGroupHandler.cs index ba07dea450..618c49a780 100644 --- a/src/ServiceControl/Recoverability/Grouping/Retries/RetryAllInGroupHandler.cs +++ b/src/ServiceControl/Recoverability/Grouping/Retries/RetryAllInGroupHandler.cs @@ -2,6 +2,7 @@ namespace ServiceControl.Recoverability { using System.Linq; using NServiceBus; + using NServiceBus.Logging; using Raven.Client; public class RetryAllInGroupHandler : IHandleMessages @@ -10,6 +11,7 @@ public void Handle(RetryAllInGroup message) { if (Retries == null) { + log.WarnFormat("Attempt to retry a group ({0}) when retries are disabled", message.GroupId); return; } @@ -28,5 +30,7 @@ public void Handle(RetryAllInGroup message) public RetriesGateway Retries { get; set; } public IDocumentSession Session { get; set; } + + static ILog log = LogManager.GetLogger(typeof(RetryAllInGroupHandler)); } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retries/FailedMessageRetries.cs b/src/ServiceControl/Recoverability/Retries/FailedMessageRetries.cs index 0d6057ec20..5c738dba96 100644 --- a/src/ServiceControl/Recoverability/Retries/FailedMessageRetries.cs +++ b/src/ServiceControl/Recoverability/Retries/FailedMessageRetries.cs @@ -76,20 +76,23 @@ public AdoptOrphanBatchesFromPreviousSession(RetryDocumentManager retryDocumentM this.timeKeeper = timeKeeper; } - private void AdoptOrphanedBatches() + private bool AdoptOrphanedBatches() { - var allDone = retryDocumentManager.AdoptOrphanedBatches(); + bool hasMoreWorkToDo; + retryDocumentManager.AdoptOrphanedBatches(out hasMoreWorkToDo); - if (allDone) + if (!hasMoreWorkToDo) { //Disable timeout timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } + + return hasMoreWorkToDo; } protected override void OnStart() { - timer = timeKeeper.New(AdoptOrphanedBatches, TimeSpan.Zero, TimeSpan.FromMinutes(2)); + timer = timeKeeper.NewTimer(AdoptOrphanedBatches, TimeSpan.Zero, TimeSpan.FromMinutes(2)); } protected override void OnStop() diff --git a/src/ServiceControl/Recoverability/Retries/RetriesGateway.cs b/src/ServiceControl/Recoverability/Retries/RetriesGateway.cs index dc409e8b1a..724fe4c09d 100644 --- a/src/ServiceControl/Recoverability/Retries/RetriesGateway.cs +++ b/src/ServiceControl/Recoverability/Retries/RetriesGateway.cs @@ -6,6 +6,7 @@ namespace ServiceControl.Recoverability using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; + using NServiceBus.Logging; using Raven.Abstractions.Data; using Raven.Client; using Raven.Client.Indexes; @@ -94,6 +95,8 @@ public void StartRetryForIndex(Expression> filt where TIndex : AbstractIndexCreationTask, new() where TType : IHaveStatus { + log.InfoFormat("Enqueuing index based bulk retry `{0}`", context); + var request = new IndexBasedBulkRetryRequest(context, filter); _bulkRequests.Enqueue(request); @@ -103,17 +106,21 @@ public void StageRetryByUniqueMessageIds(string[] messageIds, string context = n { if (messageIds == null || !messageIds.Any()) { + log.DebugFormat("Context `{0}` contains no messages", context); return; } var batchDocumentId = RetryDocumentManager.CreateBatchDocument(context); + log.InfoFormat("Created Batch {0} with {1} messages for context `{2}`", batchDocumentId, messageIds.Length, context); + var retryIds = new ConcurrentSet(); Parallel.ForEach( messageIds, id => retryIds.Add(RetryDocumentManager.CreateFailedMessageRetryDocument(batchDocumentId, id))); RetryDocumentManager.MoveBatchToStaging(batchDocumentId, retryIds.ToArray()); + log.InfoFormat("Moved Batch {0} to Staging", batchDocumentId); } internal bool ProcessNextBulkRetry() @@ -137,5 +144,7 @@ void ProcessRequest(IBulkRetryRequest request) StageRetryByUniqueMessageIds(batches[i], request.GetBatchName(i + 1, batches.Count)); } } + + static ILog log = LogManager.GetLogger(typeof(RetriesGateway)); } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retries/RetryDocumentManager.cs b/src/ServiceControl/Recoverability/Retries/RetryDocumentManager.cs index c55bb2272d..eeed9bd1f8 100644 --- a/src/ServiceControl/Recoverability/Retries/RetryDocumentManager.cs +++ b/src/ServiceControl/Recoverability/Retries/RetryDocumentManager.cs @@ -5,6 +5,7 @@ namespace ServiceControl.Recoverability using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; + using NServiceBus.Logging; using Raven.Abstractions.Data; using Raven.Abstractions.Exceptions; using Raven.Client; @@ -97,7 +98,7 @@ public void MoveBatchToStaging(string batchDocumentId, string[] failedMessageRet } catch (ConcurrencyException) { - // Ignore concurrency exceptions + log.DebugFormat("Ignoring concurrency exception while moving batch to staging {0}", batchDocumentId); } } @@ -106,7 +107,7 @@ public void RemoveFailedMessageRetryDocument(string uniqueMessageId) Store.DatabaseCommands.Delete(FailedMessageRetry.MakeDocumentId(uniqueMessageId), null); } - internal bool AdoptOrphanedBatches() + internal void AdoptOrphanedBatches(out bool hasMoreWorkToDo) { using (var session = Store.OpenSession()) { @@ -118,10 +119,17 @@ internal bool AdoptOrphanedBatches() .Select(b => b.Id) .ToArray(); + log.InfoFormat("Found {0} orphaned retry batches from previous sessions", orphanedBatchIds.Length); + AdoptBatches(session, orphanedBatchIds); - var moreToDo = stats.IsStale || orphanedBatchIds.Any(); - return abort || !moreToDo; + if (abort) + { + hasMoreWorkToDo = false; + return; + } + + hasMoreWorkToDo = stats.IsStale || orphanedBatchIds.Any(); } } @@ -147,8 +155,11 @@ void AdoptBatch(IDocumentSession session, string batchId) if (!abort) { + log.InfoFormat("Adopting retry batch {0} from previous session with {1} messages", batchId, messageIds.Count); MoveBatchToStaging(batchId, messageIds.ToArray()); } } + + static ILog log = LogManager.GetLogger(typeof(RetryDocumentManager)); } } \ No newline at end of file diff --git a/src/ServiceControl/ServiceControl.csproj b/src/ServiceControl/ServiceControl.csproj index d960ef49d3..3f893bbec9 100644 --- a/src/ServiceControl/ServiceControl.csproj +++ b/src/ServiceControl/ServiceControl.csproj @@ -305,6 +305,7 @@ +