Source code for mbproj2d.forkparallel

# Copyright (C) 2020 Jeremy Sanders <jeremy@jeremysanders.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Module to run a forked parallel process for evalulating functions,
one at a time (ForkParallel) or using a queue of input data
(ForkQueue).
"""

import os
import socket
import struct
import select
import signal
import pickle
import traceback

# special exit code to break out of child
exitcode = b'*[EX!T}*FORK'

# type used to send object size
sizesize = struct.calcsize('L')

[docs] def recvLen(sock, length): """Receive exactly length bytes from socket.""" retn = b'' while len(retn) < length: retn += sock.recv(length-len(retn)) return retn
[docs] def sendItem(sock, item): """Pickle and send item to socket using size + pickled protocol.""" pickled = pickle.dumps(item, -1) size = struct.pack('L', len(pickled)) sock.sendall(size + pickled)
[docs] def recvItem(sock): """Receive pickled item from socket.""" retn = sock.recv(64*1024) size = struct.unpack('L', retn[:sizesize])[0] retn = retn[sizesize:] while len(retn) < size: retn += sock.recv(size-len(retn)) return pickle.loads(retn)
[docs] class ForkQueuePool: """Execute function in multiple forked processes.""" def __init__(self, func, ninstances, initfunc=None, argc=None, argv=None): """Initialise queue for func and with number of instances given. if initfunc is set, run this at first """ self.func = func self.argc = () if argc is None else argc self.argv = {} if argv is None else argv self.socks = [] self.pids = [] for i in range(ninstances): parentsock, childsock = socket.socketpair() pid = os.fork() if pid == 0: # child process parentsock.close() self.sock = childsock self.amparent = False # close other children (we don't need to talk to them) del self.socks # call the initialise function, if required if initfunc is not None: initfunc() # wait for commands from parent self.childLoop() # return here, or we get a fork bomb! return else: # parent process - keep track of children self.socks.append(parentsock) self.pids.append(pid) childsock.close() self.amparent = True
[docs] def childLoop(self): """Wait for commands on the socket and execute.""" if self.amparent: raise RuntimeError('Not child, or not started') # ignore ctrl+c signal.signal(signal.SIGINT, signal.SIG_IGN) # repeat until exit code or socket breaks try: while True: # get data to process args = recvItem(self.sock) #print('received', args) # exit if parent is done if args == exitcode: break retn = [] # presumably no socket error in below try: # iterate over input and add result with index key for arg in args: res = self.func(arg, *self.argc, **self.argv) retn.append(res) except Exception as e: # send back an exception e.backtrace = traceback.format_exc() retn = e # send back the result sendItem(self.sock, retn) except socket.error: #print('Breaking on socket error') pass #print('Exiting child') os._exit(os.EX_OK)
[docs] def execute(self, argslist): """Execute the list of items on the queue. This version cheats by just splitting the input up into equal-sized chunks. Maybe the chunksize should be smaller, if some chunks would take much less time """ if not self.amparent: raise RuntimeError('Not parent, or not started') # calculate number sent to each sock (making sure that the # number of items is <= than the number of sockets argslist = list(argslist) numargs = len(argslist) if numargs < len(self.socks): socks = self.socks[:numargs] else: socks = self.socks # round up chunk size chunksize = -(-numargs//len(socks)) # send chunks to each forked process sockchunks = {} for idx, sock in enumerate(socks): sendItem(sock, argslist[idx*chunksize:(idx+1)*chunksize]) sockchunks[sock] = idx # wait and collect responses retn = [None]*numargs while sockchunks: read, write, err = select.select(list(sockchunks), [], []) for sock in read: res = recvItem(sock) if isinstance(res, Exception): raise RuntimeError( 'Exception inside parallel section:\n%s' % ( res.backtrace)) idx = sockchunks[sock] retn[idx*chunksize:(idx+1)*chunksize] = res del sockchunks[sock] return retn
[docs] def finish(self): """Close child forks and close sockets.""" if self.amparent: for sock in self.socks: try: sendItem(sock, exitcode) sock.close() except socket.error: pass self.socks.clear() # this avoids zombie processes for pid in self.pids: os.waitpid(pid, 0) self.pids.clear() else: try: if self.sock is not None: self.sock.close() except socket.error: pass self.sock = None
def __del__(self): self.finish()
[docs] def map(self, func, args): """ Allow Pool to be used like multiprocessing.Pool Note func is ignored here. """ return self.execute(args)
def __enter__(self): """Return context manager for queue.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Exit context manager for queue.""" self.finish()