wxSimple_MP

August 17th, 2010 Leave a comment Go to comments

Here’s a simple wxPython multiprocessing example. The actual calculation is trivial – the hard bit is making sure all queues are emptied and processes terminated properly upon exit.

For an easier introduction, there’s a much-simplified version in the wxPyWiki Cookbook.

    There are a few key aspects to the design:
     

  • On Windows, it is important to protect the “entry point” of the program by using if __name__ == '__main__': as recommended in the official documentation.
  • The worker processes must be independent of any object instance (ie. defined as a class method or similar) and started prior to wx.App instantiation (ie. in the __main__).
  • The task queue should be given tasks only as required to prevent any residual tasks upon the cessation of processing.
#!/usr/bin/env python

"""
Simple wxPython Multiprocessing Example
---------------------------------------

**Command Line Usage**

    wxsimple_mp [options]

**Options**

    -h, --help      Print this help and exit.
    -c, --cmd       Run from the command-line (no gui).
    -n <numproc>    Use n (supplementary) processes. Zero means only a single
                    (main) process is used for task calculation.
    -t <numtasks>   Process t tasks.

This simple example uses a wx.App to control and monitor a pool of workers
instructed to carry out a list of tasks.

The program creates the GUI plus a list of tasks, then starts a pool of workers
(processes) implemented with a classmethod. Within the GUI, the user can start
and stop processing the tasks at any time.

Copyright (c) 2010, Roger Stuckey. All rights reserved.
"""

import getopt, math, random, sys, time, types, wx

from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support

class TaskProcessor:
    """
    The TaskProcessor class provides the functions necessary to process each task.
    """
    def __init__(self, numcalcs):
        """
        Initialise the TaskProcessor.
        """
        self.numcalcs = numcalcs

    def calculate(self, angle_deg):
        """
        Calculate the result of a task.
        """
        result = 0
        for i in range(self.numcalcs):
            angle_rad = math.radians(angle_deg)
            result += math.tanh(angle_rad)/math.cosh(angle_rad)/self.numcalcs
        return ( angle_deg, result )

class Dispatcher:
    """
    The Dispatcher class manages the task and result queues.
    """
    def __init__(self):
        """
        Initialise the Dispatcher.
        """
        self.taskQueue = Queue()
        self.resultQueue = Queue()

    def putTask(self, task):
        """
        Put a task on the task queue.
        """
        self.taskQueue.put(task)

    def getTask(self):
        """
        Get a task from the task queue.
        """
        return self.taskQueue.get()

    def putResult(self, output):
        """
        Put a result on the result queue.
        """
        self.resultQueue.put(output)

    def getResult(self):
        """
        Get a result from the result queue.
        """
        return self.resultQueue.get()

