Foreword

In the relational world, The Create Table As Select aka CTAS is one of the most important/handy features available. CTAS is the simplest and fastest way to create a copy of a table. Every major relational database supports CTAS. Its origins come from the INSERT SELECT from SQL92 P.388.

A common example is:

INSERT INTO phone_book2
SELECT *
FROM   phone_book
WHERE  name IN ('John Doe', 'Peter Doe');

A prerequisite to running this command is that table phone_book2 has to be created beforehand. CTAS was later introduced which made possible to execute a statement and dump it's results to a new table in a single command:

CREATE TABLE phone_book2
AS
SELECT *
FROM   phone_book
WHERE  name IN ('John Doe', 'Peter Doe');

Although it has been discussed in CASSANDRA-8234, Apache Cassandra does not support this operation as of yet. DSE Supports CTAS from SparkSQL, otherwise it has to be done in a multiple step fashion. In this post, we will take a look at several approaches to this operation.

Max it Out!

The easiest way to accomplish this efficiently is to use the Spark features available in DSE Max If you have DSE Spark Analytic nodes you can do it in style - If you feel like showing off some glitter on your resume. There are a few ways to accomplish this.

Using dse spark-sql

If the destination table is already created, use:

INSERT INTO <destination table> SELECT * FROM <source table>;

This will result on the pre-created destination table to be populated with the data from the source table. OR, if you wanna have the CTAS experience, just execute:

CREATE TABLE <DESTINATION TABLE> AS SELECT * FROM <source table>;

Feels good, right? There is one gotcha:  This won't create an underlying Cassandra Table - it will only live in the HiveMetastore world.  ¯\_(ツ)_/¯ A real life example would be:

If the destination table is already created:

spark-sql> INSERT INTO ctas_test.employee2 SELECT * FROM ctas_test.employee;
Time taken: 3.714 seconds                                                       
spark-sql> select count(*) from ctas_test.employee2;
100
Time taken: 0.826 seconds, Fetched 1 row(s)
spark-sql>

In this case, the CQL table ctas_test.employee2 will be updated. Alternatively, if you want to live in the HiveMetastore world - Native CTAS is an option:

spark-sql> CREATE TABLE ctas_test.employee3 AS SELECT * FROM ctas_test.employee;
Time taken: 4.756 seconds                                                       
spark-sql> select count(*) from ctas_test.employee3;
100
Time taken: 0.903 seconds, Fetched 1 row(s)
spark-sql> 

There is flexibility here:

  • The use of the select clause allows for just certain columns to be included
  • Taking advantage of SQL tools, the destination table can be filtered out or joined with other tables
  • The target table DOES NOT live in cassandra but rather in Optimized Row Columnar (ORC) Hive files - keep that in mind

Using dse spark

Spark shell gives us the same functionality than spark-sql but you need to do it by yourself. Using the HiveContext we can use Dataframes where data can be "massaged" before being inserted into another table .

Steps are:

  • Invoke Spark shell
  • Import your source table to a dataframe
  • Create the destination table inheriting the dataframe DDL
  • Massage the data as you see fit - not illustrated
  • Save your dataframe to the destination table

A real life example would look like this on DSE 5.0

$ dse spark

...
Initializing SparkContext with MASTER: spark://172.31.31.124:7077
Created spark context..
Spark context available as sc.
Hive context available as sqlContext. Will be initialized on first use.
scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.cassandra._
scala> val df = sqlContext
               .read
               .format("org.apache.spark.sql.cassandra")
               .options(Map("table"->"employee","keyspace"->"ctas_test" ))
               .load()
scala> df
      .createCassandraTable( "ctas_test", "employee4" )
scala> df
      .write
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "employee4", "keyspace" -> "ctas_test"))
      .save()
scala> val df2 = sqlContext
                .read
                .format("org.apache.spark.sql.cassandra")
                .options(Map( "table" -> "employee2", "keyspace" -> "ctas_test" ))
                .load()
scala> df2.count()
res1: Long = 100
scala> exit

$ cqlsh -k ctas_test
Connected to DSE_5.0.9 at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.13.1735 | DSE 5.0.9 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh:ctas_test> select count(*) from employee4 ;

count
-------
  100

(1 rows)

cqlsh:ctas_test>

Cheap boss got you down?

If "budget restrictions" keep you from being able to use DSE Max, here are some less than elegant, but doable, workarounds.

First, the most simplistic approach

If you want to straight copy a table contents to another table in the same cluster...

  • Create the destination table
  • Take a snapshot of the source table
  • Restore into the new table
  • Run nodetool refresh

It's a no-brainer, but if you are changing PK's or the table's DDL from source to destination is different enough that sstables won't load into the new table...

FEAR  NOT!

This is the best next thing you want to try, it should work reasonably well for small/medium tables:

cqlsh  -e "COPY keyspace.src_table    (col1, col2, ...,ColN ) TO STDOUT WITH HEADER=false;" \
|cqlsh -e "COPY keyspace.target_table (col1, col2, ...,ColN ) FROM STDIN;"

The first CQLSH part gathers the data from all nodes to a single pipe into the second CQLSH feeding the target table. Because of the bottleneck nature of this operation, it might not scale well for larger tables, but it is the easiest way if you do not have access to the shell to try snapshot/restore. It is necessary to create the target table with column data types compatible with the source table, and primary keys can be re-arranged on the target - a careful selection is time well-spent.

We can take advantage of all of the COPY command performance enhancements. Intermediate steps can be added when data needs to be "massaged" before streaming to the target There are cases when a subset of the source table is required.  If so, the COPY TO could be replaced with a SELECT before feeding to the target:

cqlsh  -e "SELECT col1, col2, ...,ColN FROM keyspace.src_table WHERE ;" \
|awk -F\| '( !/^\-+/ && !/^\([0-9]+ rows)/ && !/^$/ ){print $0}' \
|sed -e 's/^[ ]*//g' -e 's/[ ]*|[ ]*/|/g' \
|cqlsh -e "COPY keyspace.target_table (col1, col2, ...,ColN )FROM STDIN WITH DELIMITER = '|' AND HEADER = true;"

Auxiliary awk, sed & other bash niceties come into play nicely. Source and Target tables do not even need to be in the same cluster. The possibilities are endless - literally! A real world example would be:

$ cqlsh
Connected to DSE_5.0.9 at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.13.1735 | DSE 5.0.9 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ctas_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> use ctas_test ;


cqlsh:ctas_test> CREATE table employee (
id int PRIMARY KEY,
first_name text,
middle_name text,
last_name text, age int);


cqlsh:ctas_test> CREATE TABLE employee2 (
id int,
first_name text,
middle_name text,
last_name text,
age int,
PRIMARY KEY ((id, last_name)));


cqlsh:ctas_test> select COUNT(*) FROM  employee;
count
-------
  100
(1 rows)

cqlsh:ctas_test> select COUNT(*) FROM  employee2;
count
-------
   0
(1 rows)
cqlsh:ctas_test> exit

$ cqlsh -k ctas_test -e "COPY employee (id, last_name, first_name, middle_name, age) TO   STDOUT WITH HEADER=false;" \
|cqlsh -k ctas_test -e "COPY employee2 (id, last_name, first_name, middle_name, age) FROM STDIN  WITH HEADER=false;"

Using 15 child processes

Starting copy of ctas_test.employee2 with columns [id, last_name, first_name, middle_name, age].
[Use . on a line by itself to end input]
Processed: 100 rows; Rate:     130 rows/s; Avg. rate:     206 rows/s
100 rows imported from 1 files in 0.486 seconds (0 skipped).

$ cqlsh -k ctas_test
Connected to DSE_5.0.9 at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.13.1735 | DSE 5.0.9 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh:ctas_test> select COUNT(*) FROM  employee2;
count
-------
  100
(1 rows)
cqlsh:ctas_test> exit

For larger tables, we might need to parallelize. The larger the source table is, the simplistic approach might take forever. As users are always looking to go as fast as hardware would let them go... who are we to say no? What if we could unload/load data living only in the local node - well you got it!

The logic to accomplish this would be:

  • Get to know the local token ranges
  • Select the desired columns from the source table with CL=LOCAL_ONE
  • Massage the data before feeding into COPY FROM

A few things to consider:

  • Still need to create the target table beforehand
  • If the primary keys of the destination table differ from the source, the data might not stay local
  • If one or more SELECTS timeout, consider increasing the TIMEOUT - should that fail, you will need to break a token range into smaller sub-ranges  - adjust getDataByTokenRange::STP
  • Needs to be executed on every node of the local DC - really
  • Be sure to repair the source table beforehand
  • No need for stinking Spark - for those with Spark-phobia conditions, go figure!  

There is a lot of flexibility here - if you know Cassandra and bash, that is. 

Coded in BASH, the whole operation would look like this - it's NOT THAT INTIMIDATING:

#!/bin/bash
#
# Script to COPY from a SOURCE table's select columns over a TARGET table
# Assumes the following:
#
# * The SOURCE table is very large - otherwise just try:
#   cqlsh  -e "COPY keyspace.src_table (col1, col2, ...,ColN ) TO   STDOUT WITH HEADER=false" \
#   |cqlsh -e "COPY keyspace.tgt_table (col1, col2, ...,ColN ) FROM STDIN  WITH HEADER=false"

# * SOURCE AND TARGET TABLES are in the SAME KEYSPACE
# * TARGET columns are named the SAME as SOURCE
#
# The script sweeps thru the local tokens to copy only the local data over to the new table
# Therefore, this script needs to run on every node on the datacenter
#
# Set these variables before executing
#
USR=cassandra
PWD=password
KSP=my_keyspace
SRC=src_table
COL="col1, col2, col3"
PKY="col1"
TGT=tgt_table
TIMEOUT=300
CQLSH="cqlsh -u ${USR} -p ${PWD} -k ${KSP} --request-timeout=${TIMEOUT}"

