Programmieren in Rust

Nebenläufigkeit mit Strängen

Inhaltsverzeichnis

  1. Nebenläufigkeit
  2. Threads starten
  3. Kanäle
  4. Strang-lokale Variablen
  5. Wechselseitiger Ausschluss
  6. Verklemmungen
  7. Atomare Operationen
  8. Multiprocessing

Nebenläufigkeit

Bei einem nebenläufigen System sind Programmabläufe in Ausführungsstränge unterteilt, die gleichzeitig ausgeführt werden können.

Nebenläufigkeit lässt sich unterscheiden nach der Art der Unterbrechung. Es gibt präemptive und kooperative Unterbrechung.

Bei der präemptiven Unterbrechung werden nebenläufige Ausführungsstränge in einem bestimmten Zeitintervall von Außen durch einen Hypervisor unterbrochen, man spricht von einem sogenannten Interrupt. Daraufhin wird die Kontrolle an den Scheduler übergeben, der den nächsten abzuarbeitenden Ausführungsstrang auswählt. Die Ausführungsstränge werden hier als Threads bezeichnet.

Bei der kooperativen Unterbrechung sind die nebenläufigen Ausführungsstränge selbst für die Unterbrechung zuständig. Die Ausführungsstränge werden hier als Koroutinen oder asynchrone Funktionen bezeichnet. Auch hier lässt sich die Kontrolle an einen Scheduler übergeben, oder aber die aktuelle Koroutine wählt selbst aus, in welcher Koroutine es weitergeht.

Präemptive Unterbrechung hat den Vorteil, Ausführung auf mehreren Prozessorkernen bzw. Prozessoren (Multiprocessing) zu erlauben. Das ist ein Schlüsselkonzept zur Erhöhung der Rechenleistung, da sich Prozessoren nicht mit beliebig hoher Frequenz takten lassen.

Kooperative Unterbrechung hat den Vorteil, dass Race Conditions nicht implizit auftreten können, man daher keine erweiterten technischen Mittel zur Verhinderung dieser benötigt.

Naive Programmierung mit Strängen kann zur Konstruktion sogenannter Wettlauf-Umstände (race conditions) führen. Fehler dieser Art sind dafür berüchtigt, besonders schwierig aufspürbar zu sein. Aus diesem Grund gilt die uneingeschränkte nebenläufige Programmierung in Sprachen mit unzureichendem Typsystem als eines der anspruchvollsten Probleme, sie ist hochgradig tückisch.

Ein Wettlauf-Umstand ist wie folgt zu erklären. Ein Programm besitze zwei oder mehr nebenläufige Ausführungstränge, welche auf einen gemeinsamen Speicherbereich zugreifen. Solange nun ein Schreib- oder Lesevorgang in einem der Stränge nicht vollständig abgeschlossen ist, befindet sich das Laufzeitsystem bezüglich diesem Speicherbereich für alle anderen Stränge in einem ungültigen Zustand. Ein Wettlauf-Umstand besteht, wenn einer der anderen Stränge Zugriff auf den im ungültigen Zustand befindlichen Speicherbereich erhalten kann.

Threads starten

Ein Metronom gestattet eine recht gute Verdeutlichung von Strängen und wie sie gestartet werden. Zunächst gibt es zur Veranschaulichung eine Funktion metronome, in welcher in einer endlosen Schleife in einem bestimmten Zeitintervall ein Klang bzw. eine Nachricht ausgegeben wird.

Wir wollen nun, dass zwei dieser Schleifen nebeneinanderher laufen. Dazu startet man zwei Stränge und ruft in beiden jeweils metronome auf.

use std::{thread, thread::sleep, time::Duration};

fn metronome(msg: &str, time: u64) {
    loop {
        println!("{}", msg);
        sleep(Duration::from_millis(time));
    }
}

fn main() {
    let t1 = thread::spawn(|| {
        metronome("tick", 500);
    });
    let t2 = thread::spawn(|| {
        metronome("*ping*", 2000);
    });
    t1.join().unwrap();
    t2.join().unwrap();
}

Kanäle

