streamparse

Defeat the Python GIL with Apache Storm.

Andrew Montalenti, CTO

About Me

  • CTO/co-founder of Parse.ly
  • Hacking in Python for over a decade
  • Fully distributed team

@amontalenti on Twitter:

http://twitter.com/amontalenti

Python GIL

Python’s GIL does not allow true multi-thread parallelism:

_images/python_gil_new.png

And on multi-core, it even leads to lock contention:

_images/python_gil.png

@dabeaz discussed this in a Friday talk on concurrency.

Queues and workers

_images/queues_and_workers.png

Standard way to solve GIL woes.

Queues: ZeroMQ => Redis => RabbitMQ

Workers: Cron Jobs => RQ => Celery

Parse.ly Architecture, 2012

_images/tech_stack.png

It started to get messy

_images/monitors.jpg

As Hettinger Says...

“There must be a better way...”

What is this Storm thing?

We read:

“Storm is a distributed real-time computation system.”

Dramatically simplifies your workers and queues.

“Great,” we thought. “But, what about Python support?”

That’s what streamparse is about.

Our Storm Use Case

What is Parse.ly?

Web content analytics for digital storytellers.

Some of our customers:

_images/parsely_customers.png

Elegant data dashboards

Informing thousands of editors and writers every day:

_images/glimpse.png

Powerful data APIs

Powering billions of site visits every month:

_images/newyorker_related.png

Too many datas!

_images/sparklines_multiple.png

“Python Can’t Do This”

“Free lunch is over.”

“It can’t scale.”

“It’s a toy language.”

“Shoulda used Scala.”

Python Can’t Scale?

Eat that, haters!

_images/cpu_cores.png

Thanks to Storm

_images/storm_applied.png

streamparse is Pythonic Storm

_images/streamparse_logo.png

streamparse lets you parse real-time streams of data.

It smoothly integrates Python code with Apache Storm.

Easy quickstart, good CLI/tooling, production tested.

Good for: Analytics, Logs, Sensors, Low-Latency Stuff.

Agenda

  • Storm topology concepts
  • Storm internals
  • How does Python work with Storm?
  • streamparse overview
  • pykafka preview

Slides on Twitter; follow @amontalenti.

Storm Topology Concepts

Storm Abstractions

Storm provides abstractions for data processing:

  • Tuple
  • Spout
  • Bolt
  • Topology

Wired Topology

_images/topology.png

WARNING

Using Python pseudocode and coroutines!

Tuple

A single data record that flows through your cluster.

# tuple spec: ["word"]
word = ("dog",)
# tuple spec: ["word", "count"]
word_count = ("dog", 4)

Spout

A component that emits raw data into cluster.

class Spout(object):
    def next_tuple():
        """Called repeatedly to emit tuples."""

@coroutine
def spout_coroutine(spout, target):
    """Get tuple from spout and send it to target."""
    while True:
        tup = spout.next_tuple()
        if tup is None:
            time.sleep(10)
            continue
        if target is not None:
            target.send(tup)

Bolt

A component that implements one processing stage.

class Bolt(object):
    def process(tup):
        """Called repeatedly to process tuples."""

@coroutine
def bolt_coroutine(bolt, target):
    """Get tuple from input, process it in Bolt.
       Then send it to next bolt target, if it exists."""
    while True:
        tup = (yield)
        if tup is None:
            time.sleep(10)
            continue
        to_emit = bolt.process(tup)
        if target is not None:
            target.send(to_emit)

Topology

Directed Acyclic Graph (DAG) describing it all.

# lay out topology
spout = WordSpout
bolts = [WordCountBolt, DebugPrintBolt]

# wire topology
topology = wire(spout=spout, bolts=bolts)

# start the topology
next(topology)

Storm Internals

Tuple Tree

_images/wordcount.png

Streams, Grouping and Parallelism

X word-spout word-count-bolt
input None word-spout
output word-count-bolt None
tuple ("dog",) ("dog", 4)
stream ["word"] ["word", "count"]
grouping ["word"] ":shuffle"
parallelism 2 8

Nimbus and Storm UI

_images/storm_ui.png

Workers and Zookeeper

_images/storm_cluster.png

Empty Slots

_images/storm_slots_empty.png

Filled Slots and Rebalancing

_images/storm_slots_filled.png

BTW, Buy This Book!

Source of these diagrams.

Storm Applied, by Manning Press.

