Bonobo API

The Bonobo API, available directly under the bonobo package, contains all the tools you need to get started with bonobo.

The bonobo package

Bonobo data-processing toolkit main module.

bonobo.run(graph, strategy=None, plugins=None, services=None)[source]

Main entry point of bonobo. It takes a graph and creates all the necessary plumbery around to execute it.

The only necessary argument is a Graph instance, containing the logic you actually want to execute.

By default, this graph will be executed using the “threadpool” strategy: each graph node will be wrapped in a thread, and executed in a loop until there is no more input to this node.

You can provide plugins factory objects in the plugins list, this function will add the necessary plugins for interactive console execution and jupyter notebook execution if it detects correctly that it runs in this context.

You’ll probably want to provide a services dictionary mapping service names to service instances.

Parameters:
  • graph (Graph) – The Graph to execute.
  • strategy (str) – The bonobo.strategies.base.Strategy to use.
  • plugins (list) – The list of plugins to enhance execution.
  • services (dict) – The implementations of services this graph will use.
Return bonobo.execution.graph.GraphExecutionContext:
 
class bonobo.Bag(*args, _flags=None, _parent=None, **kwargs)[source]

Bases: object

Bags are simple datastructures that holds arguments and keyword arguments together, that may be applied to a callable.

Example:

>>> from bonobo import Bag
>>> def myfunc(foo, *, bar):
...     print(foo, bar)
...
>>> bag = Bag('foo', bar='baz')
>>> bag.apply(myfunc)
foo baz

A bag can inherit another bag, allowing to override only a few arguments without touching the parent.

Example:

>>> bag2 = Bag(bar='notbaz', _parent=bag)
>>> bag2.apply(myfunc)
foo notbaz
apply(func_or_iter, *args, **kwargs)[source]
args
default_flags = ()
extend(*args, **kwargs)[source]
flags
get()[source]

Get a 2 element tuple of this bag’s args and kwargs.

Returns:tuple
classmethod inherit(*args, **kwargs)[source]
kwargs
set_parent(parent)[source]
class bonobo.ErrorBag(*args, _flags=None, _parent=None, **kwargs)[source]

Bases: bonobo.structs.bags.Bag

class bonobo.Graph(*chain)[source]

Bases: object

Represents a directed graph of nodes.

add_chain(*nodes, _input=<Begin>, _output=None, _name=None)[source]

Add a chain in this graph.

add_node(c)[source]

Add a node without connections in this graph and returns its index.

copy()[source]
outputs_of(idx, create=False)[source]

Get a set of the outputs for a given node index.

topologically_sorted_indexes

Iterate in topological order, based on networkx’s topological_sort() function.

class bonobo.Token(name)[source]

Bases: object

Factory for signal oriented queue messages or other token types.

bonobo.create_strategy(name=None)[source]

Create a strategy, or just returns it if it’s already one.

Parameters:name
Returns:Strategy
bonobo.open_fs(fs_url=None, *args, **kwargs)[source]

Wraps fs.open_fs() function with a few candies.

Parameters:
  • fs_url (str) – A filesystem URL
  • parse_result (ParseResult) – A parsed filesystem URL.
  • writeable (bool) – True if the filesystem must be writeable.
  • create (bool) – True if the filesystem should be created if it does not exist.
  • cwd (str) – The current working directory (generally only relevant for OS filesystems).
  • default_protocol (str) – The protocol to use if one is not supplied in the FS URL (defaults to "osfs").
Returns:

FS object

class bonobo.CsvReader(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.IOFormatEnabled, bonobo.nodes.io.file.FileReader, bonobo.nodes.io.csv.CsvHandler

Reads a CSV and yield the values as dicts.

skip

The amount of lines to skip before it actually yield output.

csv_headers

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
read(fs, file, headers)[source]
skip

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
class bonobo.CsvWriter(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.IOFormatEnabled, bonobo.nodes.io.file.FileWriter, bonobo.nodes.io.csv.CsvHandler

write(fs, file, lineno, writer, headers, *args, **kwargs)[source]
writer

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
class bonobo.FileReader(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.Reader, bonobo.nodes.io.base.FileHandler

Component factory for file-like readers.

On its own, it can be used to read a file and yield one row per line, trimming the “eol” character at the end if present. Extending it is usually the right way to create more specific file readers (like json, csv, etc.)

mode

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
read(fs, file)[source]

Write a row on the next line of given file. Prefix is used for newlines.

class bonobo.FileWriter(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.Writer, bonobo.nodes.io.base.FileHandler

Component factory for file or file-like writers.

On its own, it can be used to write in a file one line per row that comes into this component. Extending it is usually the right way to create more specific file writers (like json, csv, etc.)

lineno

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
mode

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
write(fs, file, lineno, line)[source]

Write a row on the next line of opened file in context.

class bonobo.Filter(*args, **kwargs)[source]

Bases: bonobo.config.configurables.Configurable

Filter out hashes from the stream depending on the filter callable return value, when called with the current hash as parameter.

Can be used as a decorator on a filter callable.

filter

A callable used to filter lines.

If the callable returns a true-ish value, the input will be passed unmodified to the next items.

Otherwise, it’ll be burnt.

call(*args, **kwargs)[source]
filter

A Method is a special callable-valued option, that can be used in three different ways (but for same purpose).

  • Like a normal option, the value can be provided to the Configurable constructor.

    >>> from bonobo.config import Configurable, Method
    
    >>> class MethodExample(Configurable):
    ...     handler = Method()
    
    >>> example1 = MethodExample(handler=str.upper)
    
  • It can be used by a child class that overrides the Method with a normal method.

    >>> class ChildMethodExample(MethodExample):
    ...     def handler(self, s: str):
    ...         return s.upper()
    
    >>> example2 = ChildMethodExample()
    
  • Finally, it also enables the class to be used as a decorator, to generate a subclass providing the Method a value.

    >>> @MethodExample
    ... def OtherChildMethodExample(s):
    ...     return s.upper()
    
    >>> example3 = OtherChildMethodExample()
    
class bonobo.JsonReader(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.IOFormatEnabled, bonobo.nodes.io.file.FileReader, bonobo.nodes.io.json.JsonHandler

static loader(fp, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)

Deserialize fp (a .read()-supporting file-like object containing a JSON document) to a Python object.

object_hook is an optional function that will be called with the result of any object literal decode (a dict). The return value of object_hook will be used instead of the dict. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).

object_pairs_hook is an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value of object_pairs_hook will be used instead of the dict. This feature can be used to implement custom decoders that rely on the order that the key and value pairs are decoded (for example, collections.OrderedDict will remember the order of insertion). If object_hook is also defined, the object_pairs_hook takes priority.

To use a custom JSONDecoder subclass, specify it with the cls kwarg; otherwise JSONDecoder is used.

read(fs, file)[source]
class bonobo.JsonWriter(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.IOFormatEnabled, bonobo.nodes.io.file.FileWriter, bonobo.nodes.io.json.JsonHandler

envelope

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
write(fs, file, lineno, *args, **kwargs)[source]

Write a json row on the next line of file pointed by ctx.file.

Parameters:
  • ctx
  • row
class bonobo.Limit(*args, **kwargs)[source]

Bases: bonobo.config.configurables.Configurable

Creates a Limit() node, that will only let go through the first n rows (defined by the limit option), unmodified.

limit

Number of rows to let go through.

call(counter, *args, **kwargs)[source]
counter

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
limit

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
class bonobo.PickleReader(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.IOFormatEnabled, bonobo.nodes.io.file.FileReader, bonobo.nodes.io.pickle.PickleHandler

Reads a Python pickle object and yields the items in dicts.

mode

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
pickle_headers

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
read(fs, file, pickle_headers)[source]
class bonobo.PickleWriter(*args, **kwargs)[source]

Bases: bonobo.nodes.io.base.IOFormatEnabled, bonobo.nodes.io.file.FileWriter, bonobo.nodes.io.pickle.PickleHandler

mode

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
write(fs, file, lineno, item)[source]

Write a pickled item to the opened file.

class bonobo.PrettyPrinter(*args, **kwargs)[source]

Bases: bonobo.config.configurables.Configurable

call(*args, **kwargs)[source]
class bonobo.RateLimited(*args, **kwargs)[source]

Bases: bonobo.config.configurables.Configurable

amount

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
bucket

A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level.

It works like a yielding context manager, and is the recommended way to setup and teardown objects you’ll need in the context of one execution. It’s the way to overcome the stateless nature of transformations.

The yielded values will be passed as positional arguments to the next context processors (order do matter), and finally to the __call__ method of the transformation.

Warning: this may change for a similar but simpler implementation, don’t relly too much on it (yet).

Example:

>>> from bonobo.config import Configurable
>>> from bonobo.util.objects import ValueHolder
>>> class Counter(Configurable):
...     @ContextProcessor
...     def counter(self, context):
...         yield ValueHolder(0)
...
...     def __call__(self, counter, *args, **kwargs):
...         counter += 1
...         yield counter.get()
call(bucket, *args, **kwargs)[source]
handler

A Method is a special callable-valued option, that can be used in three different ways (but for same purpose).

  • Like a normal option, the value can be provided to the Configurable constructor.

    >>> from bonobo.config import Configurable, Method
    
    >>> class MethodExample(Configurable):
    ...     handler = Method()
    
    >>> example1 = MethodExample(handler=str.upper)
    
  • It can be used by a child class that overrides the Method with a normal method.

    >>> class ChildMethodExample(MethodExample):
    ...     def handler(self, s: str):
    ...         return s.upper()
    
    >>> example2 = ChildMethodExample()
    
  • Finally, it also enables the class to be used as a decorator, to generate a subclass providing the Method a value.

    >>> @MethodExample
    ... def OtherChildMethodExample(s):
    ...     return s.upper()
    
    >>> example3 = OtherChildMethodExample()
    
initial

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
period

An Option is a descriptor for Configurable’s parameters.

type

Option type allows to provide a callable used to cast, clean or validate the option value. If not provided, or None, the option’s value will be the exact value user provided.

(default: None)

required

If an option is required, an error will be raised if no value is provided (at runtime). If it is not, option will have the default value if user does not override it at runtime.

Ignored if a default is provided, meaning that the option cannot be required.

(default: True)

positional

If this is true, it’ll be possible to provide the option value as a positional argument. Otherwise, it must be provided as a keyword argument.

(default: False)

default

Default value for non-required options.

(default: None)

Example:

from bonobo.config import Configurable, Option

class Example(Configurable):
    title = Option(str, required=True, positional=True)
    keyword = Option(str, default='foo')

    def call(self, s):
        return self.title + ': ' + s + ' (' + self.keyword + ')'

example = Example('hello', keyword='bar')
bonobo.Tee(f)[source]
bonobo.arg0_to_kwargs(row)[source]

Transform items in a stream from “arg0” format (each call only has one positional argument, which is a dict-like object) to “kwargs” format (each call only has keyword arguments that represent a row).

Parameters:row
Returns:bonobo.Bag
bonobo.count(counter, *args, **kwargs)[source]
bonobo.identity(x)[source]
bonobo.kwargs_to_arg0(**row)[source]

Transform items in a stream from “kwargs” format (each call only has keyword arguments that represent a row) to “arg0” format (each call only has one positional argument, which is a dict-like object) .

Parameters:**row
Returns:bonobo.Bag
bonobo.noop(*args, **kwargs)[source]
bonobo.get_examples_path(*pathsegments)[source]
bonobo.open_examples_fs(*pathsegments)[source]