Source code for bumps.mapper

"""
Parallel and serial mapper implementations.
"""
import sys
import os

# {{{ http://code.activestate.com/recipes/496767/ (r1)
# Converted to use ctypes by Paul Kienzle


PROCESS_ALL_ACCESS = 0x1F0FFF


[docs]def setpriority(pid=None, priority=1): """ Set The Priority of a Windows Process. Priority is a value between 0-5 where 2 is normal priority and 5 is maximum. Default sets the priority of the current python process but can take any valid process ID. """ #import win32api,win32process,win32con from ctypes import windll priorityclasses = [0x40, # IDLE_PRIORITY_CLASS, 0x4000, # BELOW_NORMAL_PRIORITY_CLASS, 0x20, # NORMAL_PRIORITY_CLASS, 0x8000, # ABOVE_NORMAL_PRIORITY_CLASS, 0x80, # HIGH_PRIORITY_CLASS, 0x100, # REALTIME_PRIORITY_CLASS ] if pid is None: pid = windll.kernel32.GetCurrentProcessId() handle = windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, True, pid) windll.kernel32.SetPriorityClass(handle, priorityclasses[priority])
# end of http://code.activestate.com/recipes/496767/ }}}
[docs]def nice(): if os.name == 'nt': setpriority(priority=1) else: os.nice(5)
[docs]class SerialMapper(object):
[docs] @staticmethod def start_worker(problem): pass
[docs] @staticmethod def start_mapper(problem, modelargs, cpus=0): # Note: map is n iterator in python 3.x return lambda points: list(map(problem.nllf, points))
[docs] @staticmethod def stop_mapper(mapper): pass
# Load the problem in the remote process rather than pickling #def _MP_load_problem(*modelargs): # from .fitproblem import load_problem # _MP_set_problem(load_problem(*modelargs)) def _MP_set_problem(problem): global _problem nice() _problem = problem def _MP_run_problem(point): global _problem return _problem.nllf(point)
[docs]class MPMapper(object): pool = None
[docs] @staticmethod def start_worker(problem): pass
[docs] @staticmethod def start_mapper(problem, modelargs, cpus=0): import multiprocessing if cpus==0: cpus = multiprocessing.cpu_count() if MPMapper.pool is not None: MPMapper.pool.terminate() #MPMapper.pool = multiprocessing.Pool(cpus,_MP_load_problem,modelargs) MPMapper.pool = multiprocessing.Pool(cpus, _MP_set_problem, (problem,)) mapper = lambda points: MPMapper.pool.map(_MP_run_problem, points) return mapper
[docs] @staticmethod def stop_mapper(mapper): pass
def _MPI_set_problem(comm, problem, root=0): global _problem _problem = comm.bcast(problem) def _MPI_run_problem(point): global _problem return _problem.nllf(point) def _MPI_map(comm, points, root=0): import numpy as np from mpi4py import MPI # Send number of points and number of variables per point npoints, nvars = comm.bcast( points.shape if comm.rank == root else None, root=root) if npoints == 0: raise StopIteration # Divvy points equally across all processes whole = points if comm.rank == root else None idx = np.arange(comm.size) size = np.ones(comm.size, idx.dtype) * \ (npoints // comm.size) + (idx < npoints % comm.size) offset = np.cumsum(np.hstack((0, size[:-1]))) part = np.empty((size[comm.rank], nvars), dtype='d') comm.Scatterv((whole, (size * nvars, offset * nvars), MPI.DOUBLE), (part, MPI.DOUBLE), root=root) # Evaluate models assigned to each processor partial_result = np.array([_MPI_run_problem(pi) for pi in part], dtype='d') # Collect results result = np.empty(npoints, dtype='d') if comm.rank == root else None comm.Barrier() comm.Gatherv((partial_result, MPI.DOUBLE), (result, (size, offset), MPI.DOUBLE), root=root) comm.Barrier() return result
[docs]class MPIMapper(object):
[docs] @staticmethod def start_worker(problem): global _problem _problem = problem from mpi4py import MPI root = 0 # If master, then return to main program if MPI.COMM_WORLD.rank == root: return # If slave, then set problem and wait in map loop #_MPI_set_problem(MPI.COMM_WORLD, None, root=root) try: while True: _MPI_map(MPI.COMM_WORLD, None, root=root) except StopIteration: pass MPI.Finalize() sys.exit(0)
[docs] @staticmethod def start_mapper(problem, modelargs, cpus=0): # Slave started from start_worker, so it never gets here # Slave expects _MPI_set_problem followed by a series # of map requests from mpi4py import MPI #_MPI_set_problem(MPI.COMM_WORLD, problem) return lambda points: _MPI_map(MPI.COMM_WORLD, points)
[docs] @staticmethod def stop_mapper(mapper): from mpi4py import MPI import numpy as np # Send an empty point list to stop the iteration try: mapper(np.empty((0, 0), 'd')) raise RuntimeException("expected StopIteration") except StopIteration: pass MPI.Finalize()
[docs]class AMQPMapper(object):
[docs] @staticmethod def start_worker(problem): #sys.stderr = open("bumps-%d.log"%os.getpid(),"w") #print >>sys.stderr,"worker is starting"; sys.stdout.flush() from amqp_map.config import SERVICE_HOST from amqp_map.core import connect, start_worker as serve server = connect(SERVICE_HOST) #os.system("echo 'serving' > /tmp/map.%d"%(os.getpid())) # print "worker is serving"; sys.stdout.flush() serve(server, "bumps", problem.nllf)
#print >>sys.stderr,"worker ended"; sys.stdout.flush()
[docs] @staticmethod def start_mapper(problem, modelargs, cpus=0): import sys import multiprocessing import subprocess from amqp_map.config import SERVICE_HOST from amqp_map.core import connect, Mapper server = connect(SERVICE_HOST) mapper = Mapper(server, "bumps") cpus = multiprocessing.cpu_count() pipes = [] for _ in range(cpus): cmd = [sys.argv[0], "--worker"] + modelargs # print "starting",sys.argv[0],"in",os.getcwd(),"with",cmd pipe = subprocess.Popen(cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) pipes.append(pipe) for pipe in pipes: if pipe.poll() > 0: raise RuntimeError("subprocess returned %d\nout: %s\nerr: %s" % (pipe.returncode, pipe.stdout, pipe.stderr)) #os.system(" ".join(cmd+["&"])) import atexit def exit_fun(): for p in pipes: p.terminate() atexit.register(exit_fun) # print "returning mapper",mapper return mapper
[docs] @staticmethod def stop_mapper(mapper): for pipe in mapper.pipes: pipe.terminate()