class TaskServerMP:
    """
    The TaskServerMP class provides a target worker class method for queued processes.
    """
    def __init__(self, numprocesses=1, tasks=[ ]):
        """
        Initialise the TaskServerMP and create the dispatcher and processes.
        """
        self.numprocesses = numprocesses
        self.Tasks = tasks
        self.numtasks = len(tasks)

        # Create the dispatcher
        self.dispatcher = Dispatcher()

        self.Processes = [ ]

        # The worker processes must be started here!
        for n in range(numprocesses):
            process = Process(target=TaskServerMP.worker, args=(self.dispatcher,))
            process.start()
            self.Processes.append(process)

        self.timeStart = 0.0
        self.timeElapsed = 0.0
        self.timeRemain = 0.0
        self.processTime = { }

        # Set some program flags
        self.keepgoing = True
        self.i = 0
        self.j = 0

    def run(self):
        """
        Run the TaskServerMP - start, stop & terminate processes.
        """
        sys.stdout.write('Number of processes = %d' % self.numprocesses)
        if (self.numprocesses == 0):
            sys.stdout.write(' (no extra processes)')
        sys.stdout.write('\nUnordered results...\n')
        self.processTasks(self.update)
        if (self.keepgoing):
            sys.stdout.write('Time elapsed: %s\n' % time.strftime('%M:%S', time.gmtime(self.timeElapsed)))
        if (self.numprocesses > 0):
            sys.stdout.write("Waiting for processes to terminate...")
            self.processTerm()

    def processTasks(self, resfunc=None):
        """
        Start the execution of tasks by the processes.
        """
        self.keepgoing = True

        self.timeStart = time.time()
        # Set the initial process time for each
        for n in range(self.numprocesses):
            pid_str = '%d' % self.Processes[n].pid
            self.processTime[pid_str] = 0.0

        # Submit first set of tasks
        if (self.numprocesses == 0):
            numprocstart = 1
        else:
            numprocstart = min(self.numprocesses, self.numtasks)
        for self.i in range(numprocstart):
            self.dispatcher.putTask(self.Tasks[self.i])

        self.j = -1
        self.i = numprocstart - 1
        while (self.j < self.i):
            # Get and print results
            output = self.getOutput()
            # Execute some function (Yield to a wx.Button event)
            if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
                resfunc(output)
            if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
                # Submit another task
                self.i += 1
                self.dispatcher.putTask(self.Tasks[self.i])

    def processStop(self, resfunc=None):
        """
        Stop the execution of tasks by the processes.
        """
        self.keepgoing = False

        while (self.j < self.i):
            # Get and print any results remining in the done queue
            output = self.getOutput()
            if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
                resfunc(output)

    def processTerm(self):
        """
        Stop the execution of tasks by the processes.
        """
        for n in range(self.numprocesses):
            # Terminate any running processes
            self.Processes[n].terminate()

        # Wait for all processes to stop
        while (self.anyAlive()):
            time.sleep(0.5)

    def anyAlive(self):
        """
        Check if any processes are alive.
        """
        isalive = False
        for n in range(self.numprocesses):
            isalive = (isalive or self.Processes[n].is_alive())
        return isalive

    def getOutput(self):
        """
        Get the output from one completed task.
        """
        self.j += 1

        if (self.numprocesses == 0):
            # Use the single-process method
            self.worker_sp()

        output = self.dispatcher.getResult()
        # Calculate the time remaining
        self.timeRemaining(self.j + 1, self.numtasks, output['process']['pid'])

        return(output)

    def timeRemaining(self, tasknum, numtasks, pid):
        """
        Calculate the time remaining for the processes to complete N tasks.
        """
        timeNow = time.time()
        self.timeElapsed = timeNow - self.timeStart

        pid_str = '%d' % pid
        self.processTime[pid_str] = self.timeElapsed

        # Calculate the average time elapsed for all of the processes
        timeElapsedAvg = 0.0
        numprocesses = self.numprocesses
        if (numprocesses == 0): numprocesses = 1
        for pid_str in self.processTime.keys():
            timeElapsedAvg += self.processTime[pid_str]/numprocesses
        self.timeRemain = timeElapsedAvg*(float(numtasks)/float(tasknum) - 1.0)

    def update(self, output):
        """
        Get and print the results from one completed task.
        """
        sys.stdout.write('%s [%d] calculate(%d) = %.2f' % ( output['process']['name'], output['process']['pid'], output['result'][0], output['result'][1] ))
#        sys.stdout.write('  [Complete: %2d / %2d  Time Elapsed: %s  Remaining: %s]' % (self.j + 1, self.numtasks, time.strftime('%M:%S', time.gmtime(self.timeElapsed)), time.strftime('%M:%S', time.gmtime(self.timeRemain))))
        sys.stdout.write('\n')

    def worker(cls, dispatcher):
        """
        The worker creates a TaskProcessor object to calculate the result.
        """
        while True:
            args = dispatcher.getTask()
            taskproc = TaskProcessor(args[0])
            result = taskproc.calculate(args[1])
            output = { 'process' : { 'name' : current_process().name, 'pid' : current_process().pid }, 'result' : result }
            # Put the result on the output queue
            dispatcher.putResult(output)

    # The multiprocessing worker must not require any existing object for execution!
    worker = classmethod(worker)

    def worker_sp(self):
        """
        A single-process version of the worker method.
        """
        args = self.dispatcher.getTask()
        taskproc = TaskProcessor(args[0])
        result = taskproc.calculate(args[1])
        output = { 'process' : { 'name' : 'Process-0', 'pid' : 0 }, 'result' : result }
        # Put the result on the output queue
        self.dispatcher.putResult(output)

class MyFrame(wx.Frame):
    """
    A simple Frame class.
    """
    def __init__(self, parent, id, title, taskserver):
        """
        Initialise the Frame.
        """
        self.taskserver = taskserver

        wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))

        # Create the panel, sizer and controls
        self.panel = wx.Panel(self, wx.ID_ANY)
        self.sizer = wx.GridBagSizer(5, 5)

        self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
        self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
        self.start_bt.SetDefault()
        self.start_bt.SetToolTipString('Start the execution of tasks')
        self.start_bt.ToolTip.Enable(True)

        self.stop_bt = wx.Button(self.panel, wx.ID_ANY, "Stop")
        self.Bind(wx.EVT_BUTTON, self.OnStop, self.stop_bt)
        self.stop_bt.SetToolTipString('Stop the execution of tasks')
        self.stop_bt.ToolTip.Enable(True)

        self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)

        self.prog_st = wx.StaticText(self.panel, wx.ID_ANY, 'Complete:')

        self.prog_gg = wx.Gauge(self.panel, id=wx.ID_ANY, range=self.taskserver.numtasks, size=(-1, 15))
        self.prog_gg.SetBezelFace(3)
        self.prog_gg.SetShadowWidth(3)

        # Add the controls to the sizer
        self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP, border=5)
        self.sizer.Add(self.stop_bt, (0, 1), flag=wx.ALIGN_CENTER|wx.TOP|wx.RIGHT, border=5)
        self.sizer.Add(self.output_tc, (1, 0), (1, 2), flag=wx.EXPAND|wx.LEFT|wx.RIGHT, border=5)
        self.sizer.Add(self.prog_st, (2, 0), (1, 2), flag=wx.LEFT|wx.RIGHT, border=5)
        self.sizer.Add(self.prog_gg, (3, 0), (1, 2), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
        self.sizer.AddGrowableCol(0)
        self.sizer.AddGrowableCol(1)
        self.sizer.AddGrowableRow(1)

        self.panel.SetSizer(self.sizer)

        self.Bind(wx.EVT_CLOSE, self.OnClose)

        self.output_tc.AppendText('Number of processes = %d' % self.taskserver.numprocesses)
        if (self.taskserver.numprocesses == 0):
            self.output_tc.AppendText(' (no extra processes)')
        self.output_tc.AppendText('\n')

    def OnStart(self, event):
        """
        Start the execution of tasks by the processes.
        """
        self.start_bt.Enable(False)
        self.stop_bt.Enable(True)
        self.output_tc.AppendText('Unordered results...\n')
        # Start processing tasks
        self.taskserver.processTasks(self.update)
        if (self.taskserver.keepgoing):
            self.output_tc.AppendText('Time elapsed: %s\n' % time.strftime('%M:%S', time.gmtime(self.taskserver.timeElapsed)))
            self.start_bt.Enable(True)
            self.stop_bt.Enable(False)

    def OnStop(self, event):
        """
        Stop the execution of tasks by the processes.
        """
        self.stop_bt.Enable(False)
        if (self.taskserver.j < self.taskserver.i):
            self.output_tc.AppendText('Completing queued tasks...\n')
        # Stop processing tasks
        self.taskserver.processStop(self.update)
        self.start_bt.Enable(True)

    def OnClose(self, event):
        """
        Stop the task queue, terminate processes and close the window.
        """
        self.OnStop(event)
        self.start_bt.Enable(False)
        if ((self.taskserver.numprocesses > 0) and self.taskserver.anyAlive()):
            busy = wx.BusyInfo("Waiting for processes to terminate...")
            # Terminate the processes
            self.taskserver.processTerm()
        self.Destroy()

    def update(self, output):
        """
        Get and print the results from one completed task.
        """
        self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % ( output['process']['name'], output['process']['pid'], output['result'][0], output['result'][1] ))
        self.prog_st.SetLabel('Complete: %2d / %2d  Time Elapsed: %s  Remaining: %s' % (self.taskserver.j + 1, self.taskserver.numtasks, time.strftime('%M:%S', time.gmtime(self.taskserver.timeElapsed)), time.strftime('%M:%S', time.gmtime(self.taskserver.timeRemain))))
        self.prog_gg.SetValue(self.taskserver.j + 1)
        # Give the user an opportunity to interact
        wx.YieldIfNeeded()

