Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sinks): Refactor internal sinks and simplify #2111

Merged
merged 19 commits into from
Apr 1, 2020

Conversation

LucioFranco
Copy link
Contributor

@LucioFranco LucioFranco commented Mar 20, 2020

This is a decently large refactor PR of our internal sink utilities. Reference #1666 for motivation for this change. At a high level this change introduces three new types StreamSink, BatchSink and PartitionBatchSink. These types are similar to our current types except for instead of wrapping sinks (StreamSink still takes an inner sink) and causing a large type mess, we upfront expose all trait bounds and do not allow users to nest things. This achieves the same goals as we had previously but with much better compiler errors and a much simpler implementation.

Both batching sinks now take a Service instead of some inner sink, which now allows us to surface trait bounds at the root level. Before, we used extension traits that would cause crazy error messages if you missed one slight bound.

Another benefit to this change is that we now allow all requests to be dispatched within their own task which should allow them to be driven independently from the sink. This should solve #299.

Other things added are, improved testing around acking, lingers and batching. Improved tracing and docs explaining what is going on under the hood. We have also unified all sinks to live under one file now instead of within 3 different places. This should clean up our sink util modules.

Closes #299, #1666, and #2049.

Signed-off-by: Lucio Franco <[email protected]>
Signed-off-by: Lucio Franco <[email protected]>
Signed-off-by: Lucio Franco <[email protected]>
@binarylogic
Copy link
Contributor

Thanks. Would you mind elaborating on the bugs that you discovered? I'd like to open issues detailing them for posterity. This helps when users search for issues in our repo. This PR should close those issues.

@LucioFranco
Copy link
Contributor Author

@binarylogic Besides the IO issue which should be fixed in this PR, there were just a couple deep implementation issues with our batching. I don't think we have seen any of them since no one has reported hanging of tasks. So I don't think its worth it just yet to write issues for them since they are far from user facing bugs but more so potential things that could happen.

Once, I get a bit further with this PR I'll write up a proper description for the PR and some better docs around batching.

@Hoverbear Hoverbear added the type: enhancement A value-adding code change that enhances its existing functionality. label Mar 23, 2020
@LucioFranco
Copy link
Contributor Author

LucioFranco commented Mar 24, 2020

Update, there are a few things left to do:

  • subtle bug with loki and logdna sinks
  • Vendor our own version of tower-timeout that uses Into implementation
  • Implement custom downcast_ref for retrylogic
  • nix old code
  • Add docs for sinks

Signed-off-by: Lucio Franco <[email protected]>
Signed-off-by: Lucio Franco <[email protected]>
Signed-off-by: Lucio Franco <[email protected]>
Signed-off-by: Lucio Franco <[email protected]>
Signed-off-by: Lucio Franco <[email protected]>
@LucioFranco LucioFranco marked this pull request as ready for review March 26, 2020 15:45
@LucioFranco LucioFranco requested a review from a user March 26, 2020 15:45
@LucioFranco
Copy link
Contributor Author

@lukesteensen @Hoverbear this should be ready for review! I've updated the main PR description.

@LucioFranco LucioFranco requested a review from Hoverbear March 26, 2020 15:51
Copy link
Member

@bruceg bruceg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see nothing majorly objectionable here, though it's much too large a change for me to grasp all the implications of it.

Comment on lines 104 to 105
BatchedHttpSink::new(common, Buffer::new(gzip), request, batch, tls_settings, &cx);
BatchedHttpSink::new(common, Buffer::new(gzip), request, batch, tls_settings, &cx)
.sink_map_err(|e| error!("Fatal elasticsearch sink error: {}", e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice a number of the new sinks add this error handler. Would it make sense to have that within the generic sink impls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, though it doesn't totally hurt here, I think in the future we want to take a different approach. At least this forces this to be handled per sink so its fine for now.

src/sinks/util/sink.rs Outdated Show resolved Hide resolved
@Hoverbear Hoverbear self-assigned this Mar 30, 2020
/// batches have been acked. This means if sequential requests r1, r2,
/// and r3 are dispatched and r2 and r3 complete, all events contained
/// in all requests will not be acked until r1 has completed.
pub struct BatchSink<S, B, Request, E = DefaultExecutor> {
Copy link
Contributor

@Hoverbear Hoverbear Mar 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be totally okay with these being separate files, but it's not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to do that, the big reason I decided to keep it one file is because this is just utility code. @lukesteensen any preference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a lot of code!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's a lot of code, but I don't feel too strongly either way.

src/sinks/util/sink.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@Hoverbear Hoverbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest looks good!

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems good! One thing that made this a little difficult to review was figuring out what in StreamSink, BatchSink, PartitionedBatchSink is new vs just moved/renamed. It's a lot to review in-depth, so I want to make sure we focus on what's actually changing.

src/sinks/util/http.rs Show resolved Hide resolved
/// We require our own timeout layer because the current
/// 0.1 version uses From intead of Into bounds for errors
/// this casues the whole stack to not align and not compile.
/// In future versions of tower this should be fixed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue we can follow for this? Would be great not to have to maintain this ourselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been fixed in the newer tower releases that we have not upgraded yet too.

The closest issue I can find is this one tower-rs/tower#387

src/sinks/util/sink.rs Show resolved Hide resolved
src/sinks/util/sink.rs Show resolved Hide resolved
src/sinks/util/sink.rs Outdated Show resolved Hide resolved
src/sinks/util/sink.rs Outdated Show resolved Hide resolved
src/sinks/util/sink.rs Show resolved Hide resolved
src/sinks/util/sink.rs Show resolved Hide resolved
src/sinks/util/sink.rs Show resolved Hide resolved
Comment on lines +444 to +447
// Apply back pressure if we are buffering more than
// 5 batches, this should only happen if the inner sink
// is apply back pressure.
if self.sending.len() > 5 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this cause backpressure very quickly if we have data with more than 5 partitions? I'm not quite sure what the goal is here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was set before, I was thinking about this but I actually don't think its even possible to go beyond 1 here since the semantics of service. I will take a look tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I looked into this more the reason I added it originally was to work around multiple batches being ready and the service applying back pressure. Since, we can fill multiple batches at once we want to buffer the complete ones until we can send them so we can continue to fill. Since, in theory the service should only be applying back pressure for a short amount of time this should only live here for a short period.

I also want to avoid refactoring this piece since it seems to be working correctly and we have not noticed issues. I suspect a lot of this code wil go away in the future and replaced by nicer async/await so I want to avoid investing too much. A lot of these types of work arounds are for the sink trait.

@LucioFranco
Copy link
Contributor Author

Thanks for the reviews @Hoverbear and @lukesteensen I'll fix this up with the feedback first thing tomorrow morning.

Signed-off-by: Lucio Franco <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A value-adding code change that enhances its existing functionality.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BatchSink does not properly drive the inner IO resources
5 participants