namespace cpp {}

C++ lernen, kennen, anwenden

Benutzer-Werkzeuge

Webseiten-Werkzeuge


kennen:parallelverarbeitung

Unterschiede

Hier werden die Unterschiede zwischen zwei Versionen angezeigt.


kennen:parallelverarbeitung [2020-01-03 16:30] (aktuell) – angelegt - Externe Bearbeitung 127.0.0.1
Zeile 1: Zeile 1:
 +====== Parallelverarbeitung ======
 +
 +Hinweis:
 +[[kennen:parallel_algorithms]] sind ab C++17 in [[.:include:algorithm]] und [[.:include:numeric]] deklariert.
 +Im folgenden geht es um //nebenläufige// Abarbeitung.
 +
 +
 +===== Leichtgewichtige Prozesse =====
 +==== Threads starten ====
 +Mit ''std::thread'' starten zeitlich parallel laufende Arbeitsstränge durch Übernahme einer Funktion.
 +Deren genaue Reihenfolge ist unbestimmt. Sie können sich auch zeitlich überlappen.
 +<code cpp>
 +#include <chrono>
 +#include <iostream>
 +#include <sstream>
 +#include <string>
 +#include <thread>
 + 
 +void ausgabe(std::string msg) 
 +
 +  std::ostringstream out;
 +  out << std::this_thread::get_id() << " : " << msg;
 +  std::cout << out.str(); // get message out in one piece
 +}
 + 
 +void nebeneinander()
 +{
 +  std::thread t1(ausgabe, "Gleichzeitig?\n");
 +  std::thread t2(ausgabe, "Kann ich nicht.\n");
 +  ausgabe("Foyer des Arts\n");
 +  // ...
 +</code>
 +Die parallel abzuarbeitenden Funktionen können auch Funktoren oder Lambda-Ausdrücke sein.
 +Dem Konstruktor können nach dem Funktionsnamen Wertparameter für deren Aufruf mitgegeben werden.
 +Referenzen auf gemeinsam genutzte Ressourcen werden mit ''std::ref(variable)'' verpackt.
 +==== Threads beenden ====
 +Ein Thread endet, sobald die übernommene Funktion verlassen wurde. 
 +Der aufrufende Thread (//Besitzer//) kann warten, 
 +bis der von ihm gestartete parallele Ablauffaden abgearbeitet ist 
 +und ihn dann zusammenführen oder ihn vorher abkoppeln.
 +
 +<code cpp>
 +  // ...
 +  t1.join();
 +  t2.detach();
 +  std::this_thread::sleep_for(std::chrono::seconds(42));
 +}  
 +</code>
 +''std::this_thread'' erlaubt, den aktiven Thread mit ''sleep_for(duration)'' für eine bestimmte Zeitspanne
 +oder mit ''sleep_until(time_point)'' bis zu einem bestimmten Zeitpunkt schlafen zu legen.
 +Mit der Methode ''yield()'' kann der auf den Rest seiner Zeitscheibe verzichten,
 +damit andere Threads aktiv werden können.
 +
 +Threads besitzen Verschiebesemantik: sie lassen sich nicht kopieren.
 +Mit der Methode ''joinable()'' kann erfragt werden, ob der Thread noch zusammengeführt werden kann.
 +
 +Mitunter ist es sinnvoll, die Anzahl der Prozessoren, Kerne oder Hyperthreads des Systems mit
 +''std::thread::hardware_concurrency()'' zu ermitteln.
 +Wenn dies scheitert, liefert die Funktion ''0''.
 +
 +
 +
 +===== Gemeinsame Ressourcen =====
 +==== Wettrennen ====
 +//Wettrennen// (race conditions) beim Zugriff auf gemeinsam genutzte, 
 +veränderbare Ressourcen führen zu undefiniertem Programmverhalten.
 +Kritische Bereiche werden durch gegenseitigen Ausschluss (//mutual exclusion//) mit
 +''std::mutex''-Variablen vor dem gleichzeitigen Zugriff durch parallel laufende Threads gesichert.
 +Am Ende des kritischen Bereichs wird die Mutex-Variable wieder freigegeben.
 +Nun kann der nächste Thread diesem Block betreten.
 +
 +Der ''std::recursive_mutex'' wurde für rekursive Aufrufe entworfen.
 +Die Typen ''std::timed_mutex'' und ''std::recursive_timed_mutex''
 +verfügen über Methoden
 +''trylock_for(duration)'' und ''try_lock_until(time_point)'',
 +die wie auch ''try_lock()'' einen Wahrheitswert liefern, ob der Mutex gesperrt werden konnte.
 +Mutex-Methoden kann man selbst aufrufen --- es ist aber einfacher, Mutexe über Sperren zu steuern.
 +==== Sperren ====
 +Sperren ''std__lock_guard<Mutex>'' bedienen die Methoden ''lock()'' und ''unlock()'' 
 +der Mutex-Variable in ihrem Konstruktor bzw. Destruktor.
 +<code cpp>
 +#include <mutex>
 +
 +void ausgabe(std::string msg)
 +{
 +  static std::mutex mutex;
 +  auto myLock = std::lock_guard(mutex); // C++17 syntax
 +  std::cout << msg << '\n';
 +}
 +</code>
 +Die Sperre ''std::unique_lock<Mutex>'' bietet größere Freiheiten beim Einrichten der Sperre (siehe Tabelle),
 +die Methoden ''lock()'', ''unlock()'' und ''try_lock()'' sind zugänglich.
 +
 +Bei ''timed_mutex''-Variablen sind ''try_lock_for(duration)'' und ''try_lock_until(time_point)'' nutzbar.
 +
 +Der Sperrenzustand lässt sich mit ''if (myLock)'' und mit der Methode ''owns_lock()'' erfragen.
 +Der Besitz an einem Mutex wird durch ''release()'' freigegeben und liefert einen Zeiger auf den Mutex.
 +
 +^ ^Konstruktorparameter ^ ^
 +|auch ''lock_guard''|''(mutex)''|übernehmen und sperren|
 +| |''(mutex, std::adopt_lock)''|schon gesperrtes Mutex übernehmen|
 +|nur ''unique_lock''|''(mutex, std::defer_lock)''|noch nicht sperren|
 +| |''(mutex, std::try_to_lock)''|sperren, wenn Mutex frei ist|
 +| |''(timed_mutex, time_point)''|maximal bis zu diesem Zeitpunkt auf Sperre warten|
 +| |''(timed_mutex, duration)''|maximal Zeitdauer auf Sperre warten|
 +| |''(unique_lock)''|Sperre übernehmen (Verschiebesemantik)|
 +
 +
 +==== Bedingungen ====
 +Bedingungsvariablen ''std::condition_variable''
 +können belegte Sperren zeitweise wieder freigeben,
 +damit andere Threads weiterarbeiten können.
 +Ist die zur Fortführung des Threads notwendige Bedingung erfüllt,
 +kann dieser von einem anderen durch die Methoden ''notify_one()'' bzw. ''notify_all()'' wieder erweckt werden,
 +die Sperre zurück erlangen und seine Arbeit fortsetzen.
 +
 +<code cpp>
 +#include <condition_variable>
 +
 +// double accounts[BANKSIZE]; simuliert Geldbeträge 
 +std::mutex mutex;
 +std::condition_variable sufficientFunds;
 +
 +void transfer(int from, int to, double amount)
 +{
 +  std::unique_lock<std::mutex> myLock(mutex);
 +  sufficientFunds.wait(myLock, [=]{ return amount <= accounts[from]; });
 + 
 +  accounts[from] -= amount;
 +  accounts[to]   += amount;
 +  
 +  sufficientFunds.notifyAll();
 +}
 +</code> 
 +Dabei ist ''variable.wait(lock, predicate)'' gleichwertig zu
 +<code cpp>
 +  while (!predicate()) variable.wait(lock);
 +</code>
 +Für ''timed_mutex''-Bedingungen sind auch ''wait_for(lock, duration, predicate)'' und  ''wait_until(lock, time_point, predicate)'' zulässig.
 +==== Verklemmung ====
 +Eine Verklemmung (//deadlock//) entsteht, wenn mehrere Sperren benötigt werden, die jedoch verschiedenen Threads zugeteilt wurden
 +und durch zyklische Abhängigkeit keiner der beteiligten Threads fortfahren kann:
 +<code cpp>
 +Account a, b;
 +std::thread t1(give, std::ref(a), std::ref(b), 5); // sperrt a, dann b
 +std::thread t2(give, std::ref(b), std::ref(a), 5); // sperrt b, dann a
 +</code>
 +Die Funktionen ''std::lock(sperrenliste)'' und ''std::try_lock(sperrenliste)''
 +erlangen alle Sperren der Liste unabhängig von der Reihenfolge ohne Verklemmungsrisiko.
 +Mit der Klasse ''std::scoped_lock<Mutexes...>'' gelingt dies noch einfacher:
 +<code cpp>
 +void give(Account& from, Account& to, double money)
 +{
 +  // std::unique_lock<std::mutex> fromLock(from.mutex, std::defer_lock);
 +  // std::unique_lock<std::mutex> toLock  (to.mutex,   std::defer_lock);
 +  // std::lock(fromLock, toLock);
 +  std::scoped_lock lock{from.mutex, to.mutex};  // ab c++17
 +  from.take(money);
 +  to.add(money);
 +}
 +</code>
 +Damit sind nicht alle möglichen Ursachen von Verklemmungen beseitigt.
 +Sie können ebenso eintreten, wenn die erwartete Bedingung nie eintritt
 +oder der wartende Thread nicht benachrichtigt wird.
 +Eine "Liquiditätsklemme" im obigen Beispiel ist nur durch Bedingungen an das Gesamtsystem vermeidbar.
 +
 +==== Threadsichere Initialisierung ====
 +Globale Daten werden mit ''const_expr''-Konstruktor threadsicher initialisiert.
 +Lokale ''static''-Variable erhalten ebenfalls threadsicher ihre Anfangswerte beim ersten Funktionsaufruf.
 +Betritt ein zweiter Thread die Funktion vor Abschluss der Initialisierung, wird er bis zu deren Ende blockiert.
 +
 +Als ''thread_local'' gekennzeichnete Variablen exitieren unabhängig voneinander in jedem Thread einmal.
 +Auch sie werden beim ersten Aufruf im Thread sicher initialisiert.
 +
 +Mit ''std::call_once(once_flag, funktor, parameter)'' lassen sich Anfangswertbelegungen auch verzögert vornehmen:
 +
 +<code cpp>
 +Account a, b;
 +std::once_flag geschenkt;
 +
 +void erstausstattung(double amount)
 +{
 +  a.add(amount);
 +  b.add(amount);
 +}
 +
 +void work()
 +{
 +  std::call_once(geschenkt, erstausstattung, 100.0);
 +  
 +  // ... jetzt kann Zahlungsverkehr beginnen
 +}
 +</code>
 +Die optionalen Argumente werden wie bei ''std::thread'' als Werte übergeben, Referenzen sind mit ''std::ref(variable)'' zu kapseln.
 +Die Aufgabe wird für das gleiche ''std::once_flag'' nur einmal erfolgreich ausgeführt.
 +Wirft die einmalig aufzurufende Funktion allerdings eine Ausnahme, 
 +gilt die Aufgabe als nicht erledigt, die Ausnahme wird an den Aufrufer weitergereicht.
 +Ein weiterer Aufruf von ''std::call_once()'' kann erfolgen.((
 +Leider bleibt durch einen [[https://stackoverflow.com/questions/41717579/stdcall-once-hangs-on-second-call-after-callable-threw-on-first-call|Fehler in der Implementierung von libstdc++]] ein mit g++ übersetztes Programm dann hängen. (Stand: Januar 2017) 
 +))
 +===== Zeitversetzte Auswertung =====
 +==== Künftige Ergebnisse ====
 +Durch ''std::future<ResultType>'' werden langwierige Berechnungen in einen nebenläufigen Prozess verpackt:
 +
 +<code cpp>
 +#include <future>
 +#include <iostream>
 +
 +int frage() { /* es dauert ... */ return 42; }
 +
 +void liefern_auf_bestellung()
 +{
 +  std::future<int> antwort = std::async(frage);
 +  std::cout << "Die Antwort lautet: ...";
 +  std::cout << antwort.get() <<'\n';
 +}
 +</code>
 +
 +Der Aufruf ''std::async(funktor, argumentliste)'' ist die einfachste Art, einen asynchronen Ablauf-Faden zu starten.
 +Die Abfrage des Ergebnisses blockiert, falls die Berechnung noch andauert.
 +Vor dem Funktor kann als Startverhalten ''std::launch::async()'' oder ''std::launch::deferred'' angegeben werden.
 +Ohne diese Angabe ist das Startverhalten der Implementierung überlassen.
 +
 +Die Methode ''valid()'' erfragt, ob das Ergebnis schon vorliegt. 
 +Der Wert kann einmalig gelesen werden,
 +danach liefert ''valid()'' wieder ''false'',
 +''wait()'' blockiert bis zur Fertigstellung der Berechnung, 
 +''wait_for(duration)'' und ''wait_until(time_point)'' liefern einen der Werte 
 +''ready'', ''deferred'' oder ''timeout'' aus ''std::future_status''.
 +
 +Ein ''std::shared_future<ResultType>'' erlaubt mehrmaligen ''get()''-Zugriff. Die Methode ''share()'' erzeugt ein solches Objekt.
 +
 +==== Aufträge ====
 +Funktoren werden in 
 +''std::packaged_task<Result(Parameter)>''
 +verpackt und durch einen Klammeroperator mit geeigneten Parametern gestartet.
 +Ein Auftrag kann zur Ausführung an einen anderen Thread übertragen werden.
 +Das Ergebnis oder eine geworfene Ausnahme werden über ein mit ''get_future()'' erzeugtes ''std::future<Result>'' abgeholt:
 +
 +<code cpp>
 +int frage1(int n) { /* schwer zu tun */ return 6*n; }
 +
 +void schick_mir_eine_antwort()
 +{
 +  std::packaged_task<int(int)> auftrag(frage1);
 +  std::future<int> antwort = auftrag.get_future();
 +  std::thread t(std::move(auftrag), 7);  // startet auftrag(7)
 +  t.detach();  
 +  std::cout << antwort.get() <<'\n';
 +}
 +</code>
 +==== Versprechen ====
 +Versprechen ''std::promise<ResultType>'' bilden im Verbund mit ''std::future<ResultType>'' 
 +den flexibelsten Kommunikationskanal zwischen den nebenläufigen Prozessen.
 +Der Zugriff auf das künftige Resultat blockiert, solange kein Wert oder keine Ausnahme hinterlegt wurde.
 +Danach kann die aufgerufene Prozedur noch weiterlaufen.
 +<code cpp>
 +void ansage(std::promise<int>& p)
 +{
 +  try
 +  {
 +    std::cout << "Die Antwort wird euch nicht gefallen ... ";
 +    p.set_value(42);
 +    // ... weitere 6,5 Mio Jahre nachdenken ...
 +  }
 +  catch(...)
 +  {
 +    p.set_exception(std::current_exception());  
 +  }
 +}
 +
 +void zur_ablage()
 +{
 +  std::promise<int> briefkasten;
 +  std::future<int> antwort = briefkasten.get_future();
 +  std::thread t(&ansage, std::ref(briefkasten));
 +  std::cout << antwort.get() << '\n';
 +  t.join();
 +}
 +</code>
 +===== Sperrenfreie Kommunikation =====
 +Sperren sind eine kostspielige Angelegenheit, ''std::atomic<T>'' erlauben einen sicheren, sperrenfreien Datenaustausch //für einzelne Werte// zwischen Threads:
 +
 +<code cpp>
 +#include <atomic>
 +
 +int countdown(std::atomic<int>& jobs)
 +{
 +  int done = 0, nr;
 +  while ( (nr = int(--jobs)) >= 0) ++done;
 +  return done;
 +}
 +
 +void gemeinsam_gehts_besser()
 +{
 +  int n = 1000000;
 +  std::atomic<int> jobs(n);
 +  auto mode = std::launch::async;
 +  auto cnt1 = std::async(mode, countdown, std::ref(jobs));
 +  auto cnt2 = std::async(mode, countdown, std::ref(jobs));
 +  int a = cnt1.get(), b = cnt2.get();
 +
 +  std::cout << a << " + " << b << " = " << a+b << '\n';
 +  assert(a+b == n);
 +}
 +</code>
  
kennen/parallelverarbeitung.txt · Zuletzt geändert: 2020-01-03 16:30 von 127.0.0.1

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki