perf(findr): Switched to using channels.
This commit is contained in:
105
walker.odin
105
walker.odin
@@ -4,10 +4,13 @@ import "core:fmt"
|
||||
import "core:os"
|
||||
import "core:strings"
|
||||
import "core:sync"
|
||||
import "core:sync/chan"
|
||||
import "core:sys/linux"
|
||||
import "core:text/regex"
|
||||
import "core:thread"
|
||||
|
||||
BATCH_SIZE :: 256
|
||||
|
||||
IgnoreMode :: enum {
|
||||
Respected, // skip gitignored, prune ignored dirs (fd -H default)
|
||||
All, // ignore .gitignore entirely, descend everywhere (fd -HI)
|
||||
@@ -35,28 +38,39 @@ WorkItem :: struct {
|
||||
}
|
||||
|
||||
WalkerPool :: struct {
|
||||
queue: [dynamic]WorkItem,
|
||||
queue_mutex: sync.Mutex,
|
||||
queue_sema: sync.Atomic_Sema,
|
||||
results: ^[dynamic]string,
|
||||
results_mutex: sync.Mutex,
|
||||
active: i64,
|
||||
done: sync.One_Shot_Event,
|
||||
threads: []^thread.Thread,
|
||||
opts: WalkOptions,
|
||||
pattern_re: regex.Regular_Expression,
|
||||
has_pattern: bool,
|
||||
exclude_gi: ^Gitignore,
|
||||
all_contexts: [dynamic]^GIContext,
|
||||
queue: [dynamic]WorkItem,
|
||||
queue_mutex: sync.Mutex,
|
||||
queue_sema: sync.Atomic_Sema,
|
||||
result_chan: chan.Chan([]string),
|
||||
active: i64,
|
||||
done: sync.One_Shot_Event,
|
||||
threads: []^thread.Thread,
|
||||
opts: WalkOptions,
|
||||
pattern_re: regex.Regular_Expression,
|
||||
has_pattern: bool,
|
||||
exclude_gi: ^Gitignore,
|
||||
all_contexts: [dynamic]^GIContext,
|
||||
contexts_lock: sync.Mutex,
|
||||
}
|
||||
|
||||
walk :: proc(roots: []string, results: ^[dynamic]string, opts: WalkOptions, thread_count: int) {
|
||||
flush_batch :: proc(ch: chan.Chan([]string), local: ^[dynamic]string) {
|
||||
if len(local) == 0 do return
|
||||
batch := local[:]
|
||||
local^ = make([dynamic]string, 0, BATCH_SIZE)
|
||||
chan.send(ch, batch)
|
||||
}
|
||||
|
||||
walk_stream :: proc(
|
||||
roots: []string,
|
||||
result_chan: chan.Chan([]string),
|
||||
opts: WalkOptions,
|
||||
thread_count: int,
|
||||
) {
|
||||
if len(roots) == 0 do return
|
||||
|
||||
pool := new(WalkerPool)
|
||||
pool.queue = make([dynamic]WorkItem)
|
||||
pool.results = results
|
||||
pool.result_chan = result_chan
|
||||
pool.active = i64(len(roots))
|
||||
pool.threads = make([]^thread.Thread, thread_count)
|
||||
pool.all_contexts = make([dynamic]^GIContext)
|
||||
@@ -137,14 +151,59 @@ walk :: proc(roots: []string, results: ^[dynamic]string, opts: WalkOptions, thre
|
||||
free(pool)
|
||||
}
|
||||
|
||||
Collector_Data :: struct {
|
||||
ch: chan.Chan([]string),
|
||||
results: ^[dynamic]string,
|
||||
}
|
||||
|
||||
collect_worker :: proc(t: ^thread.Thread) {
|
||||
data := cast(^Collector_Data)t.data
|
||||
for {
|
||||
batch, ok := chan.recv(data.ch)
|
||||
if !ok do break
|
||||
for s in batch {
|
||||
append(data.results, s)
|
||||
}
|
||||
delete(batch)
|
||||
}
|
||||
}
|
||||
|
||||
walk :: proc(roots: []string, results: ^[dynamic]string, opts: WalkOptions, thread_count: int) {
|
||||
if len(roots) == 0 do return
|
||||
|
||||
ch, _ := chan.create(chan.Chan([]string), max(2 * thread_count, 2), context.allocator)
|
||||
defer chan.destroy(ch)
|
||||
|
||||
data := new(Collector_Data)
|
||||
data.ch = ch
|
||||
data.results = results
|
||||
|
||||
collector := thread.create(collect_worker)
|
||||
collector.data = rawptr(data)
|
||||
collector.init_context = context
|
||||
thread.start(collector)
|
||||
|
||||
walk_stream(roots, ch, opts, thread_count)
|
||||
|
||||
chan.close(ch)
|
||||
thread.join(collector)
|
||||
thread.destroy(collector)
|
||||
free(data)
|
||||
}
|
||||
|
||||
walk_worker :: proc(t: ^thread.Thread) {
|
||||
pool := cast(^WalkerPool)t.data
|
||||
|
||||
prof_thread_init("walker")
|
||||
defer prof_thread_destroy()
|
||||
|
||||
local_results := make([dynamic]string, 0, 256)
|
||||
defer delete(local_results)
|
||||
local_results := make([dynamic]string, 0, BATCH_SIZE)
|
||||
defer {
|
||||
if len(local_results) > 0 {
|
||||
flush_batch(pool.result_chan, &local_results)
|
||||
}
|
||||
delete(local_results)
|
||||
}
|
||||
|
||||
for {
|
||||
sync.atomic_sema_wait(&pool.queue_sema)
|
||||
@@ -166,19 +225,15 @@ walk_worker :: proc(t: ^thread.Thread) {
|
||||
delete(item.path)
|
||||
if len(item.rel) > 0 {delete(item.rel)}
|
||||
|
||||
if len(local_results) >= BATCH_SIZE {
|
||||
flush_batch(pool.result_chan, &local_results)
|
||||
}
|
||||
|
||||
old := sync.atomic_sub_explicit(&pool.active, 1, .Release)
|
||||
if old == 1 {
|
||||
sync.one_shot_event_signal(&pool.done)
|
||||
}
|
||||
}
|
||||
|
||||
if len(local_results) > 0 {
|
||||
sync.mutex_lock(&pool.results_mutex)
|
||||
for res in local_results {
|
||||
append(pool.results, res)
|
||||
}
|
||||
sync.mutex_unlock(&pool.results_mutex)
|
||||
}
|
||||
}
|
||||
|
||||
process_dir :: proc(pool: ^WalkerPool, item: WorkItem, local_results: ^[dynamic]string) {
|
||||
|
||||
Reference in New Issue
Block a user