top of page
Writer's pictureMircea Teodor Oprea

Building a custom protocol over TCP with Rust and Tokio

Introduction

Certain applications can have specific requirements regarding network communication that make HTTP, or other existing high-level protocols, unsuitable options. This article goes over the process of building a custom protocol on top of TCP for client-server communication.

By the end of it, you should have a better understanding of how a custom protocol over TCP works and what its building blocks are, as well as feel more comfortable with building such a project yourself.

All the code written in this article can be found in the associated GitHub repository. You can refer to it if you have difficulties with any of the code.

 

Prerequisites

This article supposes that you have some prior knowledge of Rust – working with modules, structs, slices, and async. If you do not, it is a good idea to look into the Rust book before proceeding with this guide.

The only thing that you need to have installed on your system is Rust (and Cargo, which comes by default if you install it with rustup). You can follow the official installation guide and check that everything works as intended by running:

rustc -V
cargo -V

Finally, you can create a new project by running:

cargo new rust-tcp

How does TCP work?

There’s a lot of reading you can do on all the details of how TCP works. For the purposes of this article, you only need to understand a few of its characteristics and the situations when you can use it.

First and foremost, TCP is a transport layer protocol on top of which many higher-level protocols are built – HTTP, SSH, SMTP, etc. Further into this article, we will explore building a custom application layer protocol on top of TCP.

A normal client-server interaction in TCP starts by establishing a connection between the two, by using a 3-way handshake to negotiate the parameters of the connection and ensure its reliability. For each connection between the server and a client, a so-called “socket” is created, that represents the communication channel between the two.

Furthermore, TCP splits any message into packets that it numbers and then sends to the other party, which can then put together the original message even if the packets somehow arrived in a different order.

While these may seem to be desired characteristics of any network communication protocol, that is not always the case. When working with livestream data, for example, losing a couple of video frames due to connection issues is not too problematic, and it’s better for the clients to receive the most recent data, rather than the dropped frames from a few seconds ago. UDP would be the way to go for such a use case. However, if your application requires reliable and in-order messaging, TCP is the perfect solution.

 

Structuring the project

You should have created your cargo project earlier, so now it’s time to structure it properly. Since the purpose of this tutorial is to build a client-server communication protocol, it’s a good idea to create 2 separate binaries. And since they will make use of common functionality (such as creating and sending messages to each other), a common library that defines this functionality makes sense.

In the src directory of the project, create a lib.rs file, and a bins directory with a client.rs and a server.rs file. You can add some placeholder code in each of the bins:

pub async fn main() -> anyhow::Result<()> {
	println!(“Hello, world!”);
}

Then, in the Cargo.toml file, define the 3 packages:

[package]
name = "rust-tcp"
version = "0.1.0"
edition = "2021"

[lib]
name = "rust_tcp"
path = "src/lib.rs"

[[bin]]
name = "server"
path = "src/bins/server.rs"

[[bin]]
name = "client"
path = "src/bins/client.rs"

You can run cargo build to make sure everything is properly set up, as well as cargo run –bin server (or client) to see the Hello, world! message printed out.

To finish things off with Cargo.toml, you can also add the dependencies needed for this project, since they’re only 2 crates. First, there’s tokio, which is going to be used as the async runtime, and anyhow, which will be used to simplify error handling. You can install them by running;

cargo add anyhow tokio –features full

Note that, when building software for production, using anyhow for all error handling is not desirable, as you usually want specific information about what and where went wrong. Using thiserror is a better option for this, but since this tutorial does not focus on error handling, anyhow is used to reduce the amount of boilerplate code.

 

Implementing basic TCP communication

In the src folder, create a new server.rs file where the server logic will be defined. Define this module in the lib.rs file as well, so it can be used from the clients:

pub mod server;

 Back in the server.rs file, let’s define a structure that represents the server entity and contains the IP address and port that it will be running on.

pub struct Server {
    pub host: String,
    pub port: u16,
}

Then, let’s define a simple constructor that takes in the IP and the port:

impl Server {
    pub fn new(host: impl Into<String>, port: u16) -> Self {
        Server {
            host: host.into(),
            port,
        }
    }
}

Now, let’s think about what the server should behave like. You would expect a run method that allows you to start the server, which should continuously listen for connections on the IP and port that it holds. The skeleton of this would look as follows:

impl Server {
    // previous code…

