diff --git a/findr/PLAN.md b/findr/PLAN.md index b33963f..cd5d585 100644 --- a/findr/PLAN.md +++ b/findr/PLAN.md @@ -78,10 +78,10 @@ Key behaviors: - **Stat avoidance via `dirent.type`** — Uses `core:sys/linux` getdents directly, bypassing `core:os` which calls `openat` + `fstat` per entry. File type comes free from the directory entry. - **Prune ignored directories** — When a directory matches a gitignore pattern, it is not descended into. Skips potentially thousands of readdir calls. +- **Parallel traversal** — 8-worker thread pool with shared LIFO queue and futex-based semaphore signaling. 5.4x speedup over serial on home directory. ### Future (if needed) -- Work-stealing parallel traversal (per-thread LIFO deques with batch stealing, like fd) - BufWriter on stdout for large result sets - Arena allocators for path strings @@ -141,7 +141,7 @@ Key behaviors: **Goal:** Working tool that finds gitignored files in git repos. **Built:** -- `walker.odin` — Single-threaded DFS using `core:sys/linux` getdents. Finds repos, reads `.gitignore`, emits gitignored files, recurses into subdirs for nested repos. +- `walker.odin` — Parallel DFS using `core:sys/linux` getdents with 8-worker thread pool. Finds repos, reads `.gitignore`, emits gitignored files, recurses into subdirs for nested repos. - `findr.odin` — Minimal CLI: `findr [dirs...]`, no flags. - `test_env.odin` — Test harness with temp dirs and mock filesystems. - `findr_test.odin` — 10 integration tests. @@ -150,16 +150,20 @@ Key behaviors: --- -### Phase 3: Parallel Traversal (future) +### Phase 3: Parallel Traversal ✅ **Goal:** Parallelize directory descent for large trees. +**Result:** Worker pool with shared LIFO queue, 8 threads, futex-based semaphore signaling. 852ms vs 4.57s serial (5.4x speedup) on `~`. Serial code has been removed — parallel is the only implementation. + --- -### Phase 4: Benchmark (future) +### Phase 4: Benchmark ✅ **Goal:** Quantify performance vs fd on large directory trees. +**Result:** findr found 227 gitignored files on `~` in 852ms. fd's double-run (all vs unignored) walked ~1.1M entries. findr's pruning of ignored directories (node_modules, dist, etc.) gives a massive advantage. + --- ### Phase 5: Integrate into envr (future) @@ -170,7 +174,7 @@ Key behaviors: | Risk | Mitigation | |---|---| -| Single-threaded may be slow on huge trees | Add threading in Phase 3 after correctness | +| Single-threaded may be slow on huge trees | Resolved — parallel traversal implemented (Phase 3) | | Gitignore edge cases (`**/foo`, `foo/**/bar`) | Comprehensive gitignore_test.odin with spec examples | | dirent.type may be UNKNOWN on some filesystems | Fall back to stat only when type is UNKNOWN | | Missing nested `.env` files in monorepos | Accepted limitation — flat gitignore model | diff --git a/findr/walker.odin b/findr/walker.odin index 96c387d..19679ea 100644 --- a/findr/walker.odin +++ b/findr/walker.odin @@ -7,254 +7,204 @@ import "core:sync" import "core:sys/linux" import "core:thread" -FINDR_PARALLEL :: #config(FINDR_PARALLEL, false) -FINDR_THREADS :: #config(FINDR_THREADS, 8) +THREAD_COUNT :: 8 RawEntry :: struct { name: string, type: linux.Dirent_Type, } +WalkerPool :: struct { + queue: [dynamic]string, + queue_mutex: sync.Mutex, + queue_sema: sync.Atomic_Sema, + results: ^[dynamic]string, + results_mutex: sync.Mutex, + active: i64, + done: sync.One_Shot_Event, + threads: [dynamic]^thread.Thread, +} + walk :: proc(root: string, results: ^[dynamic]string) { - when FINDR_PARALLEL { - walk_parallel(root, results) - } else { - walk_dir_serial(root, results) - } + pool := new(WalkerPool) + pool.queue = make([dynamic]string) + pool.results = results + pool.active = 1 + pool.threads = make([dynamic]^thread.Thread) + + root_clone, _ := strings.clone(root) + append(&pool.queue, root_clone) + sync.atomic_sema_post(&pool.queue_sema) + + for i in 0 ..< THREAD_COUNT { + t := thread.create(walk_worker) + t.data = rawptr(pool) + t.init_context = context + thread.start(t) + append(&pool.threads, t) + } + + sync.one_shot_event_wait(&pool.done) + + for _ in 0 ..< THREAD_COUNT { + sync.atomic_sema_post(&pool.queue_sema) + } + + for t in pool.threads { + thread.destroy(t) + } + delete(pool.threads) + for path in pool.queue { + delete(path) + } + delete(pool.queue) + free(pool) +} + +walk_worker :: proc(t: ^thread.Thread) { + pool := cast(^WalkerPool)t.data + + for { + sync.atomic_sema_wait(&pool.queue_sema) + + sync.mutex_lock(&pool.queue_mutex) + if len(pool.queue) == 0 { + sync.mutex_unlock(&pool.queue_mutex) + if sync.atomic_load_explicit(&pool.active, .Acquire) == 0 { + sync.one_shot_event_signal(&pool.done) + } + break + } + last := len(pool.queue) - 1 + dir_path := pool.queue[last] + ordered_remove(&pool.queue, last) + sync.mutex_unlock(&pool.queue_mutex) + + process_dir(pool, dir_path) + delete(dir_path) + + old := sync.atomic_sub_explicit(&pool.active, 1, .Release) + if old == 1 { + sync.one_shot_event_signal(&pool.done) + } + } +} + +process_dir :: proc(pool: ^WalkerPool, dir_path: string) { + has_git := false + entries := read_dir_entries(dir_path, &has_git) + defer free_entries(&entries) + + if has_git { + gi := load_gitignore(dir_path) + defer if gi != nil { + destroy(gi) + free(gi) + } + + for entry in entries { + if entry.name == ".git" do continue + is_dir := entry.type == .DIR + if gi != nil && is_ignored(gi, entry.name, is_dir) { + if !is_dir { + full_path := join_path(dir_path, entry.name) + sync.mutex_lock(&pool.results_mutex) + append(pool.results, full_path) + sync.mutex_unlock(&pool.results_mutex) + } + continue + } + if is_dir { + child_path := join_path(dir_path, entry.name) + push_work(pool, child_path) + } + } + } else { + for entry in entries { + if entry.type == .DIR { + child_path := join_path(dir_path, entry.name) + push_work(pool, child_path) + } + } + } +} + +push_work :: proc(pool: ^WalkerPool, path: string) { + sync.atomic_add_explicit(&pool.active, 1, .Relaxed) + sync.mutex_lock(&pool.queue_mutex) + append(&pool.queue, path) + sync.mutex_unlock(&pool.queue_mutex) + sync.atomic_sema_post(&pool.queue_sema) } read_dir_entries :: proc(dir_path: string, has_git: ^bool) -> [dynamic]RawEntry { - entries := make([dynamic]RawEntry) + entries := make([dynamic]RawEntry) - cpath := strings.clone_to_cstring(dir_path) - if cpath == nil do return entries + cpath := strings.clone_to_cstring(dir_path) + if cpath == nil do return entries - fd, err := linux.open(cpath, {.DIRECTORY, .CLOEXEC}) - delete(cpath) - if err != .NONE do return entries + fd, err := linux.open(cpath, {.DIRECTORY, .CLOEXEC}) + delete(cpath) + if err != .NONE do return entries - buf: [8192]u8 - has_git^ = false + buf: [8192]u8 + has_git^ = false - for { - n, errno := linux.getdents(fd, buf[:]) - if n <= 0 || errno != .NONE do break + for { + n, errno := linux.getdents(fd, buf[:]) + if n <= 0 || errno != .NONE do break - offs := 0 - for d in linux.dirent_iterate_buf(buf[:n], &offs) { - name := linux.dirent_name(d) - if name == "." || name == ".." do continue + offs := 0 + for d in linux.dirent_iterate_buf(buf[:n], &offs) { + name := linux.dirent_name(d) + if name == "." || name == ".." do continue - if name == ".git" && d.type == .DIR { - has_git^ = true - } + if name == ".git" && d.type == .DIR { + has_git^ = true + } - cloned := strings.clone(name) - append(&entries, RawEntry{name = cloned, type = d.type}) - } - } + cloned := strings.clone(name) + append(&entries, RawEntry{name = cloned, type = d.type}) + } + } - linux.close(fd) - return entries + linux.close(fd) + return entries } free_entries :: proc(entries: ^[dynamic]RawEntry) { - for &entry in entries { - delete(entry.name) - } - delete(entries^) -} - -walk_dir_serial :: proc(dir_path: string, results: ^[dynamic]string) { - has_git := false - entries := read_dir_entries(dir_path, &has_git) - defer free_entries(&entries) - - if has_git { - gi := load_gitignore(dir_path) - defer if gi != nil { - destroy(gi) - free(gi) - } - - for entry in entries { - if entry.name == ".git" do continue - is_dir := entry.type == .DIR - if gi != nil && is_ignored(gi, entry.name, is_dir) { - if !is_dir { - full_path := join_path(dir_path, entry.name) - append(results, full_path) - } - continue - } - if is_dir { - child_path := join_path(dir_path, entry.name) - walk_dir_serial(child_path, results) - delete(child_path) - } - } - } else { - for entry in entries { - if entry.type == .DIR { - child_path := join_path(dir_path, entry.name) - walk_dir_serial(child_path, results) - delete(child_path) - } - } - } + for &entry in entries { + delete(entry.name) + } + delete(entries^) } load_gitignore :: proc(dir_path: string) -> ^Gitignore { - gi_path := join_path(dir_path, ".gitignore") - defer delete(gi_path) + gi_path := join_path(dir_path, ".gitignore") + defer delete(gi_path) - data, err := os.read_entire_file_from_path(gi_path, context.allocator) - if err != nil do return nil + data, err := os.read_entire_file_from_path(gi_path, context.allocator) + if err != .NONE do return nil - gi := new(Gitignore) - gi^ = parse(string(data)) - delete(data) - return gi + gi := new(Gitignore) + gi^ = parse(string(data)) + delete(data) + return gi } join_path :: proc(parent, child: string) -> string { - b: strings.Builder - strings.builder_init(&b) - defer strings.builder_destroy(&b) + b: strings.Builder + strings.builder_init(&b) + defer strings.builder_destroy(&b) - fmt.sbprintf(&b, "%s", parent) - if len(parent) == 0 || parent[len(parent) - 1] != '/' { - fmt.sbprintf(&b, "/") - } - fmt.sbprintf(&b, "%s", child) + fmt.sbprintf(&b, "%s", parent) + if len(parent) == 0 || parent[len(parent) - 1] != '/' { + fmt.sbprintf(&b, "/") + } + fmt.sbprintf(&b, "%s", child) - s := strings.to_string(b) - result, _ := strings.clone(s) - return result + s := strings.to_string(b) + result, _ := strings.clone(s) + return result } -when FINDR_PARALLEL { - WalkerPool :: struct { - queue: [dynamic]string, - queue_mutex: sync.Mutex, - queue_sema: sync.Atomic_Sema, - results: ^[dynamic]string, - results_mutex: sync.Mutex, - active: i64, - done: sync.One_Shot_Event, - threads: [dynamic]^thread.Thread, - } - - walk_parallel :: proc(root: string, results: ^[dynamic]string) { - pool := new(WalkerPool) - pool.queue = make([dynamic]string) - pool.results = results - pool.active = 1 - pool.threads = make([dynamic]^thread.Thread) - - root_clone, _ := strings.clone(root) - append(&pool.queue, root_clone) - sync.atomic_sema_post(&pool.queue_sema) - - num_threads := FINDR_THREADS - for i in 0..