Stratosphere accepted as Apache Incubator Project

17 Apr 2014

We are happy to announce that Stratosphere has been accepted as a project for the Apache Incubator. The proposal has been accepted by the Incubator PMC members earlier this week. The Apache Incubator is the first step in the process of giving a project to the Apache Software Foundation. While under incubation, the project will move to the Apache infrastructure and adopt the community-driven development principles of the Apache Foundation. Projects can graduate from incubation to become top-level projects if they show activity, a healthy community dynamic, and releases.

We are glad to have Alan Gates as champion on board, as well as a set of great mentors, including Sean Owen, Ted Dunning, Owen O'Malley, Henry Saputra, and Ashutosh Chauhan. We are confident that we will make this a great open source effort.

Stratosphere accepted as Apache Incubator Project

Stratosphere got accepted for Google Summer of Code 2014

24 Feb 2014

Students: Apply now for exciting summer projects in the Big Data / Analytics field

We are pleased to announce that Stratosphere got accepted to Google Summer of Code 2014 as a mentoring organization. This means that we will host a bunch of students to conduct projects within Stratosphere over the summer. Read more on the GSoC manual for students and the official FAQ. Students can improve their coding skills, learn to work with open-source projects, improve their CV and get a nice paycheck from Google.

If you are an interested student, check out our idea list in the wiki. It contains different projects with varying ranges of difficulty and requirement profiles. Students can also suggest their own projects.

We welcome students to sign up at our developer mailing list to discuss their ideas. Applying students can use our wiki (create a new page) to create a project proposal. We are happy to have a look at it.

Stratosphere got accepted for Google Summer of Code 2014

Use Stratosphere with Amazon Elastic MapReduce

18 Feb 2014

Get started with Stratosphere within 10 minutes using Amazon Elastic MapReduce.

This step-by-step tutorial will guide you through the setup of Stratosphere using Amazon Elastic MapReduce.

Background

