From 01ab74a1e1ee83608d804494f7b60fa9b53ac29d Mon Sep 17 00:00:00 2001 From: Brent Schroeter Date: Fri, 13 Mar 2026 22:04:26 +0000 Subject: [PATCH] initial commit --- .gitignore | 1 + Cargo.lock | 7 ++++ Cargo.toml | 6 ++++ README.md | 19 +++++++++++ mise.toml | 2 ++ src/main.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 133 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 mise.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..f63cbca --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "project" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..627c2e4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "project" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/README.md b/README.md new file mode 100644 index 0000000..7b23a15 --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# TCP Sprint + +This script demonstrates the risk of interleaving writes when cloning +`TcpStream` handles across threads, re: +[this discussion in the Servo Zulip](https://servo.zulipchat.com/#narrow/channel/263398-general/topic/TcpStream.3A.3Atry_clone.28.29.20thread.20safety.20in.20devtools/with/578172763). + +It works by spawning 3 threads: 2 TCP clients, each of which sends predictable +chunks of bytes over a shared connection with `TcpStream::write_all()`, and 1 +TCP listener, which tests whether messages arrive contiguously or scrambled +together. + +Messages consist of the range `0x00..=0xff`, repeated to fill the desired +message size. + +## Usage + +```sh +cargo run +``` diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000..201c577 --- /dev/null +++ b/mise.toml @@ -0,0 +1,2 @@ +[tools] +rust = { version = "1.94", components = "rust-analyzer,rust-docs,rustfmt,clippy" } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..8c5f994 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,98 @@ +use std::{ + env::args, + io::{Read, Write}, + net::{TcpListener, TcpStream}, + sync::mpsc::{Sender, channel}, + thread, + time::{Duration, Instant}, +}; + +const DURATION_MS: u64 = 5000; +const BIND_ADDR: &str = "127.0.0.1:7000"; + +fn main() { + // Parse message size from CLI argument. + let args: Vec = args().collect(); + let message_size: usize = if let Some(str_value) = args.get(1) + && let Ok(size_value) = str_value.parse() + { + size_value + } else { + println!("Expects 1 argument with the number of bytes to send in each message."); + return; + }; + if !message_size.is_multiple_of(256) { + println!("Message size must be a multiple of 256."); + return; + } + + // Start TCP server. + let (ready_sender, ready_receiver) = channel(); + let listener = thread::spawn(move || listen(ready_sender)); + ready_receiver.recv().unwrap(); + println!("Listener is ready."); + + // Create a TCP client connection and share it across threads. + let stream_handle_a = TcpStream::connect(BIND_ADDR).unwrap(); + let stream_handle_b = stream_handle_a.try_clone().unwrap(); + println!("Connected. Sending {message_size}-byte messages from 2 threads for {DURATION_MS}ms."); + for joiner in [ + thread::spawn(move || { + stream_for( + stream_handle_a, + message_size, + Duration::from_millis(DURATION_MS), + ) + }), + thread::spawn(move || { + stream_for( + stream_handle_b, + message_size, + Duration::from_millis(DURATION_MS), + ) + }), + ] { + joiner.join().unwrap(); + } + + // Clean up. + println!("Disconnected."); + listener.join().unwrap(); + println!("Exiting."); +} + +/// Starts a TCP listener, accepts a single connection, and prints a warning if +/// any received byte increments by something other than 1. Observing this +/// condition implies that [`stream_for`] threads are interleaving data. +fn listen(ready_sender: Sender<()>) { + let listener = TcpListener::bind(BIND_ADDR).unwrap(); + ready_sender.send(()).unwrap(); + + let mut stream = listener.incoming().next().unwrap().unwrap(); + + let mut last_byte: u8 = 255; + let mut buf = [0; 1024]; + while let size = stream.read(&mut buf).unwrap() + && size > 0 + { + for value in buf.iter().take(size) { + if *value != last_byte.wrapping_add(1) { + println!("OUT OF ORDER! Received 0x{last_byte:02x} followed by 0x{value:02x}"); + } + last_byte = *value; + } + } +} + +/// Repeatedly sends buffers in which each successive byte increments by 1 over +/// a TCP stream until the specified duration is reached. +fn stream_for(mut stream: TcpStream, message_size: usize, duration: Duration) { + let start_time = Instant::now(); + while Instant::now().duration_since(start_time) < duration { + let mut buf = vec![0; message_size]; + for (i, value) in buf.iter_mut().enumerate() { + *value = u8::try_from(i % 256).unwrap(); + } + stream.write_all(&buf).unwrap(); + } +}