In 2015, Salesforce Commerce Cloud (which was then called Demandware) was running a typical open source Grafana/Graphite/Carbon stack to store and visualize time series metrics of the Java application clusters powering our e-commerce business. Our JVM clusters at the time produced around 500k time series metrics per minute.
Even though our organization needed us to support more metrics for increased observability, we had already maxed out I/O bandwidth on the
i2.8xlarge AWS node with substantial provisioned IOPS SSDs. This node type was running carbon-cache, the storage engine of the open source Graphite stack.
i2.8xlarge instances came with instance-local SSDs that were a lot faster compared to EBS volumes, but the instance was still struggling. As time series data became available to our engineering teams, they instrumented their code to get deeper insights across the platform. Our users built engaging dashboards and adoption was great, but this meant our metrics volume and usage grew as well. This maxed out read bandwidth, while the influx of data points from our application servers saturated available write bandwidth. As a consequence, the user experience was bad — queries took a long time (if they even completed), there were constant outages, and we did lots of fire-fighting.
Carbon-cache also supports aggregating metrics from a high resolution to a lower resolution to make it more efficient to store historic data; however, the aggregation process involves reading all data points and aggregating them — which is pretty hard to do if read bandwidth is already fully consumed by on-line queries. Even worse, it was impossible to make backups of the volume since that would also have effectively brought it down. The issue with i2-instance-local SSDs, however, is that they are ephemeral. So if the instance was stopped all data was lost. And on top of all that we were expecting a year-over-year growth of metrics of >40%.
In short: We had to act.
When analyzing how the carbon-cache storage works, it became evident why it couldn’t keep up with our demands. Carbon-cache uses so-called ‘whisper’-files which act as the storage for the time series data. They are organized as files in the file system; the namespace of the metric equates to the directory structure and the files hold data for the data points received. When a multitude of metrics is received, many files are being created in a directory — in some cases, too many for the filesystem to even list them effectively, let alone aggregate them.
We also needed to replace carbon-relay, which suffered from lack of efficient multithreading. It was CPU-bound already with inbound metrics in the 500k/m range, but, with anticipated growth in storage throughput, we needed to act on the relay as well.
In summary, these were problems we were attempting to solve:
- Addressing maxed out I/O bandwidth
- Increasing the stability of our solution
- Performing read query optimization
- Supporting backup for large data sets
- Replacing carbon-relay with a more scalable implementation
The whisper files being the culprit wasn’t really new. In order to scale carbon-cache itself, the query engine of the Graphite stack, graphite-web, has support for querying clustered carbon-cache instances. This effectively supports a sharding scheme where you can divert metric data points to different shards based on keys; graphite-web will simply query all carbon-cache instances in the cluster and assemble the data back into a coherent query result.
Still, that would only have allowed us to throw more shards at the problem while not solving the actual problem at hand — the whisper files being ineffective for our growing volume. Our idea was to build a storage backend for graphite-web that would run on cheap hardware, not requiring expensive local SSDs and huge instances to begin with.
Our goal was to build a drop-in replacement for carbon-cache, a storage engine implementing the Line and Pickle protocols that are used to store data points as well as the query protocol used by graphite-web.
Whisper files store each metric in a flat file and need to be updated each time a new data point arrives. In other words, it is non-deterministic what data point will be received next. So, when storing those, you cannot assume or rely on any data locality — a data point you are given might be
datacenter1.cpu and the next one might be
datacenter124.memory. Based on the organization structure of whisper directories and files the location, this needs to be stored on disk, and it is going to be in very different places — making it seek-intensive on magnetic disks and thus only sustainable on SSDs which at the time were a premium feature.
So, a major design goal was to replace the random I/O with sequential I/O by grouping writes together effectively, so that our carbon-cache replacement could potentially run on conventional cheap magnetic disks.
Let’s take a step back and look at the data we are trying to store and how we planned to access it. Carbon Metrics time series data is effectively a byte string with name (called namespace), a value and a Unix epoch timestamp like this:
datacenter0.cluster1.rack2.server3.cpu.percentage 23 1632922174
The namespace defines where each metric is stored and how they are related and organized. The above string is also an example for the Carbon line protocol. One metric data point is sent in one line to the carbon-cache storage node like the above.
Metric data points are immutable; they cannot be updated. There might be a new datapoint for a different time instant and, in rare cases, there could be a duplicate data point received. (However, the outcome of that is undefined — one of the data points will ‘win.’)
Metrics data is often aggregated to lower the storage that is required by applying an aggregation function across a predefined interval. For this, all data points in that interval need to be read.
Users can query for any data points known to identify current ongoing issues or to look at historic trends. However, the most recent data points (produced in the last hour) make up the vast majority of data points users are typically interested in.
Lookups of these data points are often done using wildcards, so, for a storage engine, it must be possible to efficiently search for data points using wildcards.
Given the high amount of random writes for immutable data, we decided to build a time series metrics store based on an LSM database. Log-Structured-Merge-Tree databases hold a comparatively large working set of data in memory that gets flushed to disk periodically. After a few minutes, all data held in the working set would be flushed to disk in a large write, effectively serializing the randomly ordered writes that came in. Our use case was also reading many very recent data points. Ideally, these would still be in the working set of the LSM database when queried.
So what should we call this? Since naming is one of the two hard problems in Computer Science, we took the easy path. We would be rewriting Carbon and, since our area of expertise is in Java, we would use that language for the project.
So CarbonJ it is. Hardest problem solved.
Why a custom implementation? Why not use some off-the-shelf solution?
Other open source projects had tried to tackle this problem. For example, there is a plug-in for carbon-cache that could utilize Cassandra as a backend storage (Cassandra Storage plugin for carbon/graphite-web). Cassandra is a great database that could have solved the problem; it uses an LSM-based storage engine that will effectively serialize random writes. However, to utilize that, we would have needed to run a Cassandra cluster or at least a single node. Our team was small and didn’t have DBA expertise, so that prospect didn’t seem very compelling at the time. Another issue with Cassandra is we didn’t know how costly it would have been to make the optimizations we would have needed within the limitations of Cassandra itself. Also, the project seemed abandoned and written in Python. While Python has its advantages, it lacked the support for multithreading and high-scale tasks that we needed, so we decided to use the team’s domain knowledge, which is large-scale Java applications.
Another option available at the time was go-carbon, which was an interesting project but not yet available for prime-time, having had only a couple of commits back then.
Let’s not forget that, while there is always a cost for implementing something yourself, there is also a cost for using an off-the-shelf-solution. You need to understand what that solution does and how it is doing it so that you can gauge wether or not it would actually solve your problem. And even then you need to learn and run the new solution.
And, neither of the off-the-shelf solutions we found would have solved our secondary problem of the maxed out carbon-relays.
RocksDB and Sharding
Implementing our own LSM store that supported the Carbon protocols would have enabled us to fine-tune all aspects of the ingestion pipeline (and, to some degree, how our LSM module would store it).
But, we didn’t want to implement our own LSM storage engine, just use one. Given that the team consisted of mostly Java folks, we decided to use RocksDB. RocksDB is an embeddable database that will provide JNI bindings so it could be run in-process of our JVM, allowing us to implement a very simple scheme for a sharded LSM database. There is no replication between RocksDB processes, so nodes share nothing. However, the carbon-relay functionality provides an excellent way of multiplexing the ingressing metrics data points. This way, we could set up a pipeline feeding two replicas of a shard that effectively store the same data.
We used the metric names or namespaces as sharding keys such that it is very easy to separate data out by the top-level names they bear. Our e-commerce application runs in many different data centers, so often the DC would be the top-level namespace for a metric. One CarbonJ shard is able to hold the data and serve the data of one such data center cluster, thus providing a straightforward way for sharding.
The ability of graphite-web to assemble the data points back together from across different shards also allows us to move around data from shards that run out of disk or have become slow. It provides a simple way to scale out.
We didn’t stop at what carbon-cache and carbon-relay provided. We added many features that helped us do what the team does best: write code, not troubleshoot stuff at night (although we have gotten pretty good at that too..)
For example we wanted to bridge the gap of metrics ingestion when a node needed to be restarted. Since nodes share nothing with other nodes, all data needs to be routed to all nodes at all times in order to not have gaps. However, when we restarted instances (when we deployed a new version of CarbonJ, for example), there would always be a gap. Using two replicas helped a bit — graphite-web will query all instances it knows and merge the result data. So if one node had data and the other one did not, the query would still show data from the node that had data. But ultimately this would mess up aggregation, at least a little bit, for that replica down the road.
The ingestion path was built for throughput using User Datagram Protocol (UDP), so there could be packet drops anywhere in the chain that would also lead to gaps. Our solution was to utilize Kinesis streams at the source environments that can be consumed by the CarbonJ Storage nodes in our cluster, effectively cutting out relay nodes entirely. Since Kinesis data is durable and we can ingest forward and catch-up, we were able to reduce the gap to effectively zero if CarbonJ was back online within a set interval from the initial gap (typically 24 hours). We also implemented many more features that help us operating CarbonJ at scale.
Stats, stats, stats!
As of fall 2021, Salesforce Commerce Cloud metrics are stored in a total of 20 shards with two replicas each. Each node has a 3TB
gp2 EBS volume attached. These shards store two years worth of metrics data with a one minute resolution for 24 hours, five minutes for seven days, and 30 minutes for up to two years.
Stats across all 20 shards
Metric,Per Day,Per Minute Data Points Received,104B,72M Data Points Read,225B,156M Queries Served,14M,50k Data Points Saved,133B*,92M* Query Time (p95),15ms, Unique Active Metric Datapoints Stored,~250M on 9/28/21,
*(includes aggregate writes, i.e. number of data points > data points received)
The instances run on
r5.4xlarge nodes. These nodes provide high CPU and lots of memory (128GB). We give the JVM about 20G of that memory, and the rest is used by Linux for caching disk pages, giving us a pretty large working set of data that is available with memory latency, i.e. without having to read from disk. This is important for our use case of alerting that requires lots of data points from the last ~15 minutes to be read. In fact, alerting makes up 95%+ of our queries. We have run on
c5.4xlarge instances before that for years. We recently ran into query latency issues and decided to throw more hardware at it to more effectively scale vertically. While adding more memory does not help ingest more metrics per minute (since that process is IO-bound) it does improve query performance for recently ingested metrics (which is, after all, our top use case.)
Each node can ingest around 7 million data points arriving per minute before we split the shard. The breaking point is somewhere near 8-9 million metrics data points per minute with 3TB
gp2 volumes and its throughput maximum of 250MiB/s. Since we sometimes hit the throughput maximum, we are currently testing with
gp3 volumes and double the throughput.
What did we achieve and how?
- Rewriting a piece of open source software can be done by a small team. We were able to develop an understanding of the open source project because the source code is available.
- Kubernetes is a highly mature infrastructure orchestration solution that enables small teams to own the whole DevOps flow end-to-end without too many late-night page-outs.
- Sharding done right allows for truly horizontal scale.
- There is a sweet spot for relying on managed services vs. building and running your own — and it very much depends on the team size.
If you are also looking to replace your carbon-cache based metrics stack with something else that is compatible, look no further. You can access this project on GitHub under BSD 3-Clause license, and it includes all Java source code, Kubernetes YAML files, and Helm charts for deploying into your Kubernetes cluster.
CarbonJ is in active development, and one thing we would like to add is support for tags. That would allow you to store Prometheus-style metrics that use tags with CarbonJ.
Our team is currently working on a management tool for creating new shards, deploying configuration for CarbonJ in a substrate-agnostic way, as well as rebalancing shards automatically. It is called uConsole and will also be published as open source at a later point.
We are always looking for contributors so be sure to check out the project for CarbonJ!