small data — small problems, BIG Data — BIG Problems. Part 1.

Boris Serebrinskiy
6 min readOct 22, 2023

ChatGPT. LLM. OpenAI. There, the obligatory Gen AI buzzwords have been included (otherwise would anyone even look at my blog?), and we can get on with a real data engineering challenge.

Anyhow, I get a call one day — some team has been trying to run an update on a table in their massively parallel processing (MPP) cloud data warehouse but the process was getting terminated no matter how much compute they apply to it. They hit limits on execution time, compute size, and are now reaching out to me, one of the SMEs, for help. Oh yeah, since the platform is pay-per-use, each of their attempts costs our cloud budget another C-note.

As always, I won’t mention vendor names, or specific technologies we use, and the actual names of the tables and columns have been modified. The problem is quite generic, albeit not many people will encounter such sizable datasets. The solution though will be useful for anyone with “big data”. The cloud technology we use allows us to scale compute independently and run parallel compute clusters against the same data.

So, I started troubleshooting. The table in question had only a handful of columns and stored information related to usage of a globally distributed object system. It is essentially a log table (let’s call it “BadLog”) with a timestamp, ObjectID, ObjectName, and a few more string and numeric columns. The table is used to identify the last time an Object was last accessed. ObjectName has been loaded incorrectly and needs to be updated. The correct data is another table (let’s call it “CorrectObjectName” ) and can be applied to the BadLog table via a join on ObjectID. This table has only 2 columns: ObjectID and ObjectName. In a classic star schema, BadLog would be a fact table and CorrectObjectName would be a dimension. Doesn’t seem too bad… Basically, it looked like this pseudo code:

UPDATE BadLog SET ObjectName = CorrectObjectName.ObjectName

FROM BadLog inner join CorrectObjectName ON ObjectID = ObjectID

But a few questions immediately come to mind:

  1. Why is ObjectName denormalized as a column of the BadLog table and not kept separately? Simple answer — for performance reasons. It is quite common in analytical reporting to denormalize columns to avoid joining tables. It is a trade off — spend more money on storage, but spend less money on compute/joins. Plus strings compress really well, so all in all not a bad choice.
  2. Why was their update query timing out? — This is our policy, we do not allow queries to run more than X minutes, typically 30–60 minutes max. This is to avoid runaway cloud costs.
  3. Why not use a larger cloud compute, don’t clouds offer unlimited compute? — Another policy, we don’t allow exceeding preset compute limits to prevent runaways.

In all my years in the data engineering business rarely have I seen such a bad data quality situation requiring an update of a… BadLog had over a TRILLION records (>1,300,000,000,000 rows actually!), while CorrectObjectNames was also “just” 3 billion rows (I write “billion” with a small “b” to contrast with the Trillion). The update query was a real beast — it would scan the trillion row fact table, join to the billion row dimension, and was never going to finish. I actually estimated that on a 64 node compute cluster it would run for 2 weeks, give or take a week :) In real dollars — at approximately 250 dollars an hour for that compute cluster, 24 hours a day, 14 days of running — 86,000 USD! Even if I was wrong in my estimate, it is still many thousands of dollars per day. For comparison, on a 32 node cluster, the query progressed less than 0.5% in the first hour. And there were no guarantees that a larger cluster would speed things up linearly, meaning a 64 node may not run 2x faster than 32 node, it might do just 30–40% better, at twice the cost.

My instinct for solving such problems is to look for a way to split the process into either multiple steps running sequentially — e.g. breaking multiple joins into a sequence of doing one join at a time and storing intermediate results into a temporary table — or multiple very similar parts (or chunks) running in parallel. Or a combination of the two methods. Since there weren’t many steps to go sequentially here — the query literally had only one join, the parallelization based on chunking was the path forward.

People reading this might ask — you already have a massively parallel data platform, why not go with the largest compute possible and let the MPP engine do its job? Well, this is where Big Data becomes a Big Problem. The issues are following:

  • Any long running process is likelier to fail than a short running process, e.g. due to restart of some nodes or even client machine that kicked off the query. Such a failure will roll back all updates. Money spent on running cloud compute WILL NOT be refunded. You want to be able to demonstrate incremental progress for completed parts of work and in case of a job failure restart from that failure point. One large query cannot do that. And while Spark is capable of restarting individual tasks (this was not a Spark job by the way), even Spark cannot recover from a failure of a cluster or multiple nodes in a cluster (depending on the implementation).
  • You don’t know exactly how much of the update process has completed. This is not like copying files where each file completion is easy to see, you may be sitting there for hours or days or weeks waiting for an “all done” sign…. And then your computer reboots or cloud machine(s) crashes (see bullet #1 above)
  • You may be creating a massive block on your target table, literally making the whole system unusable. This can be true even on the most modern analytical systems. This is not a matter of exhausting available compute in the system, it is a logical blocking that many data warehouse systems apply when a table is being updated.
  • Joins use memory and when they run out of memory they spill to disk. Any such spilling will drag query performance down by orders of magnitude. Larger memory machines can eliminate some of the spilling but they cost more money and may not be as efficient as smaller machines.
  • And lastly — gasp! — a smaller query performs better than a larger one.

Let me explain the last one as it seems like I stated something ridiculously obvious. But this is Big Data, and things aren’t as simple as they seem. What follows isn’t exact math but an illustration. Suppose we have 2 tables, A and B, each with a billion rows, joined together on a unique key. As each row is joined, the intermediate results go first into memory, and then, if no memory is left, they are spilled to disk. Since by definition, a single query can only use one compute cluster, we either need to get a very, very, very large cluster, or… the query will likely start spilling and slowing down. Contrast this with a smaller query, e.g. joining two 10 million row tables — a lot less or no spilling and thus much better performance. Spilling is a major problem in Big Data processing, it occurs on joins, sorts, group bys, etc.

But what do you do if you actually need to process a billion row table? You partition your queries! In our example, we would have to run one hundred queries with ten million row tables instead of a single billion row join — and let’s see how this addresses the issues I listed above:

  • Each of the smaller queries will run much faster and deterministically complete. If it fails you just rerun it, without waiting for hours or days for the monster billion row one to finish.
  • You know exactly how far you’ve progressed as each smaller query progress is clearly marked.
  • The blocking is minimized.
  • The spilling is minimized or eliminated — performing much better, smaller compute can be utilized.
  • And lastly — in cloud computing you gain another massive advantage — you can use independent compute clusters for smaller queries running in parallel. In fact, you can run all 100 queries in parallel if you want to use 100 parallel compute clusters. So, rephrasing the last issue I listed above — a set of smaller queries in aggregate processing the same data space will run faster and more reliably than a single large query.

But, breaking large queries into smaller ones introduces quite a bit of complexity and is definitely not recommended unless you have to handle… you guessed it — Big Data. You need to be very careful, write potentially multi threaded or even multi process code, handle failures, retries, logging, etc. Some might say — maybe we should have just gotten a much bigger compute cluster. Trust me, that is not going to scale as well as your own partitioning.

With all that settled, the team started looking for ways to partition the updates. This ended up quite an interesting project, to be continued in Part 2 of the blog.

Thanks for reading, talk to you soon in Part 2.

--

--

Boris Serebrinskiy

I love sharing my skills and experiences. Happy to get on a good hike with my 2 sons, too bad they hate hiking. Wife loves dancing, and I'm not. Happy Family!