Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] p2p: add AsyncSecretConnection based on tokio #1464

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ test = false
[features]
default = ["flex-error/std", "flex-error/eyre_tracer"]
amino = ["prost-derive"]
async = ["tokio"]

[dependencies]
chacha20poly1305 = { version = "0.10", default-features = false, features = ["reduced-round"] }
Expand All @@ -50,3 +51,4 @@ tendermint-std-ext = { path = "../std-ext", version = "0.39.1", default-features

# optional dependencies
prost-derive = { version = "0.13", optional = true }
tokio = { version = "1", optional = true, features = ["io-util", "net"] }
7 changes: 7 additions & 0 deletions p2p/src/secret_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@
protocol::Version,
public_key::PublicKey,
};

#[cfg(feature = "async")]
pub use self::async_connection::AsyncSecretConnection;

Check failure on line 35 in p2p/src/secret_connection.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] p2p/src/secret_connection.rs#L35

error: item name ends with its containing module's name --> p2p/src/secret_connection.rs:35:33 | 35 | pub use self::async_connection::AsyncSecretConnection; | ^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#module_name_repetitions = note: `-D clippy::module-name-repetitions` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::module_name_repetitions)]`
Raw output
p2p/src/secret_connection.rs:35:33:e:error: item name ends with its containing module's name
  --> p2p/src/secret_connection.rs:35:33
   |
35 | pub use self::async_connection::AsyncSecretConnection;
   |                                 ^^^^^^^^^^^^^^^^^^^^^
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#module_name_repetitions
   = note: `-D clippy::module-name-repetitions` implied by `-D warnings`
   = help: to override `-D warnings` add `#[allow(clippy::module_name_repetitions)]`


__END__

use crate::error::Error;

#[cfg(feature = "amino")]
mod amino_types;

#[cfg(feature = "async")]
mod async_connection;

mod kdf;
mod nonce;
mod protocol;
Expand Down
193 changes: 193 additions & 0 deletions p2p/src/secret_connection/async_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use super::{
decrypt, encrypt, proto, EphemeralPublic, Error, Handshake, Nonce, PublicKey, ReceiveState,
SendState, Version, DATA_LEN_SIZE, DATA_MAX_SIZE, TAG_SIZE, TOTAL_FRAME_SIZE,
};
use std::slice;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpStream, ToSocketAddrs},
};

/// Encrypted connection between peers in a Tendermint network, implemented asynchronously using
/// Tokio as the underlying async runtime.
pub struct AsyncSecretConnection {
tcp_stream: TcpStream,
protocol_version: Version,
remote_pubkey: Option<PublicKey>,
send_state: SendState,
recv_state: ReceiveState,
}

