Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various errors working with DTables.jl #438

Open
StevenWhitaker opened this issue Oct 2, 2023 · 7 comments
Open

Various errors working with DTables.jl #438

StevenWhitaker opened this issue Oct 2, 2023 · 7 comments
Labels

Comments

@StevenWhitaker
Copy link

I tried to create a MWE that was closer to the actual workflow I'm working with. I'm guessing the errors occurring here are related to #437 (one of the four reported errors below is the same as the linked issue). I hope this is helpful and not just extra noise!

Contents of mwe.jl:

using Distributed
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables, DataFrames, CSV
@everywhere job_channel = Channel(100)

remotecall(2) do
    while true
        job = take!(job_channel)
        try
            func = job[1]
            args = job[2:end]
            func(args...)
        catch ex
            @info "error $ex"
            @info "stacktrace: $(stacktrace(catch_backtrace()))"
        end
    end
end

remotecall_fetch(2) do
    dt = DTable(x -> CSV.File(x), ["file.csv"]; tabletype = DataFrame)
    df = fetch(dt)
    cols1 = [df[!, c] for c in 1:48]
    cols2 = [df[!, c] for c in 49:102]
    cols = (cols1, cols2)
    cols_appended = (cols1, (cols2..., rand(length(cols2[1]))))
    df = DataFrame(
        (names(df)[1:48] .=> cols_appended[1])...,
        ((names(df)[49:102]..., "appended") .=> cols_appended[2])...;
        copycols = false,
    )
    dt = DTable(df)
    @info "$(length(dt))"
    @info "$(length(dt))"
    df = fetch(dt)
    cols1 = [df[!, c] for c in 1:48]
    cols2 = [df[!, c] for c in 49:102]
    cols = (cols1, cols2)
    df = fetch(dt)
    foreach((:new1, :new2), (rand(length(dt)), rand(length(dt)))) do name, val
        setproperty!(df, name, val)
    end
    dt = DTable(df)
    i = [6, 12, 48, 93, 94]
    dt = select(dt, i...; copycols = false)
    gdt = groupby(dt, Symbol.(names(df)[[6, 12, 48]]))
    gkeys = sort!(collect(keys(gdt)))
    sums = map(gkeys) do key
        reduce(+, gdt[key]; cols = Symbol.(names(df)[[93, 94]]))
    end .|> fetch
end

I included mwe.jl in a fresh Julia session multiple times (meaning each include occurred in its own fresh Julia session) and recorded the following errors. Note that nothing changed in mwe.jl from run to run.

Error 1:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930
ERROR: LoadError: On worker 2:
MethodError: Cannot `convert` an object of type
  Vector{Any} to an object of type
  Union{Dagger.Thunk, Dagger.Chunk}

Closest candidates are:
  convert(::Type{T}, ::T) where T
   @ Base Base.jl:64

Stacktrace:
  [1] get!
    @ ./dict.jl:455
  [2] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [3] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [4] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [5] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
  [6] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
  [7] #invokelatest#2
    @ ./essentials.jl:819
  [8] invokelatest
    @ ./essentials.jl:816
  [9] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [10] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [11] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [12] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [7] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [8] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [9] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
 [10] #39
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:35 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] _collect
    @ ./array.jl:802
 [13] collect_similar
    @ ./array.jl:711
 [14] map
    @ ./abstractarray.jl:3263
 [15] map
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:35
 [16] _manipulate
    @ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:89
 [17] #manipulate#247
    @ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:48
 [18] #select#258
    @ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:171
 [19] #7
    @ ~/tmp/mwe.jl:47
 [20] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [21] invokelatest
    @ ./essentials.jl:816
 [22] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [23] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [24] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [25] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:22
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22

Error 2:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930
ERROR: LoadError: On worker 2:
UndefRefError: access to undefined reference
Stacktrace:
  [1] getindex
    @ ./essentials.jl:13 [inlined]
  [2] get!
    @ ./dict.jl:465
  [3] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [4] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [5] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [6] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
  [7] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
  [8] #invokelatest#2
    @ ./essentials.jl:819
  [9] invokelatest
    @ ./essentials.jl:816
 [10] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [11] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [12] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [13] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [7] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [8] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [9] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
 [10] #15
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] collect
    @ ./array.jl:782
 [13] chunk_lengths
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:254
 [14] length
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:258
 [15] #7
    @ ~/tmp/mwe.jl:42
 [16] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [17] invokelatest
    @ ./essentials.jl:816
 [18] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [19] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [20] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [21] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:22
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22

Error 3:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930

