Wednesday, October 30, 2013

A Small Insight to Cassandra

Abstract

Cassandra is distributed storage system that has data being spread over many servers located in different data centers located geographically. The data is highly available with no single point of failure. In many ways Cassandra resembles a database but its does not support a full relational data model rather its follows a simple data models and thus allows having a dynamic control over our data layout.
It has been designed to run on cheap infrastructure having both write and read efficiency. It was designed to have multi datacenter support from the very beginning. It plays the best when it comes to cross data center replication. It would fit for an application which has some hundred ‘s millions of users geographically and the application’s data also needs to be geographically present in datacenters to minimize the latencies.

Interesting Design Beneath

Management of data

The design mechanism that is used by Cassandra to store data is very similar to log-structured merge trees. It’s not a B-Tree. Cassandra storage engine sequentially writes to disk in append mode storing the data contiguously. Cassandra works extremely well with SSD (Solid state Disks). It minimizes the wear and tear on the SSD by making minimal I/O operations.
Throughput and latency both are key performance factors when working with Cassandra. In Cassandra all the operation are performed in parallel so both latency and throughput are independent. Writes are very efficient in Cassandra. When random writes of small amount of data is done, Cassandra reads in the SSD sector. The Log-Structured design obviates the disk seeks. As when the database updates are received Cassandra does not overwrites the row in place modifying the data on the disk. It eliminates the disk data modification and erase block cycles to save time. It rewrites the entire sector back. The operational design nicely integrates with operating systems page cache because Cassandra does not modify the data so the dirty pages that would have been generated to be flushed out never got generated.

Writes Mechanism

The consistency of Cassandra is one of the important features and needs to be appreciated. This actually refers to how a row is up to date and synchronized across all of the replicas. Cassandra uses the hinted hand off mechanism. This allows Cassandra to offer full write availability and improves the response consistency after temporary outage.
Cassandra uses the architecture of SSTables (Sorted String Tables) and MemTable. When writes occur in Cassandra the data are stored in memory structure called the MemTable. Cassandra can manage the memory that needs to be allocated to the MemTable or we can configure it. It performs better writes than relation databases because in Cassandra the tables are not related so it does not have to perform additional work to maintain the integrity. In the process of writing to MemTable it also write the commit log on to the disk. Once the data in MemTable reaches to a certain extent its flushes the data to SSTables. The MemTable and SSTables are maintained per table. The SSTables are immutable so update on a row is stored over multiple SSTables.

Read Mechanism

The reads in Cassandra are parallel and with low latency. The Bloom Filter is a mechanism used, which tells Cassandra about the probability of the data requested based on the partition key in the SSTables before doing any disk I/O. When the read request for a row comes to a node. Cassandra uses the level compaction strategy, which prevents the data from being fragmented. Cassandra also uses the caching feature it provides partition key cache and well as the row cache which allows access of data much faster.



Features to admire in Cassandra


Order of magnitude more data per node in the cluster

With a typical java based system like Cassandra we can allocate 8GB of max memory. Having more than 8GB of memory can also the garbage collection to pause in the young generation. In Cassandra it has been very much thought through to minimize the fragmentation of data.
In version 1.1 of Cassandra the Bloom filter, Partition Key Cache, Partition summary and Compression offset all were stored in heap memory. Out of the four components the Partition Key Cache is has the fixed size where other three components grows with a direct proportion to data. So to maximize the memory through put in version 2.0 the Bloom filter, Partition Key Cache, Partition summary and Compression offset were moved off heap.

Smoother compaction throttling

The compaction throttling allows setting a target rate of how much IO you want the compaction system to use. Tuning the compaction throttle maximizes the reads and writes in Cassandra. This was tuned further more in version 1.2.5.

No manual token management