impl AsyncSecretConnection {
/// Open a TCP connection to the given socket address, performing a `SecretConnection` handshake
/// and returning a new client upon success.
///
/// # Errors
///
/// * if TCP connection fails
/// * if sharing of the pubkey fails
/// * if sharing of the signature fails
/// * if receiving the signature fails
pub async fn connect_tcp<A: ToSocketAddrs>(

Check failure on line 31 in p2p/src/secret_connection/async_connection.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] p2p/src/secret_connection/async_connection.rs#L31

error: future cannot be sent between threads safely --> p2p/src/secret_connection/async_connection.rs:31:5 | 31 | / pub async fn connect_tcp<A: ToSocketAddrs>( 32 | | addr: A, 33 | | local_privkey: ed25519_consensus::SigningKey, 34 | | protocol_version: Version, 35 | | ) -> Result<Self, Error> { | |____________________________^ future returned by `connect_tcp` is not `Send` | note: captured value is not `Send` --> p2p/src/secret_connection/async_connection.rs:32:9 | 32 | addr: A, | ^^^^ has type `A` which is not `Send` = note: `A` doesn't implement `std::marker::Send` = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#future_not_send = note: `-D clippy::future-not-send` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::future_not_send)]`
Raw output
p2p/src/secret_connection/async_connection.rs:31:5:e:error: future cannot be sent between threads safely
  --> p2p/src/secret_connection/async_connection.rs:31:5
   |
31 | /     pub async fn connect_tcp<A: ToSocketAddrs>(
32 | |         addr: A,
33 | |         local_privkey: ed25519_consensus::SigningKey,
34 | |         protocol_version: Version,
35 | |     ) -> Result<Self, Error> {
   | |____________________________^ future returned by `connect_tcp` is not `Send`
   |
note: captured value is not `Send`
  --> p2p/src/secret_connection/async_connection.rs:32:9
   |
32 |         addr: A,
   |         ^^^^ has type `A` which is not `Send`
   = note: `A` doesn't implement `std::marker::Send`
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#future_not_send
   = note: `-D clippy::future-not-send` implied by `-D warnings`
   = help: to override `-D warnings` add `#[allow(clippy::future_not_send)]`


__END__
addr: A,
local_privkey: ed25519_consensus::SigningKey,
protocol_version: Version,
) -> Result<Self, Error> {
let mut tcp_stream = TcpStream::connect(addr).await?;

// Start a handshake process.
let local_pubkey = PublicKey::from(&local_privkey);
let (mut h, local_eph_pubkey) = Handshake::new(local_privkey, protocol_version);

// Write local ephemeral pubkey and receive one too.
let remote_eph_pubkey =
exchange_eph_pubkey(&mut tcp_stream, &local_eph_pubkey, protocol_version).await?;

// Compute a local signature (also recv_cipher & send_cipher)
let h = h.got_key(remote_eph_pubkey)?;

let mut sc = Self {
tcp_stream,
protocol_version,
remote_pubkey: None,
send_state: SendState {
cipher: h.state.send_cipher.clone(),
nonce: Nonce::default(),
},
recv_state: ReceiveState {
cipher: h.state.recv_cipher.clone(),
nonce: Nonce::default(),
buffer: vec![],
},
};

// Share each other's pubkey & challenge signature.
// NOTE: the data must be encrypted/decrypted using ciphers.
let auth_sig_msg = match local_pubkey {
PublicKey::Ed25519(ref pk) => {
sc.share_auth_signature(pk, &h.state.local_signature)
.await?
},
};

// Authenticate remote pubkey.
let remote_pubkey = h.got_signature(auth_sig_msg)?;

// All good!
sc.remote_pubkey = Some(remote_pubkey);
Ok(sc)
}

/// Returns the remote pubkey. Panics if there's no key.
///
/// # Panics
/// * if the remote pubkey is not initialized.
pub fn remote_pubkey(&self) -> PublicKey {
self.remote_pubkey.expect("remote_pubkey uninitialized")
}

async fn share_auth_signature(
&mut self,
pubkey: &ed25519_consensus::VerificationKey,
local_signature: &ed25519_consensus::Signature,
) -> Result<proto::p2p::AuthSigMessage, Error> {
let buf = self
.protocol_version
.encode_auth_signature(pubkey, local_signature);

self.write_all(&buf).await?;

let auth_sig = self.read_chunk().await?;
debug_assert_eq!(
auth_sig.len(),
self.protocol_version.auth_sig_msg_response_len()
);
self.protocol_version.decode_auth_signature(&buf)
}

async fn read_chunk<'a>(&'a mut self) -> Result<Vec<u8>, Error> {

Check failure on line 108 in p2p/src/secret_connection/async_connection.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] p2p/src/secret_connection/async_connection.rs#L108

error: the following explicit lifetimes could be elided: 'a --> p2p/src/secret_connection/async_connection.rs:108:25 | 108 | async fn read_chunk<'a>(&'a mut self) -> Result<Vec<u8>, Error> { | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes = note: `-D clippy::needless-lifetimes` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::needless_lifetimes)]`
Raw output
p2p/src/secret_connection/async_connection.rs:108:25:e:error: the following explicit lifetimes could be elided: 'a
   --> p2p/src/secret_connection/async_connection.rs:108:25
    |
108 |     async fn read_chunk<'a>(&'a mut self) -> Result<Vec<u8>, Error> {
    |                         ^^   ^^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes
    = note: `-D clippy::needless-lifetimes` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(clippy::needless_lifetimes)]`


__END__
debug_assert!(self.recv_state.buffer.is_empty());

let mut sealed_frame = [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE];
self.tcp_stream.read_exact(&mut sealed_frame).await?;

// decrypt the frame
let mut frame = [0_u8; TOTAL_FRAME_SIZE];
decrypt(
&sealed_frame,
&self.recv_state.cipher,
&self.recv_state.nonce,
&mut frame,
)?;

self.recv_state.nonce.increment();
// end decryption

let chunk_length = u32::from_le_bytes(frame[..4].try_into().expect("chunk framing failed"));

if chunk_length as usize > DATA_MAX_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("chunk is too big: {chunk_length}! max: {DATA_MAX_SIZE}"),
)
.into());
}

let mut chunk = vec![0; chunk_length as usize];
chunk.clone_from_slice(
&frame[DATA_LEN_SIZE
..(DATA_LEN_SIZE
.checked_add(chunk_length as usize)
.expect("chunk size addition overflow"))],
);

Ok(chunk)
}

/// Write encrypted data to the underlying TCP socket.
pub async fn write_all<'a>(&'a mut self, src: &'a [u8]) -> Result<usize, Error> {

Check failure on line 148 in p2p/src/secret_connection/async_connection.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] p2p/src/secret_connection/async_connection.rs#L148

error: docs for function which may panic missing `# Panics` section --> p2p/src/secret_connection/async_connection.rs:148:5 | 148 | pub async fn write_all<'a>(&'a mut self, src: &'a [u8]) -> Result<usize, Error> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | note: first possible panic found here --> p2p/src/secret_connection/async_connection.rs:164:17 | 164 | n = n | _________________^ 165 | | .checked_add(chunk.len()) 166 | | .expect("overflow when adding chunk lengths"); | |_____________________________________________________________^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_panics_doc = note: `-D clippy::missing-panics-doc` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::missing_panics_doc)]`
Raw output
p2p/src/secret_connection/async_connection.rs:148:5:e:error: docs for function which may panic missing `# Panics` section
   --> p2p/src/secret_connection/async_connection.rs:148:5
    |
148 |     pub async fn write_all<'a>(&'a mut self, src: &'a [u8]) -> Result<usize, Error> {
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
note: first possible panic found here
   --> p2p/src/secret_connection/async_connection.rs:164:17
    |
164 |               n = n
    |  _________________^
165 | |                 .checked_add(chunk.len())
166 | |                 .expect("overflow when adding chunk lengths");
    | |_____________________________________________________________^
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_panics_doc
    = note: `-D clippy::missing-panics-doc` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(clippy::missing_panics_doc)]`


__END__

Check failure on line 148 in p2p/src/secret_connection/async_connection.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] p2p/src/secret_connection/async_connection.rs#L148

error: docs for function returning `Result` missing `# Errors` section --> p2p/src/secret_connection/async_connection.rs:148:5 | 148 | pub async fn write_all<'a>(&'a mut self, src: &'a [u8]) -> Result<usize, Error> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc = note: `-D clippy::missing-errors-doc` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::missing_errors_doc)]`
Raw output
p2p/src/secret_connection/async_connection.rs:148:5:e:error: docs for function returning `Result` missing `# Errors` section
   --> p2p/src/secret_connection/async_connection.rs:148:5
    |
