Skip to content

Commit

Permalink
Merge branch 'release-1.11'
Browse files Browse the repository at this point in the history
  • Loading branch information
John Simons committed Feb 29, 2016
2 parents fe32196 + 690652b commit fde133e
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ static void PerformScenarios(RunDescriptor runDescriptor, IEnumerable<ActiveRunn
var startTime = DateTime.UtcNow;
var maxTime = runDescriptor.TestExecutionTimeout;

Task.WaitAll(endpoints.Select(endpoint => Task.Factory.StartNew(() => SpinWait.SpinUntil(done, maxTime))).Cast<Task>().ToArray(), maxTime);
Task.WaitAll(Task.Run(() => SpinWait.SpinUntil(done, maxTime)));

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static void DeleteQueues(string name)
if (messageQueue.QueueName.StartsWith(nameFilter, StringComparison.OrdinalIgnoreCase))
{
queuesToBeDeleted.Add(messageQueue.Path);
Console.WriteLine("Deleted '{0}' queue", messageQueue.Path);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/ServiceControl/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public Bootstrapper(ServiceBase host = null, HostArguments hostArguments = null,
configuration.DisableFeature<Audit>();
configuration.DisableFeature<AutoSubscribe>();
configuration.DisableFeature<SecondLevelRetries>();
configuration.DisableFeature<TimeoutManager>();

configuration.UseSerialization<JsonSerializer>();

Expand Down
127 changes: 81 additions & 46 deletions src/ServiceControl/HeartbeatMonitoring/SaveHeartbeatHandler.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
namespace ServiceControl.HeartbeatMonitoring
{
using System;
using Contracts.HeartbeatMonitoring;
using System.Linq;
using Contracts.Operations;
using Infrastructure;
using NServiceBus;
using NServiceBus.Logging;
using Plugin.Heartbeat.Messages;
using Raven.Abstractions.Data;
using Raven.Client;
using Raven.Json.Linq;
using ServiceControl.Contracts.HeartbeatMonitoring;

class SaveHeartbeatHandler : IHandleMessages<EndpointHeartbeat>
{
public IDocumentSession Session { get; set; }
public IBus Bus { get; set; }
public HeartbeatStatusProvider HeartbeatStatusProvider { get; set; }
private readonly IBus bus;
private readonly HeartbeatStatusProvider statusProvider;
private readonly IDocumentStore store;

public SaveHeartbeatHandler(IBus bus, HeartbeatStatusProvider statusProvider, IDocumentStore store)
{
this.bus = bus;
this.statusProvider = statusProvider;
this.store = store;
}

public void Handle(EndpointHeartbeat message)
{
Expand All @@ -31,69 +40,95 @@ public void Handle(EndpointHeartbeat message)
{
throw new Exception("Received an EndpointHeartbeat message without proper initialization of the HostId in the schema");
}


var id = DeterministicGuid.MakeId(message.EndpointName, message.HostId.ToString());
var key = store.Conventions.DefaultFindFullDocumentKeyFromNonStringIdentifier(id, typeof(Heartbeat), false);

var heartbeat = Session.Load<Heartbeat>(id);

if (heartbeat != null)
var endpointDetails = new EndpointDetails
{
if (heartbeat.Disabled)
{
return;
}
}
HostId = message.HostId,
Host = message.Host,
Name = message.EndpointName
};

var isNew = false;
var patchResult = store.DatabaseCommands.Patch(key, new ScriptedPatchRequest
{
Script = @"
if(new Date(lastReported) <= new Date(this.LastReportAt)) {
return;
}
if (heartbeat == null)
if(this.ReportedStatus === deadStatus) {
output('wasDead');
}
this.LastReportAt = lastReported;
this.ReportedStatus = reportedStatus;
",
Values =
{
{"lastReported", message.ExecutedAt},
{"reportedStatus", (int) Status.Beating},
{"deadStatus", (int) Status.Dead},
}
}, new ScriptedPatchRequest
{
isNew = true;
heartbeat = new Heartbeat
Script = @"
this.LastReportAt = lastReported;
this.ReportedStatus = reportedStatus;
this.EndpointDetails = {
'Host': endpointDetails_Host,
'HostId': endpointDetails_HostId,
'Name': endpointDetails_Name
};
this.Disabled = false;
output('isNew');
",
Values =
{
Id = id,
ReportedStatus = Status.Beating
};
Session.Store(heartbeat);
}
{"lastReported", message.ExecutedAt},
{"reportedStatus", (int) Status.Beating},
{"endpointDetails_Host", endpointDetails.Host},
{"endpointDetails_HostId", endpointDetails.HostId.ToString()},
{"endpointDetails_Name", endpointDetails.Name}
}
}, RavenJObject.Parse(String.Format(@"
{{
""Raven-Entity-Name"": ""{0}"",
""Raven-Clr-Type"": ""{1}""
}}",
store.Conventions.GetTypeTagName(typeof(Heartbeat)),
typeof(Heartbeat).AssemblyQualifiedName)));

if (message.ExecutedAt <= heartbeat.LastReportAt)
{
Logger.InfoFormat("Out of sync heartbeat received for endpoint {0}", message.EndpointName);
return;
}
var debugStatements = patchResult.Value<RavenJArray>("Debug");
var ravenJToken = debugStatements.SingleOrDefault();
bool isNew = false, wasDead = false;

heartbeat.LastReportAt = message.ExecutedAt;
heartbeat.EndpointDetails = new EndpointDetails
if (ravenJToken != null)
{
HostId = message.HostId,
Host = message.Host,
Name = message.EndpointName
};
var result = ravenJToken.Value<string>();
isNew = result == "isNew";
wasDead = result == "wasDead";
}

if (isNew) // New endpoint heartbeat
{
Bus.Publish(new HeartbeatingEndpointDetected
bus.Publish(new HeartbeatingEndpointDetected
{
Endpoint = heartbeat.EndpointDetails,
DetectedAt = heartbeat.LastReportAt,
Endpoint = endpointDetails,
DetectedAt = message.ExecutedAt
});
}

if (heartbeat.ReportedStatus == Status.Dead)
else if (wasDead)
{
heartbeat.ReportedStatus = Status.Beating;
Bus.Publish(new EndpointHeartbeatRestored
bus.Publish(new EndpointHeartbeatRestored
{
Endpoint = heartbeat.EndpointDetails,
RestoredAt = heartbeat.LastReportAt
Endpoint = endpointDetails,
RestoredAt = message.ExecutedAt
});
}

HeartbeatStatusProvider.RegisterHeartbeatingEndpoint(heartbeat.EndpointDetails, heartbeat.LastReportAt);
statusProvider.RegisterHeartbeatingEndpoint(endpointDetails, message.ExecutedAt);
}

static readonly ILog Logger = LogManager.GetLogger(typeof(SaveHeartbeatHandler));
}
}
22 changes: 22 additions & 0 deletions src/ServiceControl/Infrastructure/OWIN/ApiLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ServiceControl.Infrastructure.OWIN
{
using System.Threading.Tasks;
using Microsoft.Owin;
using NServiceBus.Logging;

class LogApiCalls : OwinMiddleware
{
public LogApiCalls(OwinMiddleware next) : base(next) { }

public override async Task Invoke(IOwinContext context)
{
log.DebugFormat("Begin {0}: {1}", context.Request.Method, context.Request.Uri.ToString());

await Next.Invoke(context);

log.DebugFormat("End {0}: {1}", context.Request.Method, context.Request.Uri.ToString());
}

static ILog log = LogManager.GetLogger<LogApiCalls>();
}
}
3 changes: 3 additions & 0 deletions src/ServiceControl/Infrastructure/OWIN/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using ServiceControl.Infrastructure.SignalR;
using Autofac;
using Microsoft.AspNet.SignalR.Json;
using ServiceControl.Infrastructure.OWIN;
using JsonNetSerializer = Microsoft.AspNet.SignalR.Json.JsonNetSerializer;

public class Startup
Expand All @@ -30,6 +31,8 @@ public void Configuration(IAppBuilder app)
return func();
});