When a node was added to the cluster, there is mechanism in the bootstrap to identify the token for the node to join the cluster by bisecting the range of the heavily loaded node in the cluster. This mechanism actually never worked as expected. The best way to tackle was to add a token manually for the Node to join the cluster. To solve the problem the token range bisection was removed and replaced with virtual nodes. The virtual node allows each of the nodes to have large no of small partition ranges distributed through out the cluster. Virtual node uses a mechanism for constant hashing to distribute the data, which limits the generation and assignment of token.

  • Balancing was taken care by evenly distributing the data when a new node joined the cluster.
  • When a node failed the data was distributed evenly between the nodes in the cluster.
  • Rebuilding the dead node was potentially faster because the data from the other node were sent to replacement node incrementally instead of waiting for validation.
  • Finally we are free from the calculation of token or assigning tokens to each node.
  • Concurrent streaming.

Consistency

Cassandra offers an eventual consistency but you can opt for a tunable consistency. Eventual consistency is always concurrent which means operations are always going on at the same time in the cluster. Cassandra is based on the CAP (Consistency, Atomicity and Partition tolerance) theorem and it cannot satisfy all the three at the same time so we can tune it according to our need.  It has got also serial consistency but using the serial consistency, we may need to compromise with performance.

Triggers

Having trigger gives a bonus point to use Cassandra for immediate consistency. The trigger can be implemented by implementing the Itrigger interface. This basically deals with RowMutation and ColumnFamily objects.

Data model

Cassandra has a rich data model. Its basically based on schema optional and column oriented data model. In Cassandra Keyspace is the container for you application data. Inside the keyspace we have one or more than one column family. Column families are a set of columns, which is identified by row key. Cassandra doesn’t enforce relationship between Column families the way relational database does. There are no foreign keys and joins in column family at query time is not supported which helps Cassandra performs more faster.

Working with Cassandra


Working with Cassandra has become much simpler and easier when compared to the Thrift based API’s that exposed the internal storage structure of Cassandra.

CQL

The CQL is a structured query language to query Cassandra. CQL is based on the data model of partitioned row store with tunable consistency. Tables can be created, altered and dropped at runtime without blocking updates and queries. It doesn’t support joins and sub queries.

Java Drivers

There are many java API’s available in the open. We have Pelops, Kundera, Hector, Thrift, Virgil, Datastax java driver and Astyanax to name a few. We are going to mostly focus on Astyanax. Well Astyanax evolved at Netflix as a solution to a problem, which they faced while interacting with Cassandra. Astyanax can be said to be refractored version of Hector. It has got few advantages
Ease of uses of the API
Composite column support
Connection pooling
Latency
Documented
Break Object into multiple chunks.
One row per chunk
Parallelize upload
Retry small chunk instead of entire object
Eliminate hot spot
Astyanax has been supporting the Cassandra 1.2 version and the official release for support of Cassandra 2.0 is yet to come.

References


4.   Tech talks from Jonathan Ellis, Christos Kalantzis, Patrick Mc Fadin.

Java: Creating a Binary Tree, Check for leaf levels,Tree Traversal , Height and Size of the Tree

==============================================

JAVA CODE FOR CREATING A BINARY TREE
==============================================

package com.sachi.test.tree;

import java.util.List;

