Real-time Streams & Logs
Andrew Montalenti, CTO
Keith Bourgoin, Backend Lead
Andrew Montalenti, CTO
Keith Bourgoin, Backend Lead
Our presentations and code:
This presentation's slides:
This presentation's notes:
Web content analytics for digital storytellers.
Note
Gives web content teams a clear understanding about what readers want and how to deliver it to them in the most effective way.
Answers questions for journalists and editors, like:
For product teams, our API enables dynamic content recommendations which can be implemented in minutes.
Average post has <48-hour shelf life.
Note
Top publishers write 1000's of posts per day.
Note
Queues: RabbitMQ => Redis => ZeroMQ
Workers: Cron Jobs => Celery
Note
Traditional queues (e.g. RabbitMQ / Redis):
(Hint: Kafka solves these problems.)
Note
Note
To add more features, we had to add more workers and queues!
Got harder and harder to develop on "the entire stack".
More code devoted to ops, rather than business logic.
Storm is a distributed real-time computation system.
Hadoop provides a set of general primitives for doing batch processing.
Storm provides a set of general primitives for doing real-time computation.
Perfect as a replacement for ad-hoc workers-and-queues systems.
Streaming Data Set, typically from Kafka.
ZeroMQ used for inter-process communication.
Bolts & Spouts; Storm's Topology is a DAG.
Nimbus & Workers manage execution.
Tuneable parallelism + built-in fault tolerance.
Note
Hadoop Parallel:
Durable Data Set, typically from S3.
HDFS used for inter-process communication.
Mappers & Reducers; Pig's JobFlow is a DAG.
JobTracker & TaskTracker manage execution.
Tuneable parallelism + built-in fault tolerance.
Note
| Concept | Description |
|---|---|
| Stream | Unbounded sequence of data tuples with named fields |
| Spout | A source of a Stream of tuples; typically reading from Kafka |
| Bolt | Computation steps that consume Streams and emits new Streams |
| Grouping | Way of partitioning data fed to a Bolt; for example: by field, shuffle |
| Topology | Directed Acyclic Graph (DAG) describing Spouts, Bolts, & Groupings |
Tuple tree, anchoring, and retries.
;; spout configuration
{"word-spout" (shell-spout-spec
;; Python Spout implementation:
;; - fetches words (e.g. from Kafka)
["python" "words.py"]
;; - emits (word,) tuples
["word"]
)
}
import itertools
from streamparse import storm
class WordSpout(storm.Spout):
def initialize(self, conf, ctx):
self.words = itertools.cycle(['dog', 'cat',
'zebra', 'elephant'])
def next_tuple(self):
word = next(self.words)
storm.emit([word])
WordSpout().run()
;; bolt configuration
{"count-bolt" (shell-bolt-spec
;; Bolt input: Spout and field grouping on word
{"word-spout" ["word"]}
;; Python Bolt implementation:
;; - maintains a Counter of word
;; - increments as new words arrive
["python" "wordcount.py"]
;; Emits latest word count for most recent word
["word" "count"]
;; parallelism = 2
:p 2
)
}
from collections import Counter
from streamparse import storm
class WordCounter(storm.Bolt):
def initialize(self, conf, ctx):
self.counts = Counter()
def process(self, tup):
word = tup.values[0]
self.counts[word] += 1
storm.emit([word, self.counts[word]])
storm.log('%s: %d' % (word, self.counts[word]))
WordCounter().run()
sparse provides a CLI front-end to streamparse, a framework for creating Python projects for running, debugging, and submitting Storm topologies for data processing. (still in development)
After installing the lein (only dependency), you can run:
pip install streamparse
This will offer a command-line tool, sparse. Use:
sparse quickstart
You can then run the local Storm topology using:
$ sparse run
Running wordcount topology...
Options: {:spec "topologies/wordcount.clj", ...}
#<StormTopology StormTopology(spouts:{word-spout=...
storm.daemon.nimbus - Starting Nimbus with conf {...
storm.daemon.supervisor - Starting supervisor with id 4960ac74...
storm.daemon.nimbus - Received topology submission with conf {...
... lots of output as topology runs...
Interested? Lightning talk!
A "log" could be any stream of structured data:
A series of timestamped facts about a given system.
Note
Note
Log-centric messaging system developed at LinkedIn.
Designed for throughput; efficient resource use.
- Persists to disk; in-memory for recent data
- Little to no overhead for new consumers
- Scalable to 10,000's of messages per second
As of 0.8, full replication of topic data.
Note
| Concept | Description |
|---|---|
| Cluster | An arrangement of Brokers & Zookeeper nodes |
| Broker | An individual node in the Cluster |
| Topic | A group of related messages (a stream) |
| Partition | Part of a topic, used for replication |
| Producer | Publishes messages to stream |
| Consumer Group | Group of related processes reading a topic |
| Offset | Point in a topic that the consumer has read to |
Note
Replication isn't perfect. Network partitions can cause problems.
No out-of-order acknowledgement:
- "Offset" is a marker of where consumer is in log; nothing more.
- On a restart, you know where to start reading, but not if individual messages before the stored offset was fully processed.
- In practice, not as much of a problem as it sounds.
Note
Topics are logs, not queues.
Consumers read into offsets of the log.
Logs are maintained for a configurable period of time.
Messages can be "replayed".
Consumers can share identical logs easily.
Note
Even if Kafka's availability and scalability story isn't interesting to you, the multi-consumer story should be.
Note
Traditional queues (e.g. RabbitMQ / Redis):
Kafka solves all of these problems.
Note
out of order acks are actually expensive
- random disk seek/writes aren't cheap!
more consumers = duplicated messages
Good fit for at-least-once processing.
No need for out-of-order acks.
Community work is ongoing for at-most-once processing.
Able to keep up with Storm's high-throughput processing.
Great for handling backpressure during traffic spikes.
Note
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
kafka = KafkaClient('localhost:9092')
consumer = SimpleConsumer(kafka, 'test_consumer', 'raw_data')
start = time.time()
for msg in consumer:
count += 1
if count % 1000 == 0:
dur = time.time() - start
print 'Reading at {:.2f} messages/sec'.format(dur/1000)
start = time.time()
import time
from kazoo.client import KazooClient
from samsa.cluster import Cluster
zk = KazooClient()
zk.start()
cluster = Cluster(zk)
queue = cluster.topics['raw_data'].subscribe('test_consumer')
start = time.time()
for msg in queue:
count += 1
if count % 1000 == 0:
dur = time.time() - start
print 'Reading at {:.2f} messages/sec'.format(dur/1000)
queue.commit_offsets() # commit to zk every 1k msgs
| Company | Logs | Workers |
|---|---|---|
| Kafka* | Samza | |
| Kafka | Storm* | |
| Kafka | Storm | |
| Spotify | Kafka | Storm |
| Wikipedia | Kafka | Storm |
| Outbrain | Kafka | Storm |
| LivePerson | Kafka | Storm |
| Netflix | Kafka | ??? |
Go forth and stream!
Parse.ly:
Andrew & Keith: