[Python] Multithreading

Roin

Freier Denker
Registriert
22 Juli 2013
Beiträge
581
Hallo Leute,

für meine Bachelorarbeit habe ich ein Programm geschrieben, welches einige Emails einliest, Zeilen daraus extrahiert und dann mit diesen diverse Berechnungen anstellt:
  • Koordinatensystem erstellen (einfach eine sehr große Liste und diese anschließend in ein ndarray (numpy) umwandelt)
  • Entfernungen (haversine-Funktion) zwischen allen Matrix-Elementen bestimmt (ca. 20.000 Elemente)
  • Diese Entfernungen in eine von mir übergebene Funktion einsetzt (Entfernung ist im Exponenten einer e-Funktion (wieder Numpy))
  • Eine Matrix invertiert und 3 Matrizen miteinander multipliziert (Numpy)
  • Aus einer Datei vorgegebene Werte ausließt und die dazugehörigen Werte in der Matrix suchst (GPS-Koordinaten werden verglichen)
  • Differenzen zwischen Vorgabewert und errechnetem Wert bestimmt
  • Und dann mit der nächsten Email beginnt
Dieser Vorgang läuft etwa 20 Mal durch und braucht auf meiner CPU (i7 2,4GHz) einige Minuten.

Die Funktion, die ich übergebe habe ich allerdings noch nicht optimiert (ich kenne die perfekten Parameter noch nicht) - Daher möchte ich den ganzen Vorgang mit verschiedenen Parametern durchführen lassen und ein Ergebnis zwischenspeichern, welches mir angibt, welche Parameter zu welcher Abweichung geführt haben. Diese Abweichung bestimmt sich aus der oben bestimmten Differenz zum Quadrat.

Da Python bei mir lediglich einen Kern nutzt und maximal 18% CPU-Last erreicht, habe ich nun an Multi-Threading gedacht.

Mit meinem Code werden allerdings lediglich mehrere Threads ausgeführt, nutzen aber weiterhin max. 18% CPU.

Vielleicht kann mir jemand einen offensichtlichen Fehler aufzeigen?

[src=python]import numpy as np
from main3 import mainFunc as mF
#import thread
from multiprocessing.dummy import Pool as ThreadPool
import traceback


def C1(distance, a = 1000000, b = 20):
if distance <= a:
#return b*(1.5*(distance/a - 0.5*(distance/a)**3))
#return b*(np.exp((-3*(distance/a)/10)))
return b*(np.exp((-0.1*(distance/a))))
#return b*np.exp((-3*(distance/a)/11))
#return b*np.exp(-0.05*(distance/a))
#return b*np.exp(-0.01*(distance/a))
#return b*(pow(1.1, -5*(distance/a)))
#return b*np.exp(-0.005*(distance/a))
#return b*np.exp(-0.05*np.sqrt((distance/a)))
else:
return b


def mainFunc1(list1):
try:
mF(func=list1[0], a=list1[1], b=list1[2], additional_str=list1[3])
return True
except Exception as e:
print(e)
traceback.print_exc()
print("An Exception occured - ignored it")
return False

arg_list = []
##for a in range(250000, 1500000, 25000):
## for b in range(12, 60, 3):
for a in range(1500000, 1550000, 25000):
for b in range(12, 18, 3):
arg_list.append([C1, a, b, 'Func3'])

# Make the Pool of workers
pool = ThreadPool(4)

# Open the Functions in their own threads
# and return the results
results = pool.map(mainFunc1, arg_list)

#close the pool and wait for the work to finish
pool.close()
pool.join() [/src]

EDIT:
Mod, kannst du den Thread bitte sinngemäß in Multithreading / Multiprozessing umbenennen?
 
Zuletzt bearbeitet:
Von Multithreading ohne Parallelität hast Du nichts.
Dein Problem könnte hier liegen:

 
  • Thread Starter Thread Starter
  • #3
Ok, also ist Python allgemein eher für Single-Thread-Aufgaben optimiert. Soweit verstehe ich das. Doch meine Aufgabe, die ich dem Programm übergebe ist ja unabhängig von den anderen Threads, die gerade rumrechnen. Da habe ich gedacht, dass ich durch dieses Wrap-Arround das umgehen kann.
Hast du eine Idee, wie ich dennoch mein Programm "parallelisieren" kann?
 