Amazon Elastic MapReduce (Amazon EMR) is part of Amazon Web services. EMR allows to create Hadoop clusters that analyze data stored in Amazon S3 (AWS' cloud storage). Stratosphere runs on top of Hadoop using the recently released cluster resource manager YARN. YARN allows to use many different data analysis tools in your cluster side by side. Tools that run with YARN are, for example Apache Giraph, Spark or HBase. Stratosphere also runs on YARN and that's the approach for this tutorial.

1. Step: Login to AWS and prepare secure access

You need to have SSH keys to access the Hadoop master node. If you do not have keys for your computer, generate them:

  • Select EC2 and click on "Key Pairs" in the "NETWORK & SECURITY" section.
  • Click on "Create Key Pair" and give it a name
  • After pressing "Yes" it will download a .pem file.
  • Change the permissions of the .pem file
  • chmod og-rwx ~/work-laptop.pem 
    

2. Step: Create your Hadoop Cluster in the cloud

  • Select Elastic MapReduce from the AWS console
  • Click the blue "Create cluster" button.
  • Choose a Cluster name
  • You can let the other settings remain unchanged (termination protection, logging, debugging)
  • For the Hadoop distribution, it is very important to choose one with YARN support. We use 3.0.3 (Hadoop 2.2.0) (the minor version might change over time)
  • Remove all applications to be installed (unless you want to use them)
  • Choose the instance types you want to start. Stratosphere runs fine with m1.large instances. Core and Task instances both run Stratosphere, but only core instances contain HDFS data nodes.
  • Choose the EC2 key pair you've created in the previous step!
  • Thats it! You can now press the "Create cluster" button at the end of the form to boot it!

3. Step: Launch Stratosphere

You might need to wait a few minutes until Amazon started your cluster. (You can monitor the progress of the instances in EC2). Use the refresh button in the top right corner.

You see that the master is up if the field Master public DNS contains a value (first line), connect to it using SSH.

ssh hadoop@<your master public DNS> -i <path to your .pem>
# for my example, it looks like this:
ssh hadoop@ec2-54-213-61-105.us-west-2.compute.amazonaws.com -i ~/Downloads/work-laptop.pem
(Windows users have to follow these instructions to SSH into the machine running the master.)

Once connected to the master, download and start Stratosphere for YARN:
  • Download and extract Stratosphere-YARN
  • wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/stratosphere-dist-0.5-SNAPSHOT-yarn.tar.gz
    # extract it
    tar xvzf stratosphere-dist-0.5-SNAPSHOT-yarn.tar.gz
    
  • Start Stratosphere in the cluster using Hadoop YARN
  • cd stratosphere-yarn-0.5-SNAPSHOT/
    ./bin/yarn-session.sh -n 4 -jm 1024 -tm 3000
    
    The arguments have the following meaning
    • -n number of TaskManagers (=workers). This number must not exeed the number of task instances
    • -jm memory (heapspace) for the JobManager
    • -tm memory for the TaskManagers
Once the output has changed from
JobManager is now running on N/A:6123
to
JobManager is now running on ip-172-31-13-68.us-west-2.compute.internal:6123
Stratosphere has started the JobManager. It will take a few seconds until the TaskManagers (workers) have connected to the JobManager. To see how many TaskManagers connected, you have to access the JobManager's web interface. Follow the steps below to do that ...

4. Step: Launch a Stratosphere Job

This step shows how to submit and monitor a Stratosphere Job in the Amazon Cloud.
  • Open an additional terminal and connect again to the master of your cluster.
  • We recommend to create a SOCKS-proxy with your SSH that allows you to easily connect into the cluster. (If you've already a VPN setup with EC2, you can probably use that as well.)
    ssh -D localhost:2001 hadoop@<your master dns name> -i <your pem file>
    
    Notice the -D localhost:2001 argument: It opens a SOCKS proxy on your computer allowing any application to use it to communicate through the proxy via an SSH tunnel to the master node. This allows you to access all services in your EMR cluster, such as the HDFS NameNode or the YARN web interface.
  • Configure a browser to use the SOCKS proxy. Open a browser with SOCKS proxy support (such as Firefox). Ideally, do not use your primary browser for this, since ALL traffic will be routed through Amazon.
    • To configure the SOCKS proxy with Firefox, click on "Edit", "Preferences", choose the "Advanced" tab and press the "Settings ..." button.
    • Enter the details of the SOCKS proxy localhost:2001. Choose SOCKS v4.
    • Close the settings, your browser is now talking to the master node of your cluster

Since you're connected to the master now, you can open several web interfaces:
YARN Resource Manager: http://<masterIPAddress>:9026/
HDFS NameNode: http://<masterIPAddress>:9101/

You find the masterIPAddress by entering ifconfig into the terminal:

[hadoop@ip-172-31-38-95 ~]$ ifconfig
eth0      Link encap:Ethernet  HWaddr 02:CF:8E:CB:28:B2  
          inet addr:172.31.38.95  Bcast:172.31.47.255  Mask:255.255.240.0
          inet6 addr: fe80::cf:8eff:fecb:28b2/64 Scope:Link
          RX bytes:166314967 (158.6 MiB)  TX bytes:89319246 (85.1 MiB)

Optional: If you want to use the hostnames within your Firefox (that also makes the NameNode links work), you have to enable DNS resolution over the SOCKS proxy. Open the Firefox config about:config and set network.proxy.socks_remote_dns to true.

The YARN ResourceManager also allows you to connect to Stratosphere's JobManager web interface. Click the ApplicationMaster link in the "Tracking UI" column.

To run the Wordcount example, you have to upload some sample data.

# download a text
wget http://www.gnu.org/licenses/gpl.txt
# upload it to HDFS:
hadoop fs -copyFromLocal gpl.txt /input

To run a Job, enter the following command into the master's command line:

# optional: go to the extracted directory
cd stratosphere-yarn-0.5-SNAPSHOT/
# run the wordcount example
./bin/stratosphere run -w -j examples/stratosphere-java-examples-0.5-SNAPSHOT-WordCount.jar  -a 16 hdfs:///input hdfs:///output

Make sure that the number of TaskManager's have connected to the JobManager.

Lets go through the command in detail:

  • ./bin/stratosphere is the standard launcher for Stratosphere jobs from the command line
  • The -w flag stands for "wait". It is a very useful to track the progress of the job.
  • -j examples/stratosphere-java-examples-0.5-SNAPSHOT-WordCount.jar the -j command sets the jar file containing the job. If you have you own application, place your Jar-file here.
  • -a 16 hdfs:///input hdfs:///output the -a command specifies the Job-specific arguments. In this case, the wordcount expects the following input <numSubStasks> <input> <output>.

You can monitor the progress of your job in the JobManager webinterface. Once the job has finished (which should be the case after less than 10 seconds), you can analyze it there. Inspect the result in HDFS using:

hadoop fs -tail /output

If you want to shut down the whole cluster in the cloud, use Amazon's webinterface and click on "Terminate cluster". If you just want to stop the YARN session, press CTRL+C in the terminal. The Stratosphere instances will be killed by YARN.



Written by Robert Metzger (@rmetzger_).

Use Stratosphere with Amazon Elastic MapReduce

Accessing Data Stored in MongoDB with Stratosphere

28 Jan 2014

We recently merged a pull request that allows you to use any existing Hadoop InputFormat with Stratosphere. So you can now (in the 0.5-SNAPSHOT and upwards versions) define a Hadoop-based data source:

HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));

