{"id":2015,"date":"2016-01-11T23:39:04","date_gmt":"2016-01-11T23:39:04","guid":{"rendered":"http:\/\/www.doanduyhai.com\/blog\/?p=2015"},"modified":"2017-08-10T16:45:31","modified_gmt":"2017-08-10T16:45:31","slug":"cassandra-user-defined-aggregates-in-action-best-practices-and-caveats","status":"publish","type":"post","link":"https:\/\/www.doanduyhai.com\/blog\/?p=2015","title":{"rendered":"Cassandra User Defined Aggregates in action: best practices and caveats"},"content":{"rendered":"<p>In an <strong><a href=\"https:\/\/www.doanduyhai.com\/blog\/?p=1876\" title=\"Cassandra UDF &#038; UDA\" target=\"_blank\">earlier post<\/a><\/strong>, I presented the new UDF &#038; UDA features introduced by Cassandra 2.2. In this blog post, we&#8217;ll play with UDA and see how it can be leveraged for analytics use-cases and all the caveats to avoid.<\/p>\n<p> Recently, there was a <strong><a href=\"https:\/\/www.mail-archive.com\/user@cassandra.apache.org\/msg45202.html\" title=\"Cassandra 3.1 - Aggregation query failure\" target=\"_blank\">discussion<\/a><\/strong> on the Cassandra mailing list about an user having time out with UDA. After some exchanges with the devs, I decided to start the following test scenarios to confirm an intuition of mine.<\/p>\n<blockquote><p>For the remaining of this post Cassandra == Apache Cassandra\u2122<\/p><\/blockquote>\n<h1>Scope of the tests<\/h1>\n<p> We need to define clearly the scope of our tests. The idea is not to tests every possible use-cases but only some common use-cases to confirm or infirm some initial assumptions.<\/p>\n<p> The hardware setup:<\/p>\n<ul>\n<li><strong>Cassandra 3.1.1<\/strong>: 1 MacBookPro 15&#8243; 16Gb RAM SSD<\/li>\n<li>Test client: 1 MacBookPro 13&#8243; 16Gb RAM SSD, <strong>Java Driver 3.0.0-rc1<\/strong><\/li>\n<li>Boths machines connected through a 1Gb network router<\/li>\n<\/ul>\n<p> The data set:<\/p>\n<ul>\n<li>Test table\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE TABLE IF NOT EXISTS test_uda.sensor(\r\n    sensor_id int,\r\n    time bigint,\r\n    value double,\r\n    PRIMARY KEY(sensor_id, time)\r\n);\r\n<\/pre>\n<p> The table structure is very simple, on purpose, so that we can equate 1 CQL row to 1 physical column on disk\n <\/li>\n<li>Insert 10 x 10<sup>6<\/sup> data points in a <strong>single partition<\/strong> (timeseries use-case)<\/li>\n<\/ul>\n<p> The test protocol:<\/p>\n<ol>\n<li>Insert 10 x 10<sup>6<\/sup> data points into the table. <strong>sensor_id<\/strong>=10 (fixed value) and <strong>time<\/strong> varies from 1 to 10 x 10<sup>6<\/sup>. The double <strong>value<\/strong> column is randomized between 0 and 100<\/li>\n<li><strong>Nodetool flush<\/strong> to force a flush to disk of all data in memtable<\/li>\n<li>Start querying the data using the standard <strong>avg()<\/strong> aggregate on the partition, with different query parameters and time ranges. Consistency level is set to <strong>ONE<\/strong><\/li>\n<\/ol>\n<p> The current test only focus on <strong>single partition aggregation query<\/strong> scenario but we&#8217;ll also discuss multiple partitions queries.<\/p>\n<p> The source code of the test project can be found on GitHub <strong><a href=\"https:\/\/github.com\/doanduyhai\/cassandra-UDA\" title=\"Cassandra UDA Tests\" target=\"_blank\">here<\/a><\/strong><\/p>\n<h1>Tests results<\/h1>\n<h3>A First raw results<\/h3>\n<p>First, I used the command line <strong>cqlsh<\/strong> tool <strong>on the Cassandra server itself<\/strong> (so local access, no network) to perform some initial checks, below are the raw results:<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\ncqlsh:test_uda&gt; select avg(value) from sensor WHERE sensor_id=10;\r\nOperationTimedOut: errors={}, last_host=192.168.1.17\r\n\r\ncqlsh:test_uda&gt; select avg(value) from sensor WHERE sensor_id=10 AND time&gt;=1 AND time&lt;=1000000;\r\n\r\n system.avg(value)\r\n-------------------\r\n          49.95645\r\n\r\n(1 rows)\r\n\r\ncqlsh:test_uda&gt; select avg(value) from sensor WHERE sensor_id=10 AND time&gt;=1 AND time&lt;=2000000;\r\n\r\n system.avg(value)\r\n-------------------\r\n          49.99266\r\n\r\n(1 rows)\r\n\r\ncqlsh:test_uda&gt; select avg(value) from sensor WHERE sensor_id=10 AND time&gt;=1 AND time&lt;=3000000;\r\nOperationTimedOut: errors={}, last_host=192.168.1.17\r\n<\/pre>\n<p> If we want to aggregate on the whole wide partition, the query times out. By reducing the time range to the interval <strong>[1, 10<sup>6<\/sup>]<\/strong>, it works. But then the query fails when we go beyond <strong>3 x 10<sup>6<\/sup><\/strong> columns.<\/p>\n<p>Why did the query fail ? That&#8217;s an interesting question&#8230;<\/p>\n<h3>B Internal query handling<\/h3>\n<p> My first guess was that the server had to load the entire <strong>3 x 10<sup>6<\/sup><\/strong> columns into the JVM heap and it took time, so the query timed out (<em>read_request_timeout_in_ms = 5000<\/em> by default in <strong>cassandra.yaml<\/strong>)<\/p>\n<p> But <strong>Tyler Hoobs<\/strong> has said on the mailing list that Cassandra is clever enough to execute the read <strong>by page<\/strong> and does not load the entire <strong>3 x 10<sup>6<\/sup><\/strong> data set in memory at once. Furthermore, the <em>read_request_timeout_in_ms<\/em> parameter does apply <strong>on each page<\/strong>, but not on the whole aggregation request, so the query should have finished successfully but it didn&#8217;t.<\/p>\n<p> My intuition is that it has failed because of client-side timeout settings. So I decided to extend the <em>ReadTimeoutMillis<\/em> property of the Java driver. The default value is <strong>12 000 (12 secs)<\/strong> and I changed it to <strong>200 000 (200 secs)<\/strong>, a ridiculously huge value to exclude any possible client-side timeout issue.<\/p>\n<p> With this special timeout, the <strong>SELECT avg(value) FROM sensor WHERE sensor_id=10<\/strong> query returned a result after \u2248 13 secs (it would have timed out with the default setting).<\/p>\n<p> <strong>The conclusion is: from the client-side, timeout settings should be tuned specifically for aggregation queries.<\/strong><\/p>\n<p> The default 12 secs timeout value, although perfectly reasonable for real time OLTP queries, is too restrictive for aggregation queries that require fetching a great amount of data server-side. 13 seconds to aggregate <strong>10 x 10<sup>6<\/sup><\/strong> columns is not surprising and quite expected indeed (read througput \u2248 800 000 columns\/sec).<\/p>\n<p> I&#8217;ve created the JIRA <strong><a href=\"https:\/\/datastax-oss.atlassian.net\/browse\/JAVA-1033\" title=\"Allow setting socket timeout on the query level for UDA queries\" target=\"_blank\">JAVA-1033<\/a><\/strong> on the Java driver side to allow <strong>setting timeout per query<\/strong> and not globally.<\/p>\n<h3>C Parameters impacting aggregation performance<\/h3>\n<p> In addition to the timeout setting, the performance of an aggregation query also depends on:<\/p>\n<ul>\n<li><strong>Consistency Level<\/strong>: using consistency level > <strong>ONE<\/strong> will requires the coordinator to ask for the data from different replicas, and this <strong>for each page<\/strong>. It means that <em>the global query performance is bound to the slowest replica read performance<\/em><\/li>\n<li><strong>Paging size<\/strong>: setting an appropriate paging size can help grabbing a few precious seconds on the overall query duration. See chapter D below for more detais<\/li>\n<li><strong>Read Repair<\/strong>: when using consistency level > <strong>ONE<\/strong>, if the data are not synced between the replicas, Cassandra will trigger read-repair <strong>for each page of data<\/strong>. This read-repair cost will add up to the global query time and you need to take this factor into account<\/li>\n<li><strong>Partition(s) count<\/strong>: generally single partition aggregations grant the best performance for obvious reason but it does make sense to perform an aggregation on multiple partitions or on the whole table. After all, UDA has been designed for this scenario in mind. In this case, you should not be surprised that your aggregation takes a while to complete and you should extend the client timeout accordingly. <strong>Please note that read-repair does not trigger for multi-partitions queries<\/strong><\/li>\n<li><strong>Amount of fetched data<\/strong>: this parameter is pretty obvious but is worth mentioning. <strong>Aggregations in Cassandra are not distributed<\/strong>, all data are fetched on the <strong>coordinator<\/strong> before applying any aggregation function. This is necessary because of the eventual consistency architecture and last write win reconciliation model. If you can compute your aggregation on chunks of data (in other words, if your aggregation algorithm is commutative and associative), you can perform aggregation on pages of data and apply the algorithm on intermediate results<\/li>\n<\/ul>\n<h3>D Which page size to choose ?<\/h3>\n<blockquote><p>SELECT avg(value) FROM test_uda.sensor WHERE sensor_id=10<\/p><\/blockquote>\n<p> I&#8217;ve done some test scenarios with different page sizes. When tracing is enabled, Cassandra will give you the time spent for each aggregation:<\/p>\n<table>\n<thead>\n<tr>\n<th>Page size<\/th>\n<th>Query time (\u00b5s)<\/th>\n<\/tr>\n<\/thead>\n<tbody>\n<tr>\n<td>100<\/td>\n<td>41 589 557 \u00b5s<\/td>\n<\/tr>\n<tr>\n<td>500<\/td>\n<td>17 890 618 \u00b5s<\/td>\n<\/tr>\n<tr>\n<td>1000<\/td>\n<td>15 024 828 \u00b5s<\/td>\n<\/tr>\n<tr>\n<td>10000<\/td>\n<td>13 476 753 \u00b5s<\/td>\n<\/tr>\n<tr>\n<td>50000<\/td>\n<td>11 200 027 \u00b5s<\/td>\n<\/tr>\n<tr>\n<td>80000<\/td>\n<td>11 008 073 \u00b5s<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<p> A graph illustrates it better:<\/p>\n<div id=\"attachment_2028\" style=\"width: 1665px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsPageSize.png\"><img aria-describedby=\"caption-attachment-2028\" loading=\"lazy\" src=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsPageSize.png\" alt=\"Query Time vs Page Size\" width=\"1655\" height=\"1019\" class=\"size-full wp-image-2028\" srcset=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsPageSize.png 1655w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsPageSize-300x185.png 300w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsPageSize-1024x630.png 1024w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsPageSize-1170x720.png 1170w\" sizes=\"(max-width: 1655px) 100vw, 1655px\" \/><\/a><p id=\"caption-attachment-2028\" class=\"wp-caption-text\">Query Time vs Page Size<\/p><\/div>\n<p> The query time has a logarithmic decrease with regard to page size. The optimal page size in our example is 50 000.<\/p>\n<p> I also graph the impact of number of retrieved columns on the query time using page size 50 000:<\/p>\n<blockquote><p>SELECT avg(value) FROM test_uda.sensor WHERE sensor_id=10 AND time>=? AND time<=?<\/p><\/blockquote>\n<div id=\"attachment_2031\" style=\"width: 1710px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsColumnsCount.png\"><img aria-describedby=\"caption-attachment-2031\" loading=\"lazy\" src=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsColumnsCount.png\" alt=\"Query Time vs Columns Count\" width=\"1700\" height=\"1063\" class=\"size-full wp-image-2031\" srcset=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsColumnsCount.png 1700w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsColumnsCount-300x188.png 300w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsColumnsCount-1024x640.png 1024w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/QueryTimevsColumnsCount-1170x732.png 1170w\" sizes=\"(max-width: 1700px) 100vw, 1700px\" \/><\/a><p id=\"caption-attachment-2031\" class=\"wp-caption-text\">Query Time vs Columns Count<\/p><\/div>\n<p> As expected, the query time grows linearly with the number of fetched columns.<\/p>\n<p> One interesting notice, the query <strong>SELECT avg(value) FROM test_uda.sensor;<\/strong> whose result is strictly identical to <strong>SELECT avg(value) FROM test_uda.sensor WHERE sensor_id=10;<\/strong> because the table has only 1 partition, is always longer by 1 second. This extra cost may be related to range scan on all partitions.<\/p>\n<h3>E Some warnings<\/h3>\n<blockquote><p> The above tests are very simples and are not meant to cover all possible use-cases. They have been designed to confirm my initial intuition about the need to extend timeout value from the driver side.\n<\/p><\/blockquote>\n<p> The aggregation function in use, <strong>avg()<\/strong>, is very simple. I did not discuss in this blog post the impact of the aggregation function itself on performance and stability of the cluster.<\/p>\n<p> Indeed, imagine you design your own aggregate function in which you accumulate lots of data into the state object of the aggregate. This will have a significant, if not terrible, impact on the coordinator node. <strong>Keeping a great amount of data in the Java heap may lead to early promotion of those into the heap old generation and may trigger the dreadful stop-the-world full GC cycles&#8230;<\/strong><\/p>\n<h1>Apache Spark or UDA ?<\/h1>\n<h3>Choice matrix<\/h3>\n<p> Many users are hesitating between using <strong>Cassandra 3.0 UDA<\/strong> or <strong>Apache Spark<\/strong> for their analytics and aggregation use-cases. Each method has its advantages and drawbacks summarized in the below table:<\/p>\n<table>\n<thead>\n<tr>\n<th>Used consistency level<\/th>\n<th>Single\/Multi partitions operation<\/th>\n<th>Recommended approach<\/th>\n<\/tr>\n<\/thead>\n<tbody>\n<tr>\n<td><strong>ONE<\/strong><\/td>\n<td>Single partition<\/td>\n<td>C* UDA with token aware driver because the aggregation is performed directly in the datastore and is node-local<\/td>\n<\/tr>\n<tr>\n<td><strong>ONE<\/strong><\/td>\n<td>Multiple partitions<\/td>\n<td>Spark because the read operation is done in parallel on all nodes. Spark will apply the aggregation in memory. Also this method does not put heavy load on the coordinator<\/td>\n<\/tr>\n<tr>\n<td><strong>&gt; ONE<\/strong><\/td>\n<td>Single partition<\/td>\n<td>C* UDA. Data locality is no longer guaranteed by the Spark connector because of the consistency level and you&#8217;ll pay the extra cost of fetching all C* data into Spark memory for applying aggregation instead of applying it directly in the datastore layer<\/td>\n<\/tr>\n<tr>\n<td><strong>&gt; ONE<\/strong><\/td>\n<td>Multiple partitions<\/td>\n<td>This is the worst scenario. In this case, my intuition is that Spark will be a better choice than C* UDA. See explanation below<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<h3>Why Apache Spark seems better for full table aggregation ?<\/h3>\n<p>Let&#8217;s take the worst case scenario. Imagine you need to compute the average value on all your sensors, using consistency level <strong>QUORUM<\/strong> (for <strong>RF=3<\/strong>). <\/p>\n<blockquote><p>SELECT avg(value) FROM sensor;<\/p><\/blockquote>\n<p>It means that for each CQL row, Cassandra will need <strong>2 copies of data out of 3 replicas<\/strong>. More precisely it will need <strong>1 copy of the data + 1 digest of the data<\/strong>. <\/p>\n<p> Since the Spark\/Cassandra connector creates Spark partitions that map to Cassandra token ranges for data locality, the data fetching is done on every node. <strong>Each node acts as the coordinator for its primary token range and only needs to send 1 digest request to one of its replica<\/strong>. Of course, this process will be done page by page so for a given token range, there will be <em>page_count<\/em> number of digest requests. We suppose for the sake of simplicity that we&#8217;re using <strong>fixed token ranges<\/strong> and not virtual nodes. Below is how data are fetched using the connector:<\/p>\n<div id=\"attachment_2046\" style=\"width: 955px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithSpark.png\"><img aria-describedby=\"caption-attachment-2046\" loading=\"lazy\" src=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithSpark.png\" alt=\"Aggregation With Spark\" width=\"945\" height=\"435\" class=\"size-full wp-image-2046\" srcset=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithSpark.png 945w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithSpark-300x138.png 300w\" sizes=\"(max-width: 945px) 100vw, 945px\" \/><\/a><p id=\"caption-attachment-2046\" class=\"wp-caption-text\">Aggregation With Spark<\/p><\/div>\n<p>Please note that the digest requests and local data read are done <strong>in parallel<\/strong> on every Cassandra nodes.<\/p>\n<p> If we were to rely on Cassandra UDA for this scenario, there would be a single coordinator which will:<\/p>\n<ul>\n<li><strong>For each page of data<\/strong>:\n<ol>\n<li>request real data from the fastest replica (relying on dynamic snitch)<\/li>\n<li>request digest from another replica<\/li>\n<\/ol>\n<\/li>\n<\/ul>\n<div id=\"attachment_2047\" style=\"width: 955px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithUDA.png\"><img aria-describedby=\"caption-attachment-2047\" loading=\"lazy\" src=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithUDA.png\" alt=\"Aggregation With C* UDA\" width=\"945\" height=\"435\" class=\"size-full wp-image-2047\" srcset=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithUDA.png 945w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2016\/01\/AggregationWithUDA-300x138.png 300w\" sizes=\"(max-width: 945px) 100vw, 945px\" \/><\/a><p id=\"caption-attachment-2047\" class=\"wp-caption-text\">Aggregation With C* UDA<\/p><\/div>\n<p> The drawbacks are pretty clear:<\/p>\n<ul>\n<li><strong>Lower parallelism<\/strong> because there is only 1 <strong>coordinator<\/strong> to process all token ranges<\/li>\n<li><strong>Raw data<\/strong> and digest need to be moved on the network<\/li>\n<li><strong>High pressure<\/strong> on the <strong>coordinator<\/strong><\/li>\n<\/ul>\n<p> In a nutshell, for <strong>multi partitions aggregations<\/strong>, Spark seems to offer a better alternative <strong>if you have enough memory to process data on each node<\/strong>. If the local data set on each node largely exceeds available system memory, all bets are off.<\/p>\n<blockquote><p><strong>WARNING<\/strong>: all the above reasoning is only an intuition based on the actual architecture of Cassandra UDA and Spark\/Cassandra connector implementation. Proper benchmarks will real massive data is required to validate this intuition. <strong>Do not take this reasoning as granted<\/strong>\n<\/p><\/blockquote>\n<p><strong>In a near future, when the Spark\/Cassandra connector will be able to push down UDF and UDA into Cassandra, we&#8217;ll get the best of both world<\/strong>. Right now, just use C* UDA carefully.<\/p>\n<p>And that&#8217;s all for today. I hope you enjoy those little benchmarks of mine. Remarks and comments are welcomed.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In an earlier post, I presented the new UDF &#038; UDA features introduced by Cassandra 2.2. In this blog post, we&#8217;ll play with UDA and see how it can be leveraged for analytics use-cases and all the caveats to avoid&#8230;.<br \/><a class=\"read-more-button\" href=\"https:\/\/www.doanduyhai.com\/blog\/?p=2015\">Read more<\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[57,10],"tags":[],"_links":{"self":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/2015"}],"collection":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=2015"}],"version-history":[{"count":36,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/2015\/revisions"}],"predecessor-version":[{"id":13202,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/2015\/revisions\/13202"}],"wp:attachment":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=2015"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=2015"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=2015"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}