[7463] signal (11.1): Segmentation fault
in expression starting at /home/steven/tmp/mwe.jl:22
jl_object_id__cold at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/builtins.c:417
type_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1332
typekey_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1344
jl_precompute_memoized_dt at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1409
inst_datatype_inner at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1731
jl_inst_arg_tuple_type at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1826
arg_type_tuple at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2100 [inlined]
jl_lookup_generic_ at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2884 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2936
collect_task_inputs at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:392
signature at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:256
#99 at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:680
lock at ./lock.jl:229
schedule! at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:642 [inlined]
schedule! at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:642 [inlined]
scheduler_run at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:508
#compute_dag#82 at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:449
compute_dag at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:414 [inlined]
#compute#141 at /home/steven/.julia/packages/Dagger/ZOt9H/src/compute.jl:23
compute at /home/steven/.julia/packages/Dagger/ZOt9H/src/compute.jl:22 [inlined]
macro expansion at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/eager.jl:28 [inlined]
#50 at ./threadingconstructs.jl:410
unknown function (ip: 0x7efbf8213f8f)
_jl_invoke at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2758 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2940
jl_apply at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/julia.h:1880 [inlined]
start_task at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/task.c:1092
Allocations: 34071935 (Pool: 34043867; Big: 28068); GC: 39
zsh: segmentation fault  julia --project

Error 3b: Occasionally the segfault was preceded by one or more occurrences of:

Unhandled Task ERROR: ArgumentError: destination has fewer elements than required
Stacktrace:
 [1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}})
   @ Base ./abstractarray.jl:949
 [2] _collect
   @ ./array.jl:713 [inlined]
 [3] collect
   @ ./array.jl:707 [inlined]
 [4] macro expansion
   @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1189 [inlined]
 [5] (::Dagger.Sch.var"#126#133"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})()
   @ Dagger.Sch ./task.jl:134

Error 4:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930
ERROR: LoadError: On worker 2:
AssertionError: Multiple concurrent writes to Dict detected!
Stacktrace:
  [1] rehash!
    @ ./dict.jl:208
  [2] _setindex!
    @ ./dict.jl:355 [inlined]
  [3] get!
    @ ./dict.jl:477
  [4] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [5] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [6] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [7] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
  [8] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
  [9] #invokelatest#2
    @ ./essentials.jl:819
 [10] invokelatest
    @ ./essentials.jl:816
 [11] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [12] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [13] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [14] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [7] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [8] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [9] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
 [10] #48
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] collect_to!
    @ ./array.jl:840 [inlined]
 [13] collect_to_with_first!
    @ ./array.jl:818 [inlined]
 [14] collect
    @ ./array.jl:792
 [15] #reduce#42
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:111
 [16] #14
    @ ~/tmp/mwe.jl:51
 [17] iterate
    @ ./generator.jl:47 [inlined]
 [18] collect_to!
    @ ./array.jl:840 [inlined]
 [19] collect_to_with_first!
    @ ./array.jl:818 [inlined]
 [20] _collect
    @ ./array.jl:812
 [21] collect_similar
    @ ./array.jl:711
 [22] map
    @ ./abstractarray.jl:3263
 [23] #7
    @ ~/tmp/mwe.jl:50
 [24] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [25] invokelatest
    @ ./essentials.jl:816
 [26] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [27] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [28] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [29] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:22
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22

Comments:

  • The segfault was by far the most common error; the others occurred just once each (over the 10--15 trials I ran).
  • In my actual work, I don't think I've ever come across the MethodError with convert (error 1). I most commonly run into the error mentioned in "Multiple concurrent writes to Dict detected!" with DTables.reduce #437 (comment), which I did not see with mwe.jl.
  • "file.csv" is a 157 MB table with 233930 rows and 102 columns of String and Float64 values.
  • The remotecall probably isn't necessary for reproducing the bugs, but I included it because that is how my actual work is.
@jpsamaroo jpsamaroo added the bug label Oct 3, 2023
@jpsamaroo
Copy link
Member

@StevenWhitaker can you try reproducing these again on Dagger master?

@StevenWhitaker
Copy link
Author

Thanks for getting a patch released!

The issues are different now, so that's something ;)

Now I observe the following behavior (EDIT: when running Julia with multiple threads):

  • Sometimes the code runs to completion (yay!).
  • Sometimes the code hangs when computing sums (not sure if it was in map or reduce or fetch).
  • Sometimes the following error occurs when computing sums:
    Unhandled Task ERROR: ArgumentError: destination has fewer elements than required
    Stacktrace:
     [1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}})
       @ Base ./abstractarray.jl:949
     [2] _collect
       @ ./array.jl:713 [inlined]
     [3] collect
       @ ./array.jl:707 [inlined]
     [4] macro expansion
       @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1189 [inlined]
     [5] (::Dagger.Sch.var"#128#135"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})()
       @ Dagger.Sch ./task.jl:134
    
    After this error, most of the time it hangs, sometimes it runs to completion.

I realized that I start Julia with multiple threads by default, so I also ran the code with a single thread (julia --project -t1). In this case, I saw the Unhandled Task ERROR once (incidentally, the first time), and every time I ran the code (including the first time) it ran to completion.

So, besides the one sporadic error, this issue seems to be addressed, assuming the issues I observed with multiple threads are due to the interplay between Distributed and Threads.

@StevenWhitaker
Copy link
Author

Edit to my previous comment:

I'm running my actual code with a single thread now, and it also hangs, so there might be something else still at play.

@jpsamaroo
Copy link
Member

