Friday, November 29, 2013

Oozie: The Workflow engine

Oozie is a server based Workflow Engine specialized in running workflow jobs with actions that run Hadoop Map/Reduce and Pig jobs

Oozie have three levels of meaning:

A server based workflow engine , a server based Coordinator Engine and a server based Bundle Engine .Oozie is a Java Web-Application that runs in a Java servlet-container

Oozie can store and run different type of hadoop jobs(mapreduce,hive,pig,and so on),can run workflow jobs based on time and data triggers,also can manage  batch   coordinator applications.

Oozie has been designed to scale, and it can manage the timely execution of thousands of workflow in a Hadoop cluster, each composed of possibly dozens of constituent jobs. Oozie workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph), specifying a sequence of actions execution. This graph is specified in hPDL (a XML Process Definition Language).


hPDL is a fairly compact language, using a limited amount of flow control and action nodes. Control nodes define the flow of execution and include beginning and end of a workflow (start, end and fail nodes) and mechanisms to control the workflow execution path ( decision, fork and join nodes). Action nodes are the mechanism by which a workflow triggers the execution of a computation/processing task. Oozie provides support for the following types of actions: Hadoop map-reduce, Hadoop file system, Pig, Java and Oozie sub-workflow (SSH action is removed as of Oozie schema 0.2).

All computation/processing tasks triggered by an action node are remote to Oozie - they are executed by Hadoop Map/Reduce framework. This approach allows Oozie to leverage existing Hadoop machinery for load balancing, fail over, etc. The majority of these tasks are executed asynchronously (the exception is the file system action that is handled synchronously). This means that for most types of computation/processing tasks triggered by workflow action, the workflow job has to wait until the computation/processing task completes before transitioning to the following node in the workflow. Oozie can detect completion of computation/processing tasks by two different means, callbacks and polling. When a computation/processing tasks is started by Oozie, Oozie provides a unique callback URL to the task, the task should invoke the given URL to notify its completion. For cases that the task failed to invoke the callback URL for any reason (i.e. a transient network failure) or when the type of task cannot invoke the callback URL upon completion, Oozie has a mechanism to poll computation/processing tasks for completion.

Oozie workflows can be parameterized (using variables like ${inputDir} within the workflow definition). When submitting a workflow job values for the parameters must be provided. If properly parameterized (i.e. using different output directories) several identical workflow jobs can concurrently.

Some of the workflows are invoked on demand, but the majority of times it is necessary to run them based on regular time intervals and/or data availability and/or external events. The Oozie Coordinator system allows the user to define workflow execution schedules based on these parameters. Oozie coordinator allows to model workflow execution triggers in the form of the predicates, which can reference to data, time and/or external events. The workflow job is started after the predicate is satisfied.

It is also often necessary to connect workflow jobs that run regularly, but at different time intervals. The outputs of multiple subsequent runs of a workflow become the input to the next workflow. Chaining together these workflows result it is referred as a data application pipeline. Oozie coordinator support creation of such data Application pipelines. 



Installing Oozie

Step-1: Prerequisites

You can  follow the instruction provides by oozie office website,to match the right version hadoop stack software. In this tutorial we using oozie version is 3.0.2  which  accesses  available on github ,its System Requirements as follow:

-Unix (tested in Linux and Mac OS X)  .We used  Ubuntu lucid– Server Version in .
-Java 1.6+
-Hadoop
-Apache Hadoop (tested with 0.20.2)
-Yahoo! Hadoop (tested with 0.20.104.2)
-ExtJS library (optional, to enable Oozie webconsole)
-ExtJS 2.2


Step-2: Server Installation

-Download or build an Oozie binary distribution https://github.com/yahoo/oozie/downloads
-Download a Hadoop binary distribution  http://www.us.apache.org/dist/hadoop/common/hadoop-0.20.2/
-Download ExtJS library (it must be version 2.2)  http://extjs.com/deploy/ext-2.2.zip
-Expand  two packages –oozie and hadoop distribution tar.gz as the oozie Unix user which recommended   by office document  in  server installation .Commands as shown below:

->oozie@dm4:~$ tar zxvf oozie-3.0.2-distro.tar.gz -C {oozie home}
->oozie@dm4:~$ tar zxvf hadoop-0.20.2.tar.gz -C {hadoop home}

