Part 2: Writing ETL Jobs¶
In Bonobo, an ETL job is a graph with some logic to execute it, like the file we created in the previous section.
You can learn more about the bonobo.Graph
data-structure and its properties in the
graphs guide.
Scenario¶
Let’s create a sample application, which goal will be to integrate some data in various systems.
We’ll use an open-data dataset, containing all the fablabs in the world.
We will normalize this data using a few different rules, then write it somewhere.
In this step, we’ll focus on getting this data normalized and output to the console. In the next steps, we’ll extend it to other targets, like files, and databases.
Setup¶
We’ll change the tutorial.py file created in the last step to handle this new scenario.
First, let’s remove all boilerplate code, so it looks like this:
import bonobo
def get_graph(**options):
graph = bonobo.Graph()
return graph
def get_services(**options):
return {}
if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(get_graph(**options), services=get_services(**options))
Your job now contains the logic for executing an empty graph, and we’ll complete this with our application logic.
Reading the source data¶
Let’s add a simple chain to our get_graph(…) function, so that it reads from the fablabs open-data api.
The source dataset we’ll use can be found on this site. It’s licensed under Public Domain, which makes it just perfect for our example.
Note
There is a bonobo.contrib.opendatasoft
module that makes reading from OpenDataSoft APIs easier, including
pagination and limits, but for our tutorial, we’ll avoid that and build it manually.
Let’s write our extractor:
import requests
FABLABS_API_URL = 'https://public-us.opendatasoft.com/api/records/1.0/search/?dataset=fablabs&rows=1000'
def extract_fablabs():
yield from requests.get(FABLABS_API_URL).json().get('records')
This extractor will get called once, query the API url, parse it as JSON, and yield the items from the “records” list, one by one.
Note
You’ll probably want to make it a bit more verbose in a real application, to handle all kind of errors that can happen here. What if the server is down? What if it returns a response which is not JSON? What if the data is not in the expected format?
For simplicity sake, we’ll ignore that here but that’s the kind of questions you should have in mind when writing pipelines.
To test our pipeline, let’s use a bonobo.Limit
and a bonobo.PrettyPrinter
, and change our
get_graph(…) function accordingly:
import bonobo
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
extract_fablabs,
bonobo.Limit(10),
bonobo.PrettyPrinter(),
)
return graph
Running this job should output a bit of data, along with some statistics.
First, let’s look at the statistics:
- extract_fablabs in=1 out=995 [done]
- Limit in=995 out=10 [done]
- PrettyPrinter in=10 out=10 [done]
It is important to understand that we extracted everything (995 rows), before droping 99% of the dataset.
This is OK for debugging, but not efficient.
Note
You should always try to limit the amount of data as early as possible, which often means not generating the data you won’t need in the first place. Here, we could have used the rows= query parameter in the API URL to not request the data we would anyway drop.
Normalize¶
Warning
This section is missing. Sorry, but stay tuned! It’ll be added soon.
Output¶
We used bonobo.PrettyPrinter
to output the data.
It’s a flexible transformation provided that helps you display the content of a stream, and you’ll probably use it a lot for various reasons.
Moving forward¶
You now know:
How to use a reader node.
How to use the console output.
How to limit the number of elements in a stream.
How to pass data from one node to another.
How to structure a graph using chains.
It’s now time to jump to Part 3: Working with Files.