initial commit
This commit is contained in:
commit
01ab74a1e1
6 changed files with 133 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
/target
|
||||
7
Cargo.lock
generated
Normal file
7
Cargo.lock
generated
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "project"
|
||||
version = "0.1.0"
|
||||
6
Cargo.toml
Normal file
6
Cargo.toml
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
[package]
|
||||
name = "project"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
19
README.md
Normal file
19
README.md
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
# TCP Sprint
|
||||
|
||||
This script demonstrates the risk of interleaving writes when cloning
|
||||
`TcpStream` handles across threads, re:
|
||||
[this discussion in the Servo Zulip](https://servo.zulipchat.com/#narrow/channel/263398-general/topic/TcpStream.3A.3Atry_clone.28.29.20thread.20safety.20in.20devtools/with/578172763).
|
||||
|
||||
It works by spawning 3 threads: 2 TCP clients, each of which sends predictable
|
||||
chunks of bytes over a shared connection with `TcpStream::write_all()`, and 1
|
||||
TCP listener, which tests whether messages arrive contiguously or scrambled
|
||||
together.
|
||||
|
||||
Messages consist of the range `0x00..=0xff`, repeated to fill the desired
|
||||
message size.
|
||||
|
||||
## Usage
|
||||
|
||||
```sh
|
||||
cargo run <message size in bytes>
|
||||
```
|
||||
2
mise.toml
Normal file
2
mise.toml
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
[tools]
|
||||
rust = { version = "1.94", components = "rust-analyzer,rust-docs,rustfmt,clippy" }
|
||||
98
src/main.rs
Normal file
98
src/main.rs
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
use std::{
|
||||
env::args,
|
||||
io::{Read, Write},
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::mpsc::{Sender, channel},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
const DURATION_MS: u64 = 5000;
|
||||
const BIND_ADDR: &str = "127.0.0.1:7000";
|
||||
|
||||
fn main() {
|
||||
// Parse message size from CLI argument.
|
||||
let args: Vec<String> = args().collect();
|
||||
let message_size: usize = if let Some(str_value) = args.get(1)
|
||||
&& let Ok(size_value) = str_value.parse()
|
||||
{
|
||||
size_value
|
||||
} else {
|
||||
println!("Expects 1 argument with the number of bytes to send in each message.");
|
||||
return;
|
||||
};
|
||||
if !message_size.is_multiple_of(256) {
|
||||
println!("Message size must be a multiple of 256.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Start TCP server.
|
||||
let (ready_sender, ready_receiver) = channel();
|
||||
let listener = thread::spawn(move || listen(ready_sender));
|
||||
ready_receiver.recv().unwrap();
|
||||
println!("Listener is ready.");
|
||||
|
||||
// Create a TCP client connection and share it across threads.
|
||||
let stream_handle_a = TcpStream::connect(BIND_ADDR).unwrap();
|
||||
let stream_handle_b = stream_handle_a.try_clone().unwrap();
|
||||
println!("Connected. Sending {message_size}-byte messages from 2 threads for {DURATION_MS}ms.");
|
||||
for joiner in [
|
||||
thread::spawn(move || {
|
||||
stream_for(
|
||||
stream_handle_a,
|
||||
message_size,
|
||||
Duration::from_millis(DURATION_MS),
|
||||
)
|
||||
}),
|
||||
thread::spawn(move || {
|
||||
stream_for(
|
||||
stream_handle_b,
|
||||
message_size,
|
||||
Duration::from_millis(DURATION_MS),
|
||||
)
|
||||
}),
|
||||
] {
|
||||
joiner.join().unwrap();
|
||||
}
|
||||
|
||||
// Clean up.
|
||||
println!("Disconnected.");
|
||||
listener.join().unwrap();
|
||||
println!("Exiting.");
|
||||
}
|
||||
|
||||
/// Starts a TCP listener, accepts a single connection, and prints a warning if
|
||||
/// any received byte increments by something other than 1. Observing this
|
||||
/// condition implies that [`stream_for`] threads are interleaving data.
|
||||
fn listen(ready_sender: Sender<()>) {
|
||||
let listener = TcpListener::bind(BIND_ADDR).unwrap();
|
||||
ready_sender.send(()).unwrap();
|
||||
|
||||
let mut stream = listener.incoming().next().unwrap().unwrap();
|
||||
|
||||
let mut last_byte: u8 = 255;
|
||||
let mut buf = [0; 1024];
|
||||
while let size = stream.read(&mut buf).unwrap()
|
||||
&& size > 0
|
||||
{
|
||||
for value in buf.iter().take(size) {
|
||||
if *value != last_byte.wrapping_add(1) {
|
||||
println!("OUT OF ORDER! Received 0x{last_byte:02x} followed by 0x{value:02x}");
|
||||
}
|
||||
last_byte = *value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Repeatedly sends buffers in which each successive byte increments by 1 over
|
||||
/// a TCP stream until the specified duration is reached.
|
||||
fn stream_for(mut stream: TcpStream, message_size: usize, duration: Duration) {
|
||||
let start_time = Instant::now();
|
||||
while Instant::now().duration_since(start_time) < duration {
|
||||
let mut buf = vec![0; message_size];
|
||||
for (i, value) in buf.iter_mut().enumerate() {
|
||||
*value = u8::try_from(i % 256).unwrap();
|
||||
}
|
||||
stream.write_all(&buf).unwrap();
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue