Even the most potent database is still prone to the terrifying error any database system can face: query bottlenecks.
Sometimes the systems that manage our queries can't process in the same way as we do. So our questions return accurate results as a result of extra processing power.
In this article, I'll present how Redshift query queues work. We'll also cover how concurrency works at a high level on Redshift.
This intel will become handy as we go further down the rabbit hole and oversee the deployment of a good—though sometimes threatening—friend, workload management (or WLM) on Redshift.
We'll finish with a quick conclusion of what we saw and some techniques for query optimizations.
Query optimizations are very handy when creating your objects on your Panoply's Cloud Data Platform—increasing your transformations' speed, making it possible to explore more data with less. In short, good old data frugality.
What is Redshift, and how does query processing work?
Redshift is Amazon's flavor of a massively parallel processing (or MPP) database system.
This kind of database is the top choice for storing data warehouses thanks to its capabilities of processing massive amounts of data in parallel. As for the other two major cloud providers, there's BigQuery on Google Cloud Platform and Synapse Analytics on Azure.
And if you're struggling with managing your data lake house with multiple clouds, you can do those tasks more easily with the help of Panoply's Redshift integration.
Now let's look more in-depth at the process of querying.
Query processing
Once the query is started (either by the console or by pragmatic access—and both API and CLI commands follow the same way), it generates a query plan, which is the query translation made by the parser while extracting the data from the nodes.
With this execution blueprint, we can start to inspect the bottlenecks on it.
Here's an example of an EXPLAIN
Redshift command:
explain select lastname, catname, venuename, venuecity, venuestate, eventname, month, sum(pricepaid) as buyercost, max(totalprice) as maxtotalprice from category join event on category.catid = event.catid join venue on venue.venueid = event.venueid join sales on sales.eventid = event.eventid join listing on sales.listid = listing.listid join date on sales.dateid = date.dateid join users on users.userid = sales.buyerid group by lastname, catname, venuename, venuecity, venuestate, eventname, month having sum(pricepaid)>9999 order by catname, buyercost desc;
From the EXPLAIN
results above, I want to identify the tables: category, venue, sales, listing, date, and users. Each one of them uses the INNER JOIN
clause.
Redshift's power relies on heavy processing, so the bigger those tables are, the better for you, computationally speaking.
But how can you know that? Going deeper, how can you identify the smaller tables for you to take the necessary metrics and measures so important for your KPIs?
Here's where the query plan comes in handy; below, you can see all the steps Redshift executes based on the SQL you wrote. So here's where Redshift tells you if what you wrote is what Redshift understood.
Here is the result of the above EXPLAIN
command:
--WARNING! ERRORS ENCOUNTERED DURING SQL PARSING!
--WARNING! ERRORS ENCOUNTERED DURING SQL PARSING! QUERY PLAN XN Merge (cost=1015345167117.54..1015345167544.46 rows=1000 width=103) Merge Key: category.catname, sum(sales.pricepaid) -> XN Network (cost=1015345167117.54..1015345167544.46 rows=170771 width=103) Send to leader -> XN Sort (cost=1015345167117.54..1015345167544.46 rows=170771 width=103) Sort Key: category.catname, sum(sales.pricepaid) -> XN HashAggregate (cost=15345150568.37..15345152276.08 rows=170771 width=103) Filter: (sum(pricepaid) > 9999.00) -> XN Hash Join DS_BCAST_INNER (cost=742.08..15345146299.10 rows=170771 width=103) Hash Cond: ("outer".catid = "inner".catid) -> XN Hash Join DS_BCAST_INNER (cost=741.94..15342942456.61 rows=170771 width=97) Hash Cond: ("outer".dateid = "inner".dateid) -> XN Hash Join DS_BCAST_INNER (cost=737.38..15269938609.81 rows=170766 width=90) Hash Cond: ("outer".buyerid = "inner".userid) -> XN Hash Join DS_BCAST_INNER (cost=112.50..3272334142.59 rows=170771 width=84) Hash Cond: ("outer".venueid = "inner".venueid) -> XN Hash Join DS_BCAST_INNER (cost=109.98..3167290276.71 rows=172456 width=47) Hash Cond: ("outer".eventid = "inner".eventid) -> XN Merge Join DS_DIST_NONE (cost=0.00..6286.47 rows=172456 width=30) Merge Cond: ("outer".listid = "inner".listid) -> XN Seq Scan on listing (cost=0.00..1924.97 rows=192497 width=14) -> XN Seq Scan on sales (cost=0.00..1724.56 rows=172456 width=24) -> XN Hash (cost=87.98..87.98 rows=8798 width=25) -> XN Seq Scan on event (cost=0.00..87.98 rows=8798 width=25) -> XN Hash (cost=2.02..2.02 rows=202 width=41) -> XN Seq Scan on venue (cost=0.00..2.02 rows=202 width=41) -> XN Hash (cost=499.90..499.90 rows=49990 width=14) -> XN Seq Scan on users (cost=0.00..499.90 rows=49990 width=14) -> XN Hash (cost=3.65..3.65 rows=365 width=11) -> XN Seq Scan on date (cost=0.00..3.65 rows=365 width=11) -> XN Hash (cost=0.11..0.11 rows=11 width=10) -> XN Seq Scan on category (cost=0.00..0.11 rows=11 width=10) XN Merge (cost=1015345167117.54..1015345167544.46 rows=1000 width=103) Merge Key: category.catname, sum(sales.pricepaid) -> XN Network (cost=1015345167117.54..1015345167544.46 rows=170771 width=103) Send to leader -> XN Sort (cost=1015345167117.54..1015345167544.46 rows=170771 width=103) Sort Key: category.catname, sum(sales.pricepaid) -> XN HashAggregate (cost=15345150568.37..15345152276.08 rows=170771 width=103) Filter: (sum(pricepaid) > 9999.00) -> XN Hash Join DS_BCAST_INNER (cost=742.08..15345146299.10 rows=170771 width=103) Hash Cond: ("outer".catid = "inner".catid) -> XN Hash Join DS_BCAST_INNER (cost=741.94..15342942456.61 rows=170771 width=97) Hash Cond: ("outer".dateid = "inner".dateid) -> XN Hash Join DS_BCAST_INNER (cost=737.38..15269938609.81 rows=170766 width=90) Hash Cond: ("outer".buyerid = "inner".userid) -> XN Hash Join DS_BCAST_INNER (cost=112.50..3272334142.59 rows=170771 width=84) Hash Cond: ("outer".venueid = "inner".venueid) -> XN Hash Join DS_BCAST_INNER (cost=109.98..3167290276.71 rows=172456 width=47) Hash Cond: ("outer".eventid = "inner".eventid) -> XN Merge Join DS_DIST_NONE (cost=0.00..6286.47 rows=172456 width=30) Merge Cond: ("outer".listid = "inner".listid) -> XN Seq Scan on listing (cost=0.00..1924.97 rows=192497 width=14) -> XN Seq Scan on sales (cost=0.00..1724.56 rows=172456 width=24) -> XN Hash (cost=87.98..87.98 rows=8798 width=25) -> XN Seq Scan on event (cost=0.00..87.98 rows=8798 width=25) -> XN Hash (cost=2.02..2.02 rows=202 width=41) -> XN Seq Scan on venue (cost=0.00..2.02 rows=202 width=41) -> XN Hash (cost=499.90..499.90 rows=49990 width=14) -> XN Seq Scan on users (cost=0.00..499.90 rows=49990 width=14) -> XN Hash (cost=3.65..3.65 rows=365 width=11) -> XN Seq Scan on date (cost=0.00..3.65 rows=365 width=11) -> XN Hash (cost=0.11..0.11 rows=11 width=10) -> XN Seq Scan on category (cost=0.00..0.11 rows=11 width=10)
Now that we know how to create the query plan, we can go deeper into query optimization—which is nothing more than refactoring your queries to lower the processing costs described by the query plan steps.
What is workload management, or WLM?
One of the faster ways of managing your query workflows is called workload management. With this feature on, you won't sacrifice being able to answer quick questions due to long-running processes, as it enables flexibility while managing your workloads.
Let's imagine the following scenario:
- Your lead data scientist is deploying some machine learning models to detect possible fraudulent activities.
- Those activities need to be cross-referenced with the geographical location of the last transactions.
- Then those chaotic independent microservices start to run on your Redshift clusters at the exact time that your KPIs trigger new processes on the same Redshift cluster.
Fun times, right?
WLM comes to the rescue, as it creates what is called "query queues" at runtime. WLM groups these queues by a query group label defined by the user before the query execution.
These queues have concurrency levels, meaning the number of workloads started at the same time.
Let's get used to WLM
WLM comes with two types of implementations, the automatic and the manual.
Below I want to share the helpful system tables and views that should be used as starting points when needing to enhance, or simply audit, your WLM workloads.
- STL_WLM_ERROR
- STL_WLM_QUERY
- STV_WLM_CLASSIFICATION_CONFIG
- STV_WLM_QUERY_QUEUE_STATE
- STV_WLM_QUERY_STATE
- STV_WLM_QUERY_TASK_STATE
- STV_WLM_SERVICE_CLASS_CONFIG
- STV_WLM_SERVICE_CLASS_STATE
The do's and don'ts of concurrency scaling
Once you have your WLM turned on, you enable one handy feature used on massive processing: concurrency scaling—the gains from the writing process into the target for consistency on higher throughput with multiple sessions requests.
Still, when the feature is active, concurrency scaling is applied for both read and write operations—supporting SQL DML statements (INSERT, DELETE, UPDATE, and Redshift's COPY), as well as the CREATE statement.
The more critical limitations from concurrency scaling are its limitation of ANALYZE for the COPY commands and not being able to use COPY from Redshift Spectrum or when you're querying data from your HDFS storage in your EMR clusters. Being that the COPY command is the suggested option to import massive data into native Redshift tables, not running it in parallel can limit the utilization of the COPY command for your use case.
It's also relevant to remember that you need to confirm if the region where your Redshift cluster resides has the concurrency scaling feature; for a quick reference, take a look here.
Change the "Concurrency Scaling Mode" option to auto on your WLM queue, and by doing so, you'll enable the routing of your queries into the concurrency scaling clusters.
Getting statistics for your WLM
With your WLM enabled and your queues with the "Concurrency Scaling Mode" turned to auto, you can track what's going on in your cluster.
To do so, you can go to your query editor of choice or the Redshift console and execute the following query:
Query for monitoring concurrent queues
--WARNING! ERRORS ENCOUNTERED DURING SQL PARSING!
--WARNING! ERRORS ENCOUNTERED DURING SQL PARSING! SELECT w.service_class AS queue , q.concurrency_scaling_status , COUNT( * ) AS queries , SUM( q.aborted ) AS aborted , SUM( ROUND( total_queue_time::NUMERIC / 1000000,2 ) ) AS queue_secs , SUM( ROUND( total_exec_time::NUMERIC / 1000000,2 ) ) AS exec_secs FROM stl_query q JOIN stl_wlm_query w USING (userid,query) WHERE q.userid > 1 AND q.starttime > '2019-01-04 16:38:00' AND q.endtime < '2019-01-04 17:40:00' GROUP BY 1,2 ORDER BY 1,2;
The query results will provide you with a full spectrum of what's happening on your cluster, granting all the necessary information from the Redshift cluster management perspective. All this information is helpful when investigating which sessions are active or not on your sets.
In addition to the metrics collected by AWS CloudWatch and AWS CloudTrail, you'll have a fully compliant environment. All of it is using native AWS services, saving you some extra headaches.
This configuration will work as a consistency layer in addition to your well-managed and mature data pipelines with the help of Panoply tailored for your analytical needs.
Conclusion
In this article, we oversaw how concurrency on Redshift works. We also looked at how to analyze and improve queries: making them faster while consuming less processing power (you can interpret this as "making your cloud bill smaller").
Just remember the mentioned control tables and views, and you'll be all set. Mastering them will also help while verifying any conflicts with the tables on the query.
It's also a good idea to run the ANALYZE and VACUUM commands periodically.
For further help transforming your data, you can reach out to us with any questions you have and even discover new info you weren't aware of with a better insight from better understanding your data.