diff --git a/lib/TimespanLogging/src/core.jl b/lib/TimespanLogging/src/core.jl index 72717acf9..87d0974d8 100644 --- a/lib/TimespanLogging/src/core.jl +++ b/lib/TimespanLogging/src/core.jl @@ -233,28 +233,47 @@ function write_event(ml::MultiEventLog, event::Event) end end -function get_logs!(ml::MultiEventLog; only_local=false) - logs = Dict{Int,Dict{Symbol,Vector}}() - wkrs = only_local ? myid() : procs() - # FIXME: Log this logic - @sync for p in wkrs - @async begin - logs[p] = remotecall_fetch(p, ml) do ml - mls = get_state(ml) - lock(event_log_lock) do - sublogs = Dict{Symbol,Vector}() - for name in keys(mls.consumers) - sublogs[name] = mls.consumer_logs[name] - mls.consumer_logs[name] = [] +function get_logs!(ml::MultiEventLog; only_local=true) + if only_local + # Only return logs from the current process + logs = Dict{Int, Dict{Symbol,Vector}}() + pid = myid() + logs[pid] = remotecall_fetch(pid, ml) do ml + mls = get_state(ml) + lock(event_log_lock) do + sublogs = Dict{Symbol,Vector}() + for name in keys(mls.consumers) + sublogs[name] = copy(mls.consumer_logs[name]) + mls.consumer_logs[name] = [] + end + sublogs + end + end + return logs + else + # Return logs from all processes + logs = Dict{Int, Dict{Symbol,Vector}}() + wkrs = procs() + @sync for p in wkrs + @async begin + logs[p] = remotecall_fetch(p, ml) do ml + mls = get_state(ml) + lock(event_log_lock) do + sublogs = Dict{Symbol,Vector}() + for name in keys(mls.consumers) + sublogs[name] = copy(mls.consumer_logs[name]) + mls.consumer_logs[name] = [] + end + sublogs end - sublogs end end end + return logs end - return logs end + # Core logging operations empty_prof() = ProfilerResult(UInt[], Dict{UInt64, Vector{Base.StackTraces.StackFrame}}(), UInt[]) diff --git a/test/logging.jl b/test/logging.jl index c4431bba5..8e5e9a184 100644 --- a/test/logging.jl +++ b/test/logging.jl @@ -1,4 +1,5 @@ import TimespanLogging +using TimespanLogging: get_logs! import TimespanLogging: Timespan, Event, Events, LocalEventLog, MultiEventLog @testset "Logging" begin @@ -60,74 +61,85 @@ import TimespanLogging: Timespan, Event, Events, LocalEventLog, MultiEventLog end end @testset "MultiEventLog" begin - ctx = Context() - ml = MultiEventLog() - ml[:core] = Events.CoreMetrics() - ml[:id] = Events.IDMetrics() - ml[:timeline] = Events.TimelineMetrics() - ml[:wsat] = Dagger.Events.WorkerSaturation() - ml[:loadavg] = Events.CPULoadAverages() - ml[:bytes] = Dagger.Events.BytesAllocd() - ml[:mem] = Events.MemoryFree() - ml[:esat] = Events.EventSaturation() - ml[:psat] = Dagger.Events.ProcessorSaturation() - ctx.log_sink = ml - - A = rand(Blocks(4, 4), 16, 16) - collect(ctx, A*A) - - logs = TimespanLogging.get_logs!(ml) - for w in keys(logs) - len = length(logs[w][:core]) - if w == 1 - @test len > 1 - end - for c in (:core, :id, :timeline, :wsat, :loadavg, :bytes, :mem, :esat, :psat) - @test haskey(logs[w], c) - @test length(logs[w][c]) == len - end - end - @test length(keys(logs)) > 1 - - l1 = logs[1] - core = l1[:core] - @test !any(isnothing, core) - esat = l1[:esat] - @test any(e->haskey(e, :scheduler_init), esat) - @test any(e->haskey(e, :schedule), esat) - @test any(e->haskey(e, :fire), esat) - @test any(e->haskey(e, :take), esat) - @test any(e->haskey(e, :finish), esat) - # Note: May one day be true as scheduler evolves - @test !any(e->haskey(e, :compute), esat) - @test !any(e->haskey(e, :move), esat) - psat = l1[:psat] - # Note: May become false - @test all(e->length(e) == 0, psat) - - had_psat_proc = 0 - for wo in filter(w->w != 1, keys(logs)) - lo = logs[wo] - esat = lo[:esat] - @test !any(e->haskey(e, :scheduler_init), esat) - @test !any(e->haskey(e, :schedule), esat) - @test !any(e->haskey(e, :fire), esat) - @test !any(e->haskey(e, :take), esat) - @test !any(e->haskey(e, :finish), esat) - psat = lo[:psat] - if any(e->length(e) > 0, psat) - had_psat_proc += 1 - @test any(e->haskey(e, :compute), esat) - @test any(e->haskey(e, :move), esat) - end - end - @test had_psat_proc > 0 - - logs = TimespanLogging.get_logs!(ml) - for w in keys(logs) - for c in keys(logs[w]) - @test isempty(logs[w][c]) - end + @testset "Get logs for only_local = true and only_local = false" begin + ctx = Context() + ml = MultiEventLog() + ml[:core] = Events.CoreMetrics() + ml[:id] = Events.IDMetrics() + ml[:timeline] = Events.TimelineMetrics() + ml[:wsat] = Dagger.Events.WorkerSaturation() + ml[:loadavg] = Events.CPULoadAverages() + ml[:bytes] = Dagger.Events.BytesAllocd() + ml[:mem] = Events.MemoryFree() + ml[:esat] = Events.EventSaturation() + ml[:psat] = Dagger.Events.ProcessorSaturation() + ctx.log_sink = ml + + A = rand(Blocks(4, 4), 16, 16) + collect(ctx, A*A) + for only_local in (true, false) + logs = TimespanLogging.get_logs!(ml) + if only_local + logs = TimespanLogging.get_logs!(ml, only_local=true) + @test logs isa Dict{Symbol, Vector} + len = length(logs) + @test len > 1 + @test !isempty(logs) + else + logs = TimespanLogging.get_logs!(ml, only_local=false) + @test logs isa Dict{Int, Dict{Symbol, Vector}} + for w in keys(logs) + len = length(logs[w][:core]) + if w == 1 + @test len > 1 + end + for c in (:core, :id, :timeline, :wsat, :loadavg, :bytes, :mem, :esat, :psat) + @test haskey(logs[w], c) + @test length(logs[w][c]) == len + end + end + @test length(keys(logs)) > 1 + logs = TimespanLogging.get_logs!(ml, only_local=false) + for w in keys(logs) + for c in keys(logs[w]) + @test isempty(logs[w][c]) + end + end + end + l1 = logs[1] + core = l1[:core] + @test !any(isnothing, core) + esat = l1[:esat] + @test any(e->haskey(e, :scheduler_init), esat) + @test any(e->haskey(e, :schedule), esat) + @test any(e->haskey(e, :fire), esat) + @test any(e->haskey(e, :take), esat) + @test any(e->haskey(e, :finish), esat) + # Note: May one day be true as scheduler evolves + @test !any(e->haskey(e, :compute), esat) + @test !any(e->haskey(e, :move), esat) + psat = l1[:psat] + # Note: May become false + @test all(e->length(e) == 0, psat) + had_psat_proc = 0 + + for wo in filter(w->w != 1, keys(logs)) + lo = logs[wo] + esat = lo[:esat] + @test !any(e->haskey(e, :scheduler_init), esat) + @test !any(e->haskey(e, :schedule), esat) + @test !any(e->haskey(e, :fire), esat) + @test !any(e->haskey(e, :take), esat) + @test !any(e->haskey(e, :finish), esat) + psat = lo[:psat] + if any(e->length(e) > 0, psat) + had_psat_proc += 1 + @test any(e->haskey(e, :compute), esat) + @test any(e->haskey(e, :move), esat) + end + end + @test had_psat_proc > 0 + end end end end