namespace cpp

C++ lernen, kennen, anwenden

Benutzer-Werkzeuge

Webseiten-Werkzeuge


kennen:parallelverarbeitung

Parallelverarbeitung

Leichtgewichtige Prozesse

Threads starten

Mit std::thread starten parallel laufende Arbeitsstränge durch Übernahme einer Funktion.

#include <chrono>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
 
void ausgabe(std::string msg) 
{ 
  std::ostringstream out;
  out << std::this_thread::get_id() << " : " << msg;
  std::cout << out.str(); // get message out in one piece
}
 
void nebeneinander()
{
  std::thread t1(ausgabe, "Gleichzeitig?\n");
  std::thread t2(ausgabe, "Kann ich nicht.\n");
  ausgabe("Foyer des Arts\n");
  // ...

Die parallel abzuarbeitende Funktion kann auch ein Funktor oder Lambda-Ausdruck sein. Dem Konstruktor können nach dem Funktionsnamen Wertparameter für deren Aufruf mitgegeben werden. Referenzen auf gemeinsam genutzte Ressourcen werden mit std::ref(variable) verpackt.

Threads beenden

Ein Thread endet, sobald die übernommene Funktion verlassen wurde. Der aufrufende Thread (Besitzer) kann warten, bis der von ihm gestartete parallele Ablauffaden abgearbeitet ist und ihn dann zusammenführen oder ihn vorher abkoppeln.

  // ...
  t1.join();
  t2.detach();
  std::this_thread::sleep_for(std::chrono::seconds(42));
}  

std::this_thread erlaubt, den aktiven Thread mit sleep_for(duration) für eine bestimmte Zeitspanne oder mit sleep_until(time_point) bis zu einem bestimmten Zeitpunkt schlafen zu legen. Mit der Methode yield() kann der auf den Rest seiner Zeitscheibe verzichten, damit andere Threads aktiv werden können.

Threads besitzen Verschiebesemantik: sie lassen sich nicht kopieren. Mit der Methode joinable() kann erfragt werden, ob der Thread noch zusammengeführt werden kann.

Mitunter ist es sinnvoll, die Anzahl der Prozessoren, Kerne oder Hyperthreads des Systems mit std::thread::hardware_concurrency() zu ermitteln. Wenn dies scheitert, liefert die Funktion 0.

Gemeinsame Ressourcen

Wettrennen

Wettrennen (race conditions) beim Zugriff auf gemeinsam genutzte, veränderbare Ressourcen führen zu undefiniertem Programmverhalten. Kritische Bereiche werden durch gegenseitigen Ausschluss (mutual exclusion) mit std::mutex-Variablen vor dem gleichzeitigen Zugriff durch parallel laufende Threads gesichert. Am Ende des kritischen Bereichs wird die Mutex-Variable wieder freigegeben. Nun kann der nächste Thread diesem Block betreten.

Der std::recursive_mutex wurde für rekursive Aufrufe entworfen. Die Typen std::timed_mutex und std::recursive_timed_mutex verfügen über Methoden trylock_for(duration) und try_lock_until(time_point), die wie auch try_lock() einen Wahrheitswert liefern, ob der Mutex gesperrt werden konnte. Mutex-Methoden kann man selbst aufrufen — es ist aber einfacher, Mutexe über Sperren zu steuern.

Sperren

Sperren std__lock_guard<Mutex> bedienen die Methoden lock() und unlock() der Mutex-Variable in ihrem Konstruktor bzw. Destruktor.

#include <mutex>
 
void ausgabe(std::string msg)
{
  static std::mutex mutex;
  auto myLock = std::lock_guard(mutex); // C++17 syntax
  std::cout << msg << '\n';
}

Die Sperre std::unique_lock<Mutex> bietet größere Freiheiten beim Einrichten der Sperre (siehe Tabelle), die Methoden lock(), unlock() und try_lock() sind zugänglich.

Bei timed_mutex-Variablen sind try_lock_for(duration) und try_lock_until(time_point) nutzbar.

Der Sperrenzustand lässt sich mit if (myLock) und mit der Methode owns_lock() erfragen. Der Besitz an einem Mutex wird durch release() freigegeben und liefert einen Zeiger auf den Mutex.