public class Node {

private Node leftNode;
private Node rightNode;
private String nodeData;
public Node(String nodeData){
this.nodeData=nodeData;
this.leftNode=null;
this.rightNode=null;
}
public Node(String nodeData, Node leftNode, Node rightNode){
this.nodeData=nodeData;
this.leftNode=leftNode;
this.rightNode=rightNode;
}
public boolean isInnerNode(){
return leftNode!=null || rightNode!=null;
}
public boolean isLeaf(){
return leftNode==null && rightNode==null;
}
public boolean hasLeft() {
return leftNode != null;
}

public boolean hasRight() {
return rightNode != null;
}
public Node getLeftNode(){
return leftNode;
}
public Node getRightNode(){
return rightNode;
}
public String getNodeData(){
return nodeData;
}
public void setNodeData(String data){
nodeData=data;
}
/**
* RETURNS THE SIZE OF THE TREE
*/
public int getSize(){
int num=1;
if (hasLeft()){
num+=leftNode.getSize();
}
if (hasRight()){
num+=rightNode.getSize();
}
return num;
}
/**
* RETURNS THE HEIGHT OF THE TREE>
*/
public int treeHeight(){
if (isLeaf()){return 0;}
else
{
int h=0;
if (hasLeft()){
h=Math.max(h,leftNode.treeHeight());
}
if(hasRight()){
h=Math.max(h,rightNode.treeHeight());
}
return h+1;
}
}
/**
* RETURNS THE HEIGHT OF THE LEFT SUBTREE>
*/
public int leftTreeHeight(){
if (leftNode==null){return 0;}
else
{
int h=0;
if (hasLeft()){
h=Math.max(h,leftNode.leftTreeHeight());
}
return h+1;
}
}
/**
* RETURNS THE HEIGHT OF THE RIGHT SUBTREE>
*/
public int rightTreeHeight(){
if (rightNode==null){return 0;}
else
{
int h=0;
if(hasRight()){
h=Math.max(h,rightNode.rightTreeHeight());
}
return h+1;
}
}
public String toString() {
return toStringHelper("");
}
private String toStringHelper(String indent) {
String ret = "";
if (hasRight()) 
ret += rightNode.toStringHelper(indent+"  ");
ret += indent + nodeData + "\n";
if (hasLeft()) 
ret += leftNode.toStringHelper(indent+"  ");
return ret;
}
/**
* CHECK THE LEAVES LEVEL ARE SAME OR NOT
* @return true/false
*/
public boolean checktheLevelofLeavesSame(){
System.out.println("Height of right sub tree : "+ rightTreeHeight());
System.out.println("Height of left sub tree : "+ leftTreeHeight());
if (rightTreeHeight()==leftTreeHeight()){
return true;
}else{
return false;
}
}
/**
* @param dataList
* PREORDER TRAVERSAL OF THE TREE
*/
public void preorderTraversal(List<String> dataList){
dataList.add(nodeData);
if(this.hasLeft()){
leftNode.preorderTraversal(dataList);
}
if(this.hasRight()){
rightNode.preorderTraversal(dataList);
}
}
/**
* @param dataList
* POSTORDER TRAVERSAL OF THE TREE
*/
public void postorderTraversal(List<String> dataList){
if(this.hasLeft()){
leftNode.postorderTraversal(dataList);
}
if(this.hasRight()){
rightNode.postorderTraversal(dataList);
}
dataList.add(nodeData);
}
/**
* @param dataList
* INORDER TRAVERSAL OF THE TREE
*/
public void inorderTraversal(List<String> dataList){
if(this.hasLeft()){
leftNode.inorderTraversal(dataList);
}
dataList.add(nodeData);
if(this.hasRight()){
rightNode.inorderTraversal(dataList);
}
}
public static void main(String [] args) {
Node leftChild=new Node("domestic",new Node("dog"), new Node("cat"));
Node rightChild=new Node("wild",new Node("lion"), new Node("tiger"));
Node tree=new Node("animals",leftChild, rightChild);
System.out.println("Tree is : "+ tree );
System.out.println("Size of tree = " + tree.getSize());
System.out.println("Height of tree = " + tree.treeHeight());
System.out.println("checking if leaves are at same level = " + tree.checktheLevelofLeavesSame());
}
}

Monday, October 14, 2013

Understanding Strong Reference,Weak reference, Soft reference and Phantom reference.

There are basically four different degrees of reference 
  • Strong Reference 
  • Weak Reference 
  • Soft reference
  • Phantom Reference
Lets discuss about the four in detail.

Strong Reference

Strong Reference are the one which is you see everyday in your code , an example below.
StringBuilder sb= new StringBuilder();
The above creates a new StringBuilder and stores the strong reference in variable "sb".The most significant part is what makes them strong, that can be determined how they interact with garbage collector. If the object is reachable via a chain of strong references then those objects are not eligible for garbage collection. This are objects that you are currently working on and this is what is needed. So strong references are the references of the objects which are forced to remain in memory.

Weak Reference

