job_stream.inline¶
An intuitive way of specifying data flow using imperative programming techniques.
For example, to count the average word length in a string:
from job_stream import inline
# Begin a new job_stream, treating each word in the string as a piece
# of work.
with inline.Work("Hello there world, how are you?".split()) as w:
@w.job
def getLen(w):
# Transform each word into its length.
return len(w)
# A reducer combines the above answers; the result from the reducer
# will be the stored cumulative length divided by the number of words,
# or in other words the mean length.
@w.reduce(emit = lambda store: store.len / store.count)
def findAverage(store, inputs, others):
# Initialize the data store if it hasn't been.
if not hasattr(store, 'init'):
store.init = True
store.count = 0
store.len = 0.0
# Add each word's length to ours, and increment count.
for i in inputs:
store.len += i
store.count += 1
# Merge with another reducer, adding their count and length to
# our own.
for o in others:
store.len += o.len
store.count += o.count
@w.result
def printer(result):
# Print out the emit'd average.
print(result)
The main class interacted with in inline
code is
job_stream.inline.Work
, which provides methods for decorating Python
functions and remembers how the code was organized so that things are executed
in the right order. See job_stream.inline.Work
’s documentation for
more information.
Hierarchy of job_stream.inline
Pipes¶
Sometimes, code follows a common skeleton with some parallel code in the
middle. The job_stream.baked
library is an example of this;
job_stream.baked.sweep()
returns a context manager that yields a
job_stream.inline.Work
object used to tell sweep
what operations
the user wishes to perform:
from job_stream.baked import sweep
import numpy as np
with sweep({ 'a': np.arange(10) }) as w:
@w.job
def square(id, trial, a):
return { 'value': a*a + np.random.random() }
It is recommended that if a user class has functionality that can be added
within a job_stream
pipe, it should expose that functionality in an
@classmethod
as jobs_{name}
:
from job_stream.inline import Args, Work
class MyClass:
def classify(inputs):
return inputs[0]
@classmethod
def jobs_classify(w):
@w.job
def _classify(classifier, inputs):
return classifier.classify(inputs)
with Work([ Args(MyClass(), [i]) for i in range(10) ]) as w:
MyClass.jobs_classify(w)
Members¶
-
class
job_stream.inline.
Args
(*args, **kwargs)[source]¶ Bases:
object
Automatically fans-out args and kwargs to the caller. For example, the below will print out “Got 1, 2, 3, 4”:
from job_stream.inline import Args, Work with Work([ Args(1, 2, d=4, c=3) ]) as w: @w.job def handle(a, b, c, d): print("Got {}, {}, {}, {}".format(a, b, c, d))
-
class
job_stream.inline.
Multiple
(workList)[source]¶ Bases:
object
If some inline code wants to emit or recur multiple pieces of work, an instance of this class is the way to do it. For instance:
return Multiple([ 1, 2, 3 ])
-
class
job_stream.inline.
Work
(initialWork=[], useMultiprocessing=True, **runKwargs)[source]¶ Bases:
object
Represents a job_stream pipeline. Similar to traditional imperative coding practices,
Work
passes work in the same direction as the source file. In other words, the system starts with something like:import job_stream.inline as inline with inline.Work([1, 2, 3]) as w: # ...
This code will distribute the numbers 1, 2, and 3 into the system. Once in the system, any decorated methods will process the data in some way. The ordering of the decorated methods matters! For instance, running:
with inline.Work([1]) as w: @w.job def first(w): print("a: {}".format(w)) return w+1 @w.job def second(w): print("b: {}".format(w))
Will print
a: 1
andb: 2
. If the list toWork
were longer, then morea:
andb:
lines would be printed, one pair for each input to the system.Note
Multiprocessing
Python has the GIL in the default implementation, which typically limits pure-python code to a single thread. To get around this, the
job_stream
module by default uses multiprocessing for all jobs - that is, your python code will run in parallel on all cores, in different processes.If this behavior is not desired, particularly if your application loads a lot of data in memory that you would rather not duplicate, passing
useMultiprocessing=False
to theWork
object’s initializer will force all job_stream activity to happen within the original process:Work([ 1, 2 ], useMultiprocessing=False)
This may also be done to individual jobs / frames / reducers:
@w.job(useMultiprocessing=False) def doSomething(work): pass
May be passed a list or other iterable of initial work, as well as any of the following flags:
Parameters: - initialWork (list) – The token(s) or bits of work that will be processed. Each will be processed independently of the others; results might be joined late in the pipeline depending on its configuration.
- useMultiprocessing (bool) – If True (the default), then all job_stream work will be handled in child processes spawned via Python’s multiprocessing module. This is used to get around the limitations of the GIL, but if your application’s setup (not the jobs themselves) uses a lot of memory and your computation is handled by either non-GIL protected code or in another process anyway, it is appropriate to turn this off.
- checkpointFile (str) – Obsolete; use the ``job_stream`` binary. If specified, job_stream will run in checkpoint mode, periodically dumping progress to this file. If this file exists when run() is called, then simulation will be resumed from the state stored in this file.
- checkpointInterval (float) – Obsolete; use the ``job_stream`` binary. Time, in seconds, between the completion of one checkpoint and the beginning of the next.
- checkpointSyncInterval (float) – Debug / test usage only. Time, in seconds, to wait for sync when taking a checkpoint.
-
_assertFuncArgs
(func, i, isMin=False)[source]¶ Ensures that the function func takes exactly i args. Makes error messages more friendly.
-
_listCall
(boundMethod, results)[source]¶ Calls boundMethod (which is something like Job.emit) for each member of results, if results is Multiple.
Returns True if the boundMethod was called at all (non-None result), False otherwise.
-
finish
(func=None)[source]¶ Decorates a method that is called only once on the primary host, after all jobs have finished.
The decorated method takes a single argument, which is a list of all results returned from the jobs within the stream.
Anything returned within the decorated method will be emitted from the job_stream, meaning it will be returned by run(). User code should not typically rely on this; it is used mainly for e.g. IPython notebooks.
Cannot be used if Work.result is used. Work.result is preferred, if your job stream’s output can be processed one piece at a time (more efficient in that results can be discarded after they are processed).
-
frame
(func=None, **kwargs)[source]¶ Decorates a function that begins a frame. A frame is started with a single piece of work, and then recurses other pieces of work into itself until some stopping condition.
Frames (and their cousins Reducers) are the most complicated feature in job_stream. A frame is appropriate if:
- A while loop would be used in non-parallelizable code
- Individual pieces of work need fan-out and fan-in
Frames have three parts - an “all outstanding work is finished” handler, an aggregator, and everything in between, which is used to process recurred work.
For example, suppose we want to sum all digits between 1 and our work, and report the result. The best way to design this type of system is with a Frame, implemented in inline through
frame()
andframeEnd()
. The easiest way to think of these is as the two ends of awhile
loop -frame()
is evaluated as a termination condition, and is also evaluated before anything happens.frameEnd()
exists to aggregate logic from within thewhile
loop into something thatframe
can look at.from job_stream.inline import Work, Multiple w = Work([ 4, 5, 8 ]) @w.frame def sumThrough(store, first): # Remember, this is called like the header of a while statement: once at # the beginning, and each time our recurred work finishes. Anything # returned from this function will keep the loop running. if not hasattr(store, 'value'): # Store hasn't been initialized yet, meaning that this is the first # evaluation store.first = first store.value = 0 return first # If we reach here, we're done. By not returning anything, job_stream knows # to exit the loop (finish the reduction). The default behavior of frame is # to emit the store object itself, which is fine. # Anything between an @frame decorated function and @frameEnd will be executed # for anything returned by the @frame or @frameEnd functions. We could have # returned multiple from @frame as well, but this is a little more fun @w.job def countTo(w): # Generate and emit as work all integers ranging from 1 to w, inclusive return Multiple(range(1, w + 1)) @w.frameEnd def handleNext(store, next): # next is any work that made it through the stream between @frame and # @frameEnd. In our case, it is one of the integers between 1 and our # initial work. store.value += next @w.result def printMatchup(w): print("{}: {}".format(w.first, w.value)) w.run()
Running the above code will print:
$ python script.py 4: 10 8: 36 5: 15
Note that the original work is out of order, but the sums line up. This is because a frame starts a new reduction for each individual piece of work entering the @frame decorated function.
Parameters: - store – The storage object used to remember results from the frame.
- first – The first work that began this frame.
Any results returned are recursed into the frame.
- Decorator kwargs:
store - The constructor for a storage object. Defaults to inline.Object emit - A function taking a storage object and returning the work that
should be forwarded to the next member of the stream. Defaults to emitting the store itself.
-
frameEnd
(func=None)[source]¶ Ends a frame. The decorated function should accept two arguments:
Parameters: - store – The storage object used to remember results from the frame
(same as in the
frame()
decorated method). - next – The next result object that ran through the frame.
Any results returned are recursed into the frame.
- store – The storage object used to remember results from the frame
(same as in the
-
init
(func=None)[source]¶ Decorates a method that is called only once on the primary host before any jobs run. The method is never run when continuing from a checkpoint.
Useful for e.g. creating folders, deleting temporary files from previous runs, etc.
-
job
(func=None, **kwargs)[source]¶ Decorates a job. The decorated function must take one argument, which is the work coming into the job. Anything returned is passed along to the next member of the stream.
You may also call
job()
asw.job(useMultiprocessing=False)
to disable multiprocessing for this job.Warning
I/O Safety
It is not safe to write non-unique external i/o (such as a file) within a job. This is because jobs have no parallelism guards - that is, two jobs executing concurrently might open and append to a file at the same time. On some filesystems, this results in e.g. two lines of a csv being combined into a single, invalid line. To work around this, see
result()
.
-
reduce
(func=None, **kwargs)[source]¶ Decorates a reducer. Reducers are distributed (function does not run on only one machine per reduction). Typically this is used to gather and aggregate results. Any set of work coming into a reducer will be emitted as a single piece of work.
The decorated function should take three arguments: store - The storage object on this machine. works - A list of work coming into the reducer others - A list of storage objects being joined into this reducer from
other sources.Any return value is recurred as work into the reducer.
Decorator kwargs: store - Constructor for the storage element. Defaults to inline.Object emit - Function that takes the store and returns work to be forwarded
to the next element in the stream.
-
result
(func=None, **kwargs)[source]¶ Decorates a result handler, which is called only on the primary host exactly once for each piece of work that exits the system. This is the safest place for I/O!
The handler receives a single argument, which is the piece of work exiting the system.
If no result handlers are decorated, then inline will use a collector that gathers the results and returns them in a list from the run() method (run() will return None on machines that are not the primary host).
-
class
job_stream.inline.
_ForceCheckpointAndGoJob
((object)arg1) → None :[source]¶ Bases:
job_stream.common.Job
Used for tests; execution continues after checkpoint.
- C++ signature :
- void __init__(_object*)
__init__( (object)arg1) -> None :
- C++ signature :
- void __init__(_object*)
-
_forceCheckpoint
((Job)arg1, (bool)arg2) → None :¶ Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.
- C++ signature :
- void _forceCheckpoint(PyJob {lvalue},bool)
-
emit
((Job)arg1, (object)arg2) → None :¶ Emit to only target
- C++ signature :
- void emit(PyJob {lvalue},boost::python::api::object)
- emit( (Job)arg1, (object)arg2, (str)arg3) -> None :
Emit to specific target out of list
- C++ signature :
- void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
-
handleWork
(work)[source]¶ Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.
-
postSetup
()¶ Called when self.config is set and the Job is fully ready for work, but before any work is accepted.
-
class
job_stream.inline.
_ForceCheckpointJob
((object)arg1) → None :[source]¶ Bases:
job_stream.common.Job
Used for tests; raises exception after checkpoint.
- C++ signature :
- void __init__(_object*)
__init__( (object)arg1) -> None :
- C++ signature :
- void __init__(_object*)
-
_forceCheckpoint
((Job)arg1, (bool)arg2) → None :¶ Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.
- C++ signature :
- void _forceCheckpoint(PyJob {lvalue},bool)
-
emit
((Job)arg1, (object)arg2) → None :¶ Emit to only target
- C++ signature :
- void emit(PyJob {lvalue},boost::python::api::object)
- emit( (Job)arg1, (object)arg2, (str)arg3) -> None :
Emit to specific target out of list
- C++ signature :
- void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
-
handleWork
(work)[source]¶ Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.
-
postSetup
()¶ Called when self.config is set and the Job is fully ready for work, but before any work is accepted.
-
class
job_stream.inline.
_UnwrapperJob
((object)arg1) → None :[source]¶ Bases:
job_stream.common.Job
Job to make processing after a reducer possible and sensical for inline.
- C++ signature :
- void __init__(_object*)
__init__( (object)arg1) -> None :
- C++ signature :
- void __init__(_object*)
-
_forceCheckpoint
((Job)arg1, (bool)arg2) → None :¶ Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.
- C++ signature :
- void _forceCheckpoint(PyJob {lvalue},bool)
-
emit
((Job)arg1, (object)arg2) → None :¶ Emit to only target
- C++ signature :
- void emit(PyJob {lvalue},boost::python::api::object)
- emit( (Job)arg1, (object)arg2, (str)arg3) -> None :
Emit to specific target out of list
- C++ signature :
- void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
-
handleWork
(work)[source]¶ Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.
-
postSetup
()¶ Called when self.config is set and the Job is fully ready for work, but before any work is accepted.
-
job_stream.inline.
map
(func, *sequence, **kwargs)[source]¶ Returns [ func(*a) for a in sequence ] in a parallel manner. Identical to the builtin map(), except parallelized.
kwargs - Passed to job_stream.inline.Work().
This implementation differs from job_stream.map() so that it can demonstrate the functionality of the inline module. The behavior is identical.