Gremlin recipes: 7 – Variables handling

This blog post is the 7th from the series Gremlin Recipes. It is recommended to read the previous blog posts first:

  1. Gremlin as a Stream
  2. SQL to Gremlin
  3. Recommendation Engine traversal
  4. Recursive traversals
  5. Path object
  6. Projection and selection

I KillrVideo dataset

To illustrate this series of recipes, you need first to create the schema for KillrVideo and import the data. See here for more details.

The graph schema of this dataset is :

II Variable handling operators

In Gremlin we distinguish 2 types of operators:

  1. for setting variables
    • as side effect
    • as label in path history (path object)
  2. for accessing variables

Below are the list of these operators

Operator Side-Effect/Label Setting/reading variable Data type
store("variable") side effect setting BulkSet implements Set<X>
aggregate("variable") side effect setting BulkSet implements Set<X>
group("variable").by(grouping_axis).by(projection_axis) side effect setting Map<X,Y>
groupCount("variable").by(projection_axis) side effect setting Map<X,Long>
as("label") label setting Traversal
subgraph("variable") side effect setting TinkerGraph or Traversal depending on how you access it
select("variable_or_label") side effect or label reading depends on the variable
cap("variable") side effect or label reading depends on the variable

A. store(variable)

First let’s see how many users have rated Blade Runner more than 9/10:

gremlin>g.V().
  has("movie", "title", "Blade Runner").
  inE("rated").has("rating", gte(9)).
  count()
==>45

Ok, now let’s save all those 45 ratings using store() operator:

gremlin>g.V().
  has("movie", "title", "Blade Runner"). // Iterator<Movie>
  inE("rated").has("rating", gte(9)).    // Iterator<Rated_Edge>
  store("rated_edges").                  // Store rated edges inside a Collection
  outV().                                // Iterator<User>
  select("rated_edges").                 // Iterator<BulkSet<Rated_Edge>>   
  next().                                // BulkSet<Rated_Edge>
  size()                          
==>5

Strangely we only have 5 edges stored in the “rated_edges” collection. Why ? The reason is that store() is a lazy operator and only appends the traversed edges lazily into the “rated_edges” collection. The time the traverser reaches the next().size() step, only 5 edges have been accumulated into the “rated_edges” collection so far.

To force Gremlin to fill up completely the “rated_edges” collection so that next().size() returns the expected 45 count, we can use the barrier() instruction. In this case Gremlin will perform a breadth-first evaluation of all rated edges before moving to the next step

The concept of depth-first and breadth-first will be discussed in a future blog post

gremlin>g.V().
  has("movie", "title", "Blade Runner"). // Iterator<Movie>
  inE("rated").has("rating", gte(9)).    // Iterator<Rated_Edge>
  store("rated_edges").                  // Store rated edges inside a Collection
  outV().                                // Iterator<User>
  barrier().                             // Force eager evaluation of all previous steps 
  select("rated_edges").                 // Iterator<BulkSet<Rated_Edge>>     
  next().                                // BulkSet<Rated_Edge>
  size()                          
==>45

Optionally, instead of using select("rated_edges") we could use cap("rated_edges") instead. cap() will force an eager evaluation of all previous step, pretty similar to barrier(). Indeed you can see cap(xxx) as a shorthand for barrier().select(xxx)

gremlin>g.V().
  has("movie", "title", "Blade Runner"). // Iterator<Movie>
  inE("rated").has("rating", gte(9)).    // Iterator<Rated_Edge>
  store("rated_edges").                  // Store rated edges inside a Collection
  outV().                                // Iterator<User>
  cap("rated_edges").                    // Iterator<BulkSet<Rated_Edge>>     
  next().                                // BulkSet<Rated_Edge>
  size()                          
==>45

B. aggregate(variable)

Aggregate works like store() but in an eager fashion. Consequently with aggregate() you don’t need to resort to neither barrier() nor to cap()

gremlin>g.V().
  has("movie", "title", "Blade Runner"). // Iterator<Movie>
  inE("rated").has("rating", gte(9)).    // Iterator<Rated_Edge>
  aggregate("rated_edges").              // Store rated edges inside a Collection
  outV().                                // Iterator<User>
  select("rated_edges").                 // Iterator<BulkSet<Rated_Edge>>     
  next().                                // BulkSet<Rated_Edge>
  size()                          
==>45

aggregate(x).select(x) == store(x).cap(x) == store(x).barrier().select(x)

C. group(variable).by(grouping_axis).by(projection_axis)

The group(variable) allows you to store the partial grouping result (which is a Map<X,Y> structure) as a side effect of your traversal.