app.Use<LogApiCalls>();

ConfigureSignalR(app);
app.UseNancy(new NancyOptions { Bootstrapper = new NServiceBusContainerBootstrapper() });
}
Expand Down
17 changes: 14 additions & 3 deletions src/ServiceControl/Infrastructure/TimeKeeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ public class TimeKeeper : IDisposable
ConcurrentDictionary<Timer, object> timers = new ConcurrentDictionary<Timer, object>();
private ILog log = LogManager.GetLogger<TimeKeeper>();

public Timer New(Action callback, TimeSpan dueTime, TimeSpan period)
public Timer NewTimer(Func<bool> callback, TimeSpan dueTime, TimeSpan period)
{
Timer timer = null;

timer = new Timer(_ =>
{
var reschedule = false;

try
{
callback();
reschedule = callback();
}
catch (Exception ex)
{
log.Error("Reoccurring timer task failed.", ex);
}
if (timers.ContainsKey(timer))
if (reschedule && timers.ContainsKey(timer))
{
try
{
Expand All @@ -41,6 +43,15 @@ public Timer New(Action callback, TimeSpan dueTime, TimeSpan period)
return timer;
}

public Timer New(Action callback, TimeSpan dueTime, TimeSpan period)
{
return NewTimer(() =>
{
callback();
return true;
}, dueTime, period);
}

public void Release(Timer timer)
{
object _;
Expand Down
21 changes: 0 additions & 21 deletions src/ServiceControl/Operations/SatelliteImportFailuresHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,10 @@ public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception
Handle(e, messageBuilder(message), logPath);
}

public void FailedToReceive(Exception exception)
{
try
{
var id = Guid.NewGuid();

var filePath = Path.Combine(logPath, id + ".txt");
File.WriteAllText(filePath, exception.ToFriendlyString());
WriteEvent("A message import has failed. A log file has been written to " + filePath);
}
finally
{
failureCircuitBreaker.Increment(exception);
}
}

public void Init(Address address)
{
}

public void Log(TransportMessage message, Exception e)
{
DoLogging(e, messageBuilder(message), logPath);
}

void Handle(Exception exception, dynamic failure, string logDirectory)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,10 @@ namespace ServiceControl.Recoverability
using Raven.Abstractions.Data;
using Raven.Abstractions.Extensions;
using Raven.Client;
using ServiceControl.Infrastructure;
using ServiceControl.MessageFailures;

public class ArchiveAllInGroupHandler : IHandleMessages<ArchiveAllInGroup>
{
private bool abort;

public ArchiveAllInGroupHandler(ShutdownNotifier notifier)
{
notifier.Register(() => { abort = true; });
}

public void Handle(ArchiveAllInGroup message)
{
var result = Session.Advanced.DocumentStore.DatabaseCommands.UpdateByIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace ServiceControl.Recoverability
{
using System.Linq;
using NServiceBus;
using NServiceBus.Logging;
using Raven.Client;

public class RetryAllInGroupHandler : IHandleMessages<RetryAllInGroup>
Expand All @@ -10,6 +11,7 @@ public void Handle(RetryAllInGroup message)
{
if (Retries == null)
{
log.WarnFormat("Attempt to retry a group ({0}) when retries are disabled", message.GroupId);
return;
}

Expand All @@ -28,5 +30,7 @@ public void Handle(RetryAllInGroup message)

public RetriesGateway Retries { get; set; }
public IDocumentSession Session { get; set; }

static ILog log = LogManager.GetLogger(typeof(RetryAllInGroupHandler));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,23 @@ public AdoptOrphanBatchesFromPreviousSession(RetryDocumentManager retryDocumentM
this.timeKeeper = timeKeeper;
}

private void AdoptOrphanedBatches()
private bool AdoptOrphanedBatches()
{
var allDone = retryDocumentManager.AdoptOrphanedBatches();
bool hasMoreWorkToDo;
retryDocumentManager.AdoptOrphanedBatches(out hasMoreWorkToDo);

if (allDone)
if (!hasMoreWorkToDo)
{
//Disable timeout
timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}

return hasMoreWorkToDo;
}

protected override void OnStart()
{
timer = timeKeeper.New(AdoptOrphanedBatches, TimeSpan.Zero, TimeSpan.FromMinutes(2));
timer = timeKeeper.NewTimer(AdoptOrphanedBatches, TimeSpan.Zero, TimeSpan.FromMinutes(2));
}

protected override void OnStop()
Expand Down
Loading

0 comments on commit fde133e

Please sign in to comment.