-Make up oozie.war.Oozie should run on hadoop but its distribution bundle without hadoop jar files and without the ExtJS library(because of  they under different licenses ).We have to run oozie setup shell to pack the required hadoop jar files and optional ExtJS library so as to enable  the Oozie web-console. Oozie Server scripts run only under the Unix user that owns the Oozie installation directory, if necessary use sudo -u OOZIE_USER when invoking the scripts.Commands as shown below:
->$ bin/oozie-setup.sh -hadoop 0.20.2 ${HADOOP_HOME} -extjs /tmp/ext-2.2.zip

-Start up oozie and edit oozie configuration.To start Oozie as a daemon process run:
->$ bin/oozie-start.sh

-Using the Oozie command line tool check the status of Oozie:
->$ bin/oozie admin -oozie http://localhost:11000/oozie -status

Using a browser go to the Oozie web console , Oozie status should be NORMAL .If the status is HTTP 404 Not Found,you can edit the configuration file to fix it.Open conf/oozie-default.xml with vim,copy the property “oozie.services”  into oozie-site.xml. In oozie-site.xml’s  ”oozie.services”  property,one of the service name is “KerberosHadoopAccessorService” .Remove only ”Kerberos” which will make it “HadoopAccessorService”. And then restart oozie.

S.O.L.I.D: Class Design Principles in Java

Understanding S.O.L.I.D Class Design Principles 

Classes are the building blocks of your java application. If these blocks are not strong, your building (i.e. application) is going to face the tough time in future. This essentially means that not so well-written can lead to very difficult situations when the application scope goes up or application faces certain design issues either in production or maintenance.

On the other hand, set of well designed and written classes can speed up the coding process by leaps and bounds, while reducing the number of bugs in comparison.

In this post, I will list down 5 most recommended design principles, you should keep in mind, while writing your classes. These design principles are called SOLID, in short. They also form the best practices to be followed for designing your application classes.


Single Responsibility Principle

The Single Responsibility Principle (SRP) states that there should never be more than one reason for a class to change. This means that you should design your classes so that each has a single purpose. This does not mean that each class should have only one method but that all of the members in the class are related to the class's primary function. Where a class has multiple responsibilities, these should be separated into new classes.

When a class has multiple responsibilities, the likelihood that it will need to be changed increases. Each time a class is modified the risk of introducing bugs grows. By concentrating on a single responsibility, this risk is limited.


Open / Closed Principle

The Open / Closed Principle (OCP) specifies that software entities (classes, modules, functions, etc.) should be open for extension but closed for modification. The "closed" part of the rule states that once a module has been developed and tested, the code should only be adjusted to correct bugs. The "open" part says that you should be able to extend existing code in order to introduce new functionality.
As with the SRP, this principle reduces the risk of new errors being introduced by limiting changes to existing code.


Liskov Substitution Principle (LSP)

The Liskov Substitution Principle (LSP) states that "functions that use pointers or references to base classes must be able to use objects of derived classes without knowing it". When working with languages such as C#, this equates to "code that uses a base class must be able to substitute a subclass without knowing it". The principle is named after Barbara Liskov.
If you create a class with a dependency of a given type, you should be able to provide an object of that type or any of its subclasses without introducing unexpected results and without the dependent class knowing the actual type of the provided dependency. If the type of the dependency must be checked so that behaviour can be modified according to type, or if subtypes generated unexpected rules or side effects, the code may become more complex, rigid and fragile.


Interface Segregation Principle (ISP)

The Interface Segregation Principle (ISP) specifies that clients should not be forced to depend upon interfaces that they do not use. This rule means that when one class depends upon another, the number of members in the interface that is visible to the dependent class should be minimized.

Often when you create a class with a large number of methods and properties, the class is used by other types that only require access to one or two members. The classes are more tightly coupled as the number of members they are aware of grows. When you follow the ISP, large classes implement multiple smaller interfaces that group functions according to their usage. The dependents are linked to these for looser coupling, increasing robustness, flexibility and the possibility of reuse.


Dependency Inversion Principle (DIP)

