Next Stop - Ihcblog!

Some creations and thoughts sharing | sub site:ihc.im

0%

Rust Runtime Design and Implementation - Component Part

This article also has a Chinese version.

This series of articles mainly introduces how to design and implement a Runtime based on the io-uring and Thread-per-core model.

Our final Runtime product Monoio is now open source, and you can find it at github.com/bytedance/monoio.

  1. Rust Runtime Design and Implementation - General Introduction
  2. Rust Runtime Design and Implementation - Design Part 1
  3. Rust Runtime Design and Implementation - Design Part 2
  4. Rust Runtime Design and Implementation - Component Part
  5. Rust Runtime Design and Implementation - IO Compatibility Part

This article is the fourth in the series, and we have mostly covered the design aspects previously. In this part, we will focus on components such as channels.

Channel in Go and in Rust

In the Go language that we are familiar with, all tasks are definitely goroutines, and communication is mainly dependent on channels. However, in Rust, Futures and Tasks are not necessarily the same thing; multiple Futures can be nested through means such as select, and fundamentally they are still a single Task, which performance-wise is also better than Go’s model.

Even so, in Rust, there is still a strong need to use channels. Under the thread-per-core model, we can use channels that are local to only one thread to boost performance.

This content has been open-sourced as the local-sync crate, which does not depend on our Runtime, and you can use it independently in appropriate scenarios.

https://github.com/monoio-rs/local-sync

MPSC Channel

MPSC = Multiple Producer Single Consumer.

Our mpsc channel needs to support two modes, one is bounded and the other is unbounded.

In bounded mode, our send might await because there might not be enough capacity to write, necessitating waiting for consumption.

In unbounded mode, send does not need to wait. Because there is no limit on storage space, even if the consumption endpoint gets stuck, we can freely create new space for storage when we run out of space.

Of course, since there may not be any data in the channel, the recv function is asynchronous in both modes.

The final implementation:

1
2
3
4
5
6
7
8
9
#[monoio::test]
async fn test_unbounded_channel() {
let (tx, mut rx) = channel();
tx.send(1).unwrap();
assert_eq!(rx.recv().await.unwrap(), 1);

drop(tx);
assert_eq!(rx.recv().await, None);
}

Data Storage Structure

We need to design an underlying data storage structure first. In order to expand at a low cost and prevent frequent memory allocations and deallocations, we designed two possible solutions.

Linked List + Fixed-size Block

storage1

In this design, each block is of a consistent size, and the blocks are maintained using a singly linked list.

Read data from the head’s begin position and write data to the tail’s end position. When a block of data is completely read, it will be inserted after the tail so it can be recycled, avoiding repeated release and allocation.
As such, when the cache data volume increases, it will incur memory allocation maximum data peak / BLOCK_SIZE times.

An additional benefit is that we can globally cache blocks, thus further optimizing performance.

Linked List + Exponentially Growing Block

storage1

This solution also uses a linked list to maintain blocks. It can be understood as smooth expansion, which means that expansion only entails writing to the new ring while the old ring is released after being consumed.

Each time the ring is expanded, it can double in size, hence only log(maximum data peak) memory allocations are needed during peak traffic.

Taking into account the complexity of implementation, we decided to go for solution 1: Linked List + Fixed-size Block. The mpsc of Tokio also has this structure, but their reason for doing so is more about considerations for lock-free concurrency.

Multiple Blocks form a Queue through a singly linked list, and the Queue provides push and pop capabilities (with no size limit).

The Queue ensures the legality of pointer-directed content during push and pop operations, and will move block pointers at the appropriate time.

Waiting Mechanism

We need two types of waiting mechanisms:

  1. In the bounded implementation, senders need to wait for empty slots.
  2. Receivers need to wait for data.

The second type of wait is very easy to implement: since mpsc inherently only has one consumer, when it needs to wait for data, it simply needs to set its corresponding waker in the channel. When data is written, an additional check for this waker will be performed to wake it.