We describe in the following article how to access data stored in MongoDB with Stratosphere. This allows users to join data from multiple sources (e.g. MonogDB and HDFS) or perform machine learning with the documents stored in MongoDB.

The approach here is to use the MongoInputFormat that was developed for Apache Hadoop but now also runs with Stratosphere.

JobConf conf = new JobConf();
conf.set("mongo.input.uri","mongodb://localhost:27017/enron_mail.messages");
HadoopDataSource src = new HadoopDataSource(new MongoInputFormat(), conf, "Read from Mongodb", new WritableWrapperConverter());

Example Program

The example program reads data from the enron dataset that contains about 500k internal e-mails. The data is stored in MongoDB and the Stratosphere program counts the number of e-mails per day.

The complete code of this sample program is available on GitHub.

Prepare MongoDB and the Data

  • Install MongoDB
  • Download the enron dataset from their website.
  • Unpack and load it

    bunzip2 enron_mongo.tar.bz2
    tar xvf enron_mongo.tar
    mongorestore dump/enron_mail/messages.bson
    

We used Robomongo to visually examine the dataset stored in MongoDB.

Build MongoInputFormat

MongoDB offers an InputFormat for Hadoop on their GitHub page. The code is not available in any Maven repository, so we have to build the jar file on our own.

  • Check out the repository
git clone https://github.com/mongodb/mongo-hadoop.git
cd mongo-hadoop
  • Set the appropriate Hadoop version in the build.sbt, we used 1.1.
hadoopRelease in ThisBuild := "1.1"
  • Build the input format
./sbt package

The jar-file is now located in core/target.

The Stratosphere Program

Now we have everything prepared to run the Stratosphere program. I only ran it on my local computer, out of Eclipse. To do that, check out the code ...

git clone https://github.com/stratosphere/stratosphere-mongodb-example.git

... and import it as a Maven project into your Eclipse. You have to manually add the previously built mongo-hadoop jar-file as a dependency. You can now press the "Run" button and see how Stratosphere executes the little program. It was running for about 8 seconds on the 1.5 GB dataset.

The result (located in /tmp/enronCountByDay) now looks like this.

11,Fri Sep 26 10:00:00 CEST 1997
154,Tue Jun 29 10:56:00 CEST 1999
292,Tue Aug 10 12:11:00 CEST 1999
185,Thu Aug 12 18:35:00 CEST 1999
26,Fri Mar 19 12:33:00 CET 1999