The Dependency Inversion Principle (DIP) is the last of the five rules. The DIP makes two statements. The first is that high level modules should not depend upon low level modules. Both should depend upon abstractions. The second part of the rule is that abstractions should not depend upon details. Details should depend upon abstractions.

The DIP primarily relates to the concept of layering within applications, where lower level modules deal with very detailed functions and higher level modules use lower level classes to achieve larger tasks. The principle specifies that where dependencies exist between classes, they should be defined using abstractions, such as interfaces, rather than by referencing classes directly. This reduces fragility caused by changes in low level modules introducing bugs in the higher layers. The DIP is often met with the use of dependency injection.

Thread & Runnable In Java


Thread 

Thread in Java is an independent path of execution which is used to run two task in parallel. When two Threads run in parallel that is called multi-threading in Java. Java is multi-threaded from start and excellent support of Thread at language level e.g. java.lang.Thread class, synchronized keyword, volatile and final keyword makes writing concurrent programs easier in Java than any other programming language e.g. C++. Being multi-threaded is also a reason of Java's popularity and being number one programming language. On the other hand if your program divides a task between two threads it also brings lot of programming challenges and issues related to synchronization, deadlock, thread-safety and race conditions. In short answer of question What is Thread in Java can be given like "Thread is a class in Java but also a way to execute something in parallel independently in Java". Thread in Java requires a task which is executed by this thread independently and that task can be either Runnable or Callable.

Runnable

Runnable represent a task in Java which is executed by Thread. java.lang.Runnable is an interface and defines only one method called run(). When a Thread is started in Java by using Thread.start() method it calls run() method of Runnable task which was passed to Thread during creation. Code written inside run() method is executed by this newly created thread. Since start() method internally calls run() method its been a doubt among Java programmers that why not directly call the run() method. This is also asked as what is difference between start() and run() method in Java. Well when you call Runnable interface run() method directly , no new Thread will be created and task defined inside run() method is executed by calling thread.  There is another interface added in Java 1. 5 called Callable which can also be used in place of Runnable interface in Java. Callable provides additional functionality over Runnable in terms of returning result of computation. Since return type of run() method is void it can not return anything which is sometime necessary. On the other hand Callable interface defines call() method which has return type as Future which can be used to return result of computation from Thread in Java.

Example 

package sachi.test.threads;

public class Program{
  public static void main (String[] args) {
    Runner r = new Runner();
    Thread t1 = new Thread(r, "Thread A");
    Thread t2 = new Thread(r, "Thread B");
    Thread s1 = new Strider("Thread C");
    Thread s2 = new Strider("Thread D");
    t1.start();
    t2.start();
    s1.start();
    s2.start();
  }
}
class Runner implements Runnable {
  private int counter;
  public void run() {
    try {
      for (int i = 0; i != 2; i++) {
        System.out.println(Thread.currentThread().getName() + ": " 
              + counter++);
        Thread.sleep(1000);
      }
    }
    catch(InterruptedException e) {
      e.printStackTrace();
    }
  }
}

class Strider extends Thread {   
  private int counter;
  Strider(String name)    {
    super(name);
  }
  public void run()   {
    try {
      for (int i = 0; i != 2; i++) {
        System.out.println(Thread.currentThread().getName() + ": " 
            + counter++);
        Thread.sleep(1000);
      }
    }
    catch(InterruptedException e)     {
      e.printStackTrace();
    }
  }
}

Output:

Thread B: 1
Thread D: 0
Thread C: 0
Thread A: 0
Thread D: 1
Thread A: 3
Thread C: 1
Thread B: 2


Difference between Threads & Runnable.

1. Implementing Runnable is the preferred way to do it. Here, you’re not really specializing or modifying the thread’s behavior. You’re just giving the thread something to run. That means composition is the better way to go.

2. Java only supports single inheritance, so you can only extend one class.

3. Instantiating an interface gives a cleaner separation between your code and the implementation of threads.

4. Implementing Runnable makes your class more flexible. If you extend thread then the action you’re doing is always going to be in a thread. However, if you extend Runnable it doesn’t have to be. You can run it in a thread, or pass it to some kind of executor service, or just pass it around as a task within a single threaded application.

