From 7e29aad29518ca9940bd4eefe19baae442c4b27b Mon Sep 17 00:00:00 2001 From: Madhu Patel Date: Fri, 17 Mar 2023 20:59:02 +0530 Subject: [PATCH 1/4] TimespanLogging Remove worker Id Nesting --- lib/TimespanLogging/src/core.jl | 40 ++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/lib/TimespanLogging/src/core.jl b/lib/TimespanLogging/src/core.jl index 72717acf9..792a36eb2 100644 --- a/lib/TimespanLogging/src/core.jl +++ b/lib/TimespanLogging/src/core.jl @@ -233,28 +233,35 @@ function write_event(ml::MultiEventLog, event::Event) end end -function get_logs!(ml::MultiEventLog; only_local=false) - logs = Dict{Int,Dict{Symbol,Vector}}() - wkrs = only_local ? myid() : procs() - # FIXME: Log this logic - @sync for p in wkrs - @async begin - logs[p] = remotecall_fetch(p, ml) do ml - mls = get_state(ml) - lock(event_log_lock) do - sublogs = Dict{Symbol,Vector}() - for name in keys(mls.consumers) - sublogs[name] = mls.consumer_logs[name] - mls.consumer_logs[name] = [] +function get_logs!(ml::MultiEventLog; only_local=true) + if only_local + # Only return logs from current process + mls = get_state(ml) + return mls.consumer_logs + else + # Return logs from all processes + logs = Dict{Int,Dict{Symbol,Vector}}() + wkrs = procs() + @sync for p in wkrs + @async begin + logs[p] = remotecall_fetch(p, ml) do ml + mls = get_state(ml) + lock(event_log_lock) do + sublogs = Dict{Symbol,Vector}() + for name in keys(mls.consumers) + sublogs[name] = mls.consumer_logs[name] + mls.consumer_logs[name] = [] + end + sublogs end - sublogs end end end - end - return logs + return logs + end end + # Core logging operations empty_prof() = ProfilerResult(UInt[], Dict{UInt64, Vector{Base.StackTraces.StackFrame}}(), UInt[]) @@ -471,3 +478,4 @@ function summarize_events(time_spent, gc_diff, profiler_samples) end summarize_events(xs) = summarize_events(aggregate_events(xs)...) + From dde757dbe299e095130d0c309e6f40bbe3dcbc4d Mon Sep 17 00:00:00 2001 From: Madhu Patel <54492585+Madhupatel08@users.noreply.github.com> Date: Tue, 21 Mar 2023 23:39:21 +0530 Subject: [PATCH 2/4] Update lib/TimespanLogging/src/core.jl Co-authored-by: Julian Samaroo --- lib/TimespanLogging/src/core.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/TimespanLogging/src/core.jl b/lib/TimespanLogging/src/core.jl index 792a36eb2..63b1d07ff 100644 --- a/lib/TimespanLogging/src/core.jl +++ b/lib/TimespanLogging/src/core.jl @@ -258,7 +258,7 @@ function get_logs!(ml::MultiEventLog; only_local=true) end end return logs - end + end end From ceb75f669324657c50c5576eca935f351d142c49 Mon Sep 17 00:00:00 2001 From: Madhu Patel <54492585+Madhupatel08@users.noreply.github.com> Date: Thu, 23 Mar 2023 22:31:35 +0530 Subject: [PATCH 3/4] Updated get_logs! --- lib/TimespanLogging/src/core.jl | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/lib/TimespanLogging/src/core.jl b/lib/TimespanLogging/src/core.jl index 63b1d07ff..87d0974d8 100644 --- a/lib/TimespanLogging/src/core.jl +++ b/lib/TimespanLogging/src/core.jl @@ -235,21 +235,33 @@ end function get_logs!(ml::MultiEventLog; only_local=true) if only_local - # Only return logs from current process - mls = get_state(ml) - return mls.consumer_logs + # Only return logs from the current process + logs = Dict{Int, Dict{Symbol,Vector}}() + pid = myid() + logs[pid] = remotecall_fetch(pid, ml) do ml + mls = get_state(ml) + lock(event_log_lock) do + sublogs = Dict{Symbol,Vector}() + for name in keys(mls.consumers) + sublogs[name] = copy(mls.consumer_logs[name]) + mls.consumer_logs[name] = [] + end + sublogs + end + end + return logs else # Return logs from all processes - logs = Dict{Int,Dict{Symbol,Vector}}() + logs = Dict{Int, Dict{Symbol,Vector}}() wkrs = procs() - @sync for p in wkrs + @sync for p in wkrs @async begin - logs[p] = remotecall_fetch(p, ml) do ml + logs[p] = remotecall_fetch(p, ml) do ml mls = get_state(ml) lock(event_log_lock) do sublogs = Dict{Symbol,Vector}() for name in keys(mls.consumers) - sublogs[name] = mls.consumer_logs[name] + sublogs[name] = copy(mls.consumer_logs[name]) mls.consumer_logs[name] = [] end sublogs @@ -478,4 +490,3 @@ function summarize_events(time_spent, gc_diff, profiler_samples) end summarize_events(xs) = summarize_events(aggregate_events(xs)...) - From 06def0b52e81fc62fd38b3dd8ffc5ba6d1238e8e Mon Sep 17 00:00:00 2001 From: Madhu Patel <54492585+Madhupatel08@users.noreply.github.com> Date: Thu, 23 Mar 2023 22:32:23 +0530 Subject: [PATCH 4/4] Updating Tests --- test/logging.jl | 148 ++++++++++++++++++++++++++---------------------- 1 file changed, 80 insertions(+), 68 deletions(-) diff --git a/test/logging.jl b/test/logging.jl index c4431bba5..8e5e9a184 100644 --- a/test/logging.jl +++ b/test/logging.jl @@ -1,4 +1,5 @@ import TimespanLogging +using TimespanLogging: get_logs! import TimespanLogging: Timespan, Event, Events, LocalEventLog, MultiEventLog @testset "Logging" begin @@ -60,74 +61,85 @@ import TimespanLogging: Timespan, Event, Events, LocalEventLog, MultiEventLog end end @testset "MultiEventLog" begin - ctx = Context() - ml = MultiEventLog() - ml[:core] = Events.CoreMetrics() - ml[:id] = Events.IDMetrics() - ml[:timeline] = Events.TimelineMetrics() - ml[:wsat] = Dagger.Events.WorkerSaturation() - ml[:loadavg] = Events.CPULoadAverages() - ml[:bytes] = Dagger.Events.BytesAllocd() - ml[:mem] = Events.MemoryFree() - ml[:esat] = Events.EventSaturation() - ml[:psat] = Dagger.Events.ProcessorSaturation() - ctx.log_sink = ml - - A = rand(Blocks(4, 4), 16, 16) - collect(ctx, A*A) - - logs = TimespanLogging.get_logs!(ml) - for w in keys(logs) - len = length(logs[w][:core]) - if w == 1 - @test len > 1 - end - for c in (:core, :id, :timeline, :wsat, :loadavg, :bytes, :mem, :esat, :psat) - @test haskey(logs[w], c) - @test length(logs[w][c]) == len - end - end - @test length(keys(logs)) > 1 - - l1 = logs[1] - core = l1[:core] - @test !any(isnothing, core) - esat = l1[:esat] - @test any(e->haskey(e, :scheduler_init), esat) - @test any(e->haskey(e, :schedule), esat) - @test any(e->haskey(e, :fire), esat) - @test any(e->haskey(e, :take), esat) - @test any(e->haskey(e, :finish), esat) - # Note: May one day be true as scheduler evolves - @test !any(e->haskey(e, :compute), esat) - @test !any(e->haskey(e, :move), esat) - psat = l1[:psat] - # Note: May become false - @test all(e->length(e) == 0, psat) - - had_psat_proc = 0 - for wo in filter(w->w != 1, keys(logs)) - lo = logs[wo] - esat = lo[:esat] - @test !any(e->haskey(e, :scheduler_init), esat) - @test !any(e->haskey(e, :schedule), esat) - @test !any(e->haskey(e, :fire), esat) - @test !any(e->haskey(e, :take), esat) - @test !any(e->haskey(e, :finish), esat) - psat = lo[:psat] - if any(e->length(e) > 0, psat) - had_psat_proc += 1 - @test any(e->haskey(e, :compute), esat) - @test any(e->haskey(e, :move), esat) - end - end - @test had_psat_proc > 0 - - logs = TimespanLogging.get_logs!(ml) - for w in keys(logs) - for c in keys(logs[w]) - @test isempty(logs[w][c]) - end + @testset "Get logs for only_local = true and only_local = false" begin + ctx = Context() + ml = MultiEventLog() + ml[:core] = Events.CoreMetrics() + ml[:id] = Events.IDMetrics() + ml[:timeline] = Events.TimelineMetrics() + ml[:wsat] = Dagger.Events.WorkerSaturation() + ml[:loadavg] = Events.CPULoadAverages() + ml[:bytes] = Dagger.Events.BytesAllocd() + ml[:mem] = Events.MemoryFree() + ml[:esat] = Events.EventSaturation() + ml[:psat] = Dagger.Events.ProcessorSaturation() + ctx.log_sink = ml + + A = rand(Blocks(4, 4), 16, 16) + collect(ctx, A*A) + for only_local in (true, false) + logs = TimespanLogging.get_logs!(ml) + if only_local + logs = TimespanLogging.get_logs!(ml, only_local=true) + @test logs isa Dict{Symbol, Vector} + len = length(logs) + @test len > 1 + @test !isempty(logs) + else + logs = TimespanLogging.get_logs!(ml, only_local=false) + @test logs isa Dict{Int, Dict{Symbol, Vector}} + for w in keys(logs) + len = length(logs[w][:core]) + if w == 1 + @test len > 1 + end + for c in (:core, :id, :timeline, :wsat, :loadavg, :bytes, :mem, :esat, :psat) + @test haskey(logs[w], c) + @test length(logs[w][c]) == len + end + end + @test length(keys(logs)) > 1 + logs = TimespanLogging.get_logs!(ml, only_local=false) + for w in keys(logs) + for c in keys(logs[w]) + @test isempty(logs[w][c]) + end + end + end + l1 = logs[1] + core = l1[:core] + @test !any(isnothing, core) + esat = l1[:esat] + @test any(e->haskey(e, :scheduler_init), esat) + @test any(e->haskey(e, :schedule), esat) + @test any(e->haskey(e, :fire), esat) + @test any(e->haskey(e, :take), esat) + @test any(e->haskey(e, :finish), esat) + # Note: May one day be true as scheduler evolves + @test !any(e->haskey(e, :compute), esat) + @test !any(e->haskey(e, :move), esat) + psat = l1[:psat] + # Note: May become false + @test all(e->length(e) == 0, psat) + had_psat_proc = 0 + + for wo in filter(w->w != 1, keys(logs)) + lo = logs[wo] + esat = lo[:esat] + @test !any(e->haskey(e, :scheduler_init), esat) + @test !any(e->haskey(e, :schedule), esat) + @test !any(e->haskey(e, :fire), esat) + @test !any(e->haskey(e, :take), esat) + @test !any(e->haskey(e, :finish), esat) + psat = lo[:psat] + if any(e->length(e) > 0, psat) + had_psat_proc += 1 + @test any(e->haskey(e, :compute), esat) + @test any(e->haskey(e, :move), esat) + end + end + @test had_psat_proc > 0 + end end end end