Konstruktorparameter
auch lock_guard(mutex)übernehmen und sperren
(mutex, std::adopt_lock)schon gesperrtes Mutex übernehmen
nur unique_lock(mutex, std::defer_lock)noch nicht sperren
(mutex, std::try_to_lock)sperren, wenn Mutex frei ist
(timed_mutex, time_point)maximal bis zu diesem Zeitpunkt auf Sperre warten
(timed_mutex, duration)maximal Zeitdauer auf Sperre warten
(unique_lock)Sperre übernehmen (Verschiebesemantik)

Bedingungen

Bedingungsvariablen std::condition_variable können belegte Sperren zeitweise wieder freigeben, damit andere Threads weiterarbeiten können. Ist die zur Fortführung des Threads notwendige Bedingung erfüllt, kann dieser von einem anderen durch die Methoden notify_one() bzw. notify_all() wieder erweckt werden, die Sperre zurück erlangen und seine Arbeit fortsetzen.

#include <condition_variable>
 
// double accounts[BANKSIZE]; simuliert Geldbeträge 
std::mutex mutex;
std::condition_variable sufficientFunds;
 
void transfer(int from, int to, double amount)
{
  std::unique_lock<std::mutex> myLock(mutex);
  sufficientFunds.wait(myLock, [=]{ return amount <= accounts[from]; });
 
  accounts[from] -= amount;
  accounts[to]   += amount;
 
  sufficientFunds.notifyAll();
}

Dabei ist variable.wait(lock, predicate) gleichwertig zu

  while (!predicate()) variable.wait(lock);

Für timed_mutex-Bedingungen sind auch wait_for(lock, duration, predicate) und wait_until(lock, time_point, predicate) zulässig.

Verklemmung

Eine Verklemmung (deadlock) entsteht, wenn mehrere Sperren benötigt werden, die jedoch verschiedenen Threads zugeteilt wurden und durch zyklische Abhängigkeit keiner der beteiligten Threads fortfahren kann:

Account a, b;
std::thread t1(give, std::ref(a), std::ref(b), 5); // sperrt a, dann b
std::thread t2(give, std::ref(b), std::ref(a), 5); // sperrt b, dann a

Die Funktionen std::lock(sperrenliste) und std::try_lock(sperrenliste) erlangen alle Sperren der Liste unabhängig von der Reihenfolge ohne Verklemmungsrisiko. Mit der Klasse std::scoped_lock<Mutexes…> gelingt dies noch einfacher:

void give(Account& from, Account& to, double money)
{
  // std::unique_lock<std::mutex> fromLock(from.mutex, std::defer_lock);
  // std::unique_lock<std::mutex> toLock  (to.mutex,   std::defer_lock);
  // std::lock(fromLock, toLock);
  std::scoped_lock lock{from.mutex, to.mutex};  // ab c++17
  from.take(money);
  to.add(money);
}

Damit sind nicht alle möglichen Ursachen von Verklemmungen beseitigt. Sie können ebenso eintreten, wenn die erwartete Bedingung nie eintritt oder der wartende Thread nicht benachrichtigt wird. Eine "Liquiditätsklemme" im obigen Beispiel ist nur durch Bedingungen an das Gesamtsystem vermeidbar.

Threadsichere Initialisierung

Globale Daten werden mit const_expr-Konstruktor threadsicher initialisiert. Lokale static-Variable erhalten ebenfalls threadsicher ihre Anfangswerte beim ersten Funktionsaufruf. Betritt ein zweiter Thread die Funktion vor Abschluss der Initialisierung, wird er bis zu deren Ende blockiert.

Als thread_local gekennzeichnete Variablen exitieren unabhängig voneinander in jedem Thread einmal. Auch sie werden beim ersten Aufruf im Thread sicher initialisiert.

Mit std::call_once(once_flag, funktor, parameter) lassen sich Anfangswertbelegungen auch verzögert vornehmen:

Account a, b;
std::once_flag geschenkt;
 
void erstausstattung(double amount)
{
  a.add(amount);
  b.add(amount);
}
 
void work()
{
  std::call_once(geschenkt, erstausstattung, 100.0);
 
  // ... jetzt kann Zahlungsverkehr beginnen
}

Die optionalen Argumente werden wie bei std::thread als Werte übergeben, Referenzen sind mit std::ref(variable) zu kapseln. Die Aufgabe wird für das gleiche std::once_flag nur einmal erfolgreich ausgeführt. Wirft die einmalig aufzurufende Funktion allerdings eine Ausnahme, gilt die Aufgabe als nicht erledigt, die Ausnahme wird an den Aufrufer weitergereicht. Ein weiterer Aufruf von std::call_once() kann erfolgen.1)