    pub async fn run(&mut self) -> anyhow::Result<()> {
        let listner = tokio::net::TcpListener::bind(format!("{}:{}", self.host, self.port)).await?;
        println!("Server is running on {}:{}", self.host, self.port);

        loop  {
            // The actual server logic would go here…
        }
        Ok(())
    }
}

The run method tries to bind a TCP listener to the specified address and starts a loop where the actual listening logic will go. This loop should run endlessly, since breaking it means that the server will not be working anymore, but you can add custom breaking conditions if you application needs to support that.

Next, let’s look at how to handle the incoming connections. The server should be able to respond to clients concurrently; for that, a tokio task (essentially, a green thread) can be spawned for each connection that the server receives. Inside that task, the actual communication with the client will be handled. So, the loop from the above snippet should now look something like this:

loop {
    let (mut socket, addr) = listner.accept().await?;
    println!("Connection received from {}", addr);

    tokio::task::spawn(async move {
// Handle the client message
    });
}

Here, the listener waits for a client to connect, and then spawns a task to handle that connection. The loop then continues to listen for another connection and repeats the process.

Finally, let’s read messages that come in through the socket. Inside the task, you will need a buffer to store the incoming messages. Then, in another loop, you will read the incoming data into the buffer and interpret it. The task code will now look like this:

tokio::spawn(async move {
	let mut buffer = vec![0; 256];
	loop {
		let bytes_read = socket.read(&mut buffer).await?;
		if bytes_read == 0 {
			break;
        }
		let message = std::str::from_utf8&(&buffer[0..bytes_read])?;
		println!(“Received message: {}”, message);
	}
	Ok::<(), anyhow::Error>(())
});

Note: the task is forcibly returning an Ok response so that the ? operator can be used inside it. As an alternative, you can use pattern matching and handle the errors yourself.


For now, you can consider that reading 0 bytes from the socket represents the end of the message, for simplicity. A more intentional way of closing the connection will be developed later in the tutorial.

The last bit for the server is the src/bins/server.rs file, that needs a proper main function that starts the server. In this case, it will start on 127.0.0.1:8080:

use rust_tcp::server::Server;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let server = Server::new("127.0.0.1", 8080);
    server.run().await?;
    Ok(())
}

Before testing this implementation, you need one for the client as well. In the src directory, create a client.rs file. First, let’s define a simple structure to hold the data the client requires – the IP and port of the server, and the TcpStream object that it will use to send messages:

pub struct Client {
    pub server_host: String,
    pub server_port: u16,
    pub stream: TcpStream,
}

A method can then be defined to take in the IP and port, create the stream and connect to it:

impl Client {
    pub async fn connect(host: impl Into<String>, port: u16) -> anyhow::Result<Self> {
        let host = host.into();
        let address = format!("{}:{}", host, port);
        let stream = TcpStream::connect(address).await?; 

        Ok(Self {
            server_host: host,
            server_port: port,
            stream,
        })
    }
}

Then, you can define a small method to take in a string, convert it to bytes and writes it to the stream:

    pub async fn send_message(&mut self, message: impl Into<String>) -> anyhow::Result<()> {
        self.stream.write_all(&message.into().as_bytes()).await?;
        Ok(())
    }

And finally, modify the src/bins/client.rs file to use a Client object to send messages over to the server:

use rust_tcp::client::Client;

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
    let mut client = Client::connect("127.0.0.1", 8080).await?;
    for _ in 0..100 {
        client.send_message("Hello, world!").await?;
    }
    Ok(())
}

Remember to also define the client and server modules in lib.rs, so you can use them from the binaries:

pub mod client;
pub mod server;

You can now open 2 separate terminals to test that the connection works. In the first one, run:

cargo run --bin server

This will start the server and wait for incoming connections. In the second terminal, run:

cargo run --bin client

Which should send 100 “Hello, world!” messages to the server. If everything worked fine, you should see something similar to the following in the server terminal:

Message: "Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello,
world!Hello, world!"                                                            
Message: "Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello,
world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!Hello, world!"

This is great! You can now send messages from the client to the server and interpret them. However, there is clearly an issue: even though the client is technically sending 100 separate messages, the server is now aware of that. From its perspective, it’s just reading bytes from the socket until there is nothing else to read. In some cases, it might even chunk a “Hello, world!” message in the middle because there was no more space in the buffer. This is not that big of a deal for a bunch of text messages, but it clearly won’t work if the client wants to send something more meaningful to the server, like a command that it should run. So something needs to be done about that.

 

