↑Programmieren in Rust
Einige Anwendungen erfordern den Umgang mit Prozeduren, die erst nach einer gewissen Zeit abgearbeitet sind. Das kann bspw. sein
Ein recht interessantes Beispiel ist für mich ein Funktionenplotter. Erfordert eine Funktion einen hohen Rechenaufwand, dauert es ein wenig bis der Graph vollständig gezeichnet ist. Würde das Programm solange warten, würde es um so schlimmer ruckeln, je höher die Rechenzeit ist. Die Idee ist nun, die Prozedur zu pausieren und nach neuer Abfrage von Nutzereingabe zu entscheiden ob die Prozedur abgebrochen oder weitergeführt werden soll.
Weil diese Beispiele alle recht komplizierte Technik enthalten können, simulieren wir so einen Ablauf zunächst am besten möglichst einfach mit einer Prozedur, die intern nichts anderes tut als eine bestimmte Zeit warten.
Wie modelliert man solche Prozeduren nun? Eine Möglichkeit wären Threads, die sich vor allem auch bei langwierigen Berechnungen wie dem Funktionenplotter anbieten würden. Der Grund dafür liegt darin, dass übermäßig lange Rechenzeit in einem Thread nicht dazu führt, dass der Hauptthread blockiert wird.
Um Threads geht es nun aber schon in einem anderen Kapitel.
Die Prozedur muss pausieren können. Demzufolge muss man die
Prozedur so oft aufrufen können, bis sie fertig ist. Den
Rückgabewert der Prozedur formulieren wir daher als eine
Enumeration Poll
in den Varianten Ready
und Pending
. Ist die Prozedur fertig, hat sie
den Wert Ready
, andernfalls Pending
.
Eine solche Prozedur bezeichnet man als Future.
Nun bedarf es auch noch einer Hilfsfunktion block_on
,
die die Prozedur solange in einer Schleife aufruft, bis sie
abgearbeitet ist. Damit der Prozessor dabei nicht heiß läuft,
fügen wir noch einen Aufruf sleep
ein, der das
Hauptprogramm für einen Sekundenbruchteil schlafen legt.
Die folgende Umsetzung gestaltet nun die bisherigen Überlegungen aus.
use std::time::{Duration, Instant}; use std::thread::sleep; enum Poll<T> {Ready(T), Pending} struct Future<T> {poll: Box<dyn FnMut() -> Poll<T>>} fn block_on<T>(mut f: Future<T>) -> T { loop { match (f.poll)() { Poll::Ready(x) => return x, Poll::Pending => {} } sleep(Duration::from_millis(100)); } } fn new_task(time: u64) -> Future<()> { let clock = Instant::now(); Future {poll: Box::new(move || { if clock.elapsed().as_secs() < time { Poll::Pending } else { Poll::Ready(()) } })} } fn main() { let f1 = new_task(2); block_on(f1); }
Gibt es nun zwei Aufgaben zu bewältigen, die jeweils zwei Sekunden dauern, braucht das Programm bei sequentieller Ausführung schon vier Sekunden.
fn main() { let f1 = new_task(2); block_on(f1); let f2 = new_task(2); block_on(f2); }
Um die beiden Aufgaben parallel ausführen zu können, schreiben
wir eine Funktion join_all
, die eine Liste von Aufgaben
entgegennimmt und einen Scheduler zurückgibt. Der Scheduler kümmert
sich solange um die Ausführung der Aufgaben, bis alle Aufgaben erledigt
sind. Wir formulieren auch den Scheduler als Future, denn dann können
wir block_on
wiederbenutzen.
fn join_all(a: Vec<Future<()>>) -> Future<()> { let mut a: Vec<_> = a.into_iter().map(Some).collect(); Future {poll: Box::new(move || { let mut empty = true; for x in &mut a { if let Some(f) = x { empty = false; if let Poll::Ready(()) = (f.poll)() {*x = None;} } } if empty {Poll::Ready(())} else {Poll::Pending} })} } fn main() { let f1 = new_task(2); let f2 = new_task(2); let s = join_all(vec![f1, f2]); block_on(s); }
Die Funktion join_all
ist so verallgemeinerbar,
dass sie auch die Werte der Futures zurückgeben kann.
fn join_all<T: 'static>(a: Vec<Future<T>>) -> Future<Vec<T>> { enum Either<X, Y> {Left(X), Right(Y)} let mut a: Vec<_> = a.into_iter().map(Either::Left).collect(); Future {poll: Box::new(move || { let mut empty = true; for x in &mut a { if let Either::Left(f) = x { empty = false; if let Poll::Ready(value) = (f.poll)() { *x = Either::Right(value); } } } match empty { false => Poll::Pending, true => Poll::Ready(std::mem::take(&mut a) .into_iter().map(|x| match x { Either::Right(value) => value, _ => unreachable!() }).collect()), } })} }
Wurde bisher eine gewöhnliche Funktion zur äußerlichen
Darstellung des Pausierens genutzt, wollen wir das nun
eigentlich innerhalb der Prozedur durchführen. Dafür bedarf es
allerdings der Semantik einer Koroutine, die man nicht über gewöhnliche
Funktionen ausdrücken kann. Eine Koroutine ist eine Prozedur, die
mit yield
an einer Stelle im Ablauf unterbrochen wird
und sich später an dieser Stelle fortführen lässt.
Asynchrone Funktionen bieten eine Abwandlung des Konzeptes einer
Koroutine. Anstelle von yield
gibt es bei einer
asynchronen Funktion eine Anweisung await
, die die
Fortführung des Programmablaufs der Funktion solange unterbricht, bis
vom Argument von await
ein Wert erhalten wurde.
Ab nun erforderlich ist die Einbindung der Bibliotheken ›futures‹ und ›async-std‹.
use std::time::Duration; use async_std::task; use futures::{future::join_all, executor::block_on}; async fn new_task(time: u64) { task::sleep(Duration::from_secs(time)).await; } fn main() { let f1 = new_task(2); let f2 = new_task(2); block_on(join_all(vec![f1, f2])); }
Eine asynchrone Funktion ist äquivalent zu einer gewöhnlichen
Funktion, die ein Future als asynchronen Closure-Block zurückgibt.
Außerdem ist das Argument von await
selbst ein
Future.
use std::future::Future; fn new_task(time: u64) -> impl Future<Output = ()> { async move {task::sleep(Duration::from_secs(time)).await;} }
Die Funktion join_all
ist leider mit zwei Problemen
behaftet. Zum einen kommt dort mit der Erzeugung von
vec![f1,f2]
offenbar eine unnötige Heap-Allokation vor.
Zum anderen müssen die Rückgabewerte von f1
und
f2
den gleichen Datentyp besitzen. Aus diesen Gründen gibt
es noch die Funktion join
.
use std::time::Duration; use async_std::task; use futures::{future::join, executor::block_on}; struct S1 {value: &'static str} struct S2 {value: &'static str} async fn task1(time: u64) -> S1 { task::sleep(Duration::from_secs(time)).await; S1 {value: "Wert 1"} } async fn task2(time: u64) -> S2 { task::sleep(Duration::from_secs(time)).await; S2 {value: "Wert 2"} } fn main() { let f1 = task1(2); let f2 = task2(2); let (s1, s2) = block_on(join(f1, f2)); println!("{}, {}", s1.value, s2.value); }
Analog gibt es join3
, join4
,
join5
und das variadische Makro join
,
das auch mehr als fünf Futures verbinden kann.
Beim Makro muss man
block_on(async {join!(f1, f2)})
schreiben.
Anders als eine gewöhnliche Funktion muss eine Koroutine ihre Variablenzustände auch während Unterbrechungen speichern. Eine gewöhnliche Funktion besitzt das Konzept der Unterbrechung nicht, – sie wird aufgerufen, für die Variablenzustände und für Verwaltungsinformation wird hierbei ein neuer Aktivierungsrecord als Stack frame im Aufrufstapel alloziert, und beim Verlassen der Funktion wird das Stack frame wieder freigegeben. Bei der Koroutine muss der Speicherbereich für die Variablenzustände solange zur Verfügung stehen, wie die Instanz der Koroutine existiert. Dieser Speicherbereich kann auf dem Aufrufstapel oder auf dem Haldenspeicher liegen.
Wie wir wissen besitzt Rust Zeigervariablen bzw. Referenzvariablen als Sprachmittel. Im Zusammenhang mit dem Speicherbereich der Variablenzustände tut sich nun ein Problem auf. Nämlich kann es ja sein, dass der Wert einer lokalen Variablen ein Zeiger auf eine andere lokale Variable in diesem Speicherbereich ist. Wir haben es in einem solchen Fall mit einer Selbstreferenz zu tun. Diese Situation kann man sich vorstellen als eine Struktur, die einen Zeiger enthält der gerade auf eine andere Variable der Struktur oder die Struktur selbst zeigt.
Würden wir die Struktur im Speicher verschieben, würde der Zeiger ungültig werden, – eine allgemeine Problematik bei Zeigern, bekannt als hängender Zeiger. Infolge dürften wir keinen Besitz über eine solche selbstreferenzielle Struktur bekommen, denn sonst wären wie bei jedem Wert Verschiebungen erlaubt. Bei gewöhnlichen Strukturen ist ja die Situation herstellbar, dass keine Zeiger auf die Struktur mehr vorliegen. Bei einer selbstreferenziellen Struktur ist dies ausgeschlossen, weil mit dem Besitz der Struktur der Zugriff auf die Zeigervariable einhergeht.
Es gibt mehrere technische Möglichkeiten, wie man dieser Misere entgehen kann:
Man hat sich für das Konzept der Anheftung entschieden, für die
der Typ Pin
geschaffen wurde.
Zur Untersuchung, wie die Ausführung asynchroner Funktionen
tatsächlich abläuft, gehen wir am besten schrittweise vor und
passen die zuvor dargestellte Konstruktion von block_on
zunächst an die echten Schnittstellen an. Diese Schnittstellen
sind technisch ein wenig komplizierter ausgestaltet, da sie
zusätzlich die Programmierung eines Aufweck-Systems erlauben.
Das Aufweck-System können wir erst einmal außen vor lassen. Erwirkt wird dies durch die Festlegung eines Dummy-Wakers, dessen Aufweck-Operation wirkungslos bleibt.
use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Poll, Context, Wake, Waker}; use std::time::{Duration, Instant}; use std::thread::sleep; struct DummyWaker; impl Wake for DummyWaker { fn wake(self: Arc<Self>) {} } fn block_on<T>(f: impl Future<Output = T>) -> T { let mut f = Box::pin(f); let waker = Waker::from(Arc::new(DummyWaker)); let context = &mut Context::from_waker(&waker); loop { match f.as_mut().poll(context) { Poll::Ready(x) => return x, Poll::Pending => {} } sleep(Duration::from_millis(100)); println!("\t(tick)"); } } struct Sleep {clock: Instant, time: u64} impl Sleep { fn new_task(time: u64) -> Self { Self {clock: Instant::now(), time} } } impl Future for Sleep { type Output = (); fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { if self.clock.elapsed().as_secs() < self.time { Poll::Pending } else { Poll::Ready(()) } } } fn main() { let f1 = Sleep::new_task(2); block_on(f1); }
Die Mechanismen sind bis auf die Schnittstellen bisher
im Wesentlichen gleich geblieben. Gleichwohl liegt nun ein
echtes, zur Ausführung asynchroner Funktionen befähigtes
block_on
vor:
async fn task1() { println!("Anfang"); Sleep::new_task(2).await; println!("Fortsetzung"); Sleep::new_task(2).await; println!("Ende"); } fn main() { let f1 = task1(); block_on(f1); }
Als nächstes portieren wir die Konstruktion von
join_all
.
struct JoinAll<F: Future>(Vec<Option<Pin<Box<F>>>>); fn join_all<I>(i: I) -> impl Future<Output = ()> where I: IntoIterator, I::Item: Future<Output = ()> { JoinAll(i.into_iter().map(|x| Some(Box::pin(x))).collect()) } impl<F: Future<Output = ()>> Future for JoinAll<F> { type Output = (); fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> { let mut empty = true; for x in &mut self.0 { if let Some(f) = x { empty = false; match f.as_mut().poll(context) { Poll::Ready(()) => {*x = None;}, Poll::Pending => {} } } } if empty {Poll::Ready(())} else {Poll::Pending} } } async fn task(index: u32) { println!("Anfang ({})", index); Sleep::new_task(2).await; println!("Fortsetzung ({})", index); Sleep::new_task(2).await; println!("Ende ({})", index); } fn main() { let f1 = task(1); let f2 = task(2); block_on(join_all(vec![f1, f2])); }