Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimespanLogging: Remove worker Id Nesting #387

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions lib/TimespanLogging/src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -233,28 +233,47 @@ function write_event(ml::MultiEventLog, event::Event)
end
end

function get_logs!(ml::MultiEventLog; only_local=false)
logs = Dict{Int,Dict{Symbol,Vector}}()
wkrs = only_local ? myid() : procs()
# FIXME: Log this logic
@sync for p in wkrs
@async begin
logs[p] = remotecall_fetch(p, ml) do ml
mls = get_state(ml)
lock(event_log_lock) do
sublogs = Dict{Symbol,Vector}()
for name in keys(mls.consumers)
sublogs[name] = mls.consumer_logs[name]
mls.consumer_logs[name] = []
function get_logs!(ml::MultiEventLog; only_local=true)
if only_local
# Only return logs from the current process
logs = Dict{Int, Dict{Symbol,Vector}}()
pid = myid()
logs[pid] = remotecall_fetch(pid, ml) do ml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you remotecall_fetch-ing here? The worker calling get_logs! with only_local=true is by definition the same worker that you need to fetch logs from, so you don't need that level of indirection.

mls = get_state(ml)
lock(event_log_lock) do
sublogs = Dict{Symbol,Vector}()
for name in keys(mls.consumers)
sublogs[name] = copy(mls.consumer_logs[name])
mls.consumer_logs[name] = []
end
sublogs
end
end
return logs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong, it's now returning logs with an extra layer of nesting.

else
# Return logs from all processes
logs = Dict{Int, Dict{Symbol,Vector}}()
wkrs = procs()
@sync for p in wkrs
@async begin
logs[p] = remotecall_fetch(p, ml) do ml
mls = get_state(ml)
lock(event_log_lock) do
sublogs = Dict{Symbol,Vector}()
for name in keys(mls.consumers)
sublogs[name] = copy(mls.consumer_logs[name])
mls.consumer_logs[name] = []
end
sublogs
end
sublogs
end
end
end
return logs
end
return logs
end


# Core logging operations

empty_prof() = ProfilerResult(UInt[], Dict{UInt64, Vector{Base.StackTraces.StackFrame}}(), UInt[])
Expand Down
148 changes: 80 additions & 68 deletions test/logging.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import TimespanLogging
using TimespanLogging: get_logs!
import TimespanLogging: Timespan, Event, Events, LocalEventLog, MultiEventLog

@testset "Logging" begin
Expand Down Expand Up @@ -60,74 +61,85 @@ import TimespanLogging: Timespan, Event, Events, LocalEventLog, MultiEventLog
end
end
@testset "MultiEventLog" begin
ctx = Context()
ml = MultiEventLog()
ml[:core] = Events.CoreMetrics()
ml[:id] = Events.IDMetrics()
ml[:timeline] = Events.TimelineMetrics()
ml[:wsat] = Dagger.Events.WorkerSaturation()
ml[:loadavg] = Events.CPULoadAverages()
ml[:bytes] = Dagger.Events.BytesAllocd()
ml[:mem] = Events.MemoryFree()
ml[:esat] = Events.EventSaturation()
ml[:psat] = Dagger.Events.ProcessorSaturation()
ctx.log_sink = ml

A = rand(Blocks(4, 4), 16, 16)
collect(ctx, A*A)

logs = TimespanLogging.get_logs!(ml)
for w in keys(logs)
len = length(logs[w][:core])
if w == 1
@test len > 1
end
for c in (:core, :id, :timeline, :wsat, :loadavg, :bytes, :mem, :esat, :psat)
@test haskey(logs[w], c)
@test length(logs[w][c]) == len
end
end
@test length(keys(logs)) > 1

l1 = logs[1]
core = l1[:core]
@test !any(isnothing, core)
esat = l1[:esat]
@test any(e->haskey(e, :scheduler_init), esat)
@test any(e->haskey(e, :schedule), esat)
@test any(e->haskey(e, :fire), esat)
@test any(e->haskey(e, :take), esat)
@test any(e->haskey(e, :finish), esat)
# Note: May one day be true as scheduler evolves
@test !any(e->haskey(e, :compute), esat)
@test !any(e->haskey(e, :move), esat)
psat = l1[:psat]
# Note: May become false
@test all(e->length(e) == 0, psat)

