Actual-time streaming knowledge processing is a strategic crucial that straight impacts enterprise competitiveness. Organizations face mounting strain to course of huge knowledge streams instantaneously—from detecting fraudulent transactions and delivering customized buyer experiences to optimizing complicated provide chains and responding to market dynamics milliseconds forward of opponents.
Apache Spark Structured Streaming addresses these crucial enterprise challenges via its stateful processing capabilities, enabling functions to keep up and replace intermediate outcomes throughout a number of knowledge streams or time home windows. RocksDB was launched in Apache Spark 3.2, providing a extra environment friendly different to the default HDFS-based in-memory retailer. RocksDB excels in stateful streaming in situations that require dealing with giant portions of state knowledge. It delivers optimum efficiency advantages, significantly in decreasing Java digital machine (JVM) reminiscence strain and rubbish assortment (GC) overhead.
This publish explores RocksDB’s key options and demonstrates its implementation utilizing Spark on Amazon EMR and AWS Glue, offering you with the information you could scale your real-time knowledge processing capabilities.
RocksDB state retailer overview
Spark Structured Streaming processes fall into two classes:
- Stateful: Requires monitoring intermediate outcomes throughout micro-batches (for instance, when working aggregations and de-duplication).
- Stateless: Processes every batch independently.
A state retailer is required by stateful functions that monitor intermediate question outcomes. That is important for computations that depend upon steady occasions and alter outcomes primarily based on every batch of enter, or on combination knowledge over time, together with late arriving knowledge. By default, Spark provides a state retailer that retains states in JVM reminiscence, which is performant and ample for many common streaming instances. Nevertheless, when you have a lot of stateful operations in a streaming software—akin to, streaming aggregation, streaming dropDuplicates, stream-stream joins, and so forth—the default in-memory state retailer may face out-of-memory (OOM) points due to a big JVM reminiscence footprint or frequent GC pauses, leading to degraded efficiency.
Benefits of RocksDB over in-memory state retailer
RocksDB addresses the challenges of an in-memory state retailer via off-heap reminiscence administration and environment friendly checkpointing.
- Off-heap reminiscence administration: RocksDB shops state knowledge in OS-managed off-heap reminiscence, decreasing GC strain. Whereas off-heap reminiscence nonetheless consumes machine reminiscence, it doesn’t occupy area within the JVM. As a substitute, its core reminiscence buildings, akin to block cache or memTables, allocate straight from the working system, bypassing the JVM heap. This strategy makes RocksDB an optimum selection for memory-intensive functions.
- Environment friendly checkpointing: RocksDB routinely saves state modifications to checkpoint places, akin to Amazon Easy Storage Service (Amazon S3) paths or native directories, serving to to make sure full fault tolerance. When interacting with S3, RocksDB is designed to enhance checkpointing effectivity; it does this via incremental updates and compaction to scale back the quantity of knowledge transferred to S3 throughout checkpoints, and by persisting fewer giant state recordsdata in comparison with the numerous small recordsdata of the default state retailer, decreasing S3 API calls and latency.
Implementation concerns
RocksDB operates as a local C++ library embedded throughout the Spark executor, utilizing off-heap reminiscence. Whereas it doesn’t fall below JVM GC management, it nonetheless impacts total executor reminiscence utilization from the YARN or OS perspective. RocksDB’s off-heap reminiscence utilization may exceed YARN container limits with out triggering container termination, doubtlessly resulting in OOM points. It is best to contemplate the next approaches to handle Spark’s reminiscence:
Regulate the Spark executor reminiscence dimension
Improve spark.executor.memoryOverhead
orspark.executor.memoryOverheadFactor
to depart extra room for off-heap utilization. The next instance units half (4 GB) of spark.executor.reminiscence
(8 GB) because the reminiscence overhead dimension.
For Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), enabling YARN reminiscence management with the next strict container reminiscence enforcement via polling technique preempts containers to keep away from node-wide OOM failures:
Off-heap reminiscence management
Use RocksDB-specific settings to configure reminiscence utilization. Extra particulars might be discovered within the Greatest practices and concerns part.
Get began with RocksDB on Amazon EMR and AWS Glue
To activate the state retailer RocksDB in Spark, configure your software with the next setting:
Within the following sections, we discover making a pattern Spark Structured Streaming job with RocksDB enabled working on Amazon EMR and AWS Glue respectively.
RocksDB on Amazon EMR
Amazon EMR variations 6.6.0 and later assist RocksDB, together with Amazon EMR on EC2, Amazon EMR serverless and Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). On this case, we use Amazon EMR on EC2 for example.
Use the next steps to run a pattern streaming job with RocksDB enabled.
- Add the next pattern script toÂ
s3://
/script/sample_script.py
- On the AWS Administration Console for Amazon EMR, select Create Cluster
- For Identify and functions – required, choose the most recent Amazon EMR launch.
- For Steps, select Add. For Kind, choose Spark software.
- For Identify, enterÂ
GettingStartedWithRocksDB
ands3://
 because the Utility location./script/sample_script.py - Select Save step.
