Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore fsync behavior in FSDirectory via P/Invoke, #933 #938

Merged
merged 17 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 87 additions & 63 deletions src/Lucene.Net.Tests/Index/TestIndexWriterOnJRECrash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
using NUnit.Framework;
using RandomizedTesting.Generators;
using System;
using System.Data;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using System.Threading;
using BaseDirectoryWrapper = Lucene.Net.Store.BaseDirectoryWrapper;
using Assert = Lucene.Net.TestFramework.Assert;
using Console = Lucene.Net.Util.SystemConsole;

namespace Lucene.Net.Index
Expand Down Expand Up @@ -42,7 +43,7 @@ namespace Lucene.Net.Index
[TestFixture]
public class TestIndexWriterOnJRECrash : TestNRTThreads
{
// LUCENENET: Setup unnecessary because we create a new temp directory
// LUCENENET: Setup of directory unnecessary because we create a new temp directory
// in each iteration of the test.

[Test]
Expand All @@ -51,7 +52,7 @@ public class TestIndexWriterOnJRECrash : TestNRTThreads
public override void TestNRTThreads_Mem()
{
//if we are not the fork
if (SystemProperties.GetProperty("tests:crashmode") is null)
if (!SystemProperties.GetPropertyAsBoolean("tests:crashmode", false))
{
// try up to 10 times to create an index
for (int i = 0; i < 10; i++)
Expand All @@ -63,34 +64,45 @@ public override void TestNRTThreads_Mem()
// lexicographical order rather than checking the one we create in the current iteration.
DirectoryInfo tempDir = CreateTempDir("netcrash");

FileInfo tempProcessToKillFile = CreateTempFile(prefix: "netcrash-processToKill", suffix: ".txt");
tempProcessToKillFile.Delete(); // We use the creation of this file as a signal to parse it.
// Set up a TCP listener to receive the process ID
TcpListener listener = SetupSocketListener();
Process p = null;
try
{
// Get the port that we picked at random.
int port = ((IPEndPoint)listener.LocalEndpoint).Port;

// Note this is the vstest.console process we are tracking here.
Process p = ForkTest(tempDir.FullName, tempProcessToKillFile.FullName);
// Note this is the vstest.console process we are tracking here.
p = ForkTest(tempDir.FullName, port);

TextWriter childOut = BeginOutput(p, out ThreadJob stdOutPumper, out ThreadJob stdErrPumper);
TextWriter childOut = BeginOutput(p, out ThreadJob stdOutPumper, out ThreadJob stdErrPumper);

// LUCENENET: Note that ForkTest() creates the vstest.console.exe process.
// This spawns testhost.exe, which runs our test. We wait until
// the process starts and logs its own Id so we know who to kill later.
int processIdToKill = WaitForProcessToKillLogFile(tempProcessToKillFile.FullName);
// LUCENENET: Note that ForkTest() creates the vstest.console.exe process.
// This spawns testhost.exe, which runs our test. We wait until
// the process starts and transmits its own PID so we know who to kill later.
int processIdToKill = WaitForProcessId(listener);

// Setup a time to crash the forked thread
int crashTime = TestUtil.NextInt32(Random, 4000, 5000); // LUCENENET: Adjusted these up by 1 second to give our tests some more time to spin up
ThreadJob t = new ThreadAnonymousClass(this, crashTime, processIdToKill);
// Setup a time to crash the forked thread
int crashTime = TestUtil.NextInt32(Random, 4000, 5000); // LUCENENET: Adjusted these up by 1 second to give our tests some more time to spin up
ThreadJob t = new ThreadAnonymousClass(this, crashTime, processIdToKill);

t.Priority = ThreadPriority.Highest;
t.Start();
t.Join(); // Wait for our thread to kill the other process
t.Priority = ThreadPriority.Highest;
t.Start();
t.Join(); // Wait for our thread to kill the other process

// if we succeeded in finding an index, we are done.
if (CheckIndexes(tempDir))
{
// if we succeeded in finding an index, we are done.
if (CheckIndexes(tempDir))
{
EndOutput(p, childOut, stdOutPumper, stdErrPumper);
return;
}
EndOutput(p, childOut, stdOutPumper, stdErrPumper);
return;
}
EndOutput(p, childOut, stdOutPumper, stdErrPumper);
finally
{
listener.Stop();
p?.Dispose();
}
}
}
else
Expand All @@ -100,13 +112,12 @@ public override void TestNRTThreads_Mem()

// we are the fork, log our processId so the original test can kill us.
int processIdToKill = Process.GetCurrentProcess().Id;
string processIdToKillFile = SystemProperties.GetProperty("tests:tempProcessToKillFile");
int port = SystemProperties.GetPropertyAsInt32("tests:crashtestport");

assertNotNull("No tests:tempProcessToKillFile value was passed to the fork. This is a required system property.", processIdToKillFile);
assertTrue("No tests:crashtestport value was passed to the fork. This is a required system property.", port > 0);

// Writing this file will kick off the thread that crashes us.
using (var writer = new StreamWriter(processIdToKillFile, append: false, Encoding.UTF8, bufferSize: 32))
writer.WriteLine(processIdToKill.ToString(CultureInfo.InvariantCulture));
// Sending the process id will kick off the thread that crashes us.
SendProcessId(processIdToKill, port);

// run the test until we crash.
for (int i = 0; i < 100; i++)
Expand Down Expand Up @@ -145,7 +156,7 @@ public override void Run()
}
}