Zeitversetzte Auswertung

Künftige Ergebnisse

Durch std::future<ResultType> werden langwierige Berechnungen in einen nebenläufigen Prozess verpackt:

#include <future>
#include <iostream>
 
int frage() { /* es dauert ... */ return 42; }
 
void liefern_auf_bestellung()
{
  std::future<int> antwort = std::async(frage);
  std::cout << "Die Antwort lautet: ...";
  std::cout << antwort.get() <<'\n';
}

Der Aufruf std::async(funktor, argumentliste) ist die einfachste Art, einen asynchronen Ablauf-Faden zu starten. Die Abfrage des Ergebnisses blockiert, falls die Berechnung noch andauert. Vor dem Funktor kann als Startverhalten std::launch::async() oder std::launch::deferred angegeben werden. Ohne diese Angabe ist das Startverhalten der Implementierung überlassen.

Die Methode valid() erfragt, ob das Ergebnis schon vorliegt. Der Wert kann einmalig gelesen werden, danach liefert valid() wieder false, wait() blockiert bis zur Fertigstellung der Berechnung, wait_for(duration) und wait_until(time_point) liefern einen der Werte ready, deferred oder timeout aus std::future_status.

Ein std::shared_future<ResultType> erlaubt mehrmaligen get()-Zugriff. Die Methode share() erzeugt ein solches Objekt.

Aufträge

Funktoren werden in std::packaged_task<Result(Parameter)> verpackt und durch einen Klammeroperator mit geeigneten Parametern gestartet. Ein Auftrag kann zur Ausführung an einen anderen Thread übertragen werden. Das Ergebnis oder eine geworfene Ausnahme werden über ein mit get_future() erzeugtes std::future<Result> abgeholt:

int frage1(int n) { /* schwer zu tun */ return 6*n; }
 
void schick_mir_eine_antwort()
{
  std::packaged_task<int(int)> auftrag(frage1);
  std::future<int> antwort = auftrag.get_future();
  std::thread t(std::move(auftrag), 7);  // startet auftrag(7)
  t.detach();  
  std::cout << antwort.get() <<'\n';
}

Versprechen

Versprechen std::promise<ResultType> bilden im Verbund mit std::future<ResultType> den flexibelsten Kommunikationskanal zwischen den nebenläufigen Prozessen. Der Zugriff auf das künftige Resultat blockiert, solange kein Wert oder keine Ausnahme hinterlegt wurde. Danach kann die aufgerufene Prozedur noch weiterlaufen.

void ansage(std::promise<int>& p)
{
  try
  {
    std::cout << "Die Antwort wird euch nicht gefallen ... ";
    p.set_value(42);
    // ... weitere 6,5 Mio Jahre nachdenken ...
  }
  catch(...)
  {
    p.set_exception(std::current_exception());  
  }
}
 
void zur_ablage()
{
  std::promise<int> briefkasten;
  std::future<int> antwort = briefkasten.get_future();
  std::thread t(&ansage, std::ref(briefkasten));
  std::cout << antwort.get() << '\n';
  t.join();
}

Sperrenfreie Kommunikation

Sperren sind eine kostspielige Angelegenheit, std::atomic<T> erlauben einen sicheren, sperrenfreien Datenaustausch für einzelne Werte zwischen Threads:

#include <atomic>
 
int countdown(std::atomic<int>& jobs)
{
  int done = 0, nr;
  while ( (nr = int(--jobs)) >= 0) ++done;
  return done;
}
 
void gemeinsam_gehts_besser()
{
  int n = 1000000;
  std::atomic<int> jobs(n);
  auto mode = std::launch::async;
  auto cnt1 = std::async(mode, countdown, std::ref(jobs));
  auto cnt2 = std::async(mode, countdown, std::ref(jobs));
  int a = cnt1.get(), b = cnt2.get();
 
  std::cout << a << " + " << b << " = " << a+b << '\n';
  assert(a+b == n);
}
1)
Leider bleibt durch einen Fehler in der Implementierung von libstdc++ ein mit g++ übersetztes Programm dann hängen. (Stand: Januar 2017)
kennen/parallelverarbeitung.txt · Zuletzt geändert: 2018-04-27 09:28 von rrichter