Defeat the Python GIL with Apache Storm.

Andrew Montalenti, CTO

About Me

  • CTO/co-founder of
  • Hacking in Python for over a decade
  • Fully distributed team

@amontalenti on Twitter:

Python GIL

Python’s GIL does not allow true multi-thread parallelism:


And on multi-core, it even leads to lock contention:


@dabeaz discussed this in a Friday talk on concurrency.

Queues and workers


Standard way to solve GIL woes.

Queues: ZeroMQ => Redis => RabbitMQ

Workers: Cron Jobs => RQ => Celery Architecture, 2012


It started to get messy


As Hettinger Says...

“There must be a better way...”

What is this Storm thing?

We read:

“Storm is a distributed real-time computation system.”

Dramatically simplifies your workers and queues.

“Great,” we thought. “But, what about Python support?”

That’s what streamparse is about.

Our Storm Use Case

What is

Web content analytics for digital storytellers.

Some of our customers:


Elegant data dashboards

Informing thousands of editors and writers every day:


Powerful data APIs

Powering billions of site visits every month:


Too many datas!


“Python Can’t Do This”

“Free lunch is over.”

“It can’t scale.”

“It’s a toy language.”

“Shoulda used Scala.”

Python Can’t Scale?

Eat that, haters!


Thanks to Storm


streamparse is Pythonic Storm


streamparse lets you parse real-time streams of data.

It smoothly integrates Python code with Apache Storm.

Easy quickstart, good CLI/tooling, production tested.

Good for: Analytics, Logs, Sensors, Low-Latency Stuff.


  • Storm topology concepts
  • Storm internals
  • How does Python work with Storm?
  • streamparse overview
  • pykafka preview

Slides on Twitter; follow @amontalenti.

Storm Topology Concepts

Storm Abstractions

Storm provides abstractions for data processing:

  • Tuple
  • Spout
  • Bolt
  • Topology

Wired Topology



Using Python pseudocode and coroutines!


A single data record that flows through your cluster.

# tuple spec: ["word"]
word = ("dog",)
# tuple spec: ["word", "count"]
word_count = ("dog", 4)


A component that emits raw data into cluster.

class Spout(object):
    def next_tuple():
        """Called repeatedly to emit tuples."""

def spout_coroutine(spout, target):
    """Get tuple from spout and send it to target."""
    while True:
        tup = spout.next_tuple()
        if tup is None:
        if target is not None:


A component that implements one processing stage.

class Bolt(object):
    def process(tup):
        """Called repeatedly to process tuples."""

def bolt_coroutine(bolt, target):
    """Get tuple from input, process it in Bolt.
       Then send it to next bolt target, if it exists."""
    while True:
        tup = (yield)
        if tup is None:
        to_emit = bolt.process(tup)
        if target is not None:


Directed Acyclic Graph (DAG) describing it all.

# lay out topology
spout = WordSpout
bolts = [WordCountBolt, DebugPrintBolt]

# wire topology
topology = wire(spout=spout, bolts=bolts)

# start the topology

Storm Internals

Tuple Tree


Streams, Grouping and Parallelism

X word-spout word-count-bolt
input None word-spout
output word-count-bolt None
tuple ("dog",) ("dog", 4)
stream ["word"] ["word", "count"]
grouping ["word"] ":shuffle"
parallelism 2 8

Nimbus and Storm UI


Workers and Zookeeper


Empty Slots


Filled Slots and Rebalancing


BTW, Buy This Book!

Source of these diagrams.

Storm Applied, by Manning Press.

Reviewed in Storm, The Big Reference.


Network Transfer


So, Storm is Sorta Amazing!


  • will guarantee processing via tuple trees
  • does tuneable parallelism per component
  • implements a high availability model
  • allocates Python process slots on physical nodes
  • helps us rebalance computation across cluster
  • handles network messaging automatically

And, it beats the GIL!

Let’s Do This!


Getting Python on Storm

Multi-Lang Protocol (1)

Storm supports Python through the multi-lang protocol.

  • JSON protocol
  • Works via shell-based components
  • Communicate over STDIN and STDOUT

Clean, UNIX-y.

Can use CPython, PyPy; no need for Jython or Py4J.

Kinda quirky, but also relatively simple to implement.

Multi-Lang Protocol (2)

Each component of a “Python” Storm topology is either:

  • ShellSpout
  • ShellBolt

Java implementations speak to Python via light JSON.

There’s one sub-process per Storm task.

If p = 8, then 8 Python processes are spawned.

Multi-Lang Protocol (3)

INIT: JVM    => Python   >JSON
XFER: JVM    => JVM      >Kryo
DATA: JVM    => Python   >JSON
EMIT: Python => JVM      >JSON
XFER: JVM    => JVM      >Kryo
 ACK: Python => JVM      >JSON
BEAT: JVM    => Python   >JSON
SYNC: Python => JVM      >JSON issues

Storm bundles “” (a multi-lang implementation).

But, it’s not Pythonic.

We’ll fix that, we thought!

Storm as Infrastructure

Thought: Storm should be like Cassandra/Elasticsearch.

“Written in Java, but Pythonic nonetheless.”

Need: Python as a first-class citizen.

Must also fix “Javanonic” bits (e.g. packaging).

streamparse overview

Enter streamparse

Initial release Apr 2014; one year of active development.

600+ stars on Github, was a trending repo in May 2014.

90+ mailing list members and 5 new committers.

3 engineers maintaining it.

Funding from DARPA. (Yes, really!)

streamparse CLI

sparse provides a CLI front-end to streamparse, a framework for creating Python projects for running, debugging, and submitting Storm topologies for data processing.