Bisher sind alle Threads parallel gelaufen ohne sich zu beeinflussen. In komplizierteren nebenläufigen Programmen kann so ein Einfluss aber vorkommen. Das typische einführende Beispiel besteht aus einem Produzenten und einem Konsumenten. Der Produzent sendet regelmäßig Daten, welche der Konsument empfängt und verarbeitet.

Zur Umsetzung bedient man sich eines sogenannten Kanals. Ein Kanal ist eine gerichtete Verbindung zur Übertragung von Daten zwischen zwei Threads.

use std::{thread, thread::sleep, time::Duration};
use std::sync::mpsc::{channel, Sender, Receiver};

fn produce(transmitter: Sender<String>) {
    for count in 0.. {
        sleep(Duration::from_millis(500));
        let value = format!("Daten Nr. {}", count);
        transmitter.send(value).unwrap();
    }
}

fn consume(receiver: Receiver<String>) {
    loop {
        sleep(Duration::from_millis(100));
        let value = receiver.recv().unwrap();
        println!("Erhalte: {}", value);
    }
}

fn main() {
    let (transmitter, receiver) = channel();
    let t1 = thread::spawn(|| {
        produce(transmitter);
    });
    let t2 = thread::spawn(|| {
        consume(receiver);
    });
    t1.join().unwrap();
    t2.join().unwrap();
}

Die Schleife des Empfängers arbeitet hier in viel kürzeren Zeitintervallen. Der aufmerksame Leser stellt sich nun natürlich die Frage, was dann beim Aufruf von recv geschieht. Mit dem Wissen, dass das Programm korrekt arbeitet, ist die einzig mögliche Antwort darauf, dass recv solange den Thread t2 blockiert, bis neue Daten von t1 gekommen sind. Den Aufruf von sleep im Empfänger könnte man sich hier also auch sparen, da der Thread durch recv hinreichend blockiert würde.

Strang-lokale Variablen

Die Realisierung globaler Variablen ist in Rust mit dezenten Schwierigkeiten verbunden. Zunächst muss man bedenken, dass eine solche Variable nicht ohne weiteres veränderbar sein darf, denn ein Unterprogramm kann nicht wissen, ob es die erste und einzige alleinige Leihgabe der globalen Variable bezieht. Ergo muss man die Variable in Cell oder RefCell einhüllen.

Obendrein kann der Zugriff auf die globale Variable durch zwei oder mehr Stränge erfolgen. Die Typen Rc, Cell und RefCell gestatten den Zugriff durch mehr als einen Strang jedoch nicht, was durch das Abhandensein des Marker-Traits Sync zum Ausdruck kommt. Bei Rc und RefCell besteht dieses Problem deshalb, weil ihre internen Referenzzähler nicht-atomar arbeiten.

Man kann dieser Problematik aus dem Weg gehen, indem man für jeden Strang eine separate Instanz der globalen Variable existieren lässt. Eine solche Variable wird Strang-lokal genannt. Das folgende Programm zeigt die Vorgehensweise anhand eines globalen Zählers.

use std::{thread, cell::Cell};

thread_local! {
    static COUNTER: Cell<u32> = Cell::new(0);
}

fn count(thread_id: u32) {
    COUNTER.with(|c| {
        println!("Strang {}: {}", thread_id, c.get());
        c.set(c.get() + 1);
    });
}

fn main() {
    let t1 = thread::spawn(|| {
        for _ in 0..4 {count(1);}
    });
    let t2 = thread::spawn(|| {
        for _ in 0..4 {count(2);}
    });
    t1.join().unwrap();
    t2.join().unwrap();
}

Die Ausgabe dieses Programms ist

Strang 1: 0
Strang 1: 1
Strang 1: 2
Strang 1: 3
Strang 2: 0
Strang 2: 1
Strang 2: 2
Strang 2: 3

wobei die Stränge allerdings in einer beliebigen Reihenfolge auftreten können. Ich meine damit, die Zähler verbleiben in der Reihenfolge von null bis drei rauf, aber welcher der Zähler zuerst oder wann aufgerufen wird, ist unbestimmt.

Wie man an der Ausgabe sieht, liegt der Zähler COUNTER nicht nur einmal im Programm vor, sondern einmal pro Strang.