Designing a messaging format

A messaging format is simply a set of rules that all the parties (the server and the clients, in this situation) will adhere to. Since the server will be aware of the format the messages come in, it can interpret them better than just a stream of bytes.

The format of the message depends on your specific use case. However, there are some basic elements that you should consider adding to your data format:

  • The data itself: a bit of a no-brainer, but this is whatever data the client wants to send to the server, such as some simple text or a command that the server should run.

  • The length of the data: it’s a good idea to prepend the data by its length, so the server knows how many bytes to read; otherwise, in a continuous stream of bytes, there is no clue of where a message ends and another begins.

  • The version of the format: this is sometimes overlooked, but it can be a lifesaver to include 1 or 2 bytes representing the version of the messaging format as the first part of the message; your format is likely to change, and if you happen to add or remove certain elements of it, servers and clients running on different versions may not be able to communicate properly.


Since for this tutorial, the client is only sending some strings for the server to print out, these fields should be enough. You can visualize the message like this:


You can make the length component larger than 2 bytes if you want to transmit large messages through your protocol. However, a length of 2 bytes allows messages up to 65535 bytes, which should be enough for the purposes of this article.

As part of the protocol, you can also define the way to close the connection between a client and server. Previously, the server would close the socket after it had no more bytes to read. However, this can become problematic if the client writes data slower than the server reads it. So, you can define a custom message, such as one with the content of “exit”, to represent the end of communication.

 

Implementing the messaging format

In a new src/message.rs file, let’s define a new structure called Message:

#[derive(Debug, Clone)]
pub struct Message {
    pub version: u8,
    pub length: u16,
    pub content: String,
}

For the version, it’s a good idea to define a constant, either in the message.rs file or in a different constants.rs file. In the struct’s constructor, this constant will be used as the value for the version field:

impl Message {
    pub fn new(content: impl Into<String>) -> anyhow::Result<Self> {
        let content = content.into();
        let length = content.len() as u16;
        let version = PROTOCOL_VERSION; 

        Ok(Self {
            version,
            length,
            content,
        })
    }
} 

Furthermore, you need a way to convert your Message object to and from a byte array that can be transmitted over the socket. To encode the object, you can simply take each field and add it to a new vector of bytes (u8):

pub fn encode(&self) -> Vec<u8> {
    let mut buffer = Vec::new();
    buffer.push(self.version);
    buffer.extend(&self.length.to_be_bytes());
    buffer.extend(self.content.as_bytes());

    buffer
}

This first pushes the version (which is already one byte long) to the new vector, then the length field in the big-endian format (you can use little-endian as well, as long as you also interpret is that way when you read it), and finally converts the content String to bytes and pushes it.

The most important thing here is to be consistent with the way you read and write to/from the buffer; otherwise, you won’t be able to interpret your data correctly.

To decode the data, you simply reverse the process from the previous snippet. You read the first byte, which should represent the version – you can also take this opportunity to verify that you are running the same version as the message sender; otherwise, you cannot interpret the message. Then, you read 2 more bytes that represent the length of the message, and based on that, you finally read the actual message content.

Analyzing this logic also tells you that a message should be at least 3 bytes long (1 for the version and 2 for the length of 0) to make sense, so that check can be added in here as well. You can create a constant out of this, called METADATA_SIZE. The decoding method ends up looking like this:

pub fn decode(buffer: &[u8]) -> anyhow::Result<Self> {
    if buffer.len() < METADATA_SIZE {
        return Err(anyhow::anyhow!("Invalid message length"));
    }

    let version = buffer[0];
    if version != PROTOCOL_VERSION {
        return Err(anyhow::anyhow!("Invalid protocol version"));
    }

    let length = u16::from_be_bytes([buffer[1], buffer[2]]);
    let content = String::from_utf8(buffer[3..3 + length as usize].to_vec())?;

    Ok(Self {
        version,
        length,
        content,
    })
} 

The client can now take in a Message and use the encode method to convert it before writing to the stream:

pub async fn send_message(&mut self, message: Message) -> anyhow::Result<()> {
    self.stream.write_all(&message.encode()).await?;
    Ok(())
}

Then, in the client binary, you can construct messages and pass them to the send_message method; you can try sending more messages of different lengths and at different intervals to see how the server reacts to that:

