Apache Spark is often used to aggregate data from various datastores like Apache Cassandra, Elasticsearch or from HDFS files. One of the main benefits of Elassandra is that you can run Elasticsearch queries directly through the Cassandra Query Language in a Spark application.
Apache Spark is often used to aggregate data from various datastores like Apache Cassandra, Elasticsearch or from HDFS files. Columnar storage like Elasticsearch, Apache SOLR or Parquet file format are well-known to be very efficient to filter and aggregate data, but unfortunately, Spark does not yet push down aggregation queries (see SPARK-12686), and recompute aggregation while these datasources have very efficient implementations. Even the Elasticsearch spark connector does yet not support aggregation queries as explained in #276.
One of the main benefits of Elassandra is that you are able to run Elasticsearch queries directly through the Cassandra Query Language. This drastically simplify your data stack by eliminating several pain points:
But the main interest with Spark is that Elassandra supports Elasticsearch aggregation queries through CQL, allowing to drastically reduce resources load and computation time for aggregation. The following small benchmark illustrates these benefits.
Our dataset is 500M rows stored in Elassandra on 4 nodes clusters with a cassandra replication factor of 2. The Apache Spark cluster is co-located on these 4 Azure VMs (DS14, 16 cores, 112Gb RAM, 224Go SSD). In this small benchmark, we just count number of rows having cod_ray = 8. Here the results summary.
Without any Cassandra secondary index, Spark fetch all rows to filter and aggregate in memory. Such a full table scan involves a lot of expensive I/O. Counting the total number of row, counting the number of row where cod_ray=8 or aggregation on cod_ray is always about 170 to 200 seconds.
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
val cf = spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "histo", "keyspace" -> "sales")).load()
scala> spark.time(cf.count())
Time taken: 178126 ms
res4: Long = 502696713
scala> spark.time(cf.filter("cod_ray=8").count())
Time taken: 194826 ms
res2: Long = 3888988
scala> spark.time(cf.groupBy("cod_ray").count().collect.foreach(println))
[31,3927400]
[85,3652920]
[137,3410560]
...
[138,3647080]
[104,3515680]
[134,3617880]
[36,3346320]
[89,3541960]
Time taken: 221165 ms
With the spark-cassandra-connector connector, filtering is pushed down to Elasticsearch, giving a significant improvement when filtering the data in about 70 seconds. Because Spark still load a large amount of data, aggregation is still quiet expensive.
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.elasticsearch.spark.sql._
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("sales_*/histo")
scala> spark.time(df.filter("cod_ray=8").count())
Time taken: 74860 ms
res0: Long = 3888988
With the support of Elasticsearch query through the CQL driver, Elassandra delegates both filter and aggregation to Elasticsearch, providing significant improvement in load and response time. Behind the scene, the spark cassandra connector adds a token range filter and spark workers run many small local aggregation requests to Elassandra. Finally, the last reduceByKey operation sums aggregation results of each Spark partition in order to provide the final result in less than 10 seconds.
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
val df = sc.cassandraTable("sales", "histo").select("cod_ray_long","count").where("es_query='{\"query\":{\"term\":{\"cod_ray\":\"8\"}}, \"aggregations\":{\"cod_ray\":{\"terms\":{\"field\":\"cod_ray\",\"size\":200}}}}' AND es_options='indices=sales_*'")
val x = df.map( r => (r.getLong("cod_ray_long"),r.getLong("count"))).reduceByKey( _ + _)
scala> spark.time(x.collect.foreach(println))
(8,3888988)
Time taken: 8306 ms
val df = sc.cassandraTable("sales", "histo").select("cod_ray_long","count").where("es_query='{\"aggregations\":{\"cod_ray\":{\"terms\":{\"field\":\"cod_ray\",\"size\":200}}}}' AND es_options='indices=sales_*'")
val x = df.map( r => (r.getLong("cod_ray_long"),r.getLong("count"))).reduceByKey( _ + _)
spark.time(x.collect.foreach(println))
scala> spark.time(x.collect.foreach(println))
(0,3693800)
(1,3150680)
(2,3789689)
(3,3820717)
(4,3495318)
...
(137,3410560)
(138,3647080)
(139,3326962)
(140,3334640)
Time taken: 3342 ms
This mini-benchmark illustrate how pushdown to Elasticsearch can provide significant response time and load improvement, as shown in this graph :
Moreover, if you need to aggregate timeseries for example as an input of a spark machine learning computation, you can also delegate a date_histogram aggregation to elasticsearch like this. Of course, as the elassandra does not yet provides paging for aggregation results, the results of such aggregation must fit into memory. In the following, we aggregate sales per week.
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
val df = sc.cassandraTable("sales", "histo").select("dat_vte","count").where("es_query='{\"aggs\":{\"sales_per_month\":{\"date_histogram\":{\"field\":\"dat_vte\",\"interval\":\"week\"}}}}' AND es_options='indices=sales_*'")
val x = df.map( r => (r.getLong("dat_vte"),r.getLong("count"))).reduceByKey( _ + _)
scala> spark.time(x.collect.foreach(println))
(1388966400000,1205092)
(1442188800000,1205057)
(1310947200000,1205148)
(1364169600000,1205120)
...
(1307318400000,1205155)
(1360540800000,1205120)
(1282521600000,1205169)
Time taken: 14456 ms