Understanding the Internal Message Buffers of Storm
Jun 21st, 2013
When you are optimizing the performance of your Storm topologies it helps to understand how Storm’s internal messagequeues are configured and put to use. In this short article I will explain and illustrate how Storm version 0.8/0.9implements the intra-worker
communication that happens within a worker process and its associated executor threads.
Internal messaging within Storm worker processes
tuple interchangeably in the following sections.
When I say “internal messaging” I mean the messaging that happens within a worker process in Storm, which is communicationthat is restricted to happen within the same Storm machine/node. For this communication Storm relies on various messagequeues backed
by LMAX Disruptor, which is a high performance inter-threadmessaging library.
Note that this communication within the threads of a worker process is different from Storm’s
inter-workercommunication, which normally happens across machines and thus over the network. For the latter Storm usesZeroMQ by default (in Storm 0.9 there is experimental support for
Netty asthe network messaging backend). That is, ZeroMQ/Netty are used when a task in one worker process wants to send data toa task that runs in a worker process on different machine in the Storm cluster.
So for your reference:
- Intra-worker communication in Storm (inter-thread on the same Storm node): LMAX Disruptor
- Inter-worker communication (node-to-node across the network): ZeroMQ or Netty
- Inter-topology communication: nothing built into Storm, you must take care of this yourself with e.g. a messagingsystem such as Kafka/RabbitMQ, a database, etc.
If you do not know what the differences are between Storm’s worker processes, executor threads and tasks please take alook atUnderstanding
the Parallelism of a Storm Topology.
Illustration
Let us start with a picture before we discuss the nitty-gritty details in the next section.
worker process (though normally a single Storm node runs multiple such processes) and only one executor threadwithin that worker process (of which, again, there are usually many per worker process).
Detailed description
Now that you got a first glimpse of Storm’s intra-worker messaging setup we can discuss the details.
Worker processes
To manage its incoming and outgoing messages each worker process has a single receive thread that listens on the worker’sTCP port (as configured viasupervisor.slots.ports
). The parameter topology.receiver.buffer.size
determines thebatch size that the receive thread uses to place incoming messages into the incoming queues of the worker’s executorthreads. Similarly, each worker
has a single send thread that is responsible for reading messages from the worker’stransfer queue and sending them over the network to downstream consumers. The size of the transfer queue is configuredviatopology.transfer.buffer.size
.
- The
topology.receiver.buffer.size
is the maximum number of messages that are batched together at once forappending to an executor’s incoming queue by the worker receive thread (which reads the messages from the network)Setting this parameter
too high may cause a lot of problems (“heartbeat thread gets starved, throughput plummets”).The default value is 8 elements, and the value must be a power of 2 (this requirement comes indirectly from LMAXDisruptor).
1 |
|
ArrayList that is used to buffer incoming messages because in this specific case the data structure does not need to be shared with other threads, i.e. it is local to the worker’s receive thread. But because the content of this buffer is used to fill a
Disruptor-backed queue (executor incoming queues) it must still be a power of 2. See
launch-receive-thread! in backtype.storm.messaging.loader for details.
- Each element of the transfer queue configured with
topology.transfer.buffer.size
is actually a
list of tuples.The various executor send threads will batch outgoing tuples off their outgoing queues onto the transfer queue. Thedefault value is 1024 elements.
1 |
|
Executors
Each worker process controls one or more executor threads. Each executor thread has its own
incoming queue andoutgoing queue. As described above, the worker process runs a dedicated worker receive thread that is responsiblefor moving incoming messages to the appropriate incoming queue of the worker’s various executor threads. Similarly,each
executor has its dedicated send thread that moves an executor’s outgoing messages from its outgoing queue to the“parent” worker’s transfer queue. The sizes of the executors’ incoming and outgoing queues are configured viatopology.executor.receive.buffer.size
and topology.executor.send.buffer.size
, respectively.
Each executor thread has a single thread that handles the user logic for the spout/bolt (i.e. your application code),and a single send thread which moves messages from the executor’s outgoing queue to the worker’s transfer queue.
- The
topology.executor.receive.buffer.size
is the size of the incoming queue for an executor. Each element ofthis queue is a
list of tuples. Here, tuples are appended in batch. The default value is 1024 elements, andthe value must be a power of 2 (this requirement comes from LMAX Disruptor).
1 |
|
- The
topology.executor.send.buffer.size
is the size of the outgoing queue for an executor. Each element of thisqueue will contain a
single tuple. The default value is 1024 elements, and the value must be a power of 2 (thisrequirement comes from LMAX Disruptor).
1 |
|
Where to go from here
How to configure Storm’s internal message buffers
The various default values mentioned above are defined inconf/defaults.yaml. You can override these valuesglobally in a Storm cluster’sconf/storm.yaml
. You can also configure these parameters per individual Stormtopology via
backtype.storm.Config in Storm’s JavaAPI.
How to configure Storm’s parallelism
The correct configuration of Storm’s message buffers is closely tied to the workload pattern of your topology as wellas the configured
parallelism of your topologies. SeeUnderstanding the Parallelism of a Storm Topologyfor more details about the latter.
Understand what’s going on in your Storm topology
The Storm UI is a good start to inspect key metrics of your running Storm topologies. For instance, it shows you theso-called “capacity” of a spout/bolt. The various metrics will help you decide whether your changes to thebuffer-related configuration parameters
described in this article had a positive or negative effect on the performanceof your Storm topologies. SeeRunning a Multi-Node Storm Cluster for details.
Apart from that you can also generate your own application metrics and track them with a tool like Graphite.See my articles
Sending Metrics From Storm to Graphite andInstalling and Running Graphite via RPM and Supervisordfor details. It might also
be worth checking out ooyala’smetrics_storm project on GitHub (I haven’t used it yet).
Advice on performance tuning
Watch Nathan Marz’s talk onTuning and Productionization of Storm.
The TL;DR version is: Try the following settings as a first start and see whether it improves the performance of yourStorm topology.
1 |
|
Posted by Michael G. Noll Jun 21st, 2013
Filed under Programming, Storm
原文地址: http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/