public Process ForkTest(string tempDir, string tempProcessToKillFile)
public Process ForkTest(string tempDir, int port)
{
//get the full location of the assembly with DaoTests in it
string testAssemblyPath = Assembly.GetAssembly(typeof(TestIndexWriterOnJRECrash)).Location;
Expand Down Expand Up @@ -174,8 +185,8 @@ public Process ForkTest(string tempDir, string tempProcessToKillFile)
// passing NIGHTLY to this test makes it run for much longer, easier to catch it in the act...
TestRunParameter("tests:nightly", "true"),
TestRunParameter("tempDir", tempDir),
// This file is for passing the process ID of the fork back to the original test so it can kill it.
TestRunParameter("tests:tempProcessToKillFile", tempProcessToKillFile),
// This port is for passing the process ID of the fork back to the original test so it can kill it.
TestRunParameter("tests:crashtestport", port.ToString(CultureInfo.InvariantCulture)),
}),
WorkingDirectory = theDirectory,
RedirectStandardOutput = true,
Expand Down Expand Up @@ -335,48 +346,61 @@ public virtual bool CheckIndexes(FileSystemInfo file)
return false;
}

// LUCENENET: Wait for our test to spin up and log its PID so we can kill it.
private static int WaitForProcessToKillLogFile(string processToKillFile)
private TcpListener SetupSocketListener()
{
bool exists = false;
Thread.Sleep(500);
for (int i = 0; i < 150; i++)
{
if (File.Exists(processToKillFile))
{
exists = true;
break;
}
Thread.Sleep(200);
}
// If the fork didn't log its process id, it is a failure.
assertTrue("The test fork didn't log its process id, so we cannot kill it", exists);
using var reader = new StreamReader(processToKillFile, Encoding.UTF8);
// LUCENENET: Our file only has one line with the process Id in it
return int.Parse(reader.ReadLine().Trim(), CultureInfo.InvariantCulture);
// Pick a random port that is available on the local machine.
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
return listener;
}

// LUCENENET: Wait for our test to spin up and send its process ID so we can kill it.
private int WaitForProcessId(TcpListener listener)
{
using var client = listener.AcceptTcpClient();
using var stream = client.GetStream();
// Directly read the process ID as a 32-bit integer
using var reader = new BinaryReader(stream);
return reader.ReadInt32();
}

private void SendProcessId(int processId, int port)
{
using var client = new TcpClient("127.0.0.1", port);
using var stream = client.GetStream();
// Directly write the process ID as a 32-bit integer
using var writer = new BinaryWriter(stream);
writer.Write(processId);
}

public virtual void CrashDotNet(int processIdToKill)
{
Process process = null;
try
{
process = Process.GetProcessById(processIdToKill);
}
catch (ArgumentException)
{
// We get here if the process wasn't running for some reason.
// We should fix the forked test to make it run longer if we get here.
fail("The test completed before we could kill it.");
}
try
{
process = Process.GetProcessById(processIdToKill);
}
catch (ArgumentException)
{
// We get here if the process wasn't running for some reason.
// We should fix the forked test to make it run longer if we get here.
fail("The test completed before we could kill it.");
}
#if FEATURE_PROCESS_KILL_ENTIREPROCESSTREE
process.Kill(entireProcessTree: true);
process.Kill(entireProcessTree: true);
#else
process.Kill();
process.Kill();
#endif
process.WaitForExit(10000);
// We couldn't get .NET to crash for some reason.
assertTrue(process.HasExited);
process.WaitForExit(10000);
// We couldn't get .NET to crash for some reason.
assertTrue(process.HasExited);
}
finally
{
process?.Dispose();
}
}
}
}
51 changes: 51 additions & 0 deletions src/Lucene.Net.Tests/Support/TestConcurrentHashSet.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Lucene.Net.Attributes;
using Lucene.Net.Support;
using NUnit.Framework;
using System.Linq;
using System.Threading.Tasks;

namespace Lucene.Net
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

public class TestConcurrentHashSet
NightOwl888 marked this conversation as resolved.
Show resolved Hide resolved
{
[Test, LuceneNetSpecific]
public void TestExceptWith()
{
// Numbers 0-8, 10-80, 99
var initialSet = Enumerable.Range(1, 8)
.Concat(Enumerable.Range(1, 8).Select(i => i * 10))
.Append(99)
.Append(0);

var hashSet = new ConcurrentHashSet<int>(initialSet);

Parallel.ForEach(Enumerable.Range(1, 8), i =>
{
// Remove i and i * 10, i.e. 1 and 10, 2 and 20, etc.
var except = new[] { i, i * 10 };
hashSet.ExceptWith(except);
});

Assert.AreEqual(2, hashSet.Count);
Assert.IsTrue(hashSet.Contains(0));
Assert.IsTrue(hashSet.Contains(99));
}
}
}
Loading
Loading