5. By extending Thread, each of your threads has a unique object associated with it, whereas implementing Runnable, many threads can share the same runnable instance.

Understanding Comparable and Comparator

In object oriented programming sometimes we need to compare the instances of the same class. Once the instances are comparable they cab be sorted. we are going to focus on designing a class and make its compare its instances by using "java.lang.Comparable" and "java.util.Comparator" interfaces.

Major difference between Comparable and Comparator is that former is used to define natural ordering of object e.g. lexicographic order for java.lang.String, while later is used to define any alternative ordering for an object.  Main usage of java.lang.Comparable and java.util.Comparator interface is for sorting list of objects in Java. For example to sort a list of Employee by there Id, we can use Comparable interface and to provide additional sorting capability, we can define multiple comparators e.g. AgeComparator to compare age of employee, SalaryComparator to compare salary of employees etc.  This brings another important difference between Comparator and Comparable interface in Java, you can have only one ordering via Comparable e.g. natural ordering, while you can define multiple Comparator

Comparable vs Comparator

1. Comparator interface is in java.util package, which implies it's a utility class, while Comparable interface is kept on java.lang package, which means it's essential for Java objects.

2. Based on syntax, one difference between Comparable and Comparator in Java is that former gives us compareTo(Object toCompare), which accepts an object, which now uses Generics from Java 1.5 onwards, while Comparator defines compare(Object obj1, Object obj2) method for comparing two object.

3. Continuing from previous difference between Comparator vs Comparable, former is used to compare current object, represented by this keyword, with another object, while Comparator compares two arbitrary object passed to compare() method in Java.

4. One of the key difference between Comparator and Comparable interface in Java is that, You can only have one compareTo() implementation for an object, while you can define multiple Comparator for comparing objects on different parameters e.g. for an Employee object, you can use compareTo() method to compare Employees on id,  known as natural ordering, while multiple compare() method to order employee on age, salary, name and city. It's also a best practice to declare Comparator as nested static classes in Java, because they are closely associated with objects they compare. 

5. Many Java classes, which make uses of Comparator and Comparable defaults to Comparable and provided overloaded method to work with arbitrary Comparator instance e.g. Collections.sort() method, which is used to sort Collection in Java has two implementation, one which sort object based on natural order i.e. by using java.lang.Comparable and other which accepts an implementation of java.util.Comparator interface.

6. One more key thing, which is not a difference but worth remembering is that both compareTo() and compare() method in Java must be consistent with equals() implementation i.e. if two methods are equal by equals() method than compareTo() and compare() must return zero. Failing to adhere this guideline, your object may break invariants of Java collection classes which rely on compare() or compareTo() e.g. TreeSet and TreeMap.

Example

package sachi.test.compare;

/*
 * Order class is a domain object which implements
 * Comparable interface to provide sorting on natural order.
 * Order also provides copule of custom Comparators to
 * sort object based uopn amount and customer
 */
import java.util.Comparator;

class Order implements Comparable<Order> {

    private int orderId;
    private int amount;
    private String customer;

    /*
     * Comparator implementation to Sort Order object based on Amount
     */
    public static class OrderByAmount implements Comparator<Order> {

        @Override
        public int compare(Order o1, Order o2) {
            return o1.amount > o2.amount ? 1 : (o1.amount < o2.amount ? -1 : 0);
        }
    }

    /*
     * Anohter implementation or Comparator interface to sort list of Order object
     * based upon customer name.
     */
    public static class OrderByCustomer implements Comparator<Order> {

        @Override
        public int compare(Order o1, Order o2) {
            return o1.customer.compareTo(o2.customer);
        }
    }

    public Order(int orderId, int amount, String customer) {
        this.orderId = orderId;
        this.amount = amount;
        this.customer = customer;
    }

  
    public int getAmount() {return amount; }
    public void setAmount(int amount) {this.amount = amount;}

    public String getCustomer() {return customer;}
    public void setCustomer(String customer) {this.customer = customer;}

    public int getOrderId() {return orderId;}
    public void setOrderId(int orderId) {this.orderId = orderId;}

    /*
     * Sorting on orderId is natural sorting for Order.
     */
    @Override
    public int compareTo(Order o) {
        return this.orderId > o.orderId ? 1 : (this.orderId < o.orderId ? -1 : 0);
    }
  