Vielleicht mit ?

Grundsätzlich kann es passieren, daß der Verwaltungsaufwand bei solchen (Multi)Sachen größer ist, als der Gewinn.
 
  • Thread Starter Thread Starter
  • #5
Vielleicht mit ?

Ich dachte, genau das habe ich getan, aber durch das .dummy beim Import wurde es ja nur ein ThreadPool.
Ich habe das mal entfernt und somit ja einen "ProcessPool". Wenn ich das nun aber genau so mache, geht die CPU-Last für den Taskmanager Firefox und und und hoch, und es entstehen anscheinend auch mehrere Python-Prozesse. Aber es passiert einfach genau nix.

[src=python]import numpy as np
from main3 import mainFunc as mF
#import thread
#from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool as ProcessPool
import traceback


def C1(distance, a = 1000000, b = 20):
if distance <= a:
#return b*(1.5*(distance/a - 0.5*(distance/a)**3))
#return b*(np.exp((-3*(distance/a)/10)))
return b*(np.exp((-0.1*(distance/a))))
#return b*np.exp((-3*(distance/a)/11))
#return b*np.exp(-0.05*(distance/a))
#return b*np.exp(-0.01*(distance/a))
#return b*(pow(1.1, -5*(distance/a)))
#return b*np.exp(-0.005*(distance/a))
#return b*np.exp(-0.05*np.sqrt((distance/a)))
else:
return b


def mainFunc1(list1):
print("Start to run with a=%d and b=%d" % (list1[1], list1[2]))
try:
mF(func=list1[0], a=list1[1], b=list1[2], additional_str=list1[3])
return True
except Exception as e:
print(e)
traceback.print_exc()
print("An Exception occured - ignored it")
return False

arg_list = []
##for a in range(300000, 1500000, 50000):
## for b in range(12, 60, 3):
for a in range(1500000, 1550000, 25000):
for b in range(12, 18, 3):
arg_list.append([C1, a, b, 'Func3'])

# Make the Pool of workers
#pool = ThreadPool(4)
if __name__ == '__main__':
pool = ProcessPool(2)

print("Created 2 Prozesses")
# Open the Functions in their own threads
# and return the results
results = pool.map(mainFunc1, arg_list)

print("run the Prozesses")

#close the pool and wait for the work to finish
pool.close()
pool.join() [/src]

Die Zeile "Created 2 Prozesses" wird geschrieben, aber die Zeile "run with ..." wird nicht geschrieben.

EDIT:
Nach etwas über einer Minute starten die Prozesse ihre Aufgaben...
Wenn ich nun die main.py allerdings beende, laufen die Prozesse weiter. Das würde ich gerne unterbinden. Wie?

Das die Prozesse sich nicht mit beendet haben, hat daran gelegen, dass die Python IDLE Shell bei einem Neustart keine laufenden Prozesse unterbindet. Beim Ausführen über die Console funktioniert alles wie gewollt und die Zuweisung der Aufgaben dauert auch nur wenige Millisekunden.
 
Zuletzt bearbeitet:
Moin Roin,

Ich denke das was du brauchst um die Prozesse aufräumen zu können ist atexit():


Damit kannst du beim Beenden/Shutdown des Interpreters eine oder mehrere Funktion aufrufen lassen, bevor er komplett beendet wurde.
In diesem kannst du dann die Prozesse über terminate() zwingen zu beenden/abzubrechen:


Ich bin mir aber nicht 100% sicher in weit das funktioniert, wenn du die den Interpreter mit Ctrl-C abwürgst oder die Konsole schließt... das müßtest du mal testen.

---

Ich hab mich eben nur mal in die Doku von Multiprocessingen eingelesen und scheinbar brauchst du nicht zwinegen das Close, da dies automatisch gemacht wird sobald die Garbage Collection läuft, vermutlich auch der Cleanup beim beenden nach atexit() um die Ressourcen der Anwendung wieder frei zu geben. Dafür sorgt auch das "join" welches so lange Blockt, bis alle Prozesse ohne Arbeit sind.

So ließt sich das jedenfalls für mich :)
 
Zurück
Oben