job_stream.common

The core job_stream adapter in Python. This file is responsible for providing adaptations for all of the needed C++ code.

Warning

New users should look at using the job_stream.inline module instead of the classes in this file.

Example usage:

from job_stream import common
class AddOne(common.Job):
    def handleWork(self, work):
        self.emit(work + 1)

common.work.extend([ 8, 9 ])
common.run({
        'jobs': [
            { 'type': AddOne }
        ]
})
# 9 and 10 will be printed

Or:

r = job_stream.map([8, 9], lambda w: w+1) print(r) # [ 9, 10 ] will be printed

Members

class job_stream.common.Frame((object)arg1) → None :[source]

Bases: _job_stream.Frame

Base class for a Frame. A Frame is a special type of reducer that performs some special behavior based on the work that begins the reduction. Typically this is used for checking termination conditions in a recurring algorithm:

import job_stream class AddAb(job_stream.Job):

def handleWork(self, w):
self.emit(w + ‘Ab’)
class MakeAtLeastTenLetters(job_stream.Frame):
def handleFirst(self, store, w):
store.word = w
def handleNext(self, store, w):
store.word = w
def handleDone(self, store):
if len(store.word) < 10:
self.recur(store.word)
else:
self.emit(store.word)

job_stream.common.work = [ ‘abracadabra’, ‘Hey’, ‘Apples’ ] # This’ll print out the unmodified abracadabra, add two Ab’s to Apples, and # four Ab’s to Hey job_stream.common.run({

‘jobs’: [ {
‘frame’: MakeAtLeastTenLetters, ‘jobs’: [ AddAb ]

} ]

})

C++ signature :
void __init__(_object*)

__init__( (object)arg1) -> None :

C++ signature :
void __init__(_object*)
_forceCheckpoint((Frame)arg1, (bool)arg2) → None :

Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.

C++ signature :
void _forceCheckpoint(PyFrame {lvalue},bool)
emit((Frame)arg1, (object)arg2) → None :

Emit to only target (outside of frame)

C++ signature :
void emit(PyFrame {lvalue},boost::python::api::object)
handleDone(store)[source]

Called when the reduction is finished. The reduction will be marked as unfinished if a recur() happens.

handleFirst(store, work)[source]

Called for the first work, which starts a reduction. Store is an empty Object() to which this method may assign attributes.

handleNext(store, work)[source]

Called when finished work arrives at the Frame.

postSetup()[source]

Called when self.config is set and the Frame is fully ready for work, but before any work is accepted.

recur((Frame)arg1, (object)arg2) → None :

Recur to only target

C++ signature :
void recur(PyFrame {lvalue},boost::python::api::object)
recur( (Frame)arg1, (object)arg2, (str)arg3) -> None :

Recur to specific target

C++ signature :
void recur(PyFrame {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
class job_stream.common.Job((object)arg1) → None :[source]

Bases: _job_stream.Job

Base class for a standard job (starts with some work, and emits zero or more times). Handles registration of job class with the job_stream system.

Example: import job_stream class MyJob(job_stream.Job):

‘’‘Adds 8 to an integer or floating point number’‘’ def handleWork(self, work):

self.emit(work + 8)

job_stream.common.work = [ 1, 2, 3.0 ] # This will print 9, 10, and 11.0 job_stream.run({ ‘jobs’: [ MyJob ] })

C++ signature :
void __init__(_object*)

__init__( (object)arg1) -> None :

C++ signature :
void __init__(_object*)
_forceCheckpoint((Job)arg1, (bool)arg2) → None :

Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.

C++ signature :
void _forceCheckpoint(PyJob {lvalue},bool)
emit((Job)arg1, (object)arg2) → None :

Emit to only target

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object)
emit( (Job)arg1, (object)arg2, (str)arg3) -> None :

Emit to specific target out of list

C++ signature :
void emit(PyJob {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
handleWork(work)[source]

Handle incoming work, maybe call self.emit() to generate more work for jobs further down the pipe.

postSetup()[source]

Called when self.config is set and the Job is fully ready for work, but before any work is accepted.

class job_stream.common.Object(**kwargs)[source]

Bases: object

A generic object with no attributes of its own.

class job_stream.common.Reducer((object)arg1) → None :[source]

Bases: _job_stream.Reducer

Base class for a Reducer. A Reducer combines work emitted from the last stage of a reduction, eventually emitting its own result to the next link in the processing chain. A reduction starts when a piece of work enters a module guarded by a Reducer.

Example: import job_stream class AddLetterA(job_stream.Job):

def handleWork(self, w):
self.emit(w + ‘A’)
class CountLetters(job_stream.Reducer):

‘’‘Counts the number of letters passed to it’‘’ def handleInit(self, store):

store.count = 0
def handleAdd(self, store, work):
store.count += len(work)
def handleJoin(self, store, other):
store.count += other.count
def handleDone(self, store):
self.emit(store.count)

job_stream.common.work = [ ‘Hello’, ‘World’ ] # Here the reduction starts at the global scope, so it will print out 12, # which is the original 10 letters plus the two new letter A’s. print(“First:”) job_stream.run({

‘reducer’: CountLetters, ‘jobs’: [ AddLetterA ]

}) # This config has the reduction starting as part of the first job rather # than the global scope, so this will print 6 twice (once for each work that # we initially passed in). print(“Second:”) job_stream.run({

‘jobs’: [
{
‘reducer’: CountLetters, ‘jobs’: [ AddLetterA ]

}

]

})