There is one thing left I want to point out here. MongoDB represents objects stored in the database as JSON-documents. Since Stratosphere's standard types do not support JSON documents, I was using the WritableWrapper here. This wrapper allows to use any Hadoop datatype with Stratosphere.

The following code example shows how the JSON-documents are accessed in Stratosphere.

public void map(Record record, Collector<Record> out) throws Exception {
    Writable valWr = record.getField(1, WritableWrapper.class).value();
    BSONWritable value = (BSONWritable) valWr;
    Object headers = value.getDoc().get("headers");
    BasicDBObject headerOb = (BasicDBObject) headers;
    String date = (String) headerOb.get("Date");
    // further date processing
}

Please use the comments if you have questions or if you want to showcase your own MongoDB-Stratosphere integration.

Written by Robert Metzger (@rmetzger_).

Accessing Data Stored in MongoDB with Stratosphere

Optimizer Plan Visualization Tool

26 Jan 2014

Stratosphere's hybrid approach combines MapReduce and MPP database techniques. One central part of this approach is to have a separation between the programming (API) and the way programs are executed (execution plans). The compiler/optimizer decides the details concerning caching or when to partition/broadcast with a holistic view of the program. The same program may actually be executed differently in different scenarios (input data of different sizes, different number of machines).

If you want to know how exactly the system executes your program, you can find it out in two ways:

  1. The browser-based webclient UI, which takes programs packaged into JARs and draws the execution plan as a visual data flow (check out the documentation for details).

  2. For programs using the Local- or Remote Executor, you can get the optimizer plan using the method LocalExecutor.optimizerPlanAsJSON(plan). The resulting JSON string describes the execution strategies chosen by the optimizer. Naturally, you do not want to parse that yourself, especially for longer programs.

    The builds 0.5-SNAPSHOT and later come with a tool that visualizes the JSON string. It is a standalone version of the webclient's visualization, packed as an html document tools/planVisualizer.html.

    If you open it in a browser (for example chromium-browser tools/planVisualizer.html) it shows a text area where you can paste the JSON string and it renders that string as a dataflow plan (assuming it was a valid JSON string and plan). The pictures below show how that looks for the included sample program that uses delta iterations to compute the connected components of a graph.

Optimizer Plan Visualization Tool

Stratosphere 0.4 Released

13 Jan 2014

We are pleased to announce that version 0.4 of the Stratosphere system has been released.

Our team has been working hard during the last few months to create an improved and stable Stratosphere version. The new version comes with many new features, usability and performance improvements in all levels, including a new Scala API for the concise specification of programs, a Pregel-like API, support for Yarn clusters, and major performance improvements. The system features now first-class support for iterative programs and thus covers traditional analytical use cases as well as data mining and graph processing use cases with great performance.

In the course of the transition from v0.2 to v0.4 of the system, we have changed pre-existing APIs based on valuable user feedback. This means that, in the interest of easier programming, we have broken backwards compatibility and existing jobs must be adapted, as described in the migration guide.

This article will guide you through the feature list of the new release.

Scala Programming Interface

The new Stratosphere version comes with a new programming API in Scala that supports very fluent and efficient programs that can be expressed with very few lines of code. The API uses Scala's native type system (no special boxed data types) and supports grouping and joining on types beyond key/value pairs. We use code analysis and code generation to transform Scala's data model to the Stratosphere runtime. Stratosphere Scala programs are optimized before execution by Stratosphere's optimizer just like Stratosphere Java programs.

Learn more about the Scala API at the Scala Programming Guide

Iterations

Stratosphere v0.4 introduces deep support for iterative algorithms, required by a large class of advanced analysis algorithms. In contrast to most other systems, "looping over the data" is done inside the system's runtime, rather than in the client. Individual iterations (supersteps) can be as fast as sub-second times. Loop-invariant data is automatically cached in memory.

We support a special form of iterations called “delta iterations” that selectively modify only some elements of intermediate solution in each iteration. These are applicable to a variety of applications, e.g., use cases of Apache Giraph. We have observed speedups of 70x when using delta iterations instead of regular iterations.