The implementation of the first type of wait is slightly more complex due to the potential for a multitude of waiters. To avoid starvation, it must also wake them up on a first-come, first-served basis. Hence, this structure can be directly implemented using either a VecDeque or a linked list. When waiters are dropped, they also need to remove themselves, making a structure similar to a slab or a linked list necessary for use.

We follow Tokio’s approach here and store waiters in a linked list. We abstract this structure into a Semaphore, which can not only fulfill mpsc requirements but can also be exposed to users for independent use and be utilized to implement other synchronization structures.

Layered Abstraction

layers

With the Block in place, we construct a synchronization storage Queue based on the Block, which is unrelated to async and has infinite capacity.

As mentioned before, there are two types of waits, but the sender’s wait is only used in the bounded scenario.

Therefore, on top of Queue, we abstract out a lower-level structure Chan, which provides common logic and only implements the second type of wait:

  1. Expose the recv interface, allowing for waiting.
  2. Expose a synchronous send interface, with the upper-level callers responsible for maintaining their own capacity limits and sender waits.

On top of Chan, we encapsulate two implementations: bounded and unbounded. Under unbounded conditions, this is roughly equivalent to direct forwarding since our Chan can essentially be considered an unbounded channel.

For bounded channels, we need to implement the capacity limit and waking mechanism for senders. This part will introduce the semaphore mentioned in the previous section. We use the semaphore to manage and wait for the number of empty slots (while also conveniently managing the receiver’s state).

Thus, we integrate semaphores into Chan. Since the upper layers have different requirements, Semaphore here is actually a trait. For bounded channels, we directly use the Semaphore implementation; for unbounded ones, we can implement a dummy Semaphore.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
pub trait Semaphore {
fn add_permits(&self, n: usize);
fn close(&self);
fn is_closed(&self) -> bool;
}

impl Semaphore for crate::semaphore::Inner {
fn add_permits(&self, n: usize) {
self.release(n);
}

fn close(&self) {
crate::semaphore::Inner::close(self);
}

fn is_closed(&self) -> bool {
crate::semaphore::Inner::is_closed(self)
}
}

pub struct Unlimited {
closed: UnsafeCell<bool>,
}

impl Unlimited {
pub fn new() -> Self {
Self{
closed: UnsafeCell::new(false),
}
}
}

impl Semaphore for Unlimited {
fn add_permits(&self, _: usize) {}

fn close(&self) {
unsafe {
*self.closed.get() = true;
}
}

fn is_closed(&self) -> bool {
unsafe { *self.closed.get() }
}
}

Oneshot Channel

TODO

Once Cell

TODO

Semaphore

TODO

Q & A

Q: Why do we need synchronizing capabilities if everything is thread local?

A: Concurrency is not parallelism. Even in a single-threaded environment, multiple tasks can be executed simultaneously, and tasks need to communicate and depend on each other. Hence, a component that supports asynchronous waiting is needed.

Q: Why not use an open-source library?

A: Currently, there is no such comparable open-source component that is fundamentally useable. The mainstream Rust Runtime is Tokio, which is not a thread per core model, so it uses data structures with built-in cross-thread synchronization capabilities.

One somewhat viable open-source library is local-channel but its implementation has performance issues (using Vec for storage, which can cause data copies during resizing), and it is functionally inadequate (only providing unbounded mpsc implementation, unable to perform backpressure, etc.). After consideration, we decided to build our own solution.

Conclusion

This concludes the series of articles. There may be some typographical errors or misunderstandings on my part. If you would like to discuss with me, you can email me directly (contact information can be found on the about page); if it’s a question related to Monoio, you can also open an Issue or Discussion on GitHub.

Monoio is still in a very imperfect stage, and we look forward to your contributions :)

Additionally, we have set up a mirror for crates.io and rustup within China, feel free to use RsProxy!

If you wish to repost articles from this blog, please acknowledge the source. Thank you.

UPDATE: There’s a fifth article; don’t hurry away.

Welcome to my other publishing channels