#
# Get local token ranges - assumes VNODES
#
function getTokens(){
  for i in $($CQLSH -e "select tokens from system.local;" \
             |awk -F, '/{/{print $0}' | tr -d '{' | tr -d '}' | tr -d ','); do
      echo ${i//\'/}
  done | sort -n
}
#
# Adjust STP parameter if SELECT by token ranges timeout
# Practical STP values range: 10 - 256
#
function getDataByTokenRange(){
  i=0
  STP=1
  tokens=(-9223372036854775807 $(getTokens))
  while [ ${i} -lt ${#tokens[@]} ]; do
        [ ${i} -eq 0 ] && echo "SELECT ${COL} FROM ${SRC} WHERE token(${PKY}) <= ${tokens[i]} ALLOW FILTERING;"
        if [ "${tokens[i+1]}" ]; then
           if [ "${STP}" -gt 1 ]; then
              j=0
              range=$(echo "(${tokens[i+1]} - ${tokens[i]})" | bc -l)
              step=$(echo "scale=0; ${range} /${STP}" | bc -l)
              if (( ${range} % ${STP} == 0 )); then
                 subTokens=($(seq ${tokens[i]} ${step} ${tokens[i+1]}))
              else
                 subTokens=($(seq ${tokens[i]} ${step} ${tokens[i+1]}) ${tokens[i+1]})
              fi
              while [ ${j} -lt ${#subTokens[@]} ] && [ "${subTokens[j+1]}" ]; do
                    echo "SELECT ${COL} FROM ${SRC} WHERE token(${PKY}) > ${subTokens[j]} AND token(${PKY}) <= ${subTokens[j+1]} ALLOW FILTERING;"
                    ((j++))
              done
           else
              echo "SELECT ${COL} FROM ${SRC} WHERE token(${PKY}) >  ${tokens[i]} AND token(${PKY}) <= ${tokens[i+1]};"
           fi
        fi
        [ ! "${tokens[i+1]}" ] && echo "SELECT ${COL} FROM ${SRC} WHERE token(${PKY}) >  ${tokens[i]} ALLOW FILTERING;"
        ((i++))
  done
}


#
# Execute CQL statemenrs with CL=LOCAL_ONE
#
function cqlExec(){
 while IFS='' read -r cql || [[ -n "$line" ]]; do
   $CQLSH -e "CONSISTENCY LOCAL_ONE; $cql"                                          \
   |awk -F\| '( !/LOCAL_ONE/ && !/^\-+/ && !/^\([0-9]+ rows)/ && !/^$/ ){print $0}' \
   |sed -e 's/^[ ]*//g' -e 's/[ ]*|[ ]*/|/g'                                        \
   |$CQLSH -e "COPY ${TGT} (${COL}) FROM STDIN WITH DELIMITER = '|' and HEADER=true;"
   [ "$?" -gt 0 ] && echo "ERROR: Failed to import data from command: ${command}"
 done < "$1"
}

#
# Where everything is lined up
#
main(){
  echo "Begin processing ..."
  getDataByTokenRange > getDataByTokenRange.ddl
  cqlExec getDataByTokenRange.ddl
  echo "End procesing"
}

main

A real world example of this script would be:

  • First we put the contents of the bash script to a file named: copySrc2Tgt.sh
  • Grant execution permissions to the script: chmod +x copySrc2Tgt.sh
  • Customize the script parameters:
    • USR=cassandra
    • PWD=password
    • KSP=ctas_test
    • SRC=employee
    • COL="id, last_name, first_name, middle_name, age"
    • PKY="id"
    • TGT=employee2
    • TIMEOUT=300
  • Invoke the script

Execution of the script would look like this:

$ ./copySrc2Tgt.sh

Begin processing …
...
Using 15 child processes
Starting copy of ctas_test.employee2 with columns [id, last_name, first_name, middle_name, age].
[Use . on a line by itself to end input]
Processed: 19 rows; Rate:      36 rows/s; Avg. rate:      52 rows/s
19 rows imported from 1 files in 0.364 seconds (0 skipped).
…
Using 15 child processes
Starting copy of ctas_test.employee2 with columns [id, last_name, first_name, middle_name, age].
[Use . on a line by itself to end input]
Processed: 1 rows; Rate:       2 rows/s; Avg. rate:       3 rows/s
1 rows imported from 1 files in 0.364 seconds (0 skipped).
End processing


$ cqlsh -k ctas_test
Connected to DSE_5.0.9 at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.13.1735 | DSE 5.0.9 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh:ctas_test> select COUNT(*) FROM  employee2;

count
-------
  100
(1 rows)

Warnings :
Aggregation query used without partition key
cqlsh:ctas_test> exit

Again, there is a LOT of granularity and flexibility. This could be used to export data between clusters - CQLSH does not care as long as you code it properly. wink

Conclusion

There are several ways to accomplish CTAS on DataStax Enterprise:

  • DSE SparkSQL copying to a Cassandra table or creatting a table in HiveMetaStore
  • DSE Spark  
  • Cassandra backup/restore when the table’s DDL match
  • CQLSH COPY when DDL does not match
  • CQLSH select feeding CQLSH COPY when a subset is needed or data needs to be manipulated

We hope you find this helpful. Go and try them all and choose the one that works best for you!   (smile)