The Weak reference are reference of the objects which are almost useless and are eligible for clean up. So the reference which are not strong enough to force an object to be in memory. This allows you to leverage the garbage collector ability to determine the reachability. We can use the "WeakHashMap" , the WeakHashMap is just like the HashMap but the keys in the WeakHashMap uses weak reference. If the Key is no longer used then the entry is removed automatically.

Soft Reference

The Soft Reference are just the same as Weak Reference but they are not eager to release the object as the Weak Reference does. The Objects which with soft references will stick around in the memory for a while. They remain in memory as long as memory is available for expansion.

Phantom Reference

Phantom reference completely different from weak and soft references. The hold on the object is so tenuous that the object cannot be retrieved. The get method on the reference always returns null.The Phantom reference are enqueued only when the object is removed from memory. The advantages of using phantom reference , one it allows you to determine when the object was exactly removed from the memory. second to avoid the fundamental problem with finalization. Finalize() methods can "resurrect" objects by creating new strong references to them. So what, you say? Well, the problem is that an object which overrides finalize() must now be determined to be garbage in at least two separate garbage collection cycles in order to be collected. When the first cycle determines that it is garbage, it becomes eligible for finalization. Because of the (slim, but unfortunately real) possibility that the object was "resurrected" during finalization, the garbage collector has to run again before the object can actually be removed. And because finalization might not have happened in a timely fashion, an arbitrary number of garbage collection cycles might have happened while the object was waiting for finalization. This can mean serious delays in actually cleaning up garbage objects, and is why you can get OutOfMemoryErrors even when most of the heap is garbage.

Intern() in String.

What is intern()

There are two different ways to compare objects in Java. You can use the == operator, or you can use the equals() method. The == operator compares whether two references point to the same object, whereas the equals() method compares whether two objects contain the same data.

One of the java refresher and suggestion to  use equals(), not ==, to compare two strings. If you compare, say, new String("myfirstjavaprogram") == new String("myfirstjavaprogram"), you will in fact receive false, because they are two different string instances. If you use equals() instead, you will receive true, just as you'd expect. Unfortunately, the equals() method can be fairly slow, as it involves a character-by-character comparison of the strings.

Since the == method compares identity, all it has to do is compare two pointers to see if they are the same, and obviously it will be much faster than equals(). So if you're going to be comparing the same strings repeatedly, you can get a significant performance advantage by reducing it to an identity comparison rather than an equality comparison. The basic algorithm is:
  • Create a hash set of Strings
  • Check to see if the String you're dealing with is already in the set
  • If so, return the one from the set
  • Otherwise, add this string to the set and return it

After following this algorithm, you are guaranteed that if two strings contain the same characters, they are also the same instance. This means that you can safely compare strings using == rather than equals(), gaining a significant performance advantage with repeated comparisons.
Fortunately, Java already includes an implementation of the algorithm above. It's the intern() method on java.lang.Stringnew String("Hello").intern() == new String("Hello").intern() returns true, whereas without the intern() calls it returns false.
All constant strings that appear in a class are automatically interned. This includes both your own constants (like the above "version" string) as well as other strings that are part of the class file format -- class names, method and field signatures, and so forth. It even extends to constant string expressions: "Hel" + "lo" is processed by javac exactly the same as "Hello", and "Hel" + "lo" == "Hello" will return true.
So the result of calling intern() on a constant string like "version" is by definition going to be the exact same string you passed in. "version" == "version".intern(), always. You only need to intern strings when they are not constants, and you want to be able to quickly compare them to other interned strings.
There can also be a memory advantage to interning strings -- you only keep one copy of the string's characters in memory, no matter how many times you refer to it. That's the main reason why class file constant strings are interned: think about how many classes refer to (say) java.lang.Object. The name of the class java.lang.Object has to appear in every single one of those classes, but thanks to the magic of intern(), it only appears in memory once.
The bottom line? intern() is a useful method and can make life easier -- but make sure that you're using it responsibly

Friday, October 11, 2013

External Sorting and Use.

External Sorting 