Let’s say we want to save all fans of Blade Runner and Inception and group them by their age and project on their userId:

gremlin>g.V().
  has("movie", "title", "Blade Runner").     // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Blade Runner
    by("age").                               // By age
    by("userId").                            // Project on userId
  V().                                       // Restart a new traversal   
  has("movie", "title", "Inception").        // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Inception 
    by("age").                               // By age 
    by("userId").                            // Project on userId
  select("fans_scifi")                       // Recall the Map<Int==age, Collection<String==userId>>
Request timed out while processing - increase the timeout with the :remote command
Type ':help' or ':h' for help.
Display stack trace? [yN]

Strangely the query timed out. Is it related to any lazy evaluation rule or something like that ? Not at all. The explanation is given inside the 1st comment on TINKERPOP-1741:

You can’t select('a'). You have to cap('a'). This is because GroupStep requires a “on completion” computation. Why is it like this? When should reduction happen? select() just grabs the side-effect and if it hasn’t been reduced (because it might be reduced later), then that’s that. Why not have it reduced at group('a') – nope, you can’t cause you typically use side-effects repeatedly (e.g. in a repeat()). If you wanted it reduced after group('a'), you would use group().store('a'). Thus, the only step that we have the forces reduction on a side-effect is cap()

Long story short, when you’re doing group(variable) the map structure is not closed and not available for reading yet until you call cap(variable). The reason not to close the map structure and let it open is because we may accumulate more data into it later in the traversal and that’s what exactly happens with the above traversal.

First we cumulate fans of Blade Runner into “scifi_fans” map and we re-use it later to store Inception fans.

Please notice the use of V(). in the middle of our traversal. Once we have grouped all fans of Blade Runner, we restart a new traversal by jumping back to the original V(). iterator on vertices.

gremlin>g.V().
  has("movie", "title", "Blade Runner").     // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Blade Runner
    by("age").                               // By age
    by("userId").                            // Project on userId
  V().                                       // Restart a new traversal   
  has("movie", "title", "Inception").        // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Inception 
    by("age").                               // By age 
    by("userId").                            // Project on userId
  cap("scifi_fans")                       // Recall the Map<Int==age, Collection<String==userId>>