class MyApp(wx.App):
    """
    A simple App class, modified to hold the processes and task queues.
    """
    def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, taskserver=None):
        """
        Initialise the App.
        """
        self.taskserver = taskserver
        wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)

    def OnInit(self):
        """
        Initialise the App with a Frame.
        """
        self.frame = MyFrame(None, -1, 'wxSimple_MP', self.taskserver)
        self.frame.Show(True)
        return True

if __name__ == '__main__':

    freeze_support()

    try:
        (optlist, args) = getopt.getopt(sys.argv[1:], 'hcn:t:', ['help', 'cmd', 'numproc=', 'numtasks='])
    except getopt.GetoptError, msg:
        sys.stderr.write("wxsimple_mp: Error: %s" % msg)
        sys.stderr.write("See 'wxsimple_mp --help'.\n")
        sys.exit(2)
    else:
        cmdline = False
        numproc = None
        numtasks = 20
        for (opt, optarg) in optlist:
            if opt in ('-h', '--help'):
                sys.stdout.write(__doc__)
                sys.exit(0)
            elif opt in ('-c', '--cmd'):
                cmdline = True
            elif opt in ('-n', '--numproc'):
                numproc = int(optarg)
            elif opt in ('-t', '--numtasks'):
                numtasks = int(optarg)

        if (numproc is None):
            # Determine the number of CPU's/cores
            try:
                numproc = min(cpu_count(), numtasks)
            except NotImplementedError:
                numproc = 1

        # Create the task list
        Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]

        # The worker processes must be started here!
        tsmp = TaskServerMP(numprocesses=numproc, tasks=Tasks)

        if (cmdline):
            tsmp.run()
        else:
            app = MyApp(redirect=True, filename='wxsimple_mp.stderr.log', taskserver=tsmp)
            app.MainLoop()
  1. Greg
    November 19th, 2010 at 04:41 | #1

    Hi Roger,

    Thanks for this great example. I’m curious though, why do the worker processes have to be started before wx.App instantiation?

    -Greg

  2. November 21st, 2010 at 15:23 | #2

    Hi Greg,
    It’s really just a restriction of the current implementation (of the multiprocessing package). From my understanding, the processes must be started from within the main thread, as opposed to a new one spawned by the wx.App, in order for them to be managed properly. I seem to recall that it was possible to start them from within the GUI, but not terminate them and close the GUI cleanly when finished.
    Hope this helps, and thanks for your comment.

  3. Greg
    November 22nd, 2010 at 07:34 | #3

    That does help, thanks!

    The reason I asked was because I was having problems getting multiprocessing to work with a wx app that uses a 3D library called Panda3D. For a while there I thought it was a problem with wx. Eventually I boiled it down to a simple test case where if I imported anything from the 3D library all processing would only happen on one core, but if I didn’t import then everything went to all my cores as expected. I posted a question along with the test case on the Panda3D forums here:

    http://www.panda3d.org/forums/viewtopic.php?t=10383

    Do you have any inkling on why this would happen? Sorry, I know this isn’t wx app related anymore but I’m desperate. :)

    -Greg

  4. December 6th, 2010 at 12:52 | #4

    Panda3D looks great, but I haven’t had any experience with it to date. I see on the forum that you have solved your problem though. Just out of interest, to what tasks were you allocating the different processes? Cheers.

  5. Alex van Houten
    January 18th, 2011 at 00:24 | #5

    How would one implement another worker process, say worker0, in the above?
    I mean in the case that worker0 has to complete its tasks before worker can be started.
    Normally one would use the join method, but I don’t see how that could be set up from the main thread.
    Thanks in advance for any help.
    Alex.

  6. January 20th, 2011 at 13:54 | #6

    Hi Alex,
    I don’t think you would be able to use a blocking call like join() in your main thread if you wanted the wx.App to fire up straight away. Starting another Process from within the App isn’t an option either, so I would just use the existing Processes and modify the TaskProcessor, workers and Tasks to handle/store more than one type of task. Your Queue could either remain pre-ordered in the main as currently implemented, or on-the-fly, by writing a more intelligent dispatcher.
    Are you able to use this pattern?

  1. No trackbacks yet.