From the Wiki, External sorting is a term for a class of sorting algorithms that can handle massive amounts of data. External sorting is required when the data being sorted do not fit into the main memory of a computing device (usually RAM) and instead they must reside in the slower external memory (usually a hard drive). External sorting typically uses a hybrid sort-merge strategy. In the sorting phase, chunks of data small enough to fit in main memory are read, sorted, and written out to a temporary file. In the merge phase, the sorted subfiles are combined into a single larger file. External sorting is a form of distribution sort,


Problem


We are given a very large text file of size say 8 GB and we need to sort the file and store it in an output location. But the memory (RAM) is limited, say 1 GB.



Solution

We will only read the input file line by line and store it into the TreeSet. This step is similar to above, but we would not store the entire file in the TreeSet. If the file is 10 GB and our RAM capacity is 1 GB, we will store 512MB data in the TreeSet. Once it becomes 512 MB we flush it into disk, on a temporary file (call it temp1.txt). Repeat this procedure of writing into temporary files, till you read 10 GB completely. So we will have 20 sorted temporary files. Next step is to merge these 20 files into a single sorted file and delete the temporary files. We call it a K-Way merge algorithm.

Next utilize the Memory-Mapped-Files (MMAP). For this we need to consider how Java does file IO. Data in the file system (secondary storage or hard-disk) has to be brought to memory (RAM). To do this, we can either read character by character, or read a chunk of data at a time. The time taken by file IO is huge, compared to the CPU processing time. So its better to reduce the frequency of file reads, hence we use BufferedReader.
But if we use MMAP, then a file or portion of a file can be mapped directly to memory. To achieve this, it uses SWAP space (or virtual memory). Every system has this virtual memory and is different from the RAM. In Java, objects are created on the Heap or RAM or memory as we call it. But if we bring data to swap space then the data can be read directly without an operating system call (which is magnitude of times slower). You cannot Memory Map a full 10 GB file, so we can MMAP a portion of the file. The other advantage of MMAP is, multiple processes can share the same file (thread-safe). In Java, we have the classes for this in NIO package. FileChannel is the class to hold the data.

With this approach, We can memory map the 400 MB file for 8KB size. Read this 8KB data and store it in a TreeSet. Repeat 5000 times, and store the 40 MB data in a temporary file (we will have 10 temporary sorted files). Then apply k-way merge.




The Java Program


package com.sachi.datastruct;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;

/**
 * Sort a 400MB file using FileChannel
 * 
 *
 */
public class ExternalFileSort {

  private static final int DEFAULT_BUFFER_SIZE = 1024 * 8;

/**
* @param args
*/
public static void main(String[] args) throws Exception {

long t1 = new Date().getTime();
int i = 0;
FileChannel source = new FileInputStream(new File("path to file")).getChannel();
ByteBuffer buf = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);

int j = 0;
FileChannel destination = new FileOutputStream(new File("path to temp file" +i+".txt")).getChannel();
while((source.read(buf)) != -1) {
buf.flip();
destination.write(buf);
if( j == 5000 ) {
i++;
destination.close();
destination = new FileOutputStream(new File("path to temp file" +i+".txt")).getChannel();
j=0;
} else {
j++;
}
buf.clear();
}

source.close();
Map<String, Integer> map = new TreeMap<String, Integer>();

BufferedReader[] brArr = new BufferedReader[i+1];
for(j=0; j<=i; j++) {
brArr[j] = new BufferedReader(new FileReader(new File("path to temp file" +j+".txt")));
map.put(brArr[j].readLine(), j);
}


BufferedWriter bw = new BufferedWriter(new FileWriter(new File("path to output file")));

String s = null;
String endofline = "\n";
while(!map.isEmpty()) {
s = map.keySet().iterator().next();
i = map.get(s);
map.remove(s);
bw.write(s);
bw.write(endofline);
s = brArr[i].readLine();
if(s != null ) {
map.put(s, i);
}
}
bw.close();

for(j=0; j<brArr.length; j++) {
brArr[j].close();
new File("path to temp file" +j+".txt").delete();
}

long t2 = new Date().getTime();
System.out.println("Time taken = " +(t2-t1)/1000 + " sec");

}

}