After installing the lein (only dependency), you can run:

pip install streamparse

This will offer a command-line tool, sparse. Use:

sparse quickstart

Running and debugging

You can then run the local Storm topology using:

$ sparse run
Running wordcount topology...
Options: {:spec "topologies/wordcount.clj", ...}
#<StormTopology StormTopology(spouts:{word-spout=...
storm.daemon.nimbus - Starting Nimbus with conf {...
storm.daemon.supervisor - Starting supervisor with id 4960ac74...
storm.daemon.nimbus - Received topology submission with conf {...
... lots of output as topology runs...

See a live demo on YouTube.

Submitting to remote cluster

Single command:

$ sparse submit

Does all the following magic:

  • Makes virtualenvs across cluster
  • Builds a JAR out of your source code
  • Opens reverse tunnel to Nimbus
  • Constructs an in-memory Topology spec
  • Uploads JAR to Nimbus

streamparse supplants


Let’s Make a Topology!

Word Stream Spout (Storm DSL)

{"word-spout" (python-spout-spec options
      "spouts.words.WordSpout" ; class (spout)
      ["word"]                 ; stream (fields)

Word Stream Spout in Python

import itertools

from streamparse.spout import Spout

class Words(Spout):

    def initialize(self, conf, ctx):
        self.words = itertools.cycle(['dog', 'cat',
                                      'zebra', 'elephant'])

    def next_tuple(self):
        word = next(self.words)

Emits one-word tuples from endless generator.

Word Count Bolt (Storm DSL)

{"word-count-bolt" (python-bolt-spec options
        {"word-spout" ["word"]}     ; input (grouping)
        "bolts.wordcount.WordCount" ; class (bolt)
        ["word" "count"]            ; stream (fields)
        :p 2                        ; parallelism

Word Count Bolt in Python

from collections import Counter

from streamparse.bolt import Bolt

class WordCount(Bolt):

    def initialize(self, conf, ctx):
        self.counts = Counter()

    def process(self, tup):
        word = tup.values[0]
        self.counts[word] += 1
        self.log('%s: %d' % (word, self.counts[word]))

Keeps word counts in-memory (assumes grouping).

BatchingBolt for Performance

from streamparse.bolt import BatchingBolt

class WordCount(BatchingBolt):

    secs_between_batches = 5

    def group_key(self, tup):
        # collect batches of words
        word = tup.values[0]
        return word

    def process_batch(self, key, tups):
        # emit the count of words we had per 5s batch
        self.emit([key, len(tups)])

Implements 5-second micro-batches.

Bolts for Real-Time ETL


streamparse config.json

    "envs": {
        "0.8": {
            "user": "ubuntu",
            "nimbus": "",
            "workers": ["",
            "log_path": "/var/log/ubuntu/storm",
            "virtualenv_root": "/data/virtualenvs"
        "vagrant": {
            "user": "ubuntu",
            "nimbus": "vagrant.local",
            "workers": ["vagrant.local"],
            "log_path": "/home/ubuntu/storm/logs",
            "virtualenv_root": "/home/ubuntu/virtualenvs"

sparse options

$ sparse help

        sparse quickstart <project_name>
        sparse run [-o <option>]... [-p <par>] [-t <time>] [-dv]
        sparse submit [-o <option>]... [-p <par>] [-e <env>] [-dvf]
        sparse list [-e <env>] [-v]
        sparse kill [-e <env>] [-v]
        sparse tail [-e <env>] [--pattern <regex>]
        sparse (-h | --help)
        sparse --version

pykafka preview

Apache Kafka

“Messaging rethought as a commit log.”

Distributed tail -f.

Perfect fit for Storm Spouts.

Able to keep up with Storm’s high-throughput processing.

Great for handling backpressure during traffic spikes.


We have released pykafka.

NOT to be confused with kafka-python.

Upgraded internal Kafka 0.7 driver to 0.8.2:

  • SimpleConsumer and BalancedConsumer
  • Consumer Groups with Zookeeper
  • Pure Python protocol implementation
  • C protocol implementation in works (via librdkafka)


I’m sprinting on a Python Storm Topology DSL.

Hacking on Monday and Tuesday. Join me!

streamparse:’s hiring:

Find me on Twitter:

That’s it!


Storm and Spark Together


Overall Architecture


Multi-Lang Impl’s in Python

Plans to unify IPC implementations around pystorm.

Topology Wiring

def wire(spout, bolts=[]):
    """Wire the components together in a pipeline.
    Return the spout coroutine that kicks it off."""
    last, target = None, None
    for bolt in reversed(bolts):
        step = bolt_coroutine(bolt)
        if last is None:
            last = step
            step = bolt_coroutine(bolt, target=last)
            last = step
    return spout_coroutine(spout, target=last)

Streams, Grouping, Parallelism

(still pseudocode)

class WordCount(Topology):
    spouts = [
    bolts = [
            out=["word", "count"],

Storm is “Javanonic”

Ironic term one of my engineers came up with for a project that feels very Java-like, and not very “Pythonic”.

Storm Java Quirks

  • Topology Java builder API (eek).
  • Projects built with Maven tasks (yuck).
  • Deployment needs a JAR of your code (ugh).
  • No simple local dev workflow built-in (boo).
  • Storm uses Thrift interfaces (shrug).

Multi-Lang Protocol

The multi-lang protocol has the full core:

  • ack
  • fail
  • emit
  • anchor
  • log
  • heartbeat
  • tuple tree

Kafka and Multi-consumer


Kafka Consumer Groups


streamparse projects


Table Of Contents

This Page