Close

Fast Spark aggregation with Elassandra

Apache Spark is often used to aggregate data from various datastores like Apache Cassandra, Elasticsearch or from HDFS files. Columnar storage like Elasticsearch, Apache SOLR or Parquet file format are well-known to be very efficient to filter and aggregate data, but unfortunately, Spark does not yet push down aggregation queries (see SPARK-12686), and recompute aggregation while these datasources have very efficient implementations. Even the Elasticsearch spark connector does yet not support aggregation queries as explained in #276.

One of the main benefits of Elassandra is that you are able to run Elasticsearch queries directly through the Cassandra Query Language. This drastically simplify your data stack by eliminating several pain points:
* You don't need any load balancer to elasticsearch, the Cassandra drivers will ensure failover and load balacing to your favorite Cassandra virtual datacenter.
* You don't need to develop two DAO for elasticsearch and another database.
* With integrated security, authenticated CQL sessions can request both Cassandra tables and Elasticsearch indices.

But the main interest with Spark is that Elassandra supports Elasticsearch aggregation queries through CQL, allowing to drastically reduce resources load and computation time for aggregation. The following small benchmark illustrates these benefits.

Our dataset is 500M rows stored in Elassandra on 4 nodes clusters with a cassandra replication factor of 2. The Apache Spark cluster is co-located on these 4 Azure VMs (DS14, 16 cores, 112Gb RAM, 224Go SSD). In this small benchmark, we just count number of rows having cod_ray = 8. Here the results summary.

benchmark results

Spark + Cassandra connector

Without any Cassandra secondary index, Spark fetch all rows to filter and aggregate in memory. Such a full table scan involves a lot of expensive I/O. Counting the total number of row, counting the number of row where cod_ray=8 or aggregation on cod_ray is always about 170 to 200 seconds.

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
val cf = spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "histo", "keyspace" -> "sales")).load()

scala> spark.time(cf.count())
Time taken: 178126 ms                                                           
res4: Long = 502696713

scala> spark.time(cf.filter("cod_ray=8").count())
Time taken: 194826 ms                                                           
res2: Long = 3888988

scala> spark.time(cf.groupBy("cod_ray").count().collect.foreach(println))
[31,3927400]                                                                    
[85,3652920]
[137,3410560]
...
[138,3647080]
[104,3515680]
[134,3617880]
[36,3346320]
[89,3541960]
Time taken: 221165 ms

Spark + Elasticsearch-hadoop connector

With the spark-cassandra-connector connector, filtering is pushed down to Elasticsearch, giving a significant improvement when filtering the data in about 70 seconds. Because Spark still load a large amount of data, aggregation is still quiet expensive.

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._
import org.elasticsearch.spark.sql._     
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("sales_*/histo")

scala> spark.time(df.filter("cod_ray=8").count())
Time taken: 74860 ms                                                            
res0: Long = 3888988

Spark + Elasticsearch query through CQL

With the support of Elasticsearch query through the CQL driver, Elassandra delegates both filter and aggregation to Elasticsearch, providing significant improvement in load and response time. Behind the scene, the spark cassandra connector adds a token range filter and spark workers run many small local aggregation requests to Elassandra. Finally, the last reduceByKey operation sums aggregation results of each Spark partition in order to provide the final result in less than 10 seconds.

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

val df = sc.cassandraTable("sales", "histo").select("cod_ray_long","count").where("es_query='{\"query\":{\"term\":{\"cod_ray\":\"8\"}}, \"aggregations\":{\"cod_ray\":{\"terms\":{\"field\":\"cod_ray\",\"size\":200}}}}' AND es_options='indices=sales_*'")
val x = df.map( r => (r.getLong("cod_ray_long"),r.getLong("count"))).reduceByKey( _ + _)

scala> spark.time(x.collect.foreach(println))
(8,3888988)                                                                     
Time taken: 8306 ms

val df = sc.cassandraTable("sales", "histo").select("cod_ray_long","count").where("es_query='{\"aggregations\":{\"cod_ray\":{\"terms\":{\"field\":\"cod_ray\",\"size\":200}}}}' AND es_options='indices=sales_*'")
val x = df.map( r => (r.getLong("cod_ray_long"),r.getLong("count"))).reduceByKey( _ + _)
spark.time(x.collect.foreach(println))
scala> spark.time(x.collect.foreach(println))
(0,3693800)                                                                     
(1,3150680)
(2,3789689)
(3,3820717)
(4,3495318)
...
(137,3410560)
(138,3647080)
(139,3326962)
(140,3334640)
Time taken: 3342 ms

Conclusion

This mini-benchmark illustrate how pushdown to Elasticsearch can provide significant response time and load improvement, as shown in this graph :

  1. Spark + Cassandra
  2. Spark + Elasticsearch-hadoop connector
  3. Spark + Elasticsearch query through CQL

benchmark grafana

Moreover, if you need to aggregate timeseries for example as an input of a spark machine learning computation, you can also delegate a date_histogram aggregation to elasticsearch like this. Of course, as the elassandra does not yet provides paging for aggregation results, the results of such aggregation must fit into memory. In the following, we aggregate sales per week.

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

val df = sc.cassandraTable("sales", "histo").select("dat_vte","count").where("es_query='{\"aggs\":{\"sales_per_month\":{\"date_histogram\":{\"field\":\"dat_vte\",\"interval\":\"week\"}}}}' AND es_options='indices=sales_*'")
val x = df.map( r => (r.getLong("dat_vte"),r.getLong("count"))).reduceByKey( _ + _)

scala> spark.time(x.collect.foreach(println))
(1388966400000,1205092)                                                         
(1442188800000,1205057)
(1310947200000,1205148)
(1364169600000,1205120) 
...
(1307318400000,1205155)
(1360540800000,1205120)
(1282521600000,1205169)
Time taken: 14456 ms