job_stream.inline

An intuitive way of specifying data flow using imperative programming techniques.

For example, to count the average word length in a string:

from job_stream import inline

# Begin a new job_stream, treating each word in the string as a piece
# of work.
with inline.Work("Hello there world, how are you?".split()) as w:
    @w.job
    def getLen(w):
        # Transform each word into its length.
        return len(w)

    # A reducer combines the above answers; the result from the reducer
    # will be the stored cumulative length divided by the number of words,
    # or in other words the mean length.
    @w.reduce(emit = lambda store: store.len / store.count)
    def findAverage(store, inputs, others):
        # Initialize the data store if it hasn't been.
        if not hasattr(store, 'init'):
            store.init = True
            store.count = 0
            store.len = 0.0

        # Add each word's length to ours, and increment count.
        for i in inputs:
            store.len += i
            store.count += 1
        # Merge with another reducer, adding their count and length to
        # our own.
        for o in others:
            store.len += o.len
            store.count += o.count

    @w.result
    def printer(result):
        # Print out the emit'd average.
        print(result)

The main class interacted with in inline code is job_stream.inline.Work, which provides methods for decorating Python functions and remembers how the code was organized so that things are executed in the right order. See job_stream.inline.Work’s documentation for more information.

Hierarchy of job_stream.inline Pipes

Sometimes, code follows a common skeleton with some parallel code in the middle. The job_stream.baked library is an example of this; job_stream.baked.sweep() returns a context manager that yields a job_stream.inline.Work object used to tell sweep what operations the user wishes to perform:

from job_stream.baked import sweep
import numpy as np

with sweep({ 'a': np.arange(10) }) as w:
    @w.job
    def square(id, trial, a):
        return { 'value': a*a + np.random.random() }

It is recommended that if a user class has functionality that can be added within a job_stream pipe, it should expose that functionality in an @classmethod as jobs_{name}:

from job_stream.inline import Args, Work

class MyClass:
    def classify(inputs):
        return inputs[0]


    @classmethod
    def jobs_classify(w):
        @w.job
        def _classify(classifier, inputs):
            return classifier.classify(inputs)

with Work([ Args(MyClass(), [i]) for i in range(10) ]) as w:
    MyClass.jobs_classify(w)

Members

class job_stream.inline.Args(*args, **kwargs)[source]

Bases: object

Automatically fans-out args and kwargs to the caller. For example, the below will print out “Got 1, 2, 3, 4”:

from job_stream.inline import Args, Work

with Work([ Args(1, 2, d=4, c=3) ]) as w:
    @w.job
    def handle(a, b, c, d):
        print("Got {}, {}, {}, {}".format(a, b, c, d))
class job_stream.inline.Multiple(workList)[source]

Bases: object

If some inline code wants to emit or recur multiple pieces of work, an instance of this class is the way to do it. For instance:

return Multiple([ 1, 2, 3 ])

class job_stream.inline.Work(initialWork=[], useMultiprocessing=True, **runKwargs)[source]

Bases: object

Represents a job_stream pipeline. Similar to traditional imperative coding practices, Work passes work in the same direction as the source file. In other words, the system starts with something like:

import job_stream.inline as inline
with inline.Work([1, 2, 3]) as w:
    # ...

This code will distribute the numbers 1, 2, and 3 into the system. Once in the system, any decorated methods will process the data in some way. The ordering of the decorated methods matters! For instance, running:

with inline.Work([1]) as w:
    @w.job
    def first(w):
        print("a: {}".format(w))
        return w+1
    @w.job
    def second(w):
        print("b: {}".format(w))

Will print a: 1 and b: 2. If the list to Work were longer, then more a: and b: lines would be printed, one pair for each input to the system.

Note

Multiprocessing

Python has the GIL in the default implementation, which typically limits pure-python code to a single thread. To get around this, the job_stream module by default uses multiprocessing for all jobs - that is, your python code will run in parallel on all cores, in different processes.

If this behavior is not desired, particularly if your application loads a lot of data in memory that you would rather not duplicate, passing useMultiprocessing=False to the Work object’s initializer will force all job_stream activity to happen within the original process:

Work([ 1, 2 ], useMultiprocessing=False)

This may also be done to individual jobs / frames / reducers:

@w.job(useMultiprocessing=False)
def doSomething(work):
    pass

May be passed a list or other iterable of initial work, as well as any of the following flags:

Parameters:
  • initialWork (list) – The token(s) or bits of work that will be processed. Each will be processed independently of the others; results might be joined late in the pipeline depending on its configuration.
  • useMultiprocessing (bool) – If True (the default), then all job_stream work will be handled in child processes spawned via Python’s multiprocessing module. This is used to get around the limitations of the GIL, but if your application’s setup (not the jobs themselves) uses a lot of memory and your computation is handled by either non-GIL protected code or in another process anyway, it is appropriate to turn this off.
  • checkpointFile (str) – Obsolete; use the ``job_stream`` binary. If specified, job_stream will run in checkpoint mode, periodically dumping progress to this file. If this file exists when run() is called, then simulation will be resumed from the state stored in this file.
  • checkpointInterval (float) – Obsolete; use the ``job_stream`` binary. Time, in seconds, between the completion of one checkpoint and the beginning of the next.
  • checkpointSyncInterval (float) – Debug / test usage only. Time, in seconds, to wait for sync when taking a checkpoint.
_assertFuncArgs(func, i, isMin=False)[source]

Ensures that the function func takes exactly i args. Makes error messages more friendly.

_assertNoResult()[source]

Ensures that @w.result hasn’t been used yet, otherwise raises an error.

_listCall(boundMethod, results)[source]

Calls boundMethod (which is something like Job.emit) for each member of results, if results is Multiple.

Returns True if the boundMethod was called at all (non-None result), False otherwise.

finish(func=None)[source]

Decorates a method that is called only once on the primary host, after all jobs have finished.

The decorated method takes a single argument, which is a list of all results returned from the jobs within the stream.

Anything returned within the decorated method will be emitted from the job_stream, meaning it will be returned by run(). User code should not typically rely on this; it is used mainly for e.g. IPython notebooks.

Cannot be used if Work.result is used. Work.result is preferred, if your job stream’s output can be processed one piece at a time (more efficient in that results can be discarded after they are processed).

frame(func=None, **kwargs)[source]

Decorates a function that begins a frame. A frame is started with a single piece of work, and then recurses other pieces of work into itself until some stopping condition.

Frames (and their cousins Reducers) are the most complicated feature in job_stream. A frame is appropriate if:

  • A while loop would be used in non-parallelizable code
  • Individual pieces of work need fan-out and fan-in

Frames have three parts - an “all outstanding work is finished” handler, an aggregator, and everything in between, which is used to process recurred work.

For example, suppose we want to sum all digits between 1 and our work, and report the result. The best way to design this type of system is with a Frame, implemented in inline through frame() and frameEnd(). The easiest way to think of these is as the two ends of a while loop - frame() is evaluated as a termination condition, and is also evaluated before anything happens. frameEnd() exists to aggregate logic from within the while loop into something that frame can look at.

from job_stream.inline import Work, Multiple
w = Work([ 4, 5, 8 ])

@w.frame
def sumThrough(store, first):
    # Remember, this is called like the header of a while statement: once at
    # the beginning, and each time our recurred work finishes.  Anything
    # returned from this function will keep the loop running.
    if not hasattr(store, 'value'):
        # Store hasn't been initialized yet, meaning that this is the first
        # evaluation
        store.first = first
        store.value = 0
        return first

    # If we reach here, we're done.  By not returning anything, job_stream knows
    # to exit the loop (finish the reduction).  The default behavior of frame is
    # to emit the store object itself, which is fine.

# Anything between an @frame decorated function and @frameEnd will be executed
# for anything returned by the @frame or @frameEnd functions.  We could have
# returned multiple from @frame as well, but this is a little more fun

@w.job
def countTo(w):
    # Generate and emit as work all integers ranging from 1 to w, inclusive
    return Multiple(range(1, w + 1))

@w.frameEnd
def handleNext(store, next):
    # next is any work that made it through the stream between @frame and
    # @frameEnd.  In our case, it is one of the integers between 1 and our
    # initial work.
    store.value += next

@w.result
def printMatchup(w):
    print("{}: {}".format(w.first, w.value))

w.run()

Running the above code will print:

$ python script.py
4: 10
8: 36
5: 15

Note that the original work is out of order, but the sums line up. This is because a frame starts a new reduction for each individual piece of work entering the @frame decorated function.

Parameters:
  • store – The storage object used to remember results from the frame.
  • first – The first work that began this frame.

Any results returned are recursed into the frame.

Decorator kwargs:

store - The constructor for a storage object. Defaults to inline.Object emit - A function taking a storage object and returning the work that

should be forwarded to the next member of the stream. Defaults to emitting the store itself.
frameEnd(func=None)[source]

Ends a frame. The decorated function should accept two arguments:

Parameters:
  • store – The storage object used to remember results from the frame (same as in the frame() decorated method).
  • next – The next result object that ran through the frame.