148 |     pub async fn write_all<'a>(&'a mut self, src: &'a [u8]) -> Result<usize, Error> {
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc
    = note: `-D clippy::missing-errors-doc` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(clippy::missing_errors_doc)]`


__END__
let mut n = 0_usize;

for chunk in src.chunks(DATA_MAX_SIZE) {
let mut sealed_frame = [0_u8; TAG_SIZE + TOTAL_FRAME_SIZE];
encrypt(
chunk,
&self.send_state.cipher,
&self.send_state.nonce,
&mut sealed_frame,
)?;

self.send_state.nonce.increment();
// end encryption

self.tcp_stream.write_all(&sealed_frame).await?;
n = n
.checked_add(chunk.len())
.expect("overflow when adding chunk lengths");
}

Ok(n)
}
}

/// Returns `remote_eph_pubkey`
async fn exchange_eph_pubkey(
tcp_stream: &mut TcpStream,
local_eph_pubkey: &EphemeralPublic,
protocol_version: Version,
) -> Result<EphemeralPublic, Error> {
// Send our pubkey and receive theirs in tandem.
// TODO(ismail): on the go side this is done in parallel, here we do send and receive after
tcp_stream
.write_all(&protocol_version.encode_initial_handshake(local_eph_pubkey))
.await?;

let mut response_len = 0_u8;
tcp_stream
.read_exact(slice::from_mut(&mut response_len))
.await?;

let mut buf = vec![0; response_len as usize];
tcp_stream.read_exact(&mut buf).await?;
protocol_version.decode_initial_handshake(&buf)
}
Loading