I can reproduce the hangs - I'll keep investigating! Thanks for your patience 🙂

@jpsamaroo
Copy link
Member

jpsamaroo commented Oct 23, 2023

Running through your example with Dagger's logging enabled, I find that we spend a good bit of time (about 0.3-0.5 s for me) in the reduce calls at the end, which are running in serial over 233K keys - at this pace, I can see why it looks like it's hanging 😆

A large portion of the time is spent in the GC (about 40% time over ~80K allocations totaling ~500MB), so I suspect allocations are what's killing performance. If I can figure out how to reduce those allocations, it would also be reasonable to parallelize the reduce calls (by doing two maps, one to launch a task per key, and one to fetch the results), and that should give us much better runtimes.

Additionally, the other calls that took a while are select and groupby, so we could probably look into improving those a bit.

EDIT: Those timings and allocations are so high because of logging - they drop significantly when logging is disabled, although then I see a ton of long-lived allocations that threaten to crash Julia. I still need to see if some of those allocations can be reduced.

EDIT 2: Silly me, these reductions are already asynchronous 😄 I guess the task completes before we return from reduce anyway, since we're only running with 1 thread.

@jpsamaroo
Copy link
Member

Ok, something that I would recommend is, instead of the map -> reduce pattern, just use a single reduce call: reduce(+, gdt; cols=Symbol.(names(df)[[93,94]])). This appears to be much more memory and time efficient, which makes sense because it can internally do more optimizations (it already knows that you intend to reduce over each key in the group).

Can you test that and confirm whether it speeds your script up sufficiently for it to complete in a reasonable amount of time?

@StevenWhitaker
Copy link
Author

Thanks for the tip. I tried it out on my actual project (not the exact example in the OP), and it does seem to help, but I still see the code hang occasionally. I'm pretty sure it's not just taking forever, because when the code does complete, it doesn't take that long, and when it hangs the cpu utilization drops to 0.

It actually seems to be the case that my code hangs only when calling my main function again after a successful run. Or at least the chances of hanging are higher in that case. I'm not really sure why that would be the case, though.

I also saw a new error (when calling fetch on a DTable, with Dagger v0.18.4 and DTables v0.4.2):

Dagger.ThunkFailedException{Dagger.ThunkFailedException{CapturedException}}(Thunk[3](isnonempty, Any[Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame])]), Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), Dagger.ThunkFailedException{CapturedException}(Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), CapturedException(UndefRefError(), Any[(getindex at essentials.jl:13 [inlined], 1), (get! at dict.jl:465, 1), (OSProc at processor.jl:109 [inlined], 2), (do_task at Sch.jl:1368, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#132 at task.jl:134, 1)])))

It looks like it has to do with file loading, so this is the code I use to load .csv files:

DTable(x -> CSV.File(x), [filepath]; tabletype = DataFrame)

I only saw the error once, though.

And another one-time error (in the function with the reduce call):

      From worker 4:    ┌ 2023-10-24T13:00:07.238 ] pid: 20516 proc: 4 Error:  Error on 4 while connecting to peer 3, exiting
      From worker 4:    │   exception =
      From worker 4:    │    ConcurrencyViolationError("lock must be held")
      From worker 4:    │    Stacktrace:
      From worker 4:    │      [1] concurrency_violation()
      From worker 4:    │        @ Base ./condition.jl:8
      From worker 4:    │      [2] assert_havelock
      From worker 4:    │        @ ./condition.jl:25 [inlined]
      From worker 4:    │      [3] assert_havelock
      From worker 4:    │        @ ./condition.jl:48 [inlined]
      From worker 4:    │      [4] assert_havelock
      From worker 4:    │        @ ./condition.jl:72 [inlined]
      From worker 4:    │      [5] notify(c::Condition, arg::Any, all::Bool, error::Bool)
      From worker 4:    │        @ Base ./condition.jl:150
      From worker 4:    │      [6] #notify#622
      From worker 4:    │        @ ./condition.jl:148 [inlined]
      From worker 4:    │      [7] notify (repeats 2 times)
      From worker 4:    │        @ ./condition.jl:148 [inlined]
      From worker 4:    │      [8] set_worker_state
      From worker 4:    │        @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:148 [inlined]
      From worker 4:    │      [9] Distributed.Worker(id::Int, r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, manager::Distributed.DefaultClusterManager; version::Nothing, config::WorkerConfig)
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:126
      From worker 4:    │     [10] Worker
      From worker 4:    │        @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:116 [inlined]
      From worker 4:    │     [11] connect_to_peer(manager::Distributed.DefaultClusterManager, rpid::Int, wconfig::WorkerConfig)
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:363
      From worker 4:    │     [12] (::Distributed.var"#121#123"{Int, WorkerConfig})()
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:349
      From worker 4:    │     [13] exec_conn_func(w::Distributed.Worker)
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:181
      From worker 4:    │     [14] (::Distributed.var"#21#24"{Distributed.Worker})()
      From worker 4:    └        @ Distributed ./task.jl:514

The above errors occurred when calling my main function the first time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants