r"""An intuitive way of specifying data flow using imperative programming
techniques.
For example, to count the average word length in a string:
.. code-block:: python
from job_stream import inline
# Begin a new job_stream, treating each word in the string as a piece
# of work.
with inline.Work("Hello there world, how are you?".split()) as w:
@w.job
def getLen(w):
# Transform each word into its length.
return len(w)
# A reducer combines the above answers; the result from the reducer
# will be the stored cumulative length divided by the number of words,
# or in other words the mean length.
@w.reduce(emit = lambda store: store.len / store.count)
def findAverage(store, inputs, others):
# Initialize the data store if it hasn't been.
if not hasattr(store, 'init'):
store.init = True
store.count = 0
store.len = 0.0
# Add each word's length to ours, and increment count.
for i in inputs:
store.len += i
store.count += 1
# Merge with another reducer, adding their count and length to
# our own.
for o in others:
store.len += o.len
store.count += o.count
@w.result
def printer(result):
# Print out the emit'd average.
print(result)
The main class interacted with in ``inline`` code is
:class:`job_stream.inline.Work`, which provides methods for decorating Python
functions and remembers how the code was organized so that things are executed
in the right order. See :class:`job_stream.inline.Work`'s documentation for
more information.
Hierarchy of ``job_stream.inline`` Pipes
========================================
Sometimes, code follows a common skeleton with some parallel code in the
middle. The :mod:`job_stream.baked` library is an example of this;
:meth:`job_stream.baked.sweep` returns a context manager that yields a
:class:`job_stream.inline.Work` object used to tell ``sweep`` what operations
the user wishes to perform:
.. code-block:: python
from job_stream.baked import sweep
import numpy as np
with sweep({ 'a': np.arange(10) }) as w:
@w.job
def square(id, trial, a):
return { 'value': a*a + np.random.random() }
It is recommended that if a user class has functionality that can be added
within a ``job_stream`` pipe, it should expose that functionality in an
``@classmethod`` as ``jobs_{name}``:
.. code-block:: python
from job_stream.inline import Args, Work
class MyClass:
def classify(inputs):
return inputs[0]
@classmethod
def jobs_classify(w):
@w.job
def _classify(classifier, inputs):
return classifier.classify(inputs)
with Work([ Args(MyClass(), [i]) for i in range(10) ]) as w:
MyClass.jobs_classify(w)
"""
import job_stream.common
import pickle
import inspect
import os
# Imports from job_stream
Object = job_stream.Object
getCpuCount = job_stream.getCpuCount
getRank = job_stream.getRank
# Used to register new classes so they can be pickle'd appropriately
_moduleSelf = globals()
_typeCount = [ 0 ]
[docs]class _ForceCheckpointJob(job_stream.common.Job):
"""Used for tests; raises exception after checkpoint."""
[docs] def handleWork(self, work):
self._forceCheckpoint(True)
self.emit(work)
[docs]class _ForceCheckpointAndGoJob(job_stream.common.Job):
"""Used for tests; execution continues after checkpoint."""
[docs] def handleWork(self, work):
self._forceCheckpoint(False)
self.emit(work)
[docs]class _UnwrapperJob(job_stream.common.Job):
"""Job to make processing after a reducer possible and sensical for inline.
"""
USE_MULTIPROCESSING = False
[docs] def handleWork(self, work):
for w in work:
self.emit(w)
[docs]class Args(object):
"""Automatically fans-out args and kwargs to the caller. For example, the
below will print out "Got 1, 2, 3, 4":
.. code-block:: python
from job_stream.inline import Args, Work
with Work([ Args(1, 2, d=4, c=3) ]) as w:
@w.job
def handle(a, b, c, d):
print("Got {}, {}, {}, {}".format(a, b, c, d))
"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
[docs]class Multiple(object):
"""If some inline code wants to emit or recur multiple pieces of work, an instance of
this class is the way to do it. For instance:
return Multiple([ 1, 2, 3 ])
"""
def __init__(self, workList):
self.workList = workList
[docs]class Work(object):
"""Represents a job_stream pipeline. Similar to traditional imperative
coding practices, :class:`Work` passes work in the same direction as the
source file. In other words, the system starts with something like:
.. code-block:: python
import job_stream.inline as inline
with inline.Work([1, 2, 3]) as w:
# ...
This code will distribute the numbers 1, 2, and 3 into the system. Once
in the system, any decorated methods will process the data in some way.
*The ordering of the decorated methods matters!* For instance, running:
.. code-block:: python
with inline.Work([1]) as w:
@w.job
def first(w):
print("a: {}".format(w))
return w+1
@w.job
def second(w):
print("b: {}".format(w))
Will print ``a: 1`` and ``b: 2``. If the list to :class:`Work` were
longer, then more ``a:`` and ``b:`` lines would be printed, one pair for
each input to the system.
.. note:: Multiprocessing
Python has the `GIL <https://wiki.python.org/moin/GlobalInterpreterLock>`_
in the default implementation, which typically limits pure-python code
to a single thread. To get around this, the :mod:`job_stream` module
by default uses multiprocessing for all jobs - that is, your python
code will run in parallel on all cores, in different processes.
If this behavior is not desired, particularly if your application loads
a lot of data in memory that you would rather not duplicate, passing
``useMultiprocessing=False`` to the :class:`Work` object's initializer
will force all job_stream activity to happen within the original
process:
.. code-block:: python
Work([ 1, 2 ], useMultiprocessing=False)
This may also be done to individual jobs / frames / reducers:
.. code-block:: python
@w.job(useMultiprocessing=False)
def doSomething(work):
pass
"""
def __init__(self, initialWork = [], useMultiprocessing = True,
**runKwargs):
"""May be passed a list or other iterable of initial work, as well as
any of the following flags:
Args:
initialWork (list): The token(s) or bits of work that will be
processed. Each will be processed independently of the others;
results might be joined late in the pipeline depending on its
configuration.
useMultiprocessing (bool): If True (the default), then all
job_stream work will be handled in child processes spawned via
Python's multiprocessing module. This is used to get around
the limitations of the GIL, but if your application's setup
(not the jobs themselves) uses a lot of memory and your
computation is handled by either non-GIL protected code or in
another process anyway, it is appropriate to turn this off.
checkpointFile (str): **Obsolete; use the ``job_stream`` binary.**
If specified, job_stream will run in checkpoint
mode, periodically dumping progress to this file. If this file
exists when run() is called, then simulation will be resumed
from the state stored in this file.
checkpointInterval (float): **Obsolete; use the ``job_stream``
binary.** Time, in seconds, between the completion of one
checkpoint and the beginning of the next.
checkpointSyncInterval (float): **Debug / test usage only.**
Time, in seconds, to wait for sync when taking a checkpoint.
"""
self._initialWork = list(initialWork)
self._initialWorkDepth = 0
self._useMultiprocessing = useMultiprocessing
self._runKwargs = dict(runKwargs)
# The YAML config that we're building, essentially
self._config = { 'jobs': [] }
# Functions ran on init
self._inits = []
# Function ran to handle a result. If set, nothing else may be added
# to the pipe!
self._resultHandler = None
# finish() has been called
self._hasFinish = False
# parent node for current scope
self._stack = [ { 'config': self._config } ]
def __enter__(self):
return self
def __exit__(self, errType, value, tb):
if errType is None:
# If no result was specified, we do not want to cache and return
# results either. That is, there is no way to get the result of a
# with block.
if not self._hasFinish and self._resultHandler is None:
@self.result
def handleResult(r):
pass
self.run()
[docs] def init(self, func = None):
"""Decorates a method that is called only once on the primary host
before any jobs run. The method is never run when continuing from
a checkpoint.
Useful for e.g. creating folders, deleting temporary files from previous
runs, etc."""
if func is None:
return lambda func2: self.init(func2)
self._assertNoResult()
self._assertFuncArgs(func, 0)
self._inits.append(func)
return func
[docs] def finish(self, func = None):
"""Decorates a method that is called only once on the primary host,
after all jobs have finished.
The decorated method takes a single argument, which is a list of all
results returned from the jobs within the stream.
Anything returned within the decorated method will be emitted from the
job_stream, meaning it will be returned by run(). User code should not
typically rely on this; it is used mainly for e.g. IPython notebooks.
Cannot be used if Work.result is used. Work.result is preferred, if
your job stream's output can be processed one piece at a time (more
efficient in that results can be discarded after they are processed)."""
if func is None:
return lambda func2: self.finish(func2)
self._assertNoResult()
self._assertFuncArgs(func, 1)
@self.reduce(store = lambda: [], emit = func)
def collectResults(store, works, others):
store.extend(works)
for o in others:
store.extend(o)
self._hasFinish = True
[docs] def frame(self, func = None, **kwargs):
"""Decorates a function that begins a frame. A frame is started with
a single piece of work, and then recurses other pieces of work into
itself until some stopping condition.
Frames (and their cousins Reducers) are the most complicated feature in
`job_stream`. A frame is appropriate if:
* A while loop would be used in non-parallelizable code
* Individual pieces of work need fan-out and fan-in
Frames have three parts - an "all outstanding work is finished"
handler, an aggregator, and everything in between, which is used to
process recurred work.
For example, suppose we want to sum all digits between 1 and our work,
and report the result. The best way to design this type of system is
with a Frame, implemented in `inline` through :meth:`frame` and
:meth:`frameEnd`. The easiest way to think of these is as the two ends
of a ``while`` loop - :meth:`frame` is evaluated as a termination
condition, and is also evaluated before anything happens.
:meth:`frameEnd` exists to aggregate logic from within the ``while``
loop into something that ``frame`` can look at.
.. code-block:: python
from job_stream.inline import Work, Multiple
w = Work([ 4, 5, 8 ])
@w.frame
def sumThrough(store, first):
# Remember, this is called like the header of a while statement: once at
# the beginning, and each time our recurred work finishes. Anything
# returned from this function will keep the loop running.
if not hasattr(store, 'value'):
# Store hasn't been initialized yet, meaning that this is the first
# evaluation
store.first = first
store.value = 0
return first
# If we reach here, we're done. By not returning anything, job_stream knows
# to exit the loop (finish the reduction). The default behavior of frame is
# to emit the store object itself, which is fine.
# Anything between an @frame decorated function and @frameEnd will be executed
# for anything returned by the @frame or @frameEnd functions. We could have
# returned multiple from @frame as well, but this is a little more fun
@w.job
def countTo(w):
# Generate and emit as work all integers ranging from 1 to w, inclusive
return Multiple(range(1, w + 1))
@w.frameEnd
def handleNext(store, next):
# next is any work that made it through the stream between @frame and
# @frameEnd. In our case, it is one of the integers between 1 and our
# initial work.
store.value += next
@w.result
def printMatchup(w):
print("{}: {}".format(w.first, w.value))
w.run()
Running the above code will print:
.. code-block:: sh
$ python script.py
4: 10
8: 36
5: 15
Note that the original work is out of order, but the sums line up. This is
because a frame starts a new reduction for each individual piece of work
entering the `@frame` decorated function.
Args:
store: The storage object used to remember results from the frame.
first: The first work that began this frame.
Any results returned are recursed into the frame.
Decorator kwargs:
store - The constructor for a storage object. Defaults to inline.Object
emit - A function taking a storage object and returning the work that
should be forwarded to the next member of the stream. Defaults
to emitting the store itself.
"""
if func is None:
# Non-decorating form
return lambda func2: self.frame(func2, **kwargs)
self._assertNoResult()
self._assertFuncArgs(func, 2, True)
fargs = Object()
fargs.store = kwargs.pop('store', Object)
fargs.emit = kwargs.pop('emit', lambda store: store)
fargs.useMulti = kwargs.pop('useMultiprocessing', self._useMultiprocessing)
if kwargs:
raise KeyError("Unrecognized arguments: {}".format(kwargs.keys()))
fconf = { 'jobs': [] }
self._stack[-1]['config']['jobs'].append(fconf)
self._stack.append({ 'frameFunc': func, 'frameArgs': fargs, 'config': fconf })
return func
[docs] def frameEnd(self, func = None):
"""Ends a frame. The decorated function should accept two arguments:
Args:
store: The storage object used to remember results from the frame
(same as in the :meth:`frame` decorated method).
next: The next result object that ran through the frame.
Any results returned are recursed into the frame.
"""
if func is None:
# Non-decorating
return lambda func2: self.frameEnd(func2)
if 'frameFunc' not in self._stack[-1]:
raise Exception("frameEnd call does not match up with frame!")
self._assertNoResult()
self._assertFuncArgs(func, 2, True)
fargs = self._stack[-1]['frameArgs']
funcStart = self._stack[-1]['frameFunc']
# Note that the inline version skips straight from handleFirst to handleDone, then
# handleNext based on recurrence.
def handleFirst(s, store, first):
store.first = first
store.obj = fargs.store()
def handleNext(s, store, next):
if isinstance(next, Args):
results = func(store.obj, *next.args, **next.kwargs)
else:
results = func(store.obj, next)
self._listCall(s.recur, results)
def handleDone(s, store):
if isinstance(store.first, Args):
results = funcStart(store.obj, *store.first.args,
**store.first.kwargs)
else:
results = funcStart(store.obj, store.first)
hadRecur = self._listCall(s.recur, results)
if not hadRecur:
# Frame is done, so emit our results
self._listCall(s.emit, fargs.emit(store.obj))
frame = self._newType(self._stack[-1]['frameFunc'].__name__,
job_stream.common.Frame, handleFirst = handleFirst,
handleNext = handleNext, handleDone = handleDone,
useMultiprocessing = fargs.useMulti)
self._stack[-1]['config']['frame'] = frame
self._stack.pop()
# Since we always make a stack entry to start a frame, should never be empty here
assert self._stack
return func
[docs] def job(self, func = None, **kwargs):
"""Decorates a job. The decorated function must take one argument,
which is the work coming into the job. Anything returned is passed
along to the next member of the stream.
You may also call :meth:`job` as ``w.job(useMultiprocessing=False)`` to
disable multiprocessing for this job.
.. warning:: I/O Safety
It is not safe to write non-unique external i/o (such as a file)
within a job. This is because jobs have no parallelism guards -
that is, two jobs executing concurrently might open and append to a
file at the same time. On some filesystems, this results in e.g.
two lines of a csv being combined into a single, invalid line. To
work around this, see :meth:`result`.
"""
if func is None:
# Invocation, not decoration
return lambda func2: self.job(func2)
kw = kwargs.copy()
useMulti = kw.pop('useMultiprocessing', self._useMultiprocessing)
if kw:
raise ValueError("Unrecognized args: {}".format(kw))
self._assertNoResult()
funcCls = func
if not inspect.isclass(funcCls):
self._assertFuncArgs(func, 1, True)
def handle(s, work):
if isinstance(work, Args):
results = func(*work.args, **work.kwargs)
else:
results = func(work)
self._listCall(s.emit, results)
funcCls = self._newType(func.__name__, job_stream.common.Job,
handleWork = handle, useMultiprocessing=useMulti)
self._stack[-1]['config']['jobs'].append(funcCls)
# Return original function for multiplicity
return func
[docs] def reduce(self, func = None, **kwargs):
"""Decorates a reducer. Reducers are distributed (function does not
run on only one machine per reduction). Typically this is used to
gather and aggregate results. Any set of work coming into a reducer
will be emitted as a single piece of work.
The decorated function should take three arguments:
store - The storage object on this machine.
works - A list of work coming into the reducer
others - A list of storage objects being joined into this reducer from
other sources.
Any return value is recurred as work into the reducer.
Decorator kwargs:
store - Constructor for the storage element. Defaults to inline.Object
emit - Function that takes the store and returns work to be forwarded
to the next element in the stream.
"""
if func is None:
# Invocation with kwargs vs paren-less decoration
return lambda func2: self.reduce(func2, **kwargs)
self._assertNoResult()
reducerCls = func
if not inspect.isclass(reducerCls):
self._assertFuncArgs(func, 3)
storeType = kwargs.pop('store', Object)
emitValue = kwargs.pop('emit', lambda store: store)
if kwargs:
raise KeyError("Unknown kwargs: {}".format(kwargs.keys()))
def init(s, store):
store.obj = storeType()
def add(s, store, w):
results = func(store.obj, [ w ], [])
self._listCall(s.recur, results)
def join(s, store, other):
results = func(store.obj, [], [ other ])
self._listCall(s.recur, results)
def done(s, store):
self._listCall(s.emit, emitValue(store.obj))
reducerCls = self._newType(func.__name__,
job_stream.common.Reducer, handleInit = init,
handleAdd = add, handleJoin = join, handleDone = done)
self._stack[-1]['config']['reducer'] = reducerCls
self._stack.pop()
if not self._stack:
# We popped off the last. To allow inline jobs to still be added
# (post processing), we ensure that we still have a stack
self._config['jobs'].insert(0, _UnwrapperJob)
self._initialWorkDepth += 1
self._config = { 'jobs': [ self._config ] }
self._stack.append({ 'config': self._config })
# Return the original function so that the user may call it, if
# desired
return func
[docs] def result(self, func = None, **kwargs):
"""Decorates a result handler, which is called only on the primary host
exactly once for each piece of work that exits the system. **This is
the safest place for I/O!**
The handler receives a single argument, which is the piece of work
exiting the system.
If no result handlers are decorated, then inline will use a collector
that gathers the results and returns them in a list from the run()
method (run() will return None on machines that are not the primary
host)."""
if func is None:
return lambda func2: self.result(func2, **kwargs)
self._assertNoResult()
self._assertFuncArgs(func, 1, True)
if kwargs:
raise KeyError("Unknown kwargs: {}".format(kwargs.keys()))
def realFunc(arg):
if isinstance(arg, Args):
return func(*arg.args, **arg.kwargs)
return func(arg)
self._resultHandler = realFunc
# Return original function so it may be referred to as usual
return func
[docs] def run(self):
"""Runs the Work, executing all decorated methods in the order they
were specified. A few kwargs passed to Work() will be forwarded to
job_stream.common.run().
"""
runKwargs = dict(self._runKwargs)
# Hack in a finish() that returns the results, if no finish or results
# is specified
result = [ None ]
if not self._hasFinish and self._resultHandler is None:
@self.finish
def returnResults(results):
return results
if self._hasFinish:
if self._resultHandler is not None:
raise ValueError("finish() and result()?")
def handleSingleResult(onlyResult):
if result[0] is not None:
raise ValueError("Got multiple results?")
result[0] = onlyResult
runKwargs['handleResult'] = handleSingleResult
else:
if self._resultHandler is None:
raise ValueError("No finish() nor result()?")
runKwargs['handleResult'] = self._resultHandler
# Run init functions, if it's the first execution of this stream
isFirst = True
# Is this a continuation?
if runKwargs.get('checkpointFile'):
if os.path.lexists(runKwargs['checkpointFile']):
isFirst = False
# Init?
if isFirst and job_stream.getRank() == 0:
# Call initial functions
def addToInitial(w):
self._initialWork.append(w)
for init in self._inits:
# Remember, anything returned from init() adds to our initial
# work.
self._listCall(addToInitial, init())
# Bury our initial work appropriately
for i in range(self._initialWorkDepth):
self._initialWork = [ self._initialWork ]
job_stream.common.work = self._initialWork
job_stream.common.run(self._config, **runKwargs)
return result[0]
[docs] def _assertFuncArgs(self, func, i, isMin=False):
"""Ensures that the function func takes exactly i args. Makes error
messages more friendly."""
spec = inspect.getargspec(func)
numArgs = len(spec.args)
# For job_stream purposes, varargs or kwargs count as only one
# additional argument. What we want to avoid is 'store' getting
# wrapped up in *args or **kwargs.
if spec.varargs is not None or spec.keywords is not None:
numArgs += 1
if numArgs != i and not isMin or isMin and numArgs < i:
raise ValueError("Function {} must take {} {} args; takes {}"
.format(func, "at least" if isMin else "exactly", i,
numArgs))
[docs] def _assertNoResult(self):
"""Ensures that @w.result hasn't been used yet, otherwise raises an
error."""
if self._hasFinish:
raise Exception("After Work.finish is used, no other elements may "
"be added to the job stream.")
if self._resultHandler is not None:
raise Exception("After Work.result is used, no other elements may "
"be added to the job stream.")
[docs] def _listCall(self, boundMethod, results):
"""Calls boundMethod (which is something like Job.emit) for each member of results,
if results is Multiple.
Returns True if the boundMethod was called at all (non-None result), False
otherwise."""
called = False
if isinstance(results, Multiple):
for r in results.workList:
if r is None:
continue
boundMethod(r)
called = True
elif results is None:
pass
else:
boundMethod(results)
called = True
return called
def _newType(self, nameBase, clsBase, **funcs):
tname = "_{}_{}".format(nameBase, _typeCount[0])
_typeCount[0] += 1
useMulti = funcs.pop('useMultiprocessing', self._useMultiprocessing)
clsAttrs = dict(funcs)
if not useMulti:
clsAttrs['USE_MULTIPROCESSING'] = False
cls = type(tname, (clsBase,), clsAttrs)
_moduleSelf[tname] = cls
return cls
[docs]def map(func, *sequence, **kwargs):
"""Returns [ func(*a) for a in sequence ] in a parallel manner. Identical
to the builtin map(), except parallelized.
kwargs - Passed to job_stream.inline.Work().
This implementation differs from job_stream.map() so that it can
demonstrate the functionality of the inline module. The behavior is
identical."""
arr = list(enumerate(zip(*sequence)))
result = [ None for _ in range(len(arr)) ]
with Work(arr, **kwargs) as w:
@w.job
def mapWork(w):
i, arg = w
return (i, func(*arg))
@w.result
def storeResult(w):
result[w[0]] = w[1]
return result