Reviewed in Storm, The Big Reference.

_images/storm_applied.png

Network Transfer

_images/storm_transfer.png

So, Storm is Sorta Amazing!

Storm...

  • will guarantee processing via tuple trees
  • does tuneable parallelism per component
  • implements a high availability model
  • allocates Python process slots on physical nodes
  • helps us rebalance computation across cluster
  • handles network messaging automatically

And, it beats the GIL!

Let’s Do This!

_images/cpu_cores.png

Getting Python on Storm

Multi-Lang Protocol (1)

Storm supports Python through the multi-lang protocol.

  • JSON protocol
  • Works via shell-based components
  • Communicate over STDIN and STDOUT

Clean, UNIX-y.

Can use CPython, PyPy; no need for Jython or Py4J.

Kinda quirky, but also relatively simple to implement.

Multi-Lang Protocol (2)

Each component of a “Python” Storm topology is either:

  • ShellSpout
  • ShellBolt

Java implementations speak to Python via light JSON.

There’s one sub-process per Storm task.

If p = 8, then 8 Python processes are spawned.

Multi-Lang Protocol (3)

INIT: JVM    => Python   >JSON
XFER: JVM    => JVM      >Kryo
DATA: JVM    => Python   >JSON
EMIT: Python => JVM      >JSON
XFER: JVM    => JVM      >Kryo
 ACK: Python => JVM      >JSON
BEAT: JVM    => Python   >JSON
SYNC: Python => JVM      >JSON

storm.py issues

Storm bundles “storm.py” (a multi-lang implementation).

But, it’s not Pythonic.

We’ll fix that, we thought!

Storm as Infrastructure

Thought: Storm should be like Cassandra/Elasticsearch.

“Written in Java, but Pythonic nonetheless.”

Need: Python as a first-class citizen.

Must also fix “Javanonic” bits (e.g. packaging).

streamparse overview

Enter streamparse

Initial release Apr 2014; one year of active development.

600+ stars on Github, was a trending repo in May 2014.

90+ mailing list members and 5 new committers.

3 Parse.ly engineers maintaining it.

Funding from DARPA. (Yes, really!)

streamparse CLI

sparse provides a CLI front-end to streamparse, a framework for creating Python projects for running, debugging, and submitting Storm topologies for data processing.

After installing the lein (only dependency), you can run:

pip install streamparse

This will offer a command-line tool, sparse. Use:

sparse quickstart

Running and debugging

You can then run the local Storm topology using:

$ sparse run
Running wordcount topology...
Options: {:spec "topologies/wordcount.clj", ...}
#<StormTopology StormTopology(spouts:{word-spout=...
storm.daemon.nimbus - Starting Nimbus with conf {...
storm.daemon.supervisor - Starting supervisor with id 4960ac74...
storm.daemon.nimbus - Received topology submission with conf {...
... lots of output as topology runs...

See a live demo on YouTube.

Submitting to remote cluster

Single command:

$ sparse submit

Does all the following magic:

  • Makes virtualenvs across cluster
  • Builds a JAR out of your source code
  • Opens reverse tunnel to Nimbus
  • Constructs an in-memory Topology spec
  • Uploads JAR to Nimbus

streamparse supplants storm.py

_images/streamparse_comp.png

Let’s Make a Topology!

Word Stream Spout (Storm DSL)

{"word-spout" (python-spout-spec options
      "spouts.words.WordSpout" ; class (spout)
      ["word"]                 ; stream (fields)
    )
}

Word Stream Spout in Python

import itertools

from streamparse.spout import Spout

class Words(Spout):

    def initialize(self, conf, ctx):
        self.words = itertools.cycle(['dog', 'cat',
                                      'zebra', 'elephant'])

    def next_tuple(self):
        word = next(self.words)
        self.emit([word])

Emits one-word tuples from endless generator.

Word Count Bolt (Storm DSL)

{"word-count-bolt" (python-bolt-spec options
        {"word-spout" ["word"]}     ; input (grouping)
        "bolts.wordcount.WordCount" ; class (bolt)
        ["word" "count"]            ; stream (fields)
        :p 2                        ; parallelism
    )
}

Word Count Bolt in Python

from collections import Counter

from streamparse.bolt import Bolt

class WordCount(Bolt):

    def initialize(self, conf, ctx):
        self.counts = Counter()

    def process(self, tup):
        word = tup.values[0]
        self.counts[word] += 1
        self.log('%s: %d' % (word, self.counts[word]))

Keeps word counts in-memory (assumes grouping).

BatchingBolt for Performance

from streamparse.bolt import BatchingBolt

class WordCount(BatchingBolt):

    secs_between_batches = 5

    def group_key(self, tup):
        # collect batches of words
        word = tup.values[0]
        return word

    def process_batch(self, key, tups):
        # emit the count of words we had per 5s batch
        self.emit([key, len(tups)])

Implements 5-second micro-batches.

Bolts for Real-Time ETL

_images/storm_data.png

streamparse config.json

{
    "envs": {
        "0.8": {
            "user": "ubuntu",
            "nimbus": "storm-head.ec2-ubuntu.com",
            "workers": ["storm1.ec2-ubuntu.com",
                        "storm2.ec2-ubuntu.com"],
            "log_path": "/var/log/ubuntu/storm",
            "virtualenv_root": "/data/virtualenvs"
        },
        "vagrant": {
            "user": "ubuntu",
            "nimbus": "vagrant.local",
            "workers": ["vagrant.local"],
            "log_path": "/home/ubuntu/storm/logs",
            "virtualenv_root": "/home/ubuntu/virtualenvs"
        }
    }
}

sparse options

$ sparse help

Usage:
        sparse quickstart <project_name>
        sparse run [-o <option>]... [-p <par>] [-t <time>] [-dv]
        sparse submit [-o <option>]... [-p <par>] [-e <env>] [-dvf]
        sparse list [-e <env>] [-v]
        sparse kill [-e <env>] [-v]
        sparse tail [-e <env>] [--pattern <regex>]
        sparse (-h | --help)
        sparse --version

pykafka preview

Apache Kafka

“Messaging rethought as a commit log.”

Distributed tail -f.

Perfect fit for Storm Spouts.

Able to keep up with Storm’s high-throughput processing.

Great for handling backpressure during traffic spikes.

pykafka

We have released pykafka.

NOT to be confused with kafka-python.

Upgraded internal Kafka 0.7 driver to 0.8.2:

  • SimpleConsumer and BalancedConsumer
  • Consumer Groups with Zookeeper
  • Pure Python protocol implementation
  • C protocol implementation in works (via librdkafka)

https://github.com/Parsely/pykafka

Questions?

I’m sprinting on a Python Storm Topology DSL.

Hacking on Monday and Tuesday. Join me!

streamparse: http://github.com/Parsely/streamparse

Parse.ly’s hiring: http://parse.ly/jobs

Find me on Twitter: http://twitter.com/amontalenti

That’s it!

Appendix

Storm and Spark Together

_images/streamparse_reference.png

Overall Architecture

_images/big_diagram.png

Multi-Lang Impl’s in Python

Plans to unify IPC implementations around pystorm.

Topology Wiring

def wire(spout, bolts=[]):
    """Wire the components together in a pipeline.
    Return the spout coroutine that kicks it off."""
    last, target = None, None
    for bolt in reversed(bolts):
        step = bolt_coroutine(bolt)
        if last is None:
            last = step
            continue
        else:
            step = bolt_coroutine(bolt, target=last)
            last = step
    return spout_coroutine(spout, target=last)

Streams, Grouping, Parallelism

(still pseudocode)

class WordCount(Topology):
    spouts = [
        WordSpout(
            name="word-spout",
            out=["word"],
            p=2)
    ]
    bolts = [
        WordCountBolt(
            name="word-count-bolt",
            from=WordSpout,
            group_on="word",
            out=["word", "count"],
            p=8)
    ]

Storm is “Javanonic”

Ironic term one of my engineers came up with for a project that feels very Java-like, and not very “Pythonic”.

Storm Java Quirks

  • Topology Java builder API (eek).
  • Projects built with Maven tasks (yuck).
  • Deployment needs a JAR of your code (ugh).
  • No simple local dev workflow built-in (boo).
  • Storm uses Thrift interfaces (shrug).

Multi-Lang Protocol

The multi-lang protocol has the full core:

  • ack
  • fail
  • emit
  • anchor
  • log
  • heartbeat
  • tuple tree

Kafka and Multi-consumer

_images/multiconsumer.png

Kafka Consumer Groups

_images/consumer_groups.png

streamparse projects

_images/streamparse_project.png

Table Of Contents

This Page