    /*
     * implementing toString method to print orderId of Order
     */
    @Override
    public String toString(){
        return String.valueOf(orderId);
    }

}



package sachi.test.compare;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
*
* Java program to test Object sorting in Java. This Java program
* test Comparable and Comparator implementation provided by Order
* class by sorting list of Order object in ascending and descending order.
* Both in natural order using Comparable and custom Order using Comparator in Java
*/
public class ObjectSortingExample {
public static void main(String args[]) {
     
        //Creating Order object to demonstrate Sorting of Object in Java
        Order ord1 = new Order(101,2000, "Sony");
        Order ord2 = new Order(102,4000, "Hitachi");
        Order ord3 = new Order(103,6000, "Philips");
      
        //putting Objects into Collection to sort
        List<Order> orders = new ArrayList<Order>();
        orders.add(ord3);
        orders.add(ord1);
        orders.add(ord2);
      
        //printing unsorted collection
        System.out.println("Unsorted Collection : " + orders);
      
        //Sorting Order Object on natural order - ascending
        Collections.sort(orders);
      
        //printing sorted collection
        System.out.println("List of Order object sorted in natural order : " + orders);
      
        // Sorting object in descending order in Java
        Collections.sort(orders, Collections.reverseOrder());
        System.out.println("List of object sorted in descending order : " + orders);
              
        //Sorting object using Comparator in Java
        Collections.sort(orders, new Order.OrderByAmount());
        System.out.println("List of Order object sorted using Comparator - amount : " + orders);
      
        // Comparator sorting Example - Sorting based on customer
        Collections.sort(orders, new Order.OrderByCustomer());
        System.out.println("Collection of Orders sorted using Comparator - by customer : " + orders);
    }

}

Output:

Unsorted Collection : [103, 101, 102]
List of Order object sorted in natural order : [101, 102, 103]
List of object sorted in descending order : [103, 102, 101]
List of Order object sorted using Comparator - amount : [101, 102, 103]

Collection of Orders sorted using Comparator - by customer : [102, 103, 101]




Thursday, November 28, 2013

Memcached: Alleviate database load and scale up


A “cache” duplicates, some or all the data stored on one or more databases for fast and scalable access by applications. Leveraging a caching system is valuable when the original data is expensive to fetch from the source database due to resource constraints. Once the source data is populated in the cache, future requests for the same data make use of the cached copy rather than fetching the data from the original source. Introducing a “caching layer” typically makes data access times faster and improves the ability for the underlying database to scale in order to accommodate consistently higher loads or spikes in data requests. Often, the cache is “distributed”, so that the contents of the cache can be spread out over multiple systems to make more efficient use of available memory, network and
other computing resources.


Memcached is a powerful open source distributed memory caching system. It is a high performance distributed memory for object caching system. It is generic in nature but highly intended to be used for speeding up application by alleviating database load.

Many of the largest and most heavily trafficked web properties on the Internet like Facebook, Fotolog, YouTube, Mixi.jp, Yahoo, and Wikipedia deploy Memcached and MySQL to satisfy the demands of millions of users and billions of page views every month. By integrating a caching tier into their web scale architectures, these organizations have improved their application performance while minimizing database load.


How Memcached works ??

Memcached is a hash procedure for turning data into a small integer that serves as an index into an array. The net result is that it speeds up table lookup or data comparison tasks. Memcached leverages a two-stage hash that acts as like a giant hash table looking up key = value pairs. Memcached can be thought of as having two core components, a server and a client. In the course of a memcached lookup, the client hashes the key against a list of servers. When the
server is identified, the client sends its request to the server who performs a hash key lookup for the actual data. Because the client performs one stage of the hashing, memcached naturally lends itself towards the ability to easily add dozens of additional nodes.

Basic Example:

A basic memcached lookup can be illustrated in the example below with Clients  C1, C2, C3 and Servers S1, S2 and S3.
‘Set’ the Key and Value
-Client C1 wants to set the key “ key1” with value “value1”
-Client C1 takes the list of available memcached servers (S1,S2,S3) and hashes the key against them
-Server S2 is selected
-Client C1 directly connects to Server S2, and sets key “key1” with the value “value1” ‘Get’ the Key and Value
-Client C3 wants to get key “key1”
-Client C3 is able to use the same hashing process to determine that key “key1” is on Server S2
-Client C3 directly requests key “key1” from Server S2 and gets back “value1”
-Subsequent requests from Clients C1, C2 or C3 for key “key1” will access the data directly from Server S2.