Read more about the new iteration feature in the documentation

Hadoop YARN Support

YARN (Yet Another Resource Negotiator) is the major new feature of the recently announced Hadoop 2.2. It allows to share existing clusters with different runtimes. So you can run MapReduce alongside Storm and others. With the 0.4 release, Stratosphere supports YARN. Follow our guide on how to start a Stratosphere YARN session.

Improved Scripting Language Meteor

The high-level language Meteor now natively serializes JSON trees for greater performance and offers additional operators and file formats. We greatly empowered the user to write crispier scripts by adding second-order functions, multi-output operators, and other syntactical sugar. For developers of Meteor packages, the API is much more comprehensive and allows to define custom data types that can be easily embedded in JSON trees through ad-hoc byte code generation.

Spargel: Pregel Inspired Graph Processing

Spargel is a vertex-centric API similar to the interface proposed in Google's Pregel paper and implemented in Apache Giraph. Spargel is implemented in 500 lines of code (including comments) on top of Stratosphere's delta iterations feature. This confirms the flexibility of Stratosphere's architecture.

Web Frontend

Using the new web frontend, you can monitor the progress of Stratosphere jobs. For finished jobs, the frontend shows a breakdown of the execution times for each operator. The webclient also visualizes the execution strategies chosen by the optimizer.

Accumulators

Stratosphere's accumulators allow program developers to compute simple statistics, such as counts, sums, min/max values, or histograms, as a side effect of the processing functions. An example application would be to count the total number of records/tuples processed by a function. Stratosphere will not launch additional tasks (reducers), but will compute the number "on the fly" as a side-product of the functions application to the data. The concept is similar to Hadoop's counters, but supports more types of aggregation.

Refactored APIs

Based on valuable user feedback, we refactored the Java programming interface to make it more intuitive and easier to use. The basic concepts are still the same, however the naming of most interfaces changed and the structure of the code was adapted. When updating to the 0.4 release you will need to adapt your jobs and dependencies. A previous blog post has a guide to the necessary changes to adapt programs to Stratosphere 0.4.

Local Debugging

You can now test and debug Stratosphere jobs locally. The LocalExecutor allows to execute Stratosphere Jobs from IDE's. The same code that runs on clusters also runs in a single JVM multi-threaded. The mode supports full debugging capabilities known from regular applications (placing breakpoints and stepping through the program's functions). An advanced mode supports simulating fully distributed operation locally.

Miscellaneous

  • The configuration of Stratosphere has been changed to YAML
  • HBase support
  • JDBC Input format
  • Improved Windows Compatibility: Batch-files to start Stratosphere on Windows and all unit tests passing on Windows.
  • Stratosphere is available in Maven Central and Sonatype Snapshot Repository
  • Improved build system that supports different Hadoop versions using Maven profiles
  • Maven Archetypes for Stratosphere Jobs.
  • Stability and Usability improvements with many bug fixes.

Download and get started with Stratosphere v0.4

There are several options for getting started with Stratosphere.

Tell us what you think!

Are you using, or planning to use Stratosphere? Sign up in our mailing list and drop us a line.

Have you found a bug? Post an issue on GitHub.

Follow us on Twitter and GitHub to stay in touch with the latest news!

Stratosphere 0.4 Released

Stratosphere Version 0.4 Migration Guide

12 Jan 2014

This guide is intended to help users of previous Stratosphere versions to migrate their programs to the new API of v0.4.

Version 0.4-rc1, 0.4 and all newer versions have the new API. If you want to have the most recent version before the code change, please set the version to 0.4-alpha.3-SNAPSHOT. (Note that the 0.4-alpha versions are only available in the snapshot repository).

Maven Dependencies

Since we also reorganized the Maven project structure, existing programs need to update the Maven dependencies to stratosphere-java (and stratosphere-clients, for examples and executors).

The typical set of Maven dependencies for Stratosphere Java programs is:

       <groupId>eu.stratosphere</groupId>
