"""
Parallel and serial mapper implementations.
The API is a bit crufty since interprocess communication has evolved from
the original implementation. And the names are misleading.
Usage::
Mapper.start_worker(problem)
mapper = Mapper.start_mapper(problem, None, cpus)
result = mapper(points)
...
mapper = Mapper.start_mapper(problem, None, cpus)
result = mapper(points)
Mapper.stop_mapper(mapper)
"""
import sys
import os
# {{{ http://code.activestate.com/recipes/496767/ (r1)
# Converted to use ctypes by Paul Kienzle
PROCESS_ALL_ACCESS = 0x1F0FFF
[docs]
def can_pickle(problem, check=False):
"""
Returns True if *problem* can be pickled.
If this method returns False then MPMapper cannot be used and
SerialMapper should be used instead.
If *check* is True then call *nllf()* on the duplicated object as a
"smoke test" to verify that the function will run after copying. This
is not foolproof. For example, access to a database may work in the
duplicated object because the connection is open and available in the
current process, but it will fail when trying to run on a remote machine.
"""
try:
import dill
except ImportError:
dill = None
import pickle
try:
if dill is not None:
dup = dill.loads(dill.dumps(problem, recurse=True))
else:
dup = pickle.loads(pickle.dumps(problem))
if check:
dup.nllf()
return True
except Exception:
return False
[docs]
def setpriority(pid=None, priority=1):
"""
Set The Priority of a Windows Process. Priority is a value between 0-5
where 2 is normal priority and 5 is maximum. Default sets the priority
of the current python process but can take any valid process ID.
"""
#import win32api,win32process,win32con
from ctypes import windll
priorityclasses = [0x40, # IDLE_PRIORITY_CLASS,
0x4000, # BELOW_NORMAL_PRIORITY_CLASS,
0x20, # NORMAL_PRIORITY_CLASS,
0x8000, # ABOVE_NORMAL_PRIORITY_CLASS,
0x80, # HIGH_PRIORITY_CLASS,
0x100, # REALTIME_PRIORITY_CLASS
]
if pid is None:
pid = windll.kernel32.GetCurrentProcessId()
handle = windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, True, pid)
windll.kernel32.SetPriorityClass(handle, priorityclasses[priority])
# end of http://code.activestate.com/recipes/496767/ }}}
[docs]
def nice():
if os.name == 'nt':
setpriority(priority=1)
else:
os.nice(5)
[docs]
class SerialMapper(object):
[docs]
@staticmethod
def start_worker(problem):
pass
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
# Note: map is n iterator in python 3.x
return lambda points: list(map(problem.nllf, points))
[docs]
@staticmethod
def stop_mapper(mapper):
pass
# Load the problem in the remote process rather than pickling
#def _MP_load_problem(*modelargs):
# from .fitproblem import load_problem
# _MP_set_problem(load_problem(*modelargs))
def _MP_setup(namespace):
# Using MPMapper class variables to store worker globals.
# It doesn't matter if they conflict with the controller values since
# they are in a different process.
MPMapper.namespace = namespace
nice()
def _MP_run_problem(problem_point_pair):
problem_id, point = problem_point_pair
if problem_id != MPMapper.problem_id:
#print(f"Fetching problem {problem_id} from namespace")
# Problem is pickled using dill when it is available
try:
import dill
MPMapper.problem = dill.loads(MPMapper.namespace.pickled_problem)
except ImportError:
MPMapper.problem = MPMapper.namespace.problem
MPMapper.problem_id = problem_id
return MPMapper.problem.nllf(point)
[docs]
class MPMapper(object):
# Note: suprocesses are using the same variables
pool = None
manager = None
namespace = None
problem_id = 0
[docs]
@staticmethod
def start_worker(problem):
pass
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
import multiprocessing
# Set up the process pool on the first call.
if MPMapper.pool is None:
# Create a sync namespace to distribute the problem description.
MPMapper.manager = multiprocessing.Manager()
MPMapper.namespace = MPMapper.manager.Namespace()
# Start the process pool, sending the namespace handle
if cpus == 0:
cpus = multiprocessing.cpu_count()
MPMapper.pool = multiprocessing.Pool(cpus, _MP_setup, (MPMapper.namespace,))
# Increment the problem number and store the problem in the namespace.
# The store action uses pickle to transfer python objects to the
# manager process. Since this may fail for lambdas and for functions
# defined within the model file, instead use dill (if available)
# to pickle the problem before storing.
MPMapper.problem_id += 1
try:
import dill
MPMapper.namespace.pickled_problem = dill.dumps(problem, recurse=True)
except ImportError:
MPMapper.namespace.problem = problem
## Store the modelargs and the problem name if pickling doesn't work
#MPMapper.namespace.modelargs = modelargs
# Set the mapper to send problem_id/point value pairs
mapper = lambda points: MPMapper.pool.map(
_MP_run_problem, ((MPMapper.problem_id, p) for p in points))
return mapper
[docs]
@staticmethod
def stop_mapper(mapper):
# reset pool and manager
MPMapper.pool.terminate()
MPMapper.manager.shutdown()
MPMapper.pool = None
MPMapper.manager = None
MPMapper.namespace = None
# Don't reset problem id; it keeps count even when mapper is restarted.
##MPMapper.problem_id = 0
def _MPI_set_problem(problem, comm, root=0):
import dill
pickled_problem = dill.dumps(problem, recurse=True) if comm.rank == root else None
pickled_problem = comm.bcast(pickled_problem, root=root)
return problem if comm.rank == root else dill.loads(pickled_problem)
def _MPI_map(problem, points, comm, root=0):
#print(f"{comm.rank}: mapping points")
import numpy as np
from mpi4py import MPI
# Send number of points and number of variables per point.
# root: return result if there are points otherwise return False
# worker: return True if there are points otherwise return False
npoints, nvars = comm.bcast(
points.shape if comm.rank == root else None, root=root)
if npoints == 0:
return False
# Divvy points equally across all processes
whole = points if comm.rank == root else None
idx = np.arange(comm.size)
size = np.ones(comm.size, idx.dtype) * \
(npoints // comm.size) + (idx < npoints % comm.size)
offset = np.cumsum(np.hstack((0, size[:-1])))
part = np.empty((size[comm.rank], nvars), dtype='d')
comm.Scatterv((whole, (size * nvars, offset * nvars), MPI.DOUBLE),
(part, MPI.DOUBLE),
root=root)
# Evaluate models assigned to each processor
partial_result = np.array([problem.nllf(pk) for pk in part], dtype='d')
# Collect results
result = np.empty(npoints, dtype='d') if comm.rank == root else True
comm.Barrier()
comm.Gatherv((partial_result, MPI.DOUBLE),
(result, (size, offset), MPI.DOUBLE),
root=root)
comm.Barrier()
return result
[docs]
class MPIMapper(object):
_first_fit = True # The first problem is set when the worker starts
[docs]
@staticmethod
def start_worker(problem):
"""
Start the worker process.
For the main process this does nothing and returns immediately. The
worker processes never return.
Each worker sits in a loop waiting for the next batch of points
for the problem, or for the next problem. Set t
problem is set to None, then exit the process and never
"""
from mpi4py import MPI
comm, root = MPI.COMM_WORLD, 0
# If worker, sit in a loop waiting for the next point.
# If the point is empty, then wait for a new problem.
# If the problem is None then we are done, otherwise wait for next point.
if comm.rank != root:
#print(f"{comm.rank}: looping")
while True:
result = _MPI_map(problem, None, comm, root)
if not result:
problem = _MPI_set_problem(None, comm, root)
if problem is None:
break
#print(f"{comm.rank}: changing problem")
#print(f"{comm.rank}: finalizing")
MPI.Finalize()
# Exit the program after the worker is done. Don't return
# to the caller since that is continuing on with the main
# thread, and in particular, attempting to rerun the fit on
# each worker.
sys.exit(0)
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
# Only root can get here---worker is stuck in start_worker
from mpi4py import MPI
comm, root = MPI.COMM_WORLD, 0
import numpy as np
# Signal new problem then send it, but not on the first fit. We do this
# so that we can still run MPI fits even if the problem itself cannot
# be pickled, but only the first one. (You can still fit a series even
# if the problem can't be pickled, but you will need to restart the
# MPI job separately for each fit.)
# Note: setting problem to None stops the program, so call finalize().
mapper = lambda points: _MPI_map(problem, points, comm, root)
if not MPIMapper._first_fit:
#print(f"{comm.rank}: replacing problem")
# Send an empty set of points to signal a new problem is coming.
mapper(np.empty((0, 0), 'd'))
_MPI_set_problem(problem, comm, root)
if problem is None:
#print(f"{comm.rank}: finalizing root")
MPI.Finalize()
MPIMapper._first_fit = False
return mapper
[docs]
@staticmethod
def stop_mapper(mapper):
# Set problem=None to stop the program.
MPIMapper.start_mapper(None, None)
[docs]
class AMQPMapper(object):
[docs]
@staticmethod
def start_worker(problem):
#sys.stderr = open("bumps-%d.log"%os.getpid(),"w")
#print >>sys.stderr,"worker is starting"; sys.stdout.flush()
from amqp_map.config import SERVICE_HOST
from amqp_map.core import connect, start_worker as serve
server = connect(SERVICE_HOST)
#os.system("echo 'serving' > /tmp/map.%d"%(os.getpid()))
# print "worker is serving"; sys.stdout.flush()
serve(server, "bumps", problem.nllf)
#print >>sys.stderr,"worker ended"; sys.stdout.flush()
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
import sys
import multiprocessing
import subprocess
from amqp_map.config import SERVICE_HOST
from amqp_map.core import connect, Mapper
server = connect(SERVICE_HOST)
mapper = Mapper(server, "bumps")
cpus = multiprocessing.cpu_count()
pipes = []
for _ in range(cpus):
cmd = [sys.argv[0], "--worker"] + modelargs
# print "starting",sys.argv[0],"in",os.getcwd(),"with",cmd
pipe = subprocess.Popen(cmd, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pipes.append(pipe)
for pipe in pipes:
if pipe.poll() > 0:
raise RuntimeError("subprocess returned %d\nout: %s\nerr: %s"
% (pipe.returncode, pipe.stdout, pipe.stderr))
#os.system(" ".join(cmd+["&"]))
import atexit
def exit_fun():
for p in pipes:
p.terminate()
atexit.register(exit_fun)
# print "returning mapper",mapper
return mapper
[docs]
@staticmethod
def stop_mapper(mapper):
for pipe in mapper.pipes:
pipe.terminate()