==>{64=[u408, u476, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u411, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452, u452], 18=[u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978, u978], 
...

The result is ugly and there are a lot of userId duplicate for each age. We need a de-duplication:

gremlin>g.V().
  has("movie", "title", "Blade Runner").     // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Blade Runner
    by("age").                               // By age
    by(values("userId").dedup()).            // Project on userId
  V().                                       // Restart a new traversal   
  has("movie", "title", "Inception").        // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Inception 
    by("age").                               // By age 
    by(values("userId").dedup()).            // Project on userId
  cap("scifi_fans")                       // Recall the Map<Int==age, Collection<String==userId>>
==>{64=u452, 18=u978, 19=u1031, 20=u637, 21=u1070, 22=u356, 23=u390, 24=u355, 25=u255, 26=u628, 27=u685, 28=u979, 29=u420, 32=u473, 33=u548, 34=u262, 35=u785, 36=u744, 37=u398, 39=u365, 40=u693, 41=u513, 42=u347, 43=u287, 44=u442, 45=u565, 46=u335, 47=u591, 48=u622, 49=u455, 50=u555, 51=u507, 52=u416, 53=u328, 54=u268, 55=u332, 56=u530, 57=u522, 58=u551, 59=u349, 60=u630, 61=u617}

The result is shorter but also wrong. There is only 1 user for each age group. Why ? This is again a classic caveat. For all inner traversals Gremlin will call by default next() to get the result/output of this inner traversal. Since values("userId").dedup() is a traversal, we need to use fold() to collect all the userId into a collection structure:

gremlin>g.V().
  has("movie", "title", "Blade Runner").     // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Blade Runner
    by("age").                               // By age
    by(values("userId").dedup().fold()).     // Project on userId
  V().                                       // Restart a new traversal   
  has("movie", "title", "Inception").        // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  group("scifi_fans").                       // Group fans of Inception 
    by("age").                               // By age 
    by(values("userId").dedup().fold()).     // Project on userId
  cap("scifi_fans")                          // Recall the Map<Int==age, Collection<String==userId>>
==>{64=[u452, u408, u411, u476], 18=[u978], 19=[u1031, u292], 20=[u637, u483], 21=[u1070, u689, u239, u305], 22=[u356, u501, u456], 23=[u390, u678, u841], 24=[u355, u692, u718], 25=[u255, u724, u963], 26=[u628, u848, u598], 27=[u685, u339, u240, u445], 28=[u979, u497], 29=[u420, u861, u474, u1000], 32=[u473], 33=[u548, u547], 34=[u262], 35=[u785, u603, u223, u511], 36=[u744, u267], 37=[u398, u386, u434, u646], 39=[u365, u896], 40=[u693, u764], 41=[u513, u498], 42=[u347, u536], 43=[u287, u389, u251, u327], 44=[u442], 45=[u565], 46=[u335], 47=[u591, u466], 48=[u622], 49=[u455], 50=[u555, u252], 51=[u507], 52=[u416, u263], 53=[u328], 54=[u268, u524], 55=[u332], 56=[u530, u477, u396], 57=[u522, u210], 58=[u551], 59=[u349, u202], 60=[u630, u218, u491, u378], 61=[u617]}

So far so good.

D. groupCount(variable).by(grouping_axis)

groupCount() is just a special case of group() so it works exactly the same:

gremlin>g.V().
  has("movie", "title", "Blade Runner").     // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  groupCount("scifi_fans").                  // Group fans of Blade Runner
    by("age").                               // By age
  V().                                       // Restart a new traversal   
  has("movie", "title", "Inception").        // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  groupCount("scifi_fans").                  // Group fans of Inception 
    by("age").                               // By age 
  cap("scifi_fans")                          // Recall the Map<Int==age, Long==user count>
==>{64=92, 18=45, 19=46, 20=90, 21=92, 22=92, 23=91, 24=47, 25=135, 26=47, 27=92, 28=90, 29=48, 32=45, 33=46, 34=1, 35=92, 36=91, 37=92, 39=47, 40=46, 41=46, 42=2, 43=92, 44=45, 45=1, 46=1, 47=46, 48=1, 49=45, 50=90, 51=45, 52=90, 53=1, 54=47, 55=45, 56=91, 57=46, 58=1, 59=46, 60=92, 61=45}

E. as(label)

The case of as(label) is interesting. It allows you to label any step in your traversal to refer to it later. Used in combination with select() you can jump back and forth inside a traversal.

Taking the previous examples of Blade Runner fans:

gremlin>g.V().
  has("movie", "title", "Blade Runner"). // Iterator<Movie>
  inE("rated").has("rating", gte(9)).    // Iterator<Rated_Edge>
  as("rated_edges").                     // Label this step "rated_edges"
  outV().                                // Iterator<User>
  select("rated_edges").                 // should be Iterator<Rated_Edge>
  next()                                 // Rated_Edge
==>e[{~label=rated, ~out_vertex={~label=user, community_id=227508608, member_id=635}, ~in_vertex={~label=movie, community_id=285248128, member_id=2}, ~local_id=c2ef7ea4-7066-11e7-897a-edc6e0130fc0}][{~label=user, community_id=227508608, member_id=635}-rated->{~label=movie, community_id=285248128, member_id=2}]

It is also possible to assign multiple labels and recall them together. In this traversal we display the fans of Blade Runner as well as their rating:

gremlin>g.V().
  has("movie", "title", "Blade Runner"). // Iterator<Movie>
  inE("rated").has("rating", gte(9)).    // Iterator<Rated_Edge>
  as("rated_edges").                     // Label this step "rated_edges"
  outV().                                // Iterator<User>
  as("fans").                            // Label this step "fans"
  select("fans","rated_edges").          // Select "fans" and "rated_edges"
    by("userId").                        //   project "fans" on userId
    by("rating").                        //   project "rated_edges" on rating
==>{fans=u622, rated_edges=9}
==>{fans=u628, rated_edges=9}
==>{fans=u223, rated_edges=9}
==>{fans=u434, rated_edges=9}
==>{fans=u1031, rated_edges=9}
==>{fans=u718, rated_edges=9}
==>{fans=u524, rated_edges=9}
==>{fans=u365, rated_edges=9}
==>{fans=u689, rated_edges=9}
==>{fans=u390, rated_edges=9}
...

Easy. Now what if the 2 labeled steps are independent e.g. there is a mid traversal ?

gremlin>g.V().
  has("movie", "title", "Blade Runner").     // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  as("fans_blade_runner").                   // Label this step "fans_blade_runner"
  V().                                       // new traversal 
  has("movie", "title", "Inception").        // Iterator<Movie>
  inE("rated").has("rating", gte(9)).outV(). // Iterator<User>
  as("fans_inception").                       // Label this step "fans_inception"
  select("fans_blade_runner","fans_inception"). // Select "fans_blade_runner" and "fans_inception"
    by("userId").                               //   project "fans_blade_runner" on userId
    by("userId").                               //   project "fans_inception" on userId
  limit(10)
==>{fans_blade_runner=u622, fans_inception=u744}
==>{fans_blade_runner=u622, fans_inception=u978}
==>{fans_blade_runner=u622, fans_inception=u963}
==>{fans_blade_runner=u622, fans_inception=u598}
==>{fans_blade_runner=u622, fans_inception=u252}
==>{fans_blade_runner=u622, fans_inception=u513}
==>{fans_blade_runner=u622, fans_inception=u555}
==>{fans_blade_runner=u622, fans_inception=u764}
==>{fans_blade_runner=u622, fans_inception=u378}
==>{fans_blade_runner=u622, fans_inception=u332}

As we can see Gremlin is performing a kind of cartesian product between the 2 sets of users. This is explained by the fact that we have a mid traversal step V() which is equivalent to having 2 separated traversals combined together, thus the cartesian product effect.

F. subgraph(variable)

subgraph() allows you to save a partial edge-induced subgraph. Consequently the subgraph() step can only be placed after an edge step, not after a vertex step:

gremlin>g.V().
  has("movie", "title", "Blade Runner").
  inE("rated").has("rating", gte(9)).outV().
  subgraph("fans_blade_runner")
com.datastax.bdp.graph.impl.element.vertex.DsegCachedVertexImpl cannot be cast to org.apache.tinkerpop.gremlin.structure.Edge
Type ':help' or ':h' for help.
Display stack trace? [yN]

To fix this:

gremlin>g.V().
  has("movie", "title", "Blade Runner").  // Iterator<Movie>
  inE("rated").has("rating", gte(9)).     // Iterator<Rated_Edge>
  subgraph("fans_blade_runner").          // Save the subgraph connected by those edges
  outV().                                 // Iterator<User>
  cap("fans_blade_runner").               // Iterator<TinkerGraph>
  next().                                 // TinkerGraph
  traversal().                            // DefaultTraversal
  V().                                    // Iterator<Vertex>   
  count()                                 // count number of vertex
==>46

Please note the interesting sequence here

  • Similar to group(), subgraph() needs a termination step and in this case cap("fans_blade_runner")is mandatory
  • cap("fans_blade_runner") yields a Iterator<TinkerGraph>
  • next() yields a TinkerGraph
  • traversal() yields a DefaultTraversal

We found 46 vertices, which correspond to 1 Blade Runner Movie vertex + 45 fans User vertices.

There is no much usage of subgraph() as in-stream variable since it cannot be re-used later in the traversal itself.

G. select(variable_or_label)

There is no much to say about select() step. It can be used to pick side effect variables as well as saved labels in the path history. The only thing you should pay attention to is that any reducer step (sum(), mean(), …) will destroy path history and thus you loose all previously labeled steps.

H. cap(variable)

Not much to say, every thing has been said about cap() above.

III In-stream variable

In normal Gremlin pipeline, it is not unusual to create intermediate variables and re-use them later in subsequent traversal. Let’s take the recommendation engine traversal we wrote in some the previous post.

gremlin>def avg_rating = g.V().
  has("movie", "title", "Blade Runner").  // Iterator<Movie>
  inE("rated").values("rating").          // Iterator<Int>
  mean().next()                           // Double
==>8.20353982300885
gremlin>def genres = g.V().
  has("movie", "title", "Blade Runner").  // Iterator<Movie>
  out("belongsTo").                       // Iterator<Genre>
  values("name").                         // Iterator<String>
  fold().next()                           // Collection<String>  
==>Sci-Fi
==>Action
gremlin> g.V().
......1>   has("movie", "title", "Blade Runner").as("blade_runner").
......2>   inE("rated").filter(values("rating").is(gte(avg_rating))).outV().
......3>   outE("rated").filter(values("rating").is(gte(avg_rating))).inV().
......4>   where(neq("blade_runner")).
......5>   filter(inE("rated").values("rating").mean().is(gte(avg_rating))).
......6>   filter(out("belongsTo").has("name", within(genres))).
......7>   dedup().
......8>   project("movie", "average_rating", "genres").
......9>     by("title").
.....10>     by(inE("rated").values("rating").mean()).
.....11>     by(out("belongsTo").values("name").fold())
==>{movie=Pulp Fiction, average_rating=8.581005586592179, genres=[Thriller, Action]}
==>{movie=Seven Samurai, average_rating=8.470588235294118, genres=[Action, Adventure, Drama]}
==>{movie=A Clockwork Orange, average_rating=8.215686274509803, genres=[Sci-Fi, Drama]}

Fine, but what if we want to avoid defining intermediate variables like avg_rating and genres ? Can we have our recommendation traversal and define those variables inside the traversal itself to make it one-liner ? The answer is YES!!! That’s what’s I call in-stream variables e.g. variables created during previous traversal steps to be re-used later.

1st solution using aggregate()

gremlin>g.V().
  has("movie", "title", "Blade Runner").as("blade_runner").  // Label blade_runner
  sideEffect(inE("rated").values("rating").mean().aggregate("avg_rating")). // Save the avg_rating in a sideEffect() step
  sideEffect(out("belongsTo").values("name").fold().aggregate("genres")).   // Save the genres in a sideEffect() step
  inE("rated").filter(values("rating").where(gte("avg_rating")).by(unfold())).outV(). // Recall "avg_rating" using where()
  outE("rated").filter(values("rating").where(gte("avg_rating")).by(unfold())).inV(). // Recall "avg_rating" using where()
  where(neq("blade_runner")).
  filter(inE("rated").values("rating").mean().where(gte("avg_rating")).by(unfold())). // Recall "avg_rating" using where()
  filter(out("belongsTo").has("name", where(within("genres")).by(unfold()))).         // Recall "genres" using where() 
  dedup().
  project("movie", "average_rating", "genres"). 
    by("title").                                
    by(inE("rated").values("rating").mean()).   
    by(out("belongsTo").values("name").fold())

A little bit of explanation is required.

sideEffect(inE("rated").values("rating").mean().aggregate("avg_rating")) : in this step we save the average rating of Blade Runner using aggregate("avr_rating"). It means that the type of “avg_rating” is now a BulkSet<Double> with a single element. The reducing barrier step mean() is performed inside the sideEffect() step leaving the outside path history untouched and preserving our “blade_runner” label.

Similarly all the genres of Blade Runner are saved in “genres” using sideEffect(out("belongsTo").values("name").fold().aggregate("genres")). The type of “genres” is a BulkSet<String> with 2 elements: “Sci-Fi” & “Action”

Now let’s analyse inE("rated").filter(values("rating").where(gte("avg_rating")).by(unfold())).outV(): we’re using where(gte("avg_rating")) to perform variable resolution for “avg_rating”. However, what is the by(unfold()) ??? Indeed it is a projection modulator for the where() clause. The data type of “avg_rating” BulkSet<Double> so we cannot compare a Double coming from values("rating") with this. by(unfold()) will project the BulkSet<Double> using unfold() and then we obtain a Double as a result.

The same remarks apply to filter(out("belongsTo").has("name", where(within("genres")).by(unfold())))

A 2nd solution using labeled step as():

gremlin>g.V().
  has("movie", "title", "Blade Runner").as("blade_runner").   // Label "blade_runner"
  map(inE("rated").values("rating").mean()).as("avg_rating"). // Label "avg_rating" step
  select("blade_runner").                                     // Jump back    
  map(out("belongsTo").values("name").fold()).as("genres").   // Label "genres" step
  select("blade_runner").                                     // Jump back    
  inE("rated").filter(values("rating").where(gte("avg_rating"))).outV(). 
  outE("rated").filter(values("rating").where(gte("avg_rating"))).inV().
  where(neq("blade_runner")).
  filter(map(inE("rated").values("rating").mean()).where(gte("avg_rating"))).
  filter(out("belongsTo").has("name", where(within("genres")))).
  dedup().
  project("movie", "average_rating", "genres"). 
    by("title").                                
    by(inE("rated").values("rating").mean()).   
    by(out("belongsTo").values("name").fold())

So to decorticate this giant traversal.

map(inE("rated").values("rating").mean()).as("avg_rating"): classic trick, wrap the reducing barrier mean() inside a map() to avoid loosing path history and assign the result a label. “avg_rating” is now an Iterator<Double> with a single element

Same strategy for map(out("belongsTo").values("name").fold()).as("genres"): the data type of “genres” is Iterator<String> with 2 elements.

Please notice the repeated usage of select("blade_runner") to force Gremlin to backtrack to the original movie vertex to restart a new traversal from there.

inE("rated").filter(values("rating").where(gte("avg_rating"))).outV(): unlike previously when we were using aggregate() we don’t need any by() modulator here since the average rating is not nested inside any collection. where(gte("avg_rating")) will just pop the average rating value out from the Iterator<Double>, calling implicitly next() on it.

The rest of the traversal is quite clear.

And that’s all folks! Do not miss the other Gremlin recipes in this series.

If you have any question about Gremlin, find me on the datastaxacademy.slack.com, channel dse-graph. My id is @doanduyhai

Leave a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.