-      <artifactId>pact-common</artifactId>
-      <version>0.4-SNAPSHOT</version>
+      <artifactId>stratosphere-java</artifactId>
+      <version>0.4</version>

-      <artifactId>pact-clients</artifactId>
-      <version>0.4-SNAPSHOT</version>
+      <artifactId>stratosphere-clients</artifactId>
+      <version>0.4</version>

Renamed classes

We renamed many of the most commonly used classes to make their names more intuitive:

Old Name (before 0.4) New Name (0.4 and after)
Contract Operator
MatchContract JoinOperator
[Map, Reduce, ...]Stub [Map, Reduce, ...]Function
MatchStub JoinFunction
Pact[Integer, Double, ...] IntValue, DoubleValue, ...
PactRecord Record
PlanAssembler Program
PlanAssemblerDescription ProgramDescription
RecordOutputFormat CsvOutputFormat

Package names have been adapted as well. For a complete overview of the renamings, have a look at issue #257 on GitHub.

We suggest for Eclipse user adjust the programs as follows: Delete all old Stratosphere imports, then rename the the classes (PactRecord to Record and so on). Finally, use the “Organize Imports” function (CTRL+SHIFT+O) to choose the right imports. The names should be unique so always pick the classes that are in the eu.stratosphere package.

Please contact us in the comments below, on the mailing list or on GitHub if you have any issues migrating to the latest Stratosphere release.

Stratosphere Version 0.4 Migration Guide

Stratosphere wins award at Humboldt Innovation Competition "Big Data: Research meets Startups"

13 Dec 2013

Stratosphere won the second place in the competition organized by Humboldt Innovation on "Big Data: Research meets Startups," where several research projects were evaluated by a panel of experts from the Berlin startup ecosystem. The award includes a monetary prize of 10,000 euros.

We are extremely excited about this award, as it further showcases the relevance of the Stratosphere platform and Big Data technology in general for the technology startup world.

Stratosphere wins award at Humboldt Innovation Competition "Big Data: Research meets Startups"

Paper "“All Roads Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing" accepted at CIKM 2013

21 Oct 2013

Our paper "“All Roads Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing" authored by Sebastian Schelter, Kostas Tzoumas, Stephan Ewen and Volker Markl has been accepted accepted at the ACM International Conference on Information and Knowledge Management (CIKM 2013) in San Francisco.

Abstract

Executing data-parallel iterative algorithms on large datasets is crucial for many advanced analytical applications in the fields of data mining and machine learning. Current systems for executing iterative tasks in large clusters typically achieve fault tolerance through rollback recovery. The principle behind this pessimistic approach is to periodically checkpoint the algorithm state. Upon failure, the system restores a consistent state from a previously written checkpoint and resumes execution from that point.

We propose an optimistic recovery mechanism using algorithmic compensations. Our method leverages the robust, self-correcting nature of a large class of fixpoint algorithms used in data mining and machine learning, which converge to the correct solution from various intermediate consistent states. In the case of a failure, we apply a user-defined compensate function that algorithmically creates such a consistent state, instead of rolling back to a previous checkpointed state. Our optimistic recovery does not checkpoint any state and hence achieves optimal failure-free performance with respect to the overhead necessary for guaranteeing fault tolerance. We illustrate the applicability of this approach for three wide classes of problems. Furthermore, we show how to implement the proposed optimistic recovery mechanism in a data flow system. Similar to the Combine operator in MapReduce, our proposed functionality is optional and can be applied to increase performance without changing the semantics of programs. In an experimental evaluation on large datasets, we show that our proposed approach provides optimal failure-free performance. In the absence of failures our optimistic scheme is able to outperform a pessimistic approach by a factor of two to five. In presence of failures, our approach provides fast recovery and outperforms pessimistic approaches in the majority of cases.

Download the paper [PDF]

Paper "“All Roads Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing" accepted at CIKM 2013
  • Previous
  • Page: 1 of 2