Components of Memcached

There are two core components to memcached, the server and the client. In this section we cover some of the core capabilities of the memcached server, including how the memcached server deals with memory allocation, the caching of data.

Memached server

The memached server is implemented as a non-blocking event-based server with an emphasis on scalability and low resource consumption. 

Memory Allocation

By default, the memcached server allocates memory by leveraging what is internally referred to as a “slab allocator”. The reason why this internal slab allocator is used over malloc/free (a standard C/C++ library for performing dynamic memory allocation) is to avoid fragmentation and the operating system having to spend cycles searching for contiguous blocks of memory. These tasks overall, tend to consume more resources than the memcached process itself. With the slab allocator, memory is allocated in chunks and in turn, is constantly being reused. Because memory is allocated into different sized slabs, it does open up the potential to waste memory if the data being cached does not fit perfectly into the slab. There are also some practical limits to be aware of concerning key and data size limits. For example, keys are restricted to 250 characters and cached data cannot exceed the largest
slab size currently supported, 1 megabyte.

The hashing algorithm that memcached makes use of to determine which server a key is cached on does not take into account the size of all the memory available across all the servers participating in a memcached cluster. For those concerned about this issue, a potential workaround is to run multiple memcached instances on servers with more memory with each instance using the same size cache as all the other servers.


Caching Structure

When memcached’s distributed hash table becomes full, subsequent inserts force older cached data to be cycled out in a least recently used (LRU) order with associated expiration timeouts. How long data is “valid” within the cache is set via configuration options. This “validity” time may be a short, long or permanent. As mentioned, when the memcached server exhausts its memory allocation, the expired slabs are cycled out with the next oldest, unused slabs queued up next for expiration. Memcached makes use of lazy expiration. This means it does not make use of additional CPU cycles to expire items. When data is requested via a ‘get’ request, memcached references the expiration time to confirm if the data is valid before returning it to the client requesting the data. When new data is being added to the cache via a ‘set’, and memcached is unable to allocate an additional slab, expired data will be cycled out prior to any data that qualifies for the LRU criteria.


Data Redundancy and Fail Over

By design, there are no data redundancy features built into memcached. Memcached is designed to be a scalable and high performance caching-layer, including data redundancy functionality would only add complexity and overhead to the system. In the event one of the memcached servers does suffer a loss of data, under normal circumstances it should still be able to retrieve its data from the original source database. A prudent caching design involves ensuring that your application can continue to function without the availability of one or more memcached nodes. Some precautions to take in order not to suddenly overwhelm the database(s) in the event of memcached failures is to add additional memcached nodes to minimize the impact of any individual node failure. Another option is to leverage a “hot backup”, in other words a server that can take over the IP address of the failed memcached server. Also, by design, memcached does not have any built in fail over capabilities. However, there are some strategies one can employ to help minimize the impact of failed memcached nodes. The first technique involves simply having an (over) abundance of nodes. Because memcached is designed to scale straight out-of-the-box, this is a key characteristic to exploit. Having plenty of memcached nodes minimizes the overall impact an outage of one or more nodes will have on the system as a whole.


One can also remove failed nodes from the server list against which the memcached clients hash against. A practical consideration here is that when clients add or remove servers from the server list, they will invalidate the entire cache. The likely effect being that the majority of the keys will in-turn hash to different servers. Essentially, this will force all the data to be re-keyed into memcached from the source database(s). As mentioned previously, leveraging a “hot backup” server which can take over the IP address of the failed memcached server can go a long way to minimize the impact of having to reload an invalidated cache. Loading and Unloading Memcached In general, “warming-up” memcached from a database dump is not the best course of action to take. Several issues arise with this method. First, changes to data that have occurred between the data dump and load will not be accounted for. Second, there must be some strategy for dealing with data that may have expired prior to the dump being loaded. In situations where there are large amounts of fairly static or permanent data to be cached,using a data dump/load can be useful for warming up the cache quickly.