- For different settings, select the suitable settings primarily based in your use case.
- Select Create cluster to begin the streaming software through Amazon EMR step.
RocksDB on AWS Glue
AWS Glue 4.0 and later variations assist RocksDB. Use the next steps to run the pattern job with RocksDB enabled on AWS Glue.
- On the AWS Glue console, within the navigation pane, select ETL jobs.
- Select Script editor and Create script.
- For the job identify, enter
GettingStartedWithRocksDB
. - Copy the script from the earlier instance and paste it on the Script tab.
- On Job particulars tab, for Kind, choose Spark Streaming.
- Select Save, after which select Run to begin the streaming job on AWS Glue.
Walkthrough particulars
Let’s dive deep into the script to grasp the best way to run a easy stateful Spark software with RocksDB utilizing the next instance pySpark code.
- First, arrange RocksDB as your state retailer by configuring the supplier class:
- To simulate streaming knowledge, create a knowledge stream utilizing the
charge
supply kind. It generates one report per second, containing 5 random fruit names from a pre-defined checklist.
- Create a phrase counting operation on the incoming stream. It is a stateful operation as a result of it maintains working counts between processing intervals, that’s, earlier counts should be saved to calculate the subsequent new totals.
- Lastly, output the phrase rely totals to the console:
Enter knowledge
In the identical pattern code, take a look at knowledge (raw_stream
) is generated at a charge of one-row-per-second, as proven within the following instance:
Output end result
The streaming job produces the next ends in the output logs. It demonstrates how Spark Structured Streaming maintains and updates the state throughout a number of micro-batches:
- Batch 0: Begins with an empty state
- Batch 1: Processes a number of enter information, leading to preliminary counts for each one of many 10 fruits (for instance, banana seems 8 occasions)
- Batch 2: Operating totals primarily based on new occurrences from the subsequent set of information are added to the counts (for instance, banana will increase from 8 to fifteen, indicating 7 new occurrences).
State retailer logs
RocksDB generates detailed logs through the job run, like the next:
In Amazon EMR on EC2, these logs can be found on the node the place the YARN ApplicationMaster container is working. They are often discovered at/var/log/hadoop-yarn/containers/
.
As for AWS Glue, you will discover the RocksDB metrics in Amazon CloudWatch, below the log group /aws-glue/jobs/error
.
RocksDB metrics
The metrics from the previous logs present insights on RocksDB standing. The followings are some instance metrics you may discover helpful when investigating streaming job points:
rocksdbCommitCheckpointLatency
: Time spent writing checkpoints to native storagerocksdbCommitCompactLatency
: Period of checkpoint compaction operations throughout checkpoint commitsrocksdbSstFileSize
: Present dimension of SST recordsdata in RocksDB.
Deep dive into RocksDB key ideas
To raised perceive the state metrics proven within the logs, we deep dive into RocksDB’s key ideas: MemTable, sorted string desk (SST) file, and checkpoints. Moreover, we offer some ideas for finest practices and fine-tuning.
Excessive degree structure
RocksDB is an area, non-distributed persistent key-value retailer embedded in Spark executors. It permits scalable state administration for streaming workloads, backed by Spark’s checkpointing for fault tolerance. As proven within the previous determine, RocksDB shops knowledge in reminiscence and in addition on disk. RocksDB’s means to spill knowledge over to disk is what permits Spark Structured Streaming to deal with state knowledge that exceeds the accessible reminiscence.
Reminiscence:
- Write buffers (MemTables): Designated reminiscence to buffer writes earlier than flushing onto disk
- Block cache (learn buffer): Reduces question time by caching outcomes from disk
Disk:
- SST recordsdata: Sorted String Desk saved as SST file format for quick entry
MemTable: Saved off-heap
MemTable, proven within the previous determine, is an in-memory retailer the place knowledge is first written off-heap, earlier than being flushed to disk as an SST file. RocksDB caches the most recent two batches of knowledge (sizzling knowledge) in MemTable to scale back streaming course of latency. By default, RocksDB solely has two MemTables—one is lively and the opposite is read-only. When you’ve got ample reminiscence, the configuration spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber
might be elevated to have greater than two MemTables. Amongst these MemTables, there’s at all times one lively desk, and the remainder are read-only MemTables used as write buffers.
SST recordsdata: Saved on Spark executor’s native disk
SST recordsdata are block-based tables saved on the Spark executor’s native disk. When the in-memory state knowledge can now not match right into a MemTable (outlined by a Spark configuration writeBufferSizeMB
), the lively desk is marked as immutable, saving it because the SST file format, which switches it to a read-only MemTable whereas asynchronously flushing it to native disks. Whereas flushing, the immutable MemTable can nonetheless be learn, in order that the newest state knowledge is out there with minimal learn latency.
Studying from RocksDB follows the sequence demonstrated by the previous diagram:
- Learn from the lively MemTable.
- If not discovered, iterate via read-only MemTables within the order of latest to oldest.
- If not discovered, learn from BlockCache (learn buffer).
- If misses, load index (one index per SST) from disk into BlockCache. Search for key from index and if hits, load knowledge block onto BlockCache and return end result.
SST recordsdata are saved on executors’ native directories below the trail of spark.native.dir
(default: /tmp
) or yarn.nodemanager.local-dirs:
- Amazon EMR on EC2 –Â
${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache/
/ / - Amazon EMR Serverless, Amazon EMR on EKS, AWS Glue –
${spark.native.dir}/
/
Moreover, through the use of software logs, you possibly can monitor the MemTable flush and SST file add standing below the file path:
- Amazon EMR on EC2 –
/var/log/hadoop-yarn/containers/
/ /stderr - Amazon EMR on EKS –
/var/log/spark/person/
- /stderr
The next is an instance command to verify the SST file standing in an executor log from Amazon EMR on EKS:
cat /var/log/spark/person/
or
kubectl logs
The next screenshot is an instance of the output of both command.
You need to use the next examples to verify if the MemTable information had been deleted and flushed out to SST:
cat /var/log/spark/person/
or
kubectl logs
The next screenshot is an instance of the output of both command.
Checkpoints: Saved on the executor’s native disk or in an S3 bucket
To deal with fault tolerance and fail over from the final dedicated level, RocksDB helps checkpoints. The checkpoint recordsdata are normally saved on the executor’s disk or in an S3 bucket, together with snapshot and delta or changelog knowledge recordsdata.
Beginning with Amazon EMR 7.0 and AWS Glue5.0, RocksDB state retailer gives a brand new function known as changelog checkpointing to improve checkpoint efficiency. when the changelog is enabled (disabled by default) utilizing the setting spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled
, RocksDB writes smaller change logs to the storage location (the native disk by default) as an alternative of continuously persisting giant snapshot knowledge. Be aware that snapshots are nonetheless created however much less continuously, as proven within the following screenshot.
Right here’s an instance of a checkpoint location path when overridden to an S3 bucket: s3://
Greatest practices and concerns
This part outlines key methods for fine-tuning RocksDB efficiency and avoiding frequent pitfalls.
1. Reminiscence administration for RocksDB
To stop OOM errors on Spark executors, you possibly can configure RocksDB’s reminiscence utilization at both the node degree or occasion degree:
- Node degree (beneficial): Implement a world off-heap reminiscence restrict per executor. On this context, every executor is handled as a RocksDB node. If an executor processes N partitions of a stateful operator, it would have N variety of RocksDB cases on a single executor.
- Occasion-level: Advantageous-tune particular person RocksDB cases.
Node-level reminiscence management per executor
Beginning with Amazon EMR 7.0 and AWS Glue 5.0 (Spark 3.5), a crucial Spark configuration, boundedMemoryUsage
, was launched (via SPARK-43311) to implement a world reminiscence cap at a single executor degree that’s shared by a number of RocksDB cases. This prevents RocksDB from consuming unbounded off-heap reminiscence, which might result in OOM errors or executor termination by useful resource managers akin to YARN or Kubernetes.
The next instance reveals the node-level configuration:
A single RocksDB occasion degree management
For granular reminiscence administration, you possibly can configure particular person RocksDB cases utilizing the next settings:
- writeBufferSizeMB (default: 64, steered: 64 – 128): Controls the most dimension of a single MemTable in RocksDB, affecting reminiscence utilization and write throughput. This setting is out there in Spark3.5 – [SPARK-42819] and later. It determines the dimensions of the reminiscence buffer earlier than state knowledge is flushed to disk. Bigger buffer sizes can enhance write efficiency by decreasing SST flush frequency however will enhance the executor’s reminiscence utilization. Adjusting this parameter is essential for optimizing reminiscence utilization and write throughput.
- maxWriteBufferNumber (default: 2, steered: 3 – 4): Units the full variety of lively and immutable MemTables.
For read-heavy workloads, prioritize the next block cache tuning over write buffers to scale back disk I/O. You may configure SST block dimension and caching as follows:
-  blockSizeKB (default: 4, steered: 64–128): When an lively MemTable is full, it turns into a read-only memTable. From there, new writes proceed to build up in a brand new desk. The read-only MemTable is flushed into SST recordsdata on the disk. The information in SST recordsdata is roughly chunked into fixed-sized blocks (default is 4 KB). Every block, in flip, retains a number of knowledge entries. When writing knowledge to SST recordsdata, you possibly can compress or encode knowledge effectively inside a block, which regularly ends in a smaller knowledge dimension in contrast with its uncooked format.
For workloads with a small state dimension (akin to lower than 10 GB), the default block dimension is normally ample. For a big state (akin to greater than 50 GB), growing the block dimension can enhance compression effectivity and sequential learn efficiency however enhance CPU overhead.
- blockCacheSizeMB (default: 8, steered: 64–512, giant state: greater than
1024
): When retrieving knowledge from SST recordsdata, RocksDB gives a cache layer (block cache) to enhance the learn efficiency. It first locates the info block the place the goal report may reside, then caches the block to reminiscence, and eventually searches that report throughout the cached block. To keep away from frequent reads of the identical block, the block cache can be utilized to maintain the loaded blocks in reminiscence.
2. Clear up state knowledge at checkpoint
To assist be certain that your state file sizes and storage prices stay below management when checkpoint efficiency turns into a priority, use the next Spark configurations to regulate cleanup frequency, retention limits, and checkpoint file sorts:
- maintenanceInterval (default: 60 seconds):Â Retaining a state for a protracted time period can assist cut back upkeep value and background IO. Nevertheless, longer intervals enhance file itemizing time, as a result of state shops typically scan each retained file.
- minBatchesToRetain (default: 100, steered: 10–50): Limits the variety of state variations retained at checkpoint places. Decreasing this quantity ends in fewer recordsdata being endured and reduces storage utilization.
- changelogCheckpointing (default: false, steered: true): Historically, RocksDB snapshots and uploads incremental SST recordsdata to checkpoint. To keep away from this value, changelog checkpointing was launched in Amazon EMR7.0+ and AWS Glue 5.0, which write solely state modifications because the final checkpoint.
To trace an SST file’s retention standing, you possibly can search RocksDBFileManager
entries within the executor logs. Contemplate the next logs in Amazon EMR on EKS for example. The output (proven within the screenshot) reveals that 4 SST recordsdata below model 102 had been uploaded to an S3 checkpoint location, and that an previous changelog state file with model 97 was cleaned up.
or
3. Optimize native disk utilization
RocksDB consumes native disk area when producing SST recordsdata at every Spark executor. Whereas disk utilization doesn’t scale linearly, RocksDB can accumulate storage over time primarily based on state knowledge dimension. When working streaming jobs, if native accessible disk area will get inadequate, No area left on gadget
errors can happen.
To optimize disk utilization by RocksDB, regulate the next Spark configurations:
Infrastructure changes can additional mitigate the disk problem:
For Amazon EMR:
For AWS Glue:
- Use AWS Glue G.2X or bigger employee sorts to keep away from the restricted disk capability of G.1X staff.
- Schedule common upkeep home windows at optimum timing to unlock disk area primarily based on workload wants.
Conclusion
On this publish, we explored RocksDB because the new state retailer implementation in Apache Spark Structured Streaming, accessible on Amazon EMR and AWS Glue. RocksDB provides benefits over the default HDFS-backed in-memory state retailer, significantly for functions coping with large-scale stateful operations. RocksDB helps forestall JVM reminiscence strain and rubbish assortment points frequent with the default state retailer.
The implementation is simple, requiring minimal configuration modifications, although you need to pay cautious consideration to reminiscence and disk area administration for optimum efficiency. Whereas RocksDB isn’t assured to cut back job latency, it gives a strong answer for dealing with large-scale stateful operations in Spark Structured Streaming functions.
We encourage you to judge RocksDB to your use instances, significantly when you’re experiencing reminiscence strain points with the default state retailer or have to deal with giant quantities of state knowledge in your streaming functions.
In regards to the authors
Melody Yang is a Senior Massive Information Resolution Architect for Amazon EMR at AWS. She is an skilled analytics chief working with AWS clients to offer finest apply steering and technical recommendation with a view to help their success in knowledge transformation. Her areas of pursuits are open-source frameworks and automation, knowledge engineering and DataOps.
Dai Ozaki is a Cloud Assist Engineer on the AWS Massive Information Assist staff. He’s enthusiastic about serving to clients construct knowledge lakes utilizing ETL workloads. In his spare time, he enjoys taking part in desk tennis.
Noritaka Sekiyama is a Principal Massive Information Architect with Amazon Internet Companies (AWS) Analytics providers. He’s answerable for constructing software program artifacts to assist clients. In his spare time, he enjoys biking on his highway bike.
Amir Shenavandeh is a Sr Analytics Specialist Options Architect and Amazon EMR material professional at Amazon Internet Companies. He helps clients with architectural steering and optimisation. He leverages his expertise to assist individuals deliver their concepts to life, specializing in distributed processing and massive knowledge architectures.
Xi Yang is a Senior Hadoop System Engineer and Amazon EMR material professional at Amazon Internet Companies. He’s enthusiastic about serving to clients resolve difficult points within the Massive Information space.