Inhaltsverzeichnis
Parallelverarbeitung
Hinweis: Parallele Algorithmen sind ab C++17 in <algorithm> und <numeric> deklariert. Im folgenden geht es um nebenläufige Abarbeitung.
Leichtgewichtige Prozesse
Threads starten
Mit std::thread
starten zeitlich parallel laufende Arbeitsstränge durch Übernahme einer Funktion.
Deren genaue Reihenfolge ist unbestimmt. Sie können sich auch zeitlich überlappen.
#include <chrono> #include <iostream> #include <sstream> #include <string> #include <thread> void ausgabe(std::string msg) { std::ostringstream out; out << std::this_thread::get_id() << " : " << msg; std::cout << out.str(); // get message out in one piece } void nebeneinander() { std::thread t1(ausgabe, "Gleichzeitig?\n"); std::thread t2(ausgabe, "Kann ich nicht.\n"); ausgabe("Foyer des Arts\n"); // ...
Die parallel abzuarbeitenden Funktionen können auch Funktoren oder Lambda-Ausdrücke sein.
Dem Konstruktor können nach dem Funktionsnamen Wertparameter für deren Aufruf mitgegeben werden.
Referenzen auf gemeinsam genutzte Ressourcen werden mit std::ref(variable)
verpackt.
Threads beenden
Ein Thread endet, sobald die übernommene Funktion verlassen wurde. Der aufrufende Thread (Besitzer) kann warten, bis der von ihm gestartete parallele Ablauffaden abgearbeitet ist und ihn dann zusammenführen oder ihn vorher abkoppeln.
// ... t1.join(); t2.detach(); std::this_thread::sleep_for(std::chrono::seconds(42)); }
std::this_thread
erlaubt, den aktiven Thread mit sleep_for(duration)
für eine bestimmte Zeitspanne
oder mit sleep_until(time_point)
bis zu einem bestimmten Zeitpunkt schlafen zu legen.
Mit der Methode yield()
kann der auf den Rest seiner Zeitscheibe verzichten,
damit andere Threads aktiv werden können.
Threads besitzen Verschiebesemantik: sie lassen sich nicht kopieren.
Mit der Methode joinable()
kann erfragt werden, ob der Thread noch zusammengeführt werden kann.
Mitunter ist es sinnvoll, die Anzahl der Prozessoren, Kerne oder Hyperthreads des Systems mit
std::thread::hardware_concurrency()
zu ermitteln.
Wenn dies scheitert, liefert die Funktion 0
.
Gemeinsame Ressourcen
Wettrennen
Wettrennen (race conditions) beim Zugriff auf gemeinsam genutzte,
veränderbare Ressourcen führen zu undefiniertem Programmverhalten.
Kritische Bereiche werden durch gegenseitigen Ausschluss (mutual exclusion) mit
std::mutex
-Variablen vor dem gleichzeitigen Zugriff durch parallel laufende Threads gesichert.
Am Ende des kritischen Bereichs wird die Mutex-Variable wieder freigegeben.
Nun kann der nächste Thread diesem Block betreten.
Der std::recursive_mutex
wurde für rekursive Aufrufe entworfen.
Die Typen std::timed_mutex
und std::recursive_timed_mutex
verfügen über Methoden
trylock_for(duration)
und try_lock_until(time_point)
,
die wie auch try_lock()
einen Wahrheitswert liefern, ob der Mutex gesperrt werden konnte.
Mutex-Methoden kann man selbst aufrufen — es ist aber einfacher, Mutexe über Sperren zu steuern.
Sperren
Sperren std__lock_guard<Mutex>
bedienen die Methoden lock()
und unlock()
der Mutex-Variable in ihrem Konstruktor bzw. Destruktor.
#include <mutex> void ausgabe(std::string msg) { static std::mutex mutex; auto myLock = std::lock_guard(mutex); // C++17 syntax std::cout << msg << '\n'; }
Die Sperre std::unique_lock<Mutex>
bietet größere Freiheiten beim Einrichten der Sperre (siehe Tabelle),
die Methoden lock()
, unlock()
und try_lock()
sind zugänglich.
Bei timed_mutex
-Variablen sind try_lock_for(duration)
und try_lock_until(time_point)
nutzbar.
Der Sperrenzustand lässt sich mit if (myLock)
und mit der Methode owns_lock()
erfragen.
Der Besitz an einem Mutex wird durch release()
freigegeben und liefert einen Zeiger auf den Mutex.
Konstruktorparameter | ||
---|---|---|
auch lock_guard | (mutex) | übernehmen und sperren |
(mutex, std::adopt_lock) | schon gesperrtes Mutex übernehmen | |
nur unique_lock | (mutex, std::defer_lock) | noch nicht sperren |
(mutex, std::try_to_lock) | sperren, wenn Mutex frei ist | |
(timed_mutex, time_point) | maximal bis zu diesem Zeitpunkt auf Sperre warten | |
(timed_mutex, duration) | maximal Zeitdauer auf Sperre warten | |
(unique_lock) | Sperre übernehmen (Verschiebesemantik) |
Bedingungen
Bedingungsvariablen std::condition_variable
können belegte Sperren zeitweise wieder freigeben,
damit andere Threads weiterarbeiten können.
Ist die zur Fortführung des Threads notwendige Bedingung erfüllt,
kann dieser von einem anderen durch die Methoden notify_one()
bzw. notify_all()
wieder erweckt werden,
die Sperre zurück erlangen und seine Arbeit fortsetzen.
#include <condition_variable> // double accounts[BANKSIZE]; simuliert Geldbeträge std::mutex mutex; std::condition_variable sufficientFunds; void transfer(int from, int to, double amount) { std::unique_lock<std::mutex> myLock(mutex); sufficientFunds.wait(myLock, [=]{ return amount <= accounts[from]; }); accounts[from] -= amount; accounts[to] += amount; sufficientFunds.notifyAll(); }
Dabei ist variable.wait(lock, predicate)
gleichwertig zu
while (!predicate()) variable.wait(lock);
Für timed_mutex
-Bedingungen sind auch wait_for(lock, duration, predicate)
und wait_until(lock, time_point, predicate)
zulässig.
Verklemmung
Eine Verklemmung (deadlock) entsteht, wenn mehrere Sperren benötigt werden, die jedoch verschiedenen Threads zugeteilt wurden und durch zyklische Abhängigkeit keiner der beteiligten Threads fortfahren kann:
Account a, b; std::thread t1(give, std::ref(a), std::ref(b), 5); // sperrt a, dann b std::thread t2(give, std::ref(b), std::ref(a), 5); // sperrt b, dann a
Die Funktionen std::lock(sperrenliste)
und std::try_lock(sperrenliste)
erlangen alle Sperren der Liste unabhängig von der Reihenfolge ohne Verklemmungsrisiko.
Mit der Klasse std::scoped_lock<Mutexes…>
gelingt dies noch einfacher:
void give(Account& from, Account& to, double money) { // std::unique_lock<std::mutex> fromLock(from.mutex, std::defer_lock); // std::unique_lock<std::mutex> toLock (to.mutex, std::defer_lock); // std::lock(fromLock, toLock); std::scoped_lock lock{from.mutex, to.mutex}; // ab c++17 from.take(money); to.add(money); }
Damit sind nicht alle möglichen Ursachen von Verklemmungen beseitigt. Sie können ebenso eintreten, wenn die erwartete Bedingung nie eintritt oder der wartende Thread nicht benachrichtigt wird. Eine "Liquiditätsklemme" im obigen Beispiel ist nur durch Bedingungen an das Gesamtsystem vermeidbar.
Threadsichere Initialisierung
Globale Daten werden mit const_expr
-Konstruktor threadsicher initialisiert.
Lokale static
-Variable erhalten ebenfalls threadsicher ihre Anfangswerte beim ersten Funktionsaufruf.
Betritt ein zweiter Thread die Funktion vor Abschluss der Initialisierung, wird er bis zu deren Ende blockiert.
Als thread_local
gekennzeichnete Variablen exitieren unabhängig voneinander in jedem Thread einmal.
Auch sie werden beim ersten Aufruf im Thread sicher initialisiert.
Mit std::call_once(once_flag, funktor, parameter)
lassen sich Anfangswertbelegungen auch verzögert vornehmen:
Account a, b; std::once_flag geschenkt; void erstausstattung(double amount) { a.add(amount); b.add(amount); } void work() { std::call_once(geschenkt, erstausstattung, 100.0); // ... jetzt kann Zahlungsverkehr beginnen }
Die optionalen Argumente werden wie bei std::thread
als Werte übergeben, Referenzen sind mit std::ref(variable)
zu kapseln.
Die Aufgabe wird für das gleiche std::once_flag
nur einmal erfolgreich ausgeführt.
Wirft die einmalig aufzurufende Funktion allerdings eine Ausnahme,
gilt die Aufgabe als nicht erledigt, die Ausnahme wird an den Aufrufer weitergereicht.
Ein weiterer Aufruf von std::call_once()
kann erfolgen.1)
Zeitversetzte Auswertung
Künftige Ergebnisse
Durch std::future<ResultType>
werden langwierige Berechnungen in einen nebenläufigen Prozess verpackt:
#include <future> #include <iostream> int frage() { /* es dauert ... */ return 42; } void liefern_auf_bestellung() { std::future<int> antwort = std::async(frage); std::cout << "Die Antwort lautet: ..."; std::cout << antwort.get() <<'\n'; }
Der Aufruf std::async(funktor, argumentliste)
ist die einfachste Art, einen asynchronen Ablauf-Faden zu starten.
Die Abfrage des Ergebnisses blockiert, falls die Berechnung noch andauert.
Vor dem Funktor kann als Startverhalten std::launch::async()
oder std::launch::deferred
angegeben werden.
Ohne diese Angabe ist das Startverhalten der Implementierung überlassen.
Die Methode valid()
erfragt, ob das Ergebnis schon vorliegt.
Der Wert kann einmalig gelesen werden,
danach liefert valid()
wieder false
,
wait()
blockiert bis zur Fertigstellung der Berechnung,
wait_for(duration)
und wait_until(time_point)
liefern einen der Werte
ready
, deferred
oder timeout
aus std::future_status
.
Ein std::shared_future<ResultType>
erlaubt mehrmaligen get()
-Zugriff. Die Methode share()
erzeugt ein solches Objekt.
Aufträge
Funktoren werden in
std::packaged_task<Result(Parameter)>
verpackt und durch einen Klammeroperator mit geeigneten Parametern gestartet.
Ein Auftrag kann zur Ausführung an einen anderen Thread übertragen werden.
Das Ergebnis oder eine geworfene Ausnahme werden über ein mit get_future()
erzeugtes std::future<Result>
abgeholt:
int frage1(int n) { /* schwer zu tun */ return 6*n; } void schick_mir_eine_antwort() { std::packaged_task<int(int)> auftrag(frage1); std::future<int> antwort = auftrag.get_future(); std::thread t(std::move(auftrag), 7); // startet auftrag(7) t.detach(); std::cout << antwort.get() <<'\n'; }
Versprechen
Versprechen std::promise<ResultType>
bilden im Verbund mit std::future<ResultType>
den flexibelsten Kommunikationskanal zwischen den nebenläufigen Prozessen.
Der Zugriff auf das künftige Resultat blockiert, solange kein Wert oder keine Ausnahme hinterlegt wurde.
Danach kann die aufgerufene Prozedur noch weiterlaufen.
void ansage(std::promise<int>& p) { try { std::cout << "Die Antwort wird euch nicht gefallen ... "; p.set_value(42); // ... weitere 6,5 Mio Jahre nachdenken ... } catch(...) { p.set_exception(std::current_exception()); } } void zur_ablage() { std::promise<int> briefkasten; std::future<int> antwort = briefkasten.get_future(); std::thread t(&ansage, std::ref(briefkasten)); std::cout << antwort.get() << '\n'; t.join(); }
Sperrenfreie Kommunikation
Sperren sind eine kostspielige Angelegenheit, std::atomic<T>
erlauben einen sicheren, sperrenfreien Datenaustausch für einzelne Werte zwischen Threads:
#include <atomic> int countdown(std::atomic<int>& jobs) { int done = 0, nr; while ( (nr = int(--jobs)) >= 0) ++done; return done; } void gemeinsam_gehts_besser() { int n = 1000000; std::atomic<int> jobs(n); auto mode = std::launch::async; auto cnt1 = std::async(mode, countdown, std::ref(jobs)); auto cnt2 = std::async(mode, countdown, std::ref(jobs)); int a = cnt1.get(), b = cnt2.get(); std::cout << a << " + " << b << " = " << a+b << '\n'; assert(a+b == n); }