{"id":1876,"date":"2015-12-13T22:36:58","date_gmt":"2015-12-13T22:36:58","guid":{"rendered":"http:\/\/www.doanduyhai.com\/blog\/?p=1876"},"modified":"2017-08-10T16:45:48","modified_gmt":"2017-08-10T16:45:48","slug":"cassandra-udfuda-technical-deep-dive","status":"publish","type":"post","link":"https:\/\/www.doanduyhai.com\/blog\/?p=1876","title":{"rendered":"Cassandra UDF\/UDA Technical Deep Dive"},"content":{"rendered":"<p>In this blog post, we&#8217;ll review the new <strong>User-Defined Function<\/strong> (UDF) and <strong>User-Defined Aggregate<\/strong> (UDA) feature and look into their technical implementation.<\/p>\n<blockquote><p>For the remaining of this post Cassandra == Apache Cassandra\u2122<\/p><\/blockquote>\n<hr\/>\n<p> The <strong>UDF\/UDA<\/strong> feature has been first premiered at <strong>Cassandra Summit Europe 2014<\/strong> in London. Robert Stupp presented the new features, his slides are available <a href=\"http:\/\/www.slideshare.net\/RobertStupp\/user-definedfunctionscassandrasummiteu2014\" title=\"User Defined Functions\" target=\"_blank\">here<\/a>.<\/p>\n<p> The feature has been released with <strong>Cassandra 2.2<\/strong> and improved by <strong>Cassandra 3.0<\/strong><\/p>\n<h1>I User Defined Function (UDF) <\/h1>\n<h3>A. Rationale<\/h3>\n<p>The idea behind <strong>UDF<\/strong> and <strong>UDA<\/strong> is to push computation to server-side. On small cluster of less than 10~20 machines, either you retrieve all the data and apply the aggregation at the client-side or you push computation to server-side may not yield a huge gain in term of performance. But on big cluster with 100+ node, the gain is real.<\/p>\n<p>Pushing computation server-side helps:<\/p>\n<ol>\n<li>saving network bandwidth<\/li>\n<li>simplifying client-code<\/li>\n<li>accelerating analytics use-case (<strong>pre-aggregation for Spark<\/strong>)<\/li>\n<\/ol>\n<h3>B. UDF Syntax <\/h3>\n<p>The standard syntax to create an <strong>UDF<\/strong> is<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\n    CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS]\r\n    [keyspace.]functionName (param1 type1, param2 type2, \u2026)\r\n    CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT\r\n    RETURN returnType\r\n    LANGUAGE language  \r\n    AS '\r\n           \/\/ source code here\t\r\n    ';\r\n<\/pre>\n<ul>\n<li>You can use <strong>CREATE OR REPLACE FUNCTION<\/strong> or <strong>CREATE FUNCTION IF NOT EXISTS<\/strong> but it is forbidden to use <strong>CREATE OR REPLACE FUNCTION IF NOT EXISTS<\/strong>. Indeed <strong>REPLACE<\/strong> and <strong>IF NOT EXISTS<\/strong> clauses are mutually exclusive.<\/li>\n<li>The scope of an <strong>UDF<\/strong> is <strong>keyspace-wide<\/strong>. Therefore you can add a <em>keyspace prefix<\/em> to an <strong>UDF<\/strong> name.<\/li>\n<li>Each <strong>UDF<\/strong> accepts a list of arguments of the form <em>param <sub>1<\/sub> type <sub>1<\/sub>, param <sub>2<\/sub> type <sub>2<\/sub> &#8230;<\/em> It is very similar to a Java method signature definition.<\/li>\n<li>There are 2 ways to treat null input. If you use the <strong>CALLED ON NULL INPUT<\/strong> clause, the <strong>UDF<\/strong> will always be called no matter the input values. <strong>Consequently you need to perform null check in the code of your UDF<\/strong>.\n<li>If <strong>RETURNS NULL ON NULL INPUT<\/strong> is used, Cassandra will skip the <strong>UDF<\/strong> execution and return <em>null<\/em> <strong>if any of the input argument is null<\/strong>.<\/li>\n<li>You need to specify the return type of your <strong>UDF<\/strong>. Please note that the return type as well as input parameters type should be <strong>valid CQL types (primitive, collection, tuples or UDT)<\/strong><\/li>\n<li>For the language used for your <strong>UDF<\/strong>, the possible values are:\n<ul>\n<li><strong>Java<\/strong><\/li>\n<li><strong>Javascript<\/strong> (using the NashHorn engine integrated in Java 8)<\/li>\n<li><strong>Groovy<\/strong><\/li>\n<li><strong>Scala<\/strong><\/li>\n<li><strong>JRuby<\/strong><\/li>\n<li><strong>JPython<\/strong><\/li>\n<li><strong><del>Clojure<\/del><\/strong> (because of a bug in JSR-223)<\/li>\n<\/ul>\n<\/li>\n<li>In the source code block, you should escape simple quote whenever possible. Optionally, the code block can use double dollar ($$) as separator instead of simple quote (&#8216;):<br \/>\n  AS $$return Math.max(currentVal, nextVal);$$;\n <\/li>\n<\/ul>\n<h3>C. Demo example<\/h3>\n<p> Let&#8217;s create a <strong>UDF<\/strong> <em>maxOf(currentValue int, testValue int)<\/em> to take the max of two integers.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\n    CREATE FUNCTION maxof(currentvalue int, testvalue int)\r\n    RETURNS NULL ON NULL INPUT\r\n    RETURNS int\r\n    LANGUAGE java\r\n    AS 'return Math.max(currentvalue,testvalue);';\r\n<\/pre>\n<p> Let&#8217;s create a test table and insert some sample data<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE TABLE test (\r\n    id int,\r\n    val1 int,\r\n    val2 int,\r\n    PRIMARY KEY(id)\r\n);\r\n\r\nINSERT INTO test(id, val1, val2) VALUES(1, 100, 200);\r\nINSERT INTO test(id, val1) VALUES(2, 100);\r\nINSERT INTO test(id, val2) VALUES(3, 200);\r\n<\/pre>\n<p>We use our <em>maxOf<\/em> <strong>UDF<\/strong> to obtain the max value between <em>val1<\/em> and <em>val2<\/em><\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nSELECT id,val1,val2,maxOf(val1,val2) FROM test WHERE id IN(1,2,3);\r\n\r\n id | val1 | val2 | udf.maxof(val1, val2)\r\n----+------+------+-----------------------\r\n  1 |  100 |  200 |                   200\r\n  2 |  100 | null |                  null\r\n  3 | null |  200 |                  null\r\n<\/pre>\n<p> As you can see, whenever the input argument is <strong>null<\/strong>, the <strong>UDF<\/strong> just returns <strong>null<\/strong>, thus respecting the contract <strong>RETURNS NULL ON NULL INPUT<\/strong><\/p>\n<p> Now let&#8217;s change the clause <strong>RETURNS NULL ON NULL INPUT<\/strong> to <strong>CALLED ON NULL INPUT<\/strong><\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE OR REPLACE FUNCTION maxOf(current int, testvalue int) \r\nCALLED ON NULL INPUT \r\nRETURNS int \r\nLANGUAGE java \r\nAS $$return Math.max(current,testvalue);$$;\r\n\r\n&gt;InvalidRequest: code=2200 [Invalid query] \r\nmessage=&quot;Function udf.maxof : \r\n(int, int) -&gt; int can only be replaced with CALLED ON NULL INPUT&quot;\r\n<\/pre>\n<p>Interestingly, once you chosed <strong>RETURNS NULL ON NULL INPUT<\/strong>, it is no longer possible to modify it. We&#8217;ll need to drop and recreate the function<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nDROP FUNCTION maxOf;\r\nCREATE OR REPLACE FUNCTION maxOf(current int, testvalue int) \r\nCALLED ON NULL INPUT \r\nRETURNS int \r\nLANGUAGE java \r\nAS $$return Math.max(current,testvalue);$$;\r\n<\/pre>\n<p>Then let&#8217;s use it again:<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nSELECT id,val1,val2,maxOf(val1,val2) FROM test WHERE id IN(1,2,3);\r\n\r\n\r\n&gt; FunctionFailure: code=1400 [User Defined Function failure] \r\nmessage=&quot;execution of 'udf.maxof[int, int]' \r\nfailed: java.lang.NullPointerException&quot;\r\n<\/pre>\n<p>The error is expected because we did not have any null check in our UDF.<\/p>\n<h1>II User Defined Aggregate (UDA)<\/h1>\n<p> Now let&#8217;s have a closer look at <strong>UDA<\/strong>. The standard creation syntax is:<\/p>\n<h3>A. UDA Syntax<\/h3>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE [OR REPLACE] AGGREGATE [IF NOT EXISTS]\r\naggregateName(type1, type2, \u2026)\r\nSFUNC accumulatorFunction\r\nSTYPE stateType\r\n[FINALFUNC finalFunction]\r\nINITCOND initCond;\r\n<\/pre>\n<ul>\n<li>This time, <strong>it is not allowed to prefix the UDA name with a keyspace<\/strong>. It is <strong>intentional<\/strong> because you are not allowed to use <strong>UDF<\/strong> cross the keyspace barrier. <strong>In other words you need to be logged into a keyspace to create an UDA<\/strong>.<\/li>\n<li>The <strong>UDA<\/strong> signature is very particular. It only accepts a <strong>list of types<\/strong>, not parameter name.<\/li>\n<li>The <strong>SFUNC<\/strong> clause should points to an <strong>UDF<\/strong> playing the role of <em>accumulator function<\/em><\/li>\n<li>The <strong>STYPE<\/strong> clause indicates the type of the state to be passed to the <strong>SFUNC<\/strong> for accumulation. It is also the <strong>returned type<\/strong> of the <strong>UDA<\/strong> if no <strong>FINALFUNC<\/strong> is specified<\/li>\n<li>You can optionally provide a <strong>FINALFUNC<\/strong> to apply a final processing to the <strong>state<\/strong> before returning the result<\/li>\n<li><strong>INITCOND<\/strong> is the initial value of the <strong>state<\/strong>. It is optional. This initial state value should be given using CQL syntax<\/li>\n<li>As for <strong>UDF<\/strong>, all types used in <strong>UDA<\/strong> definition should be <strong>CQL-compatible types<\/strong>.<\/li>\n<\/ul>\n<p><strong>UDA<\/strong> declaration is very different from <strong>UDF<\/strong> declaration and needs some clarification.<\/p>\n<p>First the signature for input argument(s) only specifies the types, no parameter name. It is because there is an <strong>implicit constraint between the UDA signature and its SFUNC signature<\/strong>:<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE FUNCTION accumulatorFunction(state &lt;state_type&gt;, param1 type1, param2 type2, ...)\r\n...\r\nRETURNS &lt;state_type&gt;\r\n...;\r\n\r\nCREATE AGGREGATE uda(type1, type2, ...)\r\n...\r\nSFUNC accumulatorFunction\r\nSTYPE &lt;state_type&gt;\r\n...;\r\n<\/pre>\n<p>The <em>accumulatorFunction<\/em> input types should be the same as the <strong>UDA<\/strong>,  <strong>prepended by the state type (STYPE)<\/strong>. The <em>accumulatorFunction<\/em> returned type should be <strong>STYPE<\/strong>.<\/p>\n<p>The optional <strong>FINALFUNC<\/strong>, <em>if provided<\/em>, should have <strong>STYPE as the only input type<\/strong> and can return <strong>any type<\/strong>.<\/p>\n<p>Consequently, the return type of an <strong>UDA<\/strong> is either the <strong>STYPE<\/strong> if no <strong>FINALFUNC<\/strong> is specified, or is the return type of the <strong>FINALFUNC<\/strong>. <\/p>\n<p>Complicated ? Let&#8217;s clarify it with an example:<\/p>\n<h3>B. UDA Example<\/h3>\n<p>We have a counter table that keeps records of number of sold items for each category, each day and each shop<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE TABLE sales_items(shop_id text, day int, category text, count counter, PRIMARY KEY((shop_id),day,category));\r\n\r\nUPDATE sales_items SET count=count+1300 WHERE shop_id='BestDeals' AND day=20151221 AND category='Books';\r\nUPDATE sales_items SET count=count+5000 WHERE shop_id='BestDeals' AND day=20151221 AND category='Movies';\r\nUPDATE sales_items SET count=count+3493 WHERE shop_id='BestDeals' AND day=20151221 AND category='Games';\r\n\r\nUPDATE sales_items SET count=count+1500 WHERE shop_id='BestDeals' AND day=20151222 AND category='Books';\r\nUPDATE sales_items SET count=count+7000 WHERE shop_id='BestDeals' AND day=20151222 AND category='Movies';\r\nUPDATE sales_items SET count=count+9000 WHERE shop_id='BestDeals' AND day=20151222 AND category='Games';\r\n\r\nUPDATE sales_items SET count=count+1200 WHERE shop_id='BestDeals' AND day=20151223 AND category='Books';\r\nUPDATE sales_items SET count=count+11000 WHERE shop_id='BestDeals' AND day=20151223 AND category='Movies';\r\nUPDATE sales_items SET count=count+13000 WHERE shop_id='BestDeals' AND day=20151223 AND category='Games';\r\n\r\nUPDATE sales_items SET count=count+800 WHERE shop_id='BestDeals' AND day=20151224 AND category='Books';\r\nUPDATE sales_items SET count=count+3000 WHERE shop_id='BestDeals' AND day=20151224 AND category='Movies';\r\nUPDATE sales_items SET count=count+1000 WHERE shop_id='BestDeals' AND day=20151224 AND category='Games';\r\n\r\nSELECT * FROM sales_items;\r\n\r\n shop_id   | day      | category | count\r\n-----------+----------+----------+-------\r\n BestDeals | 20151221 |    Books |  1300\r\n BestDeals | 20151221 |    Games |  3493\r\n BestDeals | 20151221 |   Movies |  5000\r\n BestDeals | 20151222 |    Books |  1500\r\n BestDeals | 20151222 |    Games |  9000\r\n BestDeals | 20151222 |   Movies |  7000\r\n BestDeals | 20151223 |    Books |  1200\r\n BestDeals | 20151223 |    Games | 13000\r\n BestDeals | 20151223 |   Movies | 11000\r\n BestDeals | 20151224 |    Books |   800\r\n BestDeals | 20151224 |    Games |  1000\r\n BestDeals | 20151224 |   Movies |  3000\r\n<\/pre>\n<p>Now we want to have an <em>aggregate view of sales count over a period of time<\/em> for each category of item. So let&#8217;s create an <strong>UDA<\/strong> for this!<\/p>\n<p>First we need to create the <em>accumulatorFunction<\/em><\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE OR REPLACE FUNCTION cumulateCounter(state map&lt;text,bigint&gt;, category text, count counter)\r\nRETURNS NULL ON NULL INPUT\r\nRETURNS map&lt;text,bigint&gt;\r\nLANGUAGE java\r\nAS '\r\nif(state.containsKey(category)) {\r\n  state.put(category, state.get(category) + count); \r\n} else {\r\n  state.put(category, count);\r\n}\r\nreturn state;\r\n'; \r\n\r\n&gt; InvalidRequest: code=2200 [Invalid query] message=&quot;Could not compile function 'udf.cumulatecounter' \r\nfrom Java source: org.apache.cassandra.exceptions.InvalidRequestException: \r\nJava source compilation failed:\r\nLine 3: The operator + is undefined for the argument type(s) Object, long\r\nLine 5: Type safety: The method put(Object, Object) belongs to the raw type Map. \r\nReferences to generic type Map&lt;K,V&gt; should be parameterized\r\n<\/pre>\n<p>Surprisingly it fails. The reason of the failure is that the map state we are using in our source code is <strong>not parameterized<\/strong>, it&#8217;s a <strong>raw Map<\/strong> so the Java compiler cannot guess that <em>state.get(category)<\/em> is a <strong>Long<\/strong> value instead of a plain <strong>Object<\/strong> &#8230;<\/p>\n<p>It&#8217;s is disappointing since in the definition of the <strong>UDF<\/strong>, we provide the generic types for the map (map<text, bigint>). The JIRA <a href=\"https:\/\/issues.apache.org\/jira\/browse\/CASSANDRA-10819\" title=\"Generic Java UDF types\" target=\"_blank\"><strong>CASSANDRA-10819<\/strong><\/a> has been filed to fix this.<\/p>\n<p>A simple fix in our case is to force a cast to Long: <em>(Long)state.get(category) + count<\/em>. After that, the <em>cumulateCounter<\/em> UDF does compile.<\/p>\n<p>Now let&#8217;s create our <strong>UDA<\/strong> to group all items by category and count them<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE OR REPLACE AGGREGATE groupCountByCategory(text, counter)\r\nSFUNC cumulateCounter\r\nSTYPE map&lt;text,bigint&gt;\r\nINITCOND {};\r\n<\/pre>\n<p>The input type is <em>(text, counter)<\/em>. <strong>Text<\/strong> for the <em>category<\/em> column and <strong>counter<\/strong> for the <em>count<\/em> column. However the <strong>STYPE<\/strong> is <strong>map&lt;text,bigint&gt;<\/strong> and not <strong>map&lt;text,counter&gt;<\/strong> because counter values are converted straight to <strong>bigint<\/strong> (Long).<\/p>\n<p>The <strong>INITCOND<\/strong> represents the initial value of the <strong>STYPE<\/strong>. This should be an <em>empty map<\/em>, thus <strong>{ }<\/strong> (<strong>CQL syntax for empty map<\/strong>)<\/p>\n<p>Once done, let&#8217;s calculate the number of items sold for each category between 21<sup>st<\/sup> and 24<sup>th<\/sup> of December:<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nSELECT groupCountByCategory(category,count) \r\nFROM sales_items \r\nWHERE shop_id='BestDeals' \r\nAND day&gt;=20151221 AND day&lt;=20151224;\r\n\r\n groupcountbycategory(category, count)\r\n--------------------------------------------------\r\n {'Books': 4800, 'Games': 26493, 'Movies': 26000}\r\n<\/pre>\n<p>A simple verification on the original dataset proves that our <strong>UDA<\/strong> works as expected.<\/p>\n<p>So far so good. Now that we have an <strong>UDA<\/strong> that can simulate a <strong>GROUP BY<\/strong> of SQL, we want to push further. Is it possible to filter on the grouped values ? In other words, is it possible to have an equivalent of the <strong>HAVING<\/strong> clause of SQL ?<\/p>\n<p>For this, let&#8217;s try to inject a <strong>FINALFUNC<\/strong> to our <strong>UDA<\/strong> to filter the final state!<\/p>\n<p>We need to define first the <strong>FINALFUNC<\/strong> so that we can extract the grouped values for category &#8216;Books&#8217; only.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE OR REPLACE FUNCTION bookCountOnly(state map&lt;text,bigint&gt;)\r\nRETURNS NULL ON NULL INPUT\r\nRETURNS bigint \r\nLANGUAGE java \r\nAS '\r\nif(state.containsKey(&quot;Books&quot;)) {\r\n return (Long)state.get(&quot;Books&quot;);\r\n} else {\r\n return 0L;\r\n}';\r\n<\/pre>\n<p>Then update our <strong>UDA<\/strong><\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE OR REPLACE AGGREGATE groupCountByCategory(text, counter)\r\nSFUNC cumulateCounter\r\nSTYPE map&lt;text,bigint&gt;\r\nFINALFUNC bookCountOnly\r\nINITCOND {};\r\n\r\n&gt; InvalidRequest: code=2200 [Invalid query] message=&quot;Cannot replace aggregate groupcountbycategory, \r\nthe new return type bigint is not compatible with the return \r\ntype map&lt;text, bigint&gt; of existing function&quot;\r\n<\/pre>\n<p>Again, we need to drop and recreate our <strong>UDA<\/strong> because the returns type has changed &#8230;<br \/>\nIf we apply again the <strong>UDA<\/strong> on our example dataset:<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nSELECT groupCountByCategory(category,count) \r\nFROM sales_items \r\nWHERE shop_id='BestDeals' \r\nAND day&gt;=20151221 AND day&lt;=20151224;\r\n\r\ngroupcountbycategory(category, count)\r\n--------------------------------------------------\r\n4800\r\n<\/pre>\n<p>Et voil\u00e0!<\/p>\n<p>The attentive reader will complain because it&#8217;s not quite flexible. <strong>What if we want now a group by for &#8216;Games&#8217; category and not &#8216;Books&#8217; ? Why don&#8217;t we add an extra argument to our <strong>UDF<\/strong> that will act as a filter for category ?<\/strong>. Something like: <\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nCREATE OR REPLACE FUNCTION \r\ncumulateCounterWithFilter(state map&lt;text,bigint&gt;, category text, count counter, filter text)\r\nRETURNS NULL ON NULL INPUT\r\nRETURNS map&lt;text,bigint&gt;\r\nLANGUAGE java\r\nAS '\r\nif(category.equals(filter)) {\r\n  if(state.containsKey(category)) {\r\n    state.put(category, (Long)state.get(category) + count); \r\n  } else {\r\n    state.put(category, count);\r\n  }\r\n}\r\nreturn state;\r\n'; \r\n\r\nCREATE OR REPLACE AGGREGATE groupCountHaving(text, counter, text)\r\nSFUNC cumulateCounterWithFilter\r\nSTYPE map&lt;text,bigint&gt;\r\nINITCOND {};\r\n\r\nSELECT groupCountHaving(category,count, 'Books') \r\nFROM sales_items \r\nWHERE shop_id='BestDeals' \r\nAND day&gt;=20151221 AND day&lt;=20151224; \r\n\r\n&gt; [Syntax error in CQL query] message=&quot;line 1:40 no viable alternative at \r\ninput 'Books' (SELECT groupCountHaving(category,count, ['Book]...)&quot;&gt;\r\n<\/pre>\n<p>Arggg !!! Gro&szlig; problem! <strong>Currently it is not possible to have literal value as UDF\/UDA parameter!<\/strong>. So disappointing! I filed <a href=\"https:\/\/issues.apache.org\/jira\/browse\/CASSANDRA-10783\" title=\"Allow literal value as parameter of UDF &#038; UDA\" target=\"_blank\"><strong>CASSANDRA-10783<\/strong><\/a> to allow literal value. It turns out that the issue is related to <strong>method overloading<\/strong>. <\/p>\n<p> Indeed, when you apply an <strong>UDF\/UDA<\/strong> on a CQL column, the type of each input argument is known for sure because the CQL column type is provided by the schema. If we have defined many <em>maxOf<\/em> functions with different input types: <\/p>\n<ul>\n<li>maxOf(currentvalue int, nextvalue int)<\/li>\n<li>maxOf(currentvalue bigint, nextvalue bigint)<\/li>\n<\/ul>\n<p>Calling <em>SELECT maxOf(val1, 100) &#8230;<\/em> is very ambiguous because <strong>Cassandra<\/strong> can&#8217;t coerce <strong>100<\/strong> to an appropriate type, <strong>int<\/strong> and <strong>bigint<\/strong> are both sensible choices&#8230;<\/p>\n<h3>C. UDA Execution<\/h3>\n<p>Below is the steps executed by an UDA:<\/p>\n<ol>\n<li>Init the state value with <strong>INITCOND<\/strong> if provided, or null<\/li>\n<li>For each row:\n<ul>\n<li>call <strong>SFUNC<\/strong> with <em>previous state<\/em> + <em>params<\/em> as input arguments<\/li>\n<li>update state with <strong>SFUNC<\/strong> returned value<\/li>\n<\/ul>\n<\/li>\n<li>If no <strong>FINALFUNC<\/strong> is defined, return the last state<\/li>\n<li>Else apply <strong>FINALFUNC<\/strong> on the last state and return its output<\/li>\n<\/ol>\n<blockquote><p>As we can see, UDA<strong> <\/strong>execution carries a state around, which make it quite hard, if not impossible (at least with regard to the current implementation) to distribute its computation on many nodes.<\/p><\/blockquote>\n<h1>III Native UDF and UDA<\/h1>\n<p><strong>Cassandra 2.2<\/strong> and <strong>3.0<\/strong> are shipped with some system functions and aggregates:<\/p>\n<h5>System functions<\/h5>\n<ul>\n<li>blobAsXXX() where XXX is a valid CQL type<\/li>\n<li>XXXAsBlob() where XXX is a valid CQL type<\/li>\n<li><del>dateOf()<\/del>, now(), minTimeuuid(), maxTimeuuid(), <del>unixTimestampOf()<\/del><\/li>\n<li>toDate(timeuuid), toTimestamp(timeuuid), toUnixTimestamp(timeuuid)<\/li>\n<li>toDate(timestamp), toUnixTimestamp(timestamp)<\/li>\n<li>toTimestamp(date), toUnixTimestamp(date)<\/li>\n<li>token()<\/li>\n<li>count(1)\/count(*)<\/li>\n<li>writetime()<\/li>\n<li>ttl()<\/li>\n<li>toJson(), fromJson()<\/li>\n<li>uuid()<\/li>\n<\/ul>\n<h5>System aggregates<\/h5>\n<ul>\n<li>count()<\/li>\n<li>avg(), min(), max(), sum()<\/li>\n<\/ul>\n<blockquote><p> Please note that the <em>avg()<\/em>, <em>min()<\/em>, <em>max()<\/em> and <em>sum()<\/em> functions are defined for <strong>Byte<\/strong>, <strong>Decimal<\/strong>, <strong>Double<\/strong>, <strong>Float<\/strong>, <strong>Int32<\/strong>, <strong>Long<\/strong>, <strong>Short<\/strong> and <strong>VarInt<\/strong> input types using <strong>method overloading<\/strong>.<\/p><\/blockquote>\n<h1>IV Technical implementation<\/h1>\n<p>In this chapter, we&#8217;ll see how <strong>UDF<\/strong> and <strong>UDA<\/strong> are implemented internally in <strong>Cassandra<\/strong>. <\/p>\n<h3>A. UDF\/UDA creation<\/h3>\n<p>Below are different steps for <strong>UDF\/UDA<\/strong> creation:<\/p>\n<p>For <strong>Cassandra 2.2<\/strong><\/p>\n<ol>\n<li><strong>UDF\/UDA<\/strong> definition sent to <strong>coordinator<\/strong><\/li>\n<li>The coordinator checks for user permission for function CREATE\/ALTER<\/li>\n<li>The coordinator ensures that <strong>UDF\/UDA<\/strong> is enabled in <strong>cassandra.yaml<\/strong> file (<em>enable_user_defined_functions = true<\/em>)<\/li>\n<li>If the <strong>UDF<\/strong> language is Java, the coordinator will compile it using <strong>Eclipse ECJ compiler<\/strong> (not Javac because of licence issues)<\/li>\n<li>If the <strong>UDF<\/strong> language is NOT Java, the appropriate <strong>javax.script.ScriptEngine<\/strong> is called to compile the source code<\/li>\n<li>The coordinator loads the <strong>UDF<\/strong> in a <strong>child classloader<\/strong> of the current classloader<\/li>\n<li>The coordinator announces a <strong>schema change<\/strong> (<em>UDF and UDA are considered as schema metadata<\/em>) to all nodes<\/li>\n<\/ol>\n<p>For <strong>Cassandra 3.x<\/strong>, there are some additional steps to control the source code and avoid bad code execution server-side<\/p>\n<ol>\n<li><strong>UDF\/UDA<\/strong> definition sent to <strong>coordinator<\/strong><\/li>\n<li>The coordinator checks for user permission for function CREATE\/ALTER<\/li>\n<li>The coordinator ensures that <strong>UDF\/UDA<\/strong> is enabled in <strong>cassandra.yaml<\/strong> file (<em>enable_user_defined_functions = true<\/em>)<\/li>\n<li>If the <strong>UDF<\/strong> language is Java, the coordinator will compile it using <strong>Eclipse ECJ compiler<\/strong> (not Javac because of licence issues)<\/li>\n<li>If the <strong>UDF<\/strong> language is NOT Java, the appropriate <strong>javax.script.ScriptEngine<\/strong> is called to compile the source code<\/li>\n<li>The coordinator verifies the compiled byte-code to ensure that:\n<ul>\n<li>there is <strong>neither static nor instance code block<\/strong> (code block that are outside of a method body). The idea is to forbid code execution outside the call of <strong>UDF<\/strong><\/li>\n<li>there is <strong>no constructor\/method declaration<\/strong>. It will discourage complex code bloc e.g. complex computation in <strong>UDF<\/strong><\/li>\n<li>there is <strong>no inner class declared<\/strong>, same goal as above<\/li>\n<li>there is <strong>no synchronized block<\/strong>. Do you want to break the server ?<\/li>\n<li>some <strong>forbidden classes<\/strong> (like <em>java.net.NetworkInteface<\/em>) are not accessed<\/li>\n<li>some <strong>forbidden methods<\/strong> (like <em>Class.forName()<\/em>) are not called <\/li>\n<\/ul>\n<\/li>\n<li>The coordinator loads the UDF in a <strong>separated classloader<\/strong> from the current classloader<\/li>\n<li>The loaded <strong>UDF<\/strong> class is verified against a <strong>whitelist<\/strong> that it imports only allowed classes<\/li>\n<li>The loaded <strong>UDF<\/strong> class is verified against a <strong>blacklist<\/strong> that it does not import forbidden classes (like <em>java.lang.Thread<\/em> for example)<\/li>\n<li>The coordinator announces a <strong>schema change<\/strong> (<em>UDF and UDA are considered as schema metadata<\/em>) to all nodes<\/li>\n<\/ol>\n<blockquote><p> Please note that all the above steps for <strong>UDF\/UDA<\/strong> compilation will be repeated on all nodes after the schema change message. Normally if the <strong>UDF<\/strong> has compiled successfully and has been verified on the coordinator, their compilation and verification should pass on other nodes too.<\/p><\/blockquote>\n<h3>B. UDF\/UDA execution<\/h3>\n<p>For <strong>Cassandra 2.2<\/strong>, the <strong>UDF<\/strong> is executed by the caller thread (normally the thread that is in charge of the CQL statement execution). There is no other control for <strong>UDF\/UDA<\/strong> execution so you better pay attention to the source code you want to execute server-side.<\/p>\n<p>For <strong>Cassandra 3.0<\/strong>, if the flag <em>enable_user_defined_functions_threads<\/em> is turned <strong>ON<\/strong> (<em>it is by default<\/em>), there are some extra checks and <strong>sandboxing<\/strong> to avoid <em>evil code execution<\/em> and prevent people from shooting on their foot:<\/p>\n<ol>\n<li>The <strong>UDF\/UDA<\/strong> execution is done using a dedicated <strong>ThreadPool<\/strong><\/li>\n<li>If <em>user_defined_function_fail_timeout<\/em> (defaults to <strong>1500 millisecs<\/strong>) is reached, abort <\/li>\n<li>If <em>user_defined_function_warn_timeout<\/em> is reached, emit a <strong>WARNING<\/strong> message in the log and retry the execution with a new timeout = <em>user_defined_function_fail_timeout &#8211; user_defined_function_warn_timeout<\/em><\/li>\n<\/ol>\n<h3>C. Some gotchas<\/h3>\n<p>Below is a network diagram of an <strong>UDF<\/strong> execution:<\/p>\n<div id=\"attachment_1920\" style=\"width: 904px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDFExecution.png\"><img aria-describedby=\"caption-attachment-1920\" loading=\"lazy\" src=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDFExecution.png\" alt=\"UDF Execution\" width=\"894\" height=\"498\" class=\"size-full wp-image-1920\" srcset=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDFExecution.png 894w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDFExecution-300x167.png 300w\" sizes=\"(max-width: 894px) 100vw, 894px\" \/><\/a><p id=\"caption-attachment-1920\" class=\"wp-caption-text\">UDF Execution<\/p><\/div>\n<p> The attentive reader should wonder, <strong>why doesn&#8217;t Cassandra execute the UDF on the replica node rather than on the coordinator node ?<\/strong><\/p>\n<p> The answer is pretty obvious. With an <em>eventual consistency<\/em> model, <strong>Cassandra<\/strong> needs to retrieve all copies of the data from different replicas first on the coordinator, then reconcile them (using <strong>Last Write Win<\/strong>) and finally apply the <strong>UDF<\/strong>. <strong>Cassandra<\/strong> cannot apply the <strong>UDF<\/strong> locally because the data values may differ from different replicas.<\/p>\n<blockquote><p>The only moment the <strong>UDF<\/strong> is applied locally on the replica node is when using consistency level <strong>ONE<\/strong>\/<strong>LOCAL_ONE<\/strong> and using a <strong>TokenAwareLoadBalancing<\/strong> strategy from the client so that the coordinator is a replica itself<\/p><\/blockquote>\n<p>The same remarks applies to <strong>UDA<\/strong> execution:<\/p>\n<div id=\"attachment_1922\" style=\"width: 904px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDAExecution.png\"><img aria-describedby=\"caption-attachment-1922\" loading=\"lazy\" src=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDAExecution.png\" alt=\"UDA Execution\" width=\"894\" height=\"498\" class=\"size-full wp-image-1922\" srcset=\"https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDAExecution.png 894w, https:\/\/www.doanduyhai.com\/blog\/wp-content\/uploads\/2015\/12\/UDAExecution-300x167.png 300w\" sizes=\"(max-width: 894px) 100vw, 894px\" \/><\/a><p id=\"caption-attachment-1922\" class=\"wp-caption-text\">UDA Execution<\/p><\/div>\n<blockquote><p> Consequently, you should pay attention not to apply <strong>UDA<\/strong> on a huge number of CQL rows because they will be kept on coordinator JVM heap memory for reconciliation before execution of <strong>UDA<\/strong>.<\/p><\/blockquote>\n<p> The only cases when the above recommendation can be loosened are:<\/p>\n<ul>\n<li>when you use <strong>ONE<\/strong>\/<strong>LOCAL_ONE<\/strong> consistency so that data are fetched locally<\/li>\n<li>when you use <strong>UDA<\/strong> in <strong>Spark<\/strong> because the Spark\/Cassandra connector restricts the query to <em>local token ranges<\/em> for each node and uses <strong>LOCAL_ONE<\/strong> consistency by default<\/li>\n<\/ul>\n<h1>V The future of UDF\/UDA<\/h1>\n<p>Although quite limited in their usage right now (no literal as input arg for example), <strong>UDF\/UDA<\/strong> is only the first building block for more promising features in <strong>Cassandra<\/strong>:<\/p>\n<ul>\n<li>Functional index (<a href=\"https:\/\/issues.apache.org\/jira\/browse\/CASSANDRA-7458\" title=\"Functional Index\" target=\"_blank\"><strong>CASSANDRA-7458<\/strong><\/a>)<\/li>\n<li>Partial index (<a href=\"https:\/\/issues.apache.org\/jira\/browse\/CASSANDRA-7391\" title=\"Partial index\" target=\"_blank\"><strong>CASSANDRA-7391<\/strong><\/a>)<\/li>\n<li><strong>WHERE<\/strong> clause filtering with UDF<\/li>\n<li><strong>UDF\/UDA<\/strong> used for building <strong>Materialized Views<\/strong><\/li>\n<li>&#8230;<\/li>\n<\/ul>\n<p>The future looks bright folks!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this blog post, we&#8217;ll review the new User-Defined Function (UDF) and User-Defined Aggregate (UDA) feature and look into their technical implementation. For the remaining of this post Cassandra == Apache Cassandra\u2122 The UDF\/UDA feature has been first premiered at&#8230;<br \/><a class=\"read-more-button\" href=\"https:\/\/www.doanduyhai.com\/blog\/?p=1876\">Read more<\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[57,10],"tags":[],"_links":{"self":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/1876"}],"collection":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1876"}],"version-history":[{"count":51,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/1876\/revisions"}],"predecessor-version":[{"id":13204,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=\/wp\/v2\/posts\/1876\/revisions\/13204"}],"wp:attachment":[{"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1876"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1876"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.doanduyhai.com\/blog\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1876"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}