C++ signature :
void __init__(_object*)

__init__( (object)arg1) -> None :

C++ signature :
void __init__(_object*)
_forceCheckpoint((Reducer)arg1, (bool)arg2) → None :

Force a checkpoint after this work. If True is passed, cause the program to crash afterwards.

C++ signature :
void _forceCheckpoint(PyReducer {lvalue},bool)
emit((Reducer)arg1, (object)arg2) → None :

Emit to only target (outside of reducer)

C++ signature :
void emit(PyReducer {lvalue},boost::python::api::object)
handleAdd(store, work)[source]

Called when new work arrives at the Reducer.

handleDone(store)[source]

Called when the reduction is finished. The reduction will be marked as unfinished if a recur() happens.

handleInit(store)[source]

Called when a reduction is started. Store is a python object that should be modified to remember information between invocations.

handleJoin(store, other)[source]

Called to merge two stores from the same Reducer.

postSetup()[source]

Called when self.config is set and the Job is fully ready for work, but before any work is accepted.

recur((Reducer)arg1, (object)arg2) → None :

Recur to only target

C++ signature :
void recur(PyReducer {lvalue},boost::python::api::object)
recur( (Reducer)arg1, (object)arg2, (str)arg3) -> None :

Recur to specific target

C++ signature :
void recur(PyReducer {lvalue},boost::python::api::object,std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
exception job_stream.common._StackAlreadyPrintedError[source]

Bases: Exception

An exception to be used if the stack trace has already been printed, and an exception needs to be raised just to communicate to job_stream to abort.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class job_stream.common._Work[source]

Bases: list

List of initial work sent into job_stream. If left empty, work comes from stdin.

append(object) → None -- append object to end
clear() → None -- remove all items from L
copy() → list -- a shallow copy of L
count(value) → integer -- return number of occurrences of value
extend(iterable) → None -- extend list by appending elements from the iterable
index(value[, start[, stop]]) → integer -- return first index of value.

Raises ValueError if the value is not present.

insert()

L.insert(index, object) – insert object before index

pop([index]) → item -- remove and return item at index (default last).

Raises IndexError if list is empty or index is out of range.

remove(value) → None -- remove first occurrence of value.

Raises ValueError if the value is not present.

reverse()

L.reverse() – reverse IN PLACE

sort(key=None, reverse=False) → None -- stable sort *IN PLACE*
job_stream.common._cpuThreadTime()[source]

Returns the current thread’s CPU time in seconds. Used for tests of profiling, mostly.

job_stream.common._decode(s)[source]

Decodes an object with cPickle

job_stream.common._encode(o)[source]

Encodes an object with cPickle

job_stream.common._initMultiprocessingPool()[source]

The multiprocessing pool is initialized lazily by default, to avoid overhead if no jobs are using multiprocessing

job_stream.common.getCpuCount()[source]

Retunrs the number of CPUs in the cluster. Must be called within a job_stream, or will raise an error.

job_stream.common.getHostCpuCount()[source]

Returns the number of CPUs on this host. Must be called within a job_stream, or will raise an error.

job_stream.common.getRank()[source]

Returns the rank (integer index) of this processor. Typically, this value is checked against 0 for work that should only happen once, e.g. init code.

job_stream.common.invoke(progAndArgs, transientErrors=[], maxRetries=20)[source]

Since it can be difficult to launch some programs from an MPI distributed application, job_stream provides invoke functionality to safely launch an external program (launching an application such as Xyce, for instance, can cause problems if the environment variables are not doctored appropriately).

progAndArgs: list, [ ‘path/to/file’, *args ]

transientErrors: list of strings, if any of these strings are found in the

stderr of the program, then any non-zero return code is considered a transient error and the application will be re-launched up to maxRetries times.

Note that “No child processes” is automatically handled as transient.

maxRetries: The maximum number of times to run the application if there are
transient errors. Only the final (successful) results are returned.

Returns: (contents of stdout, contents of stderr)

job_stream.common.map(func, *sequence, **kwargs)[source]

Returns [ func(*a) for a in sequence ] in a parallel manner. Identical to the builtin map(), except parallelized.

kwargs - Passed to job_stream.run().

job_stream.common.run(configDictOrPath, **kwargs)[source]

Runs the given YAML file or config dictionary.

Acceptable kwargs:

checkpointFile - (string) The file to use for checkpoint / restore checkpointInterval - (float) The time between the completion of one

checkpoint and the starting of the next, in seconds.
checkpointSyncInterval - (float) The time between all processors
thinking they’re ready to checkpoint and the actual checkpoint.
handleResult - (callable) The default is to print out repr(result). If
specified, this function will be called instead with the output work as an argument. Note that this goes outside of checkpointing! If you are saving work into an array, for example, and want to be checkpoint-safe, this method MUST save what it needs to file.