use rust_tcp::{client::Client, message::Message};

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
    let mut client = Client::connect("127.0.0.1", 8080).await?;
    for i in 1..11 {
        let message = Message::new(format!("Hello, world x {}!", i));
        client.send_message(message.clone()).await?;
    } 

    client.send_message(Message::new("exit")).await?;

    Ok(())
} 

Back in the Server structure, the final thing that needs to be done is reading messages according to the new format. However, you also need to take into consideration that there’s no guarantee over how messages will come in. Depending on your buffer size, some messages might be split, and you cannot process one message until you read from the buffer again. Other messages might come in bulk. It is, therefore, a good idea to delegate the task of parsing and forming messages to a separate struct, MessageReader. For each spawned task, you can create a new instance of MessageReader. Then, as you read from the buffer, you pass the read bytes to the reader object, which in turn returns the parsed messages, if any are available. This is how the MessageReader would be used:

tokio::task::spawn(async move {
    let mut message_reader = MessageReader::new();
    'handler: loop {
        let mut buffer = [0; 256];
        let bytes_read = socket.read(&mut buffer).await?;

        let messages = message_reader.read(&buffer[..bytes_read])?;
        for message in messages {
            if message.content == "exit" {
                println!("Connection closed by client");
                break 'handler;
            }
            println!("Message: {:?}", message);
        }
    }
    Ok::<(), anyhow::Error>(())
});

Apart from the discussed changes, the main loop has been given an alias of ‘handler, so you can break out of it when receiving an exit message. This will cause the loop to end and the task to finish.

 

Let’s now create the MessageReader struct in a new src/message_read.rs file. The structure will hold a buffer of its own, in which it will store data until it can parse it into a message:

pub struct MessageReader {
    pub buffer: Vec<u8>,
}
 
impl MessageReader {
    pub fn new() -> Self {
        Self { buffer: Vec::new() }
    }
}

Before looking into the read method, let’s define some helpers. You need a way to know whether you can parse the current buffer into (at least) one message, so that can go into a can_parse method. If the current buffer’s length is less than the METADATA_SIZE defined earlier, it’s clear that it cannot be parsed. Otherwise, you can read the length of the message and use it to check whether the whole message has been received yet:

fn can_parse(&self) -> bool {
    if self.buffer.len() < METADATA_SIZE {
        return false;
    }
    let length = u16::from_be_bytes([self.buffer[1], self.buffer[2]]);
    self.buffer.len() >= METADATA_SIZE + length as usize
}

The buffer can contain more data than just one message – either more complete messages, or one messages and part of a next one. A good helper in this case is a method that parses the first message in the buffer. It will read the length of the first message, copy all the message’s data and then remove it from the buffer:

fn parse_first(&mut self) -> anyhow::Result<Message> {
    let length = u16::from_be_bytes([self.buffer[1], self.buffer[2]]);
    let message_length = METADATA_SIZE + length as usize;
    let message = self.buffer[..message_length].to_vec();
    self.buffer = self.buffer[message_length..].to_vec();

    Message::decode(&message)
}

Finally, you can write the read method that takes in some data from the server, appends it to the internal buffer, and then attempts to parse messages from it for as long as it can:

pub fn read(&mut self, data: &[u8]) -> anyhow::Result<Vec<Message>> {
    self.buffer.extend_from_slice(data);
    let mut data = vec![];
    while self.can_parse() {
        let message = self.parse_first()?;
        data.push(message);
    }

    Ok(data)
} 

With this, you can now run the server and client binaries again. In the server terminal, you should see something similar to this:

Connection received from 127.0.0.1:52631
Message: Message { version: 1, length: 17, content: "Hello, world x 1!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 2!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 3!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 4!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 5!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 6!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 7!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 8!" }
Message: Message { version: 1, length: 17, content: "Hello, world x 9!" }
Message: Message { version: 1, length: 18, content: "Hello, world x 10!" }
Connection closed by client

Conclusion

If you have followed the guide, you should now have a working client-server pair that work by communicating on a custom protocol on top of TCP.  You can play around with different variables to check that everything works as intended: send more messages, wait a few seconds between sending them, change the server’s buffer size to a really small or large number, and so on.

This is also extendable in many ways: you can add different elements to the message format, add message types that the server processes in different ways, or let the client receive messages from the server as well. But you should now have a good basis for doing so!

 

507 views0 comments

Recent Posts

See All

© 2024 by Soupdev Solutions

bottom of page