Any results returned are recursed into the frame.

init(func=None)[source]

Decorates a method that is called only once on the primary host before any jobs run. The method is never run when continuing from a checkpoint.

Useful for e.g. creating folders, deleting temporary files from previous runs, etc.

job(func=None, **kwargs)[source]

Decorates a job. The decorated function must take one argument, which is the work coming into the job. Anything returned is passed along to the next member of the stream.

You may also call job() as w.job(useMultiprocessing=False) to disable multiprocessing for this job.

Warning

I/O Safety

It is not safe to write non-unique external i/o (such as a file) within a job. This is because jobs have no parallelism guards - that is, two jobs executing concurrently might open and append to a file at the same time. On some filesystems, this results in e.g. two lines of a csv being combined into a single, invalid line. To work around this, see result().

reduce(func=None, **kwargs)[source]

Decorates a reducer. Reducers are distributed (function does not run on only one machine per reduction). Typically this is used to gather and aggregate results. Any set of work coming into a reducer will be emitted as a single piece of work.

The decorated function should take three arguments: store - The storage object on this machine. works - A list of work coming into the reducer others - A list of storage objects being joined into this reducer from

other sources.

Any return value is recurred as work into the reducer.

Decorator kwargs: store - Constructor for the storage element. Defaults to inline.Object emit - Function that takes the store and returns work to be forwarded

to the next element in the stream.
result(func=None, **kwargs)[source]

Decorates a result handler, which is called only on the primary host exactly once for each piece of work that exits the system. This is the safest place for I/O!

The handler receives a single argument, which is the piece of work exiting the system.

If no result handlers are decorated, then inline will use a collector that gathers the results and returns them in a list from the run() method (run() will return None on machines that are not the primary host).

run()[source]

Runs the Work, executing all decorated methods in the order they were specified. A few kwargs passed to Work() will be forwarded to job_stream.common.run().

class job_stream.inline._ForceCheckpointAndGoJob((object)arg1) → None :[source]

Bases: job_stream.common.Job

Used for tests; execution continues after checkpoint.

C++ signature :
void __init__(_object*)

__init__( (object)arg1) -> None :

C++ signature :
void __init__(_object*)
_forceCheckpoint((Job)arg1, (bool)arg2) → None :

Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.

C++ signature :
void _forceCheckpoint(PyJob {lvalue},bool)
emit((Job)arg1, (object)arg2) → None :

Emit to only target

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object)
emit( (Job)arg1, (object)arg2, (str)arg3) -> None :

Emit to specific target out of list

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
handleWork(work)[source]

Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.

postSetup()

Called when self.config is set and the Job is fully ready for work, but before any work is accepted.

class job_stream.inline._ForceCheckpointJob((object)arg1) → None :[source]

Bases: job_stream.common.Job

Used for tests; raises exception after checkpoint.

C++ signature :
void __init__(_object*)

__init__( (object)arg1) -> None :

C++ signature :
void __init__(_object*)
_forceCheckpoint((Job)arg1, (bool)arg2) → None :

Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.

C++ signature :
void _forceCheckpoint(PyJob {lvalue},bool)
emit((Job)arg1, (object)arg2) → None :

Emit to only target

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object)
emit( (Job)arg1, (object)arg2, (str)arg3) -> None :

Emit to specific target out of list

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
handleWork(work)[source]

Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.

postSetup()

Called when self.config is set and the Job is fully ready for work, but before any work is accepted.

class job_stream.inline._UnwrapperJob((object)arg1) → None :[source]

Bases: job_stream.common.Job

Job to make processing after a reducer possible and sensical for inline.

C++ signature :
void __init__(_object*)

__init__( (object)arg1) -> None :

C++ signature :
void __init__(_object*)
_forceCheckpoint((Job)arg1, (bool)arg2) → None :

Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.

C++ signature :
void _forceCheckpoint(PyJob {lvalue},bool)
emit((Job)arg1, (object)arg2) → None :

Emit to only target

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object)
emit( (Job)arg1, (object)arg2, (str)arg3) -> None :

Emit to specific target out of list

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
handleWork(work)[source]

Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.

postSetup()

Called when self.config is set and the Job is fully ready for work, but before any work is accepted.

job_stream.inline.map(func, *sequence, **kwargs)[source]

Returns [ func(*a) for a in sequence ] in a parallel manner. Identical to the builtin map(), except parallelized.

kwargs - Passed to job_stream.inline.Work().

This implementation differs from job_stream.map() so that it can demonstrate the functionality of the inline module. The behavior is identical.