had_psat_proc = 0
for wo in filter(w->w != 1, keys(logs))
lo = logs[wo]
esat = lo[:esat]
@test !any(e->haskey(e, :scheduler_init), esat)
@test !any(e->haskey(e, :schedule), esat)
@test !any(e->haskey(e, :fire), esat)
@test !any(e->haskey(e, :take), esat)
@test !any(e->haskey(e, :finish), esat)
psat = lo[:psat]
if any(e->length(e) > 0, psat)
had_psat_proc += 1
@test any(e->haskey(e, :compute), esat)
@test any(e->haskey(e, :move), esat)
end
end
@test had_psat_proc > 0

logs = TimespanLogging.get_logs!(ml)
for w in keys(logs)
for c in keys(logs[w])
@test isempty(logs[w][c])
end
@testset "Get logs for only_local = true and only_local = false" begin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra level of testset nesting isn't necessary.

ctx = Context()
ml = MultiEventLog()
ml[:core] = Events.CoreMetrics()
ml[:id] = Events.IDMetrics()
ml[:timeline] = Events.TimelineMetrics()
ml[:wsat] = Dagger.Events.WorkerSaturation()
ml[:loadavg] = Events.CPULoadAverages()
ml[:bytes] = Dagger.Events.BytesAllocd()
ml[:mem] = Events.MemoryFree()
ml[:esat] = Events.EventSaturation()
ml[:psat] = Dagger.Events.ProcessorSaturation()
ctx.log_sink = ml

A = rand(Blocks(4, 4), 16, 16)
collect(ctx, A*A)
Comment on lines +78 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines need to go inside the loop, to ensure we're generating logs for each of only_local=true and only_local=false.

for only_local in (true, false)
logs = TimespanLogging.get_logs!(ml)
if only_local
logs = TimespanLogging.get_logs!(ml, only_local=true)
@test logs isa Dict{Symbol, Vector}
len = length(logs)
@test len > 1
@test !isempty(logs)
else
logs = TimespanLogging.get_logs!(ml, only_local=false)
@test logs isa Dict{Int, Dict{Symbol, Vector}}
for w in keys(logs)
len = length(logs[w][:core])
if w == 1
@test len > 1
end
for c in (:core, :id, :timeline, :wsat, :loadavg, :bytes, :mem, :esat, :psat)
@test haskey(logs[w], c)
@test length(logs[w][c]) == len
end
end
@test length(keys(logs)) > 1
logs = TimespanLogging.get_logs!(ml, only_local=false)
for w in keys(logs)
for c in keys(logs[w])
@test isempty(logs[w][c])
end
end
end
l1 = logs[1]
core = l1[:core]
@test !any(isnothing, core)
esat = l1[:esat]
@test any(e->haskey(e, :scheduler_init), esat)
@test any(e->haskey(e, :schedule), esat)
@test any(e->haskey(e, :fire), esat)
@test any(e->haskey(e, :take), esat)
@test any(e->haskey(e, :finish), esat)
# Note: May one day be true as scheduler evolves
@test !any(e->haskey(e, :compute), esat)
@test !any(e->haskey(e, :move), esat)
psat = l1[:psat]
# Note: May become false
@test all(e->length(e) == 0, psat)
had_psat_proc = 0

for wo in filter(w->w != 1, keys(logs))
lo = logs[wo]
esat = lo[:esat]
@test !any(e->haskey(e, :scheduler_init), esat)
@test !any(e->haskey(e, :schedule), esat)
@test !any(e->haskey(e, :fire), esat)
@test !any(e->haskey(e, :take), esat)
@test !any(e->haskey(e, :finish), esat)
psat = lo[:psat]
if any(e->length(e) > 0, psat)
had_psat_proc += 1
@test any(e->haskey(e, :compute), esat)
@test any(e->haskey(e, :move), esat)
end
end
@test had_psat_proc > 0
end
end
end
end