scala - Set Cassandra Clustering Order on TableDef with Datastax's Spark Cassandra Connector -
every time try create new table in cassandra new tabledef
end clustering order of ascending , i'm trying descending.
i'm using cassandra 2.1.10, spark 1.5.1, , datastax spark cassandra connector 1.5.0-m2.
i'm creating new tabledef
val table = tabledef("so", "example", seq(columndef("parkey", partitionkeycolumn, texttype)), seq(columndef("ts", clusteringcolumn(0), timestamptype)), seq(columndef("name", regularcolumn, texttype))) rdd.saveascassandratableex(table, somecolumns("key", "time", "name"))
what i'm expecting see in cassandra is
create table so.example ( parkey text, ts timestamp, name text, primary key ((parkey), ts) ) clustering order (ts desc);
what end
create table so.example ( parkey text, ts timestamp, name text, primary key ((parkey), ts) ) clustering order (ts asc);
how can force set clustering order descending?
i not able find direct way of doing this. additionally there lot of other options may want specify. ended extending columndef
, tabledef
, overriding cql
method in tabledef
. example of solution came below. if has better way or becomes natively supported i'd happy change answer.
// scala enum object clusteringorder { abstract sealed class order(val ordinal: int) extends ordered[order] serializable { def compare(that: order) = that.ordinal compare this.ordinal def toint: int = this.ordinal } case object ascending extends order(0) case object descending extends order(1) def fromint(i: int): order = values.find(_.ordinal == i).get val values = set(ascending, descending) } // extend columndef case class add enum support class columndefex(columnname: string, columnrole: columnrole, columntype: columntype[_], indexed: boolean = false, val clusteringorder: clusteringorder.order = clusteringorder.ascending) extends columndef(columnname, columnrole, columntype, indexed) // mimic columndef object object columndefex { def apply(columnname: string, columnrole: columnrole, columntype: columntype[_], indexed: boolean, clusteringorder: clusteringorder.order): columndef = { new columndefex(columnname, columnrole, columntype, indexed, clusteringorder) } def apply(columnname: string, columnrole: columnrole, columntype: columntype[_], clusteringorder: clusteringorder.order = clusteringorder.ascending): columndef = { new columndefex(columnname, columnrole, columntype, false, clusteringorder) } // copied columndef object def apply(column: columnmetadata, columnrole: columnrole): columndef = { val columntype = columntype.fromdrivertype(column.gettype) new columndefex(column.getname, columnrole, columntype, column.getindex != null) } } // extend tabledef case class override cql method class tabledefex(keyspacename: string, tablename: string, partitionkey: seq[columndef], clusteringcolumns: seq[columndef], regularcolumns: seq[columndef], options: string) extends tabledef(keyspacename, tablename, partitionkey, clusteringcolumns, regularcolumns) { override def cql = { val stmt = super.cql val ordered = if (clusteringcolumns.size > 0) s"$stmt\r\nwith clustering order (${clusteringcolumnorder(clusteringcolumns)})" else stmt appendoptions(ordered, options) } private[this] def clusteringcolumnorder(clusteringcolumns: seq[columndef]): string = clusteringcolumns.map { col => col match { case c: columndefex => if (c.clusteringorder == clusteringorder.descending) s"${c.columnname} desc" else s"${c.columnname} asc" case c: columndef => s"${c.columnname} asc" } }.tolist.mkstring(", ") private[this] def appendoptions(stmt: string, opts: string) = if (stmt.contains("with") && opts.startswith("with")) s"$stmt\r\nand ${opts.substring(4)}" else if (!stmt.contains("with") && opts.startswith("and")) s"with ${opts.substring(3)}" else s"$stmt\r\n$opts" } // mimic tabledef object return new tabledefex object tabledefex { def apply(keyspacename: string, tablename: string, partitionkey: seq[columndef], clusteringcolumns: seq[columndef], regularcolumns: seq[columndef], options: string = "") = new tabledefex(keyspacename, tablename, partitionkey, clusteringcolumns, regularcolumns, options) def fromtype[t: columnmapper](keyspacename: string, tablename: string): tabledef = implicitly[columnmapper[t]].newtable(keyspacename, tablename) }
this allowed me create new tables in manner:
val table = tabledefex("so", "example", seq(columndef("parkey", partitionkeycolumn, texttype)), seq(columndefex("ts", clusteringcolumn(0), timestamptype, clusteringorder.descending)), seq(columndef("name", regularcolumn, texttype))) rdd.saveascassandratableex(table, somecolumns("key", "time", "name"))
Comments
Post a Comment