Programmieren in Rust

Asynchrone Programmierung

Inhaltsverzeichnis

  1. Grundlagen
    1. Futures
    2. Asynchrone Funktionen
  2. Technische Umsetzung
    1. Koroutinen
    2. Selbstreferenzen und Anheftung
    3. Exekutoren
    4. Waker
  3. Literatur

Grundlagen

Futures

Einige Anwendungen erfordern den Umgang mit Prozeduren, die erst nach einer gewissen Zeit abgearbeitet sind. Das kann bspw. sein

Ein recht interessantes Beispiel ist für mich ein Funktionenplotter. Erfordert eine Funktion einen hohen Rechenaufwand, dauert es ein wenig bis der Graph vollständig gezeichnet ist. Würde das Programm solange warten, würde es um so schlimmer ruckeln, je höher die Rechenzeit ist. Die Idee ist nun, die Prozedur zu pausieren und nach neuer Abfrage von Nutzereingabe zu entscheiden ob die Prozedur abgebrochen oder weitergeführt werden soll.

Weil diese Beispiele alle recht komplizierte Technik enthalten können, simulieren wir so einen Ablauf zunächst am besten möglichst einfach mit einer Prozedur, die intern nichts anderes tut als eine bestimmte Zeit warten.

Wie modelliert man solche Prozeduren nun? Eine Möglichkeit wären Threads, die sich vor allem auch bei langwierigen Berechnungen wie dem Funktionenplotter anbieten würden. Der Grund dafür liegt darin, dass übermäßig lange Rechenzeit in einem Thread nicht dazu führt, dass der Hauptthread blockiert wird.

Um Threads geht es nun aber schon in einem anderen Kapitel.

Die Prozedur muss pausieren können. Demzufolge muss man die Prozedur so oft aufrufen können, bis sie fertig ist. Den Rückgabewert der Prozedur formulieren wir daher als eine Enumeration Poll in den Varianten Ready und Pending. Ist die Prozedur fertig, hat sie den Wert Ready, andernfalls Pending. Eine solche Prozedur bezeichnet man als Future.

Nun bedarf es auch noch einer Hilfsfunktion block_on, die die Prozedur solange in einer Schleife aufruft, bis sie abgearbeitet ist. Damit der Prozessor dabei nicht heiß läuft, fügen wir noch einen Aufruf sleep ein, der das Hauptprogramm für einen Sekundenbruchteil schlafen legt.

Die folgende Umsetzung gestaltet nun die bisherigen Überlegungen aus.

use std::time::{Duration, Instant};
use std::thread::sleep;

enum Poll<T> {Ready(T), Pending}

struct Future<T> {poll: Box<dyn FnMut() -> Poll<T>>}

fn block_on<T>(mut f: Future<T>) -> T {
    loop {
        match (f.poll)() {
            Poll::Ready(x) => return x,
            Poll::Pending => {}
        }
        sleep(Duration::from_millis(100));
    }
}

fn new_task(time: u64) -> Future<()> {
    let clock = Instant::now();
    Future {poll: Box::new(move || {
        if clock.elapsed().as_secs() < time {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    })}
}

fn main() {
    let f1 = new_task(2);
    block_on(f1);
}

Gibt es nun zwei Aufgaben zu bewältigen, die jeweils zwei Sekunden dauern, braucht das Programm bei sequentieller Ausführung schon vier Sekunden.

fn main() {
    let f1 = new_task(2);
    block_on(f1);
    let f2 = new_task(2);
    block_on(f2);
}

Um die beiden Aufgaben parallel ausführen zu können, schreiben wir eine Funktion join_all, die eine Liste von Aufgaben entgegennimmt und einen Scheduler zurückgibt. Der Scheduler kümmert sich solange um die Ausführung der Aufgaben, bis alle Aufgaben erledigt sind. Wir formulieren auch den Scheduler als Future, denn dann können wir block_on wiederbenutzen.

fn join_all(a: Vec<Future<()>>) -> Future<()> {
    let mut a: Vec<_> = a.into_iter().map(Some).collect();
    Future {poll: Box::new(move || {
        let mut empty = true;
        for x in &mut a {
            if let Some(f) = x {
                empty = false;
                if let Poll::Ready(()) =  (f.poll)() {*x = None;}
            }
        }
        if empty {Poll::Ready(())} else {Poll::Pending}
    })}
}

fn main() {
    let f1 = new_task(2);
    let f2 = new_task(2);
    let s = join_all(vec![f1, f2]);
    block_on(s);
}

Die Funktion join_all ist so verallgemeinerbar, dass sie auch die Werte der Futures zurückgeben kann.

fn join_all<T: 'static>(a: Vec<Future<T>>) -> Future<Vec<T>> {
    enum Either<X, Y> {Left(X), Right(Y)}
    let mut a: Vec<_> = a.into_iter().map(Either::Left).collect();
    Future {poll: Box::new(move || {
        let mut empty = true;
        for x in &mut a {
            if let Either::Left(f) = x {
                empty = false;
                if let Poll::Ready(value) = (f.poll)() {
                    *x = Either::Right(value);
                }
            }
        }
        match empty {
            false => Poll::Pending,
            true => Poll::Ready(std::mem::take(&mut a)
                .into_iter().map(|x| match x {
                    Either::Right(value) => value,
                    _ => unreachable!()
                }).collect()),
        }
    })}
}