Memcached Threads

With the release of version 1.2, memcached can now utilize multiple CPUs and share the cached between the CPUs engaged in processing. At its core is a simple locking mechanism that is used when particular values or data needs to be updated. Memcached threads presents considerable value to those making use of ‘multi-gets’, which in-turn become more efficient and makes memcached overall easier to manage, as it avoids having to run multiple nodes on the same server to achieve the desired level of performance.

Memached Client

It is important to note that there are different memcached client implementations. Not only does the manner in which they store data into memcached vary between client libraries, but also in how they implement the hashing algorithm. Because of this, the implications of mixing and matching client libraries and versions should be carefully considered. However, unlike the variances found between client versions, memcached servers always store data
and apply hashing algorithms consistently. Spymemcached – Java Client for Memcached.


1. Working with Memcached & MySql.


In the above architecture we combine several memcached servers and a stand-alone MySQL server. This architecture allows for scaling a read intensive application. The memcached clients are illustrated separately; however, they typically will be co-located with the application server.


2.Memcached & MySql with Replication.


In this architecture we combine several memcached servers with a master MySQL server and multiple slave servers. This architecture allows for scaling a read intensive application. The memcached clients illustrated are co-located with the application servers. Read requests can be satisfied from memcached servers or from MySQL slave servers. Writes are directed to the MySQL master server.

3.Memcached Sharding & MySql with Replication.


With sharding (application partitioning) we partition data across multiple physical servers to gain read and write scalability. In this example we have created two shards partitioned by customer number. Read requests can be satisfied from memcached servers or from MySQL slave servers. Writes are directed to the appropriate MySQL master server.

Conclusion

-Implement a scalable, high performance data caching solution for their online
applications
-Reduce database Total Cost of Ownership (TCO) by eliminating licensing costs for Proprietary data caching software
-Reduce system TCO by making better use of resources like idle or spare RAM on existing systems
-Incrementally add/remove data caching capacity, on-demand to quickly meet
changing requirements.

Wednesday, November 27, 2013

Storm: parallelizing and computing data real time

What is Storm 

Storm is a big data framework. It is similar to Hadoop. It is used to handle enormous amounts of real time data. At limelight its a open source real time computational system that can be used with any programming language.It is written in Clojure and by default supports java.It can be used to perform activities such as

i. Analyzing real time data.
ii. Online machine learning.
iii. Continuos computational task.
iv. Distributed remote procedure call.
v. ETL.

Storm does real-time data interpretation and analysis. It is quite similar to Hadoop but the fact is Hadoop performs offline data analytics and has its own programming paradigm and filesystems which Storm does not. Storm is all about obtaining chunks of data known as spouts and passing the data through various components called as bolts.The mechanism used in storm to process data is extremely fast and to perform real-time analytics.In some of the use case Hadoop and Storm can be beautifully used in designing a system which can used to do real time analytics and long time pattern recognition. 


Storm Topologies

Storm comprises of topologies. A topology is basically a computational graph. each nodes in the topology has its own processing logic and links between nodes suggest how data should be passed between nodes. Storm topology can be run as below:-

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

You can see the class " backtype.storm.MyTopology " which defines the topology and submits it to Nimbus. Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.The storm jar takes care of connecting to Nimbus and uploading the jar.


Storm Clusters

A Storm cluster is superficially similar to a Hadoop cluster, on Storm you run "topologies". "Jobs" and "topologies" themselves are very different. In a topology processes messages forever (or until you kill it).

There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called "Nimbus" that is similar to Hadoop's "JobTracker". Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.

Each worker node runs a daemon called the "Supervisor". The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster. Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and stateless; all state is kept in Zookeeper or on local disk.This design leads to Storm clusters being incredibly stable.

Below is the representation of storm cluster.





Streams

The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way.

The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt consumes any number of input streams, does some processing, and possibly emits new streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases, and more.

Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Each node in a Storm topology executes in parallel. In your topology, you can specify how much parallelism you want for each node, and then Storm will spawn that number of threads across the cluster to do the execution.

A topology runs forever, or until you kill it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if machines go down and messages are dropped.