mirror of
https://github.com/sbrow/envr.git
synced 2026-06-27 10:38:33 -04:00
wip:
This commit is contained in:
@@ -78,10 +78,10 @@ Key behaviors:
|
||||
|
||||
- **Stat avoidance via `dirent.type`** — Uses `core:sys/linux` getdents directly, bypassing `core:os` which calls `openat` + `fstat` per entry. File type comes free from the directory entry.
|
||||
- **Prune ignored directories** — When a directory matches a gitignore pattern, it is not descended into. Skips potentially thousands of readdir calls.
|
||||
- **Parallel traversal** — 8-worker thread pool with shared LIFO queue and futex-based semaphore signaling. 5.4x speedup over serial on home directory.
|
||||
|
||||
### Future (if needed)
|
||||
|
||||
- Work-stealing parallel traversal (per-thread LIFO deques with batch stealing, like fd)
|
||||
- BufWriter on stdout for large result sets
|
||||
- Arena allocators for path strings
|
||||
|
||||
@@ -141,7 +141,7 @@ Key behaviors:
|
||||
**Goal:** Working tool that finds gitignored files in git repos.
|
||||
|
||||
**Built:**
|
||||
- `walker.odin` — Single-threaded DFS using `core:sys/linux` getdents. Finds repos, reads `.gitignore`, emits gitignored files, recurses into subdirs for nested repos.
|
||||
- `walker.odin` — Parallel DFS using `core:sys/linux` getdents with 8-worker thread pool. Finds repos, reads `.gitignore`, emits gitignored files, recurses into subdirs for nested repos.
|
||||
- `findr.odin` — Minimal CLI: `findr [dirs...]`, no flags.
|
||||
- `test_env.odin` — Test harness with temp dirs and mock filesystems.
|
||||
- `findr_test.odin` — 10 integration tests.
|
||||
@@ -150,16 +150,20 @@ Key behaviors:
|
||||
|
||||
---
|
||||
|
||||
### Phase 3: Parallel Traversal (future)
|
||||
### Phase 3: Parallel Traversal ✅
|
||||
|
||||
**Goal:** Parallelize directory descent for large trees.
|
||||
|
||||
**Result:** Worker pool with shared LIFO queue, 8 threads, futex-based semaphore signaling. 852ms vs 4.57s serial (5.4x speedup) on `~`. Serial code has been removed — parallel is the only implementation.
|
||||
|
||||
---
|
||||
|
||||
### Phase 4: Benchmark (future)
|
||||
### Phase 4: Benchmark ✅
|
||||
|
||||
**Goal:** Quantify performance vs fd on large directory trees.
|
||||
|
||||
**Result:** findr found 227 gitignored files on `~` in 852ms. fd's double-run (all vs unignored) walked ~1.1M entries. findr's pruning of ignored directories (node_modules, dist, etc.) gives a massive advantage.
|
||||
|
||||
---
|
||||
|
||||
### Phase 5: Integrate into envr (future)
|
||||
@@ -170,7 +174,7 @@ Key behaviors:
|
||||
|
||||
| Risk | Mitigation |
|
||||
|---|---|
|
||||
| Single-threaded may be slow on huge trees | Add threading in Phase 3 after correctness |
|
||||
| Single-threaded may be slow on huge trees | Resolved — parallel traversal implemented (Phase 3) |
|
||||
| Gitignore edge cases (`**/foo`, `foo/**/bar`) | Comprehensive gitignore_test.odin with spec examples |
|
||||
| dirent.type may be UNKNOWN on some filesystems | Fall back to stat only when type is UNKNOWN |
|
||||
| Missing nested `.env` files in monorepos | Accepted limitation — flat gitignore model |
|
||||
|
||||
@@ -7,254 +7,204 @@ import "core:sync"
|
||||
import "core:sys/linux"
|
||||
import "core:thread"
|
||||
|
||||
FINDR_PARALLEL :: #config(FINDR_PARALLEL, false)
|
||||
FINDR_THREADS :: #config(FINDR_THREADS, 8)
|
||||
THREAD_COUNT :: 8
|
||||
|
||||
RawEntry :: struct {
|
||||
name: string,
|
||||
type: linux.Dirent_Type,
|
||||
}
|
||||
|
||||
WalkerPool :: struct {
|
||||
queue: [dynamic]string,
|
||||
queue_mutex: sync.Mutex,
|
||||
queue_sema: sync.Atomic_Sema,
|
||||
results: ^[dynamic]string,
|
||||
results_mutex: sync.Mutex,
|
||||
active: i64,
|
||||
done: sync.One_Shot_Event,
|
||||
threads: [dynamic]^thread.Thread,
|
||||
}
|
||||
|
||||
walk :: proc(root: string, results: ^[dynamic]string) {
|
||||
when FINDR_PARALLEL {
|
||||
walk_parallel(root, results)
|
||||
} else {
|
||||
walk_dir_serial(root, results)
|
||||
}
|
||||
pool := new(WalkerPool)
|
||||
pool.queue = make([dynamic]string)
|
||||
pool.results = results
|
||||
pool.active = 1
|
||||
pool.threads = make([dynamic]^thread.Thread)
|
||||
|
||||
root_clone, _ := strings.clone(root)
|
||||
append(&pool.queue, root_clone)
|
||||
sync.atomic_sema_post(&pool.queue_sema)
|
||||
|
||||
for i in 0 ..< THREAD_COUNT {
|
||||
t := thread.create(walk_worker)
|
||||
t.data = rawptr(pool)
|
||||
t.init_context = context
|
||||
thread.start(t)
|
||||
append(&pool.threads, t)
|
||||
}
|
||||
|
||||
sync.one_shot_event_wait(&pool.done)
|
||||
|
||||
for _ in 0 ..< THREAD_COUNT {
|
||||
sync.atomic_sema_post(&pool.queue_sema)
|
||||
}
|
||||
|
||||
for t in pool.threads {
|
||||
thread.destroy(t)
|
||||
}
|
||||
delete(pool.threads)
|
||||
for path in pool.queue {
|
||||
delete(path)
|
||||
}
|
||||
delete(pool.queue)
|
||||
free(pool)
|
||||
}
|
||||
|
||||
walk_worker :: proc(t: ^thread.Thread) {
|
||||
pool := cast(^WalkerPool)t.data
|
||||
|
||||
for {
|
||||
sync.atomic_sema_wait(&pool.queue_sema)
|
||||
|
||||
sync.mutex_lock(&pool.queue_mutex)
|
||||
if len(pool.queue) == 0 {
|
||||
sync.mutex_unlock(&pool.queue_mutex)
|
||||
if sync.atomic_load_explicit(&pool.active, .Acquire) == 0 {
|
||||
sync.one_shot_event_signal(&pool.done)
|
||||
}
|
||||
break
|
||||
}
|
||||
last := len(pool.queue) - 1
|
||||
dir_path := pool.queue[last]
|
||||
ordered_remove(&pool.queue, last)
|
||||
sync.mutex_unlock(&pool.queue_mutex)
|
||||
|
||||
process_dir(pool, dir_path)
|
||||
delete(dir_path)
|
||||
|
||||
old := sync.atomic_sub_explicit(&pool.active, 1, .Release)
|
||||
if old == 1 {
|
||||
sync.one_shot_event_signal(&pool.done)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
process_dir :: proc(pool: ^WalkerPool, dir_path: string) {
|
||||
has_git := false
|
||||
entries := read_dir_entries(dir_path, &has_git)
|
||||
defer free_entries(&entries)
|
||||
|
||||
if has_git {
|
||||
gi := load_gitignore(dir_path)
|
||||
defer if gi != nil {
|
||||
destroy(gi)
|
||||
free(gi)
|
||||
}
|
||||
|
||||
for entry in entries {
|
||||
if entry.name == ".git" do continue
|
||||
is_dir := entry.type == .DIR
|
||||
if gi != nil && is_ignored(gi, entry.name, is_dir) {
|
||||
if !is_dir {
|
||||
full_path := join_path(dir_path, entry.name)
|
||||
sync.mutex_lock(&pool.results_mutex)
|
||||
append(pool.results, full_path)
|
||||
sync.mutex_unlock(&pool.results_mutex)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if is_dir {
|
||||
child_path := join_path(dir_path, entry.name)
|
||||
push_work(pool, child_path)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for entry in entries {
|
||||
if entry.type == .DIR {
|
||||
child_path := join_path(dir_path, entry.name)
|
||||
push_work(pool, child_path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
push_work :: proc(pool: ^WalkerPool, path: string) {
|
||||
sync.atomic_add_explicit(&pool.active, 1, .Relaxed)
|
||||
sync.mutex_lock(&pool.queue_mutex)
|
||||
append(&pool.queue, path)
|
||||
sync.mutex_unlock(&pool.queue_mutex)
|
||||
sync.atomic_sema_post(&pool.queue_sema)
|
||||
}
|
||||
|
||||
read_dir_entries :: proc(dir_path: string, has_git: ^bool) -> [dynamic]RawEntry {
|
||||
entries := make([dynamic]RawEntry)
|
||||
entries := make([dynamic]RawEntry)
|
||||
|
||||
cpath := strings.clone_to_cstring(dir_path)
|
||||
if cpath == nil do return entries
|
||||
cpath := strings.clone_to_cstring(dir_path)
|
||||
if cpath == nil do return entries
|
||||
|
||||
fd, err := linux.open(cpath, {.DIRECTORY, .CLOEXEC})
|
||||
delete(cpath)
|
||||
if err != .NONE do return entries
|
||||
fd, err := linux.open(cpath, {.DIRECTORY, .CLOEXEC})
|
||||
delete(cpath)
|
||||
if err != .NONE do return entries
|
||||
|
||||
buf: [8192]u8
|
||||
has_git^ = false
|
||||
buf: [8192]u8
|
||||
has_git^ = false
|
||||
|
||||
for {
|
||||
n, errno := linux.getdents(fd, buf[:])
|
||||
if n <= 0 || errno != .NONE do break
|
||||
for {
|
||||
n, errno := linux.getdents(fd, buf[:])
|
||||
if n <= 0 || errno != .NONE do break
|
||||
|
||||
offs := 0
|
||||
for d in linux.dirent_iterate_buf(buf[:n], &offs) {
|
||||
name := linux.dirent_name(d)
|
||||
if name == "." || name == ".." do continue
|
||||
offs := 0
|
||||
for d in linux.dirent_iterate_buf(buf[:n], &offs) {
|
||||
name := linux.dirent_name(d)
|
||||
if name == "." || name == ".." do continue
|
||||
|
||||
if name == ".git" && d.type == .DIR {
|
||||
has_git^ = true
|
||||
}
|
||||
if name == ".git" && d.type == .DIR {
|
||||
has_git^ = true
|
||||
}
|
||||
|
||||
cloned := strings.clone(name)
|
||||
append(&entries, RawEntry{name = cloned, type = d.type})
|
||||
}
|
||||
}
|
||||
cloned := strings.clone(name)
|
||||
append(&entries, RawEntry{name = cloned, type = d.type})
|
||||
}
|
||||
}
|
||||
|
||||
linux.close(fd)
|
||||
return entries
|
||||
linux.close(fd)
|
||||
return entries
|
||||
}
|
||||
|
||||
free_entries :: proc(entries: ^[dynamic]RawEntry) {
|
||||
for &entry in entries {
|
||||
delete(entry.name)
|
||||
}
|
||||
delete(entries^)
|
||||
}
|
||||
|
||||
walk_dir_serial :: proc(dir_path: string, results: ^[dynamic]string) {
|
||||
has_git := false
|
||||
entries := read_dir_entries(dir_path, &has_git)
|
||||
defer free_entries(&entries)
|
||||
|
||||
if has_git {
|
||||
gi := load_gitignore(dir_path)
|
||||
defer if gi != nil {
|
||||
destroy(gi)
|
||||
free(gi)
|
||||
}
|
||||
|
||||
for entry in entries {
|
||||
if entry.name == ".git" do continue
|
||||
is_dir := entry.type == .DIR
|
||||
if gi != nil && is_ignored(gi, entry.name, is_dir) {
|
||||
if !is_dir {
|
||||
full_path := join_path(dir_path, entry.name)
|
||||
append(results, full_path)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if is_dir {
|
||||
child_path := join_path(dir_path, entry.name)
|
||||
walk_dir_serial(child_path, results)
|
||||
delete(child_path)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for entry in entries {
|
||||
if entry.type == .DIR {
|
||||
child_path := join_path(dir_path, entry.name)
|
||||
walk_dir_serial(child_path, results)
|
||||
delete(child_path)
|
||||
}
|
||||
}
|
||||
}
|
||||
for &entry in entries {
|
||||
delete(entry.name)
|
||||
}
|
||||
delete(entries^)
|
||||
}
|
||||
|
||||
load_gitignore :: proc(dir_path: string) -> ^Gitignore {
|
||||
gi_path := join_path(dir_path, ".gitignore")
|
||||
defer delete(gi_path)
|
||||
gi_path := join_path(dir_path, ".gitignore")
|
||||
defer delete(gi_path)
|
||||
|
||||
data, err := os.read_entire_file_from_path(gi_path, context.allocator)
|
||||
if err != nil do return nil
|
||||
data, err := os.read_entire_file_from_path(gi_path, context.allocator)
|
||||
if err != .NONE do return nil
|
||||
|
||||
gi := new(Gitignore)
|
||||
gi^ = parse(string(data))
|
||||
delete(data)
|
||||
return gi
|
||||
gi := new(Gitignore)
|
||||
gi^ = parse(string(data))
|
||||
delete(data)
|
||||
return gi
|
||||
}
|
||||
|
||||
join_path :: proc(parent, child: string) -> string {
|
||||
b: strings.Builder
|
||||
strings.builder_init(&b)
|
||||
defer strings.builder_destroy(&b)
|
||||
b: strings.Builder
|
||||
strings.builder_init(&b)
|
||||
defer strings.builder_destroy(&b)
|
||||
|
||||
fmt.sbprintf(&b, "%s", parent)
|
||||
if len(parent) == 0 || parent[len(parent) - 1] != '/' {
|
||||
fmt.sbprintf(&b, "/")
|
||||
}
|
||||
fmt.sbprintf(&b, "%s", child)
|
||||
fmt.sbprintf(&b, "%s", parent)
|
||||
if len(parent) == 0 || parent[len(parent) - 1] != '/' {
|
||||
fmt.sbprintf(&b, "/")
|
||||
}
|
||||
fmt.sbprintf(&b, "%s", child)
|
||||
|
||||
s := strings.to_string(b)
|
||||
result, _ := strings.clone(s)
|
||||
return result
|
||||
s := strings.to_string(b)
|
||||
result, _ := strings.clone(s)
|
||||
return result
|
||||
}
|
||||
|
||||
when FINDR_PARALLEL {
|
||||
WalkerPool :: struct {
|
||||
queue: [dynamic]string,
|
||||
queue_mutex: sync.Mutex,
|
||||
queue_sema: sync.Atomic_Sema,
|
||||
results: ^[dynamic]string,
|
||||
results_mutex: sync.Mutex,
|
||||
active: i64,
|
||||
done: sync.One_Shot_Event,
|
||||
threads: [dynamic]^thread.Thread,
|
||||
}
|
||||
|
||||
walk_parallel :: proc(root: string, results: ^[dynamic]string) {
|
||||
pool := new(WalkerPool)
|
||||
pool.queue = make([dynamic]string)
|
||||
pool.results = results
|
||||
pool.active = 1
|
||||
pool.threads = make([dynamic]^thread.Thread)
|
||||
|
||||
root_clone, _ := strings.clone(root)
|
||||
append(&pool.queue, root_clone)
|
||||
sync.atomic_sema_post(&pool.queue_sema)
|
||||
|
||||
num_threads := FINDR_THREADS
|
||||
for i in 0..<num_threads {
|
||||
t := thread.create(walk_worker)
|
||||
t.data = rawptr(pool)
|
||||
t.init_context = context
|
||||
thread.start(t)
|
||||
append(&pool.threads, t)
|
||||
}
|
||||
|
||||
sync.one_shot_event_wait(&pool.done)
|
||||
|
||||
for _ in 0..<num_threads {
|
||||
sync.atomic_sema_post(&pool.queue_sema)
|
||||
}
|
||||
|
||||
for t in pool.threads {
|
||||
thread.destroy(t)
|
||||
}
|
||||
delete(pool.threads)
|
||||
for path in pool.queue {
|
||||
delete(path)
|
||||
}
|
||||
delete(pool.queue)
|
||||
free(pool)
|
||||
}
|
||||
|
||||
push_work :: proc(pool: ^WalkerPool, path: string) {
|
||||
sync.atomic_add_explicit(&pool.active, 1, .Relaxed)
|
||||
sync.mutex_lock(&pool.queue_mutex)
|
||||
append(&pool.queue, path)
|
||||
sync.mutex_unlock(&pool.queue_mutex)
|
||||
sync.atomic_sema_post(&pool.queue_sema)
|
||||
}
|
||||
|
||||
process_dir_parallel :: proc(pool: ^WalkerPool, dir_path: string) {
|
||||
has_git := false
|
||||
entries := read_dir_entries(dir_path, &has_git)
|
||||
defer free_entries(&entries)
|
||||
|
||||
if has_git {
|
||||
gi := load_gitignore(dir_path)
|
||||
defer if gi != nil {
|
||||
destroy(gi)
|
||||
free(gi)
|
||||
}
|
||||
|
||||
for entry in entries {
|
||||
if entry.name == ".git" do continue
|
||||
is_dir := entry.type == .DIR
|
||||
if gi != nil && is_ignored(gi, entry.name, is_dir) {
|
||||
if !is_dir {
|
||||
full_path := join_path(dir_path, entry.name)
|
||||
sync.mutex_lock(&pool.results_mutex)
|
||||
append(pool.results, full_path)
|
||||
sync.mutex_unlock(&pool.results_mutex)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if is_dir {
|
||||
child_path := join_path(dir_path, entry.name)
|
||||
push_work(pool, child_path)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for entry in entries {
|
||||
if entry.type == .DIR {
|
||||
child_path := join_path(dir_path, entry.name)
|
||||
push_work(pool, child_path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
walk_worker :: proc(t: ^thread.Thread) {
|
||||
pool := cast(^WalkerPool) t.data
|
||||
|
||||
for {
|
||||
sync.atomic_sema_wait(&pool.queue_sema)
|
||||
|
||||
sync.mutex_lock(&pool.queue_mutex)
|
||||
if len(pool.queue) == 0 {
|
||||
sync.mutex_unlock(&pool.queue_mutex)
|
||||
if sync.atomic_load_explicit(&pool.active, .Acquire) == 0 {
|
||||
sync.one_shot_event_signal(&pool.done)
|
||||
}
|
||||
break
|
||||
}
|
||||
last := len(pool.queue) - 1
|
||||
dir_path := pool.queue[last]
|
||||
ordered_remove(&pool.queue, last)
|
||||
sync.mutex_unlock(&pool.queue_mutex)
|
||||
|
||||
process_dir_parallel(pool, dir_path)
|
||||
delete(dir_path)
|
||||
|
||||
old := sync.atomic_sub_explicit(&pool.active, 1, .Release)
|
||||
if old == 1 {
|
||||
sync.one_shot_event_signal(&pool.done)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user