Asynchrone Funktionen

Wurde bisher eine gewöhnliche Funktion zur äußerlichen Darstellung des Pausierens genutzt, wollen wir das nun eigentlich innerhalb der Prozedur durchführen. Dafür bedarf es allerdings der Semantik einer Koroutine, die man nicht über gewöhnliche Funktionen ausdrücken kann. Eine Koroutine ist eine Prozedur, die mit yield an einer Stelle im Ablauf unterbrochen wird und sich später an dieser Stelle fortführen lässt.

Asynchrone Funktionen bieten eine Abwandlung des Konzeptes einer Koroutine. Anstelle von yield gibt es bei einer asynchronen Funktion eine Anweisung await, die die Fortführung des Programmablaufs der Funktion solange unterbricht, bis vom Argument von await ein Wert erhalten wurde.

Ab nun erforderlich ist die Einbindung der Bibliotheken ›futures‹ und ›async-std‹.

use std::time::Duration;
use async_std::task;
use futures::{future::join_all, executor::block_on};

async fn new_task(time: u64) {
    task::sleep(Duration::from_secs(time)).await;
}

fn main() {
    let f1 = new_task(2);
    let f2 = new_task(2);
    block_on(join_all(vec![f1, f2]));
}

Eine asynchrone Funktion ist äquivalent zu einer gewöhnlichen Funktion, die ein Future als asynchronen Closure-Block zurückgibt. Außerdem ist das Argument von await selbst ein Future.

use std::future::Future;

fn new_task(time: u64) -> impl Future<Output = ()> {
    async move {task::sleep(Duration::from_secs(time)).await;}
}

Die Funktion join_all ist leider mit zwei Problemen behaftet. Zum einen kommt dort mit der Erzeugung von vec![f1,f2] offenbar eine unnötige Heap-Allokation vor. Zum anderen müssen die Rückgabewerte von f1 und f2 den gleichen Datentyp besitzen. Aus diesen Gründen gibt es noch die Funktion join.

use std::time::Duration;
use async_std::task;
use futures::{future::join, executor::block_on};

struct S1 {value: &'static str}
struct S2 {value: &'static str}

async fn task1(time: u64) -> S1 {
    task::sleep(Duration::from_secs(time)).await;
    S1 {value: "Wert 1"}
}

async fn task2(time: u64) -> S2 {
    task::sleep(Duration::from_secs(time)).await;
    S2 {value: "Wert 2"}
}

fn main() {
    let f1 = task1(2);
    let f2 = task2(2);
    let (s1, s2) = block_on(join(f1, f2));
    println!("{}, {}", s1.value, s2.value);
}

Analog gibt es join3, join4, join5 und das variadische Makro join, das auch mehr als fünf Futures verbinden kann. Beim Makro muss man

block_on(async {join!(f1, f2)})

schreiben.

Technische Umsetzung

Selbstreferenzen und Anheftung

Anders als eine gewöhnliche Funktion muss eine Koroutine ihre Variablenzustände auch während Unterbrechungen speichern. Eine gewöhnliche Funktion besitzt das Konzept der Unterbrechung nicht, – sie wird aufgerufen, für die Variablenzustände und für Verwaltungsinformation wird hierbei ein neuer Aktivierungsrecord als Stack frame im Aufrufstapel alloziert, und beim Verlassen der Funktion wird das Stack frame wieder freigegeben. Bei der Koroutine muss der Speicherbereich für die Variablenzustände solange zur Verfügung stehen, wie die Instanz der Koroutine existiert. Dieser Speicherbereich kann auf dem Aufrufstapel oder auf dem Haldenspeicher liegen.

Wie wir wissen besitzt Rust Zeigervariablen bzw. Referenzvariablen als Sprachmittel. Im Zusammenhang mit dem Speicherbereich der Variablenzustände tut sich nun ein Problem auf. Nämlich kann es ja sein, dass der Wert einer lokalen Variablen ein Zeiger auf eine andere lokale Variable in diesem Speicherbereich ist. Wir haben es in einem solchen Fall mit einer Selbstreferenz zu tun. Diese Situation kann man sich vorstellen als eine Struktur, die einen Zeiger enthält der gerade auf eine andere Variable der Struktur oder die Struktur selbst zeigt.

Würden wir die Struktur im Speicher verschieben, würde der Zeiger ungültig werden, – eine allgemeine Problematik bei Zeigern, bekannt als hängender Zeiger. Infolge dürften wir keinen Besitz über eine solche selbstreferenzielle Struktur bekommen, denn sonst wären wie bei jedem Wert Verschiebungen erlaubt. Bei gewöhnlichen Strukturen ist ja die Situation herstellbar, dass keine Zeiger auf die Struktur mehr vorliegen. Bei einer selbstreferenziellen Struktur ist dies ausgeschlossen, weil mit dem Besitz der Struktur der Zugriff auf die Zeigervariable einhergeht.

Es gibt mehrere technische Möglichkeiten, wie man dieser Misere entgehen kann:

  1. Anstelle einer Speicheradresse speichert der Zeiger nur ein Offset. Diese Technik ist analog zum Position independent code.
  2. Das Verschieben wird mit Mitteln des Typsystems verboten, – die Struktur besitzt damit eine feste Anheftung im Speicher.

Man hat sich für das Konzept der Anheftung entschieden, für die der Typ Pin geschaffen wurde.

Exekutoren

Zur Untersuchung, wie die Ausführung asynchroner Funktionen tatsächlich abläuft, gehen wir am besten schrittweise vor und passen die zuvor dargestellte Konstruktion von block_on zunächst an die echten Schnittstellen an. Diese Schnittstellen sind technisch ein wenig komplizierter ausgestaltet, da sie zusätzlich die Programmierung eines Aufweck-Systems erlauben.

Das Aufweck-System können wir erst einmal außen vor lassen. Erwirkt wird dies durch die Festlegung eines Dummy-Wakers, dessen Aufweck-Operation wirkungslos bleibt.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Poll, Context, Wake, Waker};
use std::time::{Duration, Instant};
use std::thread::sleep;

struct DummyWaker;

impl Wake for DummyWaker {
    fn wake(self: Arc<Self>) {}
}

fn block_on<T>(f: impl Future<Output = T>) -> T {
    let mut f = Box::pin(f);
    let waker = Waker::from(Arc::new(DummyWaker));
    let context = &mut Context::from_waker(&waker);
    loop {
        match f.as_mut().poll(context) {
            Poll::Ready(x) => return x,
            Poll::Pending => {}
        }
        sleep(Duration::from_millis(100));
        println!("\t(tick)");
    }
}

struct Sleep {clock: Instant, time: u64}

impl Sleep {
    fn new_task(time: u64) -> Self {
        Self {clock: Instant::now(), time}
    }
}

impl Future for Sleep {
    type Output = ();
    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>)
    -> Poll<Self::Output>
    {
        if self.clock.elapsed().as_secs() < self.time {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

fn main() {
    let f1 = Sleep::new_task(2);
    block_on(f1);
}

Die Mechanismen sind bis auf die Schnittstellen bisher im Wesentlichen gleich geblieben. Gleichwohl liegt nun ein echtes, zur Ausführung asynchroner Funktionen befähigtes block_on vor:

async fn task1() {
    println!("Anfang");
    Sleep::new_task(2).await;
    println!("Fortsetzung");
    Sleep::new_task(2).await;
    println!("Ende");
}

fn main() {
    let f1 = task1();
    block_on(f1);
}

Als nächstes portieren wir die Konstruktion von join_all.

struct JoinAll<F: Future>(Vec<Option<Pin<Box<F>>>>);

fn join_all<I>(i: I) -> impl Future<Output = ()>
where I: IntoIterator, I::Item: Future<Output = ()>
{
    JoinAll(i.into_iter().map(|x| Some(Box::pin(x))).collect())
}

impl<F: Future<Output = ()>> Future for JoinAll<F> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>)
    -> Poll<Self::Output>
    {
        let mut empty = true;
        for x in &mut self.0 {
            if let Some(f) = x {
                empty = false;
                match f.as_mut().poll(context) {
                    Poll::Ready(()) => {*x = None;},
                    Poll::Pending => {}
                }
            }
        }
        if empty {Poll::Ready(())} else {Poll::Pending}
    }
}

async fn task(index: u32) {
    println!("Anfang ({})", index);
    Sleep::new_task(2).await;
    println!("Fortsetzung ({})", index);
    Sleep::new_task(2).await;
    println!("Ende ({})", index);
}

fn main() {
    let f1 = task(1);
    let f2 = task(2);
    block_on(join_all(vec![f1, f2]));
}

Literatur

  1. Amos: »Pin and suffering«. (28. März 2021).
  2. Philipp Oppermann: »Async/Await«. In: »Writing an OS in Rust«. (27. März 2020).
  3. Taylor Cramer, Aaron Turon, Lee Bernick et al.: »Asynchronous Programming in Rust«.
  4. Stjepan Glavina, Florian Gilcher, Yoshua Wuyts et al.: »Async programming in Rust with async-std«.