Skip to content

Commit

Permalink
runtime: add elementary thread management
Browse files Browse the repository at this point in the history
  • Loading branch information
mertcandav committed Jan 1, 2025
1 parent 1ce1de4 commit 712d90f
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 13 deletions.
31 changes: 30 additions & 1 deletion src/julec/obj/cxx/object.jule
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const coSpawnMethodSuffix = "_method" // method suffix for coSpawn data prefix
const coSpawnArgDataSuffix = "_argdata" // argdata suffix for coSpawn data prefix
const coSpawnArgDataParam = "__juleCoParam"
const coSpawnArgDataFunc = "__juleCoFunc"
const coSpawnThreadData = "__juleThread"

const structDefaultEqMethodSuffix = "_eq"
const indentKind = '\t'
Expand Down Expand Up @@ -183,6 +184,14 @@ impl ObjectCoder {
self.coSpawnObj.WriteStr(is)!
self.coSpawnObj.WriteStr("{\n")!

// setup thread data
self.coSpawnObj.WriteByte(indentKind)!
writeThreadType(self.coSpawnObj)
self.coSpawnObj.WriteStr(" *")!
self.coSpawnObj.WriteStr(coSpawnThreadData)!
self.coSpawnObj.WriteByte(';')!
self.coSpawnObj.WriteByte('\n')!

// setup func data
self.coSpawnObj.WriteByte(indentKind)!
self.tc.func(self.coSpawnObj, f)
Expand All @@ -206,7 +215,7 @@ impl ObjectCoder {
self.coSpawnObj.WriteByte('\n')!

match {
| build::OS == build::DistOS.Windows:
| build::IsWindows(build::OS):
self.coSpawnObj.WriteStr("unsigned long ")!
| build::IsUnix(build::OS):
self.coSpawnObj.WriteStr("void *")!
Expand Down Expand Up @@ -248,6 +257,13 @@ impl ObjectCoder {
}
self.coSpawnObj.WriteStr(");\n")!

self.coSpawnObj.WriteByte(indentKind)!
identCoder.funcIns(self.coSpawnObj, meta::Program.Runtime.CloseThread)
self.coSpawnObj.WriteStr("(")!
self.coSpawnObj.WriteStr(argData)!
self.coSpawnObj.WriteStr("->")!
self.coSpawnObj.WriteStr(coSpawnThreadData)!
self.coSpawnObj.WriteStr(");\n")!
self.coSpawnObj.WriteByte(indentKind)!
self.coSpawnObj.WriteStr("delete ")!
self.coSpawnObj.WriteStr(argData)!
Expand Down Expand Up @@ -1415,4 +1431,17 @@ fn concatAllParts(parts: ...&token::Token): []byte {
s.WriteStr(p.Kind)!
}
ret unsafe { s.Buf() }
}

// Writes thread data type for Jule's runtime thread handling.
// See runtime::coSpawn function for documentation.
fn writeThreadType(mut &b: strings::Builder) {
match {
| build::IsWindows(build::OS):
b.WriteStr("HANDLE")!
| build::IsUnix(build::OS):
b.WriteStr("pthread_t")!
|:
panic("unreachable")
}
}
2 changes: 2 additions & 0 deletions src/julec/obj/meta/meta.jule
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct Runtime {
SliceBytePtr: &sema::FuncIns
StrAsSlice: &sema::FuncIns
SliceAsStr: &sema::FuncIns
CloseThread: &sema::FuncIns

Map: &sema::Struct
MapIterator: &sema::Struct
Expand Down Expand Up @@ -108,6 +109,7 @@ fn CollectRuntime(mut &ir: &obj::IR): &Runtime {
meta.SliceBytePtr = obj::RuntimeFindFunc(p, "sliceBytePtr").Instances[0]
meta.StrAsSlice = obj::RuntimeFindFunc(p, "strAsSlice").Instances[0]
meta.SliceAsStr = obj::RuntimeFindFunc(p, "sliceAsStr").Instances[0]
meta.CloseThread = obj::RuntimeFindFunc(p, "closeThread").Instances[0]

// Structs.
meta.Map = obj::RuntimeFindStruct(p, "_Map")
Expand Down
4 changes: 4 additions & 0 deletions src/julec/opt/deadcode/define.jule
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ impl ObjectDeadCode {
}

fn collectLive(mut &self) {
// Special cases.
self.pushLive[&sema::FuncIns](meta::Program.Runtime.CloseThread)
self.setReferencesAsLive(meta::Program.Runtime.CloseThread.Refers)

for (_, mut used) in self.ir.Used {
if !used.Binded {
self.collectLivePackage(used.Package)
Expand Down
2 changes: 1 addition & 1 deletion std/runtime/chan.jule
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fn chanpark(ch: &hchan, &n: int, &c: int) {
// The lock is acquired. This means no changes can occur in the channel.
// We must release the lock and immediately switch to a different thread.
ch.lock.unlock()
osyield()
yield()
// After the thread wakes up, we must acquire the lock
// before reaching the critical section again.
ch.lock.lock()
Expand Down
3 changes: 2 additions & 1 deletion std/runtime/sema.jule
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ fn cansemacquire(mut &sema: u32): bool {
fn semapark(&lock: fmutex, &deq: bool) {
lock.unlock()
for !deq {
sleep(1e6)
yield()
}
}

Expand Down Expand Up @@ -257,6 +257,7 @@ fn notifyListWait(mut &l: notifyList, t: u32) {
ret
}
l.lock.unlock()
yield()
}
}

Expand Down
106 changes: 106 additions & 0 deletions std/runtime/thread.jule
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2024 The Jule Programming Language.
// Use of this source code is governed by a BSD 3-Clause
// license that can be found in the LICENSE file.

// State flags of threads.
const threadRunning = 1 << 0
const threadSuspended = 1 << 1
const threadClosed = 1 << 2

// A thread instance is represents a spawned thread.
// Used by the Jule runtime to manage threads.
struct thread {
os: osthread
state: u32
}

// A thread stack and associated lock.
// All spawned threads are stored in the threads.
// When a thread completed, it will be marked as closed.
// A closed thread instance will not be released, remains allocated and placed
// in the threads. Subsequent thread generations may use the same allocation
// of closed threads for the new spawned threads.
static threadMutex = fmutex{}
static mut threads = ([]&thread)(nil)

// Pushes a new thread to the main thread stack and sets state as running.
// Returns the thread representing the created thread.
// Locks the threadMutex and will not release before return,
// should be released after pushNewThread.
fn pushNewThread(): &thread {
threadMutex.lock()
// Lookup for empty threads to caught ready to reuse thread if exist.
for (_, mut t) in threads {
if t.state&threadClosed == threadClosed {
t.state &= ^threadClosed
t.state |= threadRunning
ret t
}
}
// We have not any reusable thread, so create a new one.
mut t := new(thread)
t.state |= threadRunning
threads = append(threads, t)
ret t
}

// Suspends the current thread and yields the CPU.
fn yield() {
threadMutex.lock()
id := currentThreadID()
mut i := -1
mut runs := false
for (j, mut thread) in threads {
if thread.os.equal(id) {
thread.state |= threadSuspended
i = j
break
}
runs = runs ||
thread.state&threadRunning == threadRunning &&
thread.state&threadSuspended != threadSuspended
}
if i == -1 {
panic("runtime: thread is not exist")
}
// There is no running thread found for [0, i].
// Lookup to (i, len) for running thread.
if !runs {
for (_, mut thread) in threads[i+1:] {
if thread.state&threadRunning == threadRunning &&
thread.state&threadSuspended != threadSuspended {
runs = true
break
}
}
// We have not any running thread, all of them suspended or closed.
// So all threads are locked now, we have a deadlock.
if !runs {
panic("runtime: all threads are asleep - deadlock!")
}
}
// Unlock the mutex becase other threads may need to lock.
// There is nothing to do for this thread, so release.
threadMutex.unlock()
// Yield the CPU if possible, it may return immediately for the same thread.
// However, this part of thread management belongs to the operating system.
osyield()
// CPU is back for this thread.
// Lock mutex again and wake up.
threadMutex.lock()
threads[i].state &= ^threadSuspended
threadMutex.unlock()
}

// Closes the thread associated with t, if exist.
fn closeThread(t: *unsafe) {
threadMutex.lock()
for (_, mut thread) in threads {
if &thread.os.handle == t {
thread.state &= ^(threadRunning | threadSuspended)
thread.state |= threadClosed
break
}
}
threadMutex.unlock()
}
44 changes: 39 additions & 5 deletions std/runtime/thread_unix.jule
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,63 @@ cpp use "<pthread.h>"
cpp use "<sched.h>"

cpp unsafe fn pthread_create(*cpp.pthread_t, *unsafe, *unsafe, *unsafe): int
cpp fn pthread_equal(cpp.pthread_t, cpp.pthread_t): bool
cpp fn pthread_detach(cpp.pthread_t): int
cpp fn pthread_self(): cpp.pthread_t
cpp fn sched_yield(): int

#typedef
cpp struct pthread_t{}

// Wrapper for operating system thread.
struct osthread {
handle: cpp.pthread_t
}

impl osthread {
// Reports whether the threads are equal.
fn equal(self, other: cpp.pthread_t): bool {
ret cpp.pthread_equal(self.handle, other)
}
}

// Common head fields for a thread data.
struct threadData {
handle: *cpp.pthread_t
}

// A low level API function for threads.
// It doesn't provide much abstraction.
// It just creates and detaches a thread using API.
// Reports whether the thread created successfully.
// The created thread is a native-thread.
// The |func| parameter should point to the valid function for operating system thread API.
// The |args| parameter may be nil and should point to the argument data.
// The |args| parameter may be nil and should point to the thread data.
// The thread data, should be fit into the threadData struct.
// So, the head fields of the thread data should be matched fields of the threadData.
#export "__jule_coSpawn"
unsafe fn coSpawn(func: *unsafe, args: *unsafe): bool {
let t: cpp.pthread_t
if cpp.pthread_create(&t, nil, integ::Emit[*unsafe]("(void*(*)(void*))({})", func), args) != 0 {
unsafe fn coSpawn(func: *unsafe, mut args: *unsafe): bool {
mut thread := pushNewThread()
(*threadData)(args).handle = &thread.os.handle
if cpp.pthread_create(&thread.os.handle, nil, integ::Emit[*unsafe]("(void*(*)(void*))({})", func), args) != 0 {
ret false
}
cpp.pthread_detach(t)
threadMutex.unlock()
cpp.pthread_detach(thread.os.handle)
ret true
}

fn currentThreadID(): cpp.pthread_t {
ret cpp.pthread_self()
}

fn osyield() {
cpp.sched_yield()
}

fn init() {
// Push the main thread to threads.
mut thread := pushNewThread()
thread.os.handle = currentThreadID()
threadMutex.unlock()
}
51 changes: 46 additions & 5 deletions std/runtime/thread_windows.jule
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,66 @@ use integ "std/jule/integrated"
use "std/sys"

cpp unsafe fn CreateThread(*unsafe, int, *unsafe, *unsafe, int, *unsafe): *unsafe
cpp fn GetCurrentThreadId(): _DWORD
cpp fn SwitchToThread(): bool

// Wrapper for operating system thread.
struct osthread {
id: _DWORD
handle: *unsafe
}

impl osthread {
// Reports whether the threads are equal.
fn equal(self, other: _DWORD): bool {
ret self.id == other
}
}

// Common head fields for a thread data.
struct threadData {
handle: **unsafe
}

// A low level API function for threads.
// It doesn't provide much abstraction.
// It just creates and detaches a thread using API.
// Reports whether the thread created successfully.
// The created thread is a native-thread.
// The |func| parameter should point to the valid function for operating system thread API.
// The |args| parameter may be nil and should point to the argument data.
// The |args| parameter may be nil and should point to the thread data.
// The thread data, should be fit into the threadData struct.
// So, the head fields of the thread data should be matched fields of the threadData.
#export "__jule_coSpawn"
unsafe fn coSpawn(func: *unsafe, args: *unsafe): bool {
handle := cpp.CreateThread(nil, 0, integ::Emit[*unsafe]("(unsigned long(*)(void*))({})", func), args, 0, nil)
if handle == nil {
unsafe fn coSpawn(func: *unsafe, mut args: *unsafe): bool {
mut thread := pushNewThread()
(*threadData)(args).handle = &thread.os.handle
thread.os.handle = cpp.CreateThread(
nil,
0,
integ::Emit[*unsafe]("(unsigned long(*)(void*))({})", func),
args,
0,
integ::Emit[*unsafe]("(LPDWORD)({})", &thread.os.id))
if thread.os.handle == nil {
ret false
}
sys::CloseHandle(sys::Handle(handle))
threadMutex.unlock()
sys::CloseHandle(sys::Handle(thread.os.handle))
ret true
}

fn currentThreadID(): _DWORD {
ret cpp.GetCurrentThreadId()
}

fn osyield() {
cpp.SwitchToThread()
}

fn init() {
// Push the main thread to threads.
mut thread := pushNewThread()
thread.os.id = currentThreadID()
threadMutex.unlock()
}
2 changes: 2 additions & 0 deletions std/sync/cond_test.jule
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD 3-Clause
// license that can be found in the LICENSE file.

use "std/runtime"
use "std/sync/atomic"
use "std/testing"

Expand All @@ -21,6 +22,7 @@ fn testCond(t: &testing::T) {
for atomic::Load(cond.notify.wait, atomic::SeqCst) == 0 {
// Wait until thread starting to wait for condition.
}
runtime::sleep(runtime::_Second * 1)
cond.Lock()
*ready = true
cond.Signal()
Expand Down

0 comments on commit 712d90f

Please sign in to comment.