General
Stratosphere is a data processing system and an alternative to Hadoop's MapReduce component. It comes with its own runtime, rather than building on top of MapReduce. As such, it can work completely independently of the Hadoop ecosystem. However, Stratosphere can also access Hadoop's distributed file system (HDFS) to read and write data, and Hadoop's next-generation resource manager (YARN) to provision cluster resources. Since most Stratosphere users are using Hadoop HDFS to store their data, we ship already the required libraries to access HDFS.
No. Stratosphere can run without a Hadoop installation. However, a very common setup is to use Stratosphere to analyze data stored in the Hadoop Distributed File System (HDFS). To make these setups work out of the box, we bundle the Hadoop client libraries with Stratosphere by default.
Additionally, we provide a special YARN Enabled download of Stratosphere for users with an existing Hadoop YARN cluster. Apache Hadoop YARN is Hadoop's cluster resource manager that allows to use different execution engines next to each other on a cluster.
Usage
There are a multiple of ways to track the progress of a Stratosphere program:
conf/stratosphere-config.yml
).swt-visualization
tool reports the states of all subtasks. If profiling is enabled (see Configuration Reference), the load of all machines is displayed as well.
--wait
or -w
), task exceptions are printed to the standard error stream and shown on the console.log/stratosphere-<user>-jobmanager-<host>.log
and log/stratosphere-<user>-taskmanager-<host>.log
).
Errors
If you run Stratosphere in a massively parallel setting (100+ parallel threads), you need to adapt the number of network buffers via the config parameter taskmanager.network.numberOfBuffers
.
As a rule-of-thumb, the number of buffers should be at least 4 * numberOfNodes * numberOfTasksPerNode^2
See Configuration Reference for details.
Note: In version 0.4, the delta iterations limit the solution set to records with fixed-length data types. We will in the next version.
The most common case for these exception is when Stratosphere is set up with the wrong HDFS version. Because different HDFS versions are often not compatible with each other, the connection between the filesystem master and the client breaks.
Call to <host:port> failed on local exception: java.io.EOFException
at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
at org.apache.hadoop.ipc.Client.call(Client.java:743)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
at $Proxy0.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
at eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:276
Please refer to the download page and the build instructions for details on how to set up Stratosphere for different Hadoop and HDFS versions.
Keys must correctly implement the methods java.lang.Object#hashCode()
, java.lang.Object#equals(Object o)
, and java.util.Comparable#compareTo(...)
. These methods are always backed with default implementations which are usually inadequate. Therefore, all keys must override hashCode()
and equals(Object o)
.
All data type classes must be public and have a public nullary constructor (constructor with no arguments). Further more, the classes must not be abstract or interfaces. If the classes are internal classes, they must be public and static.
The swt-visualization is, as the name suggests, a SWT application. It requires the appropriate native library for the SWT (Standard Widget Toolkit) gui library. That library must be specific to your platform. The one that is packaged with Stratosphere by default is for 64bit Linux GTK systems. If you have a different operating system, architecture, or graphics library, you need a different SWT version.
To fix the problem, update the maven dependency in stratosphere-addons/swt-visualization/pom.xml to refer to your platform specific library. The relevant dependency entry is
<dependency>
<groupId>org.eclipse.swt.gtk.linux</groupId>
<artifactId>x86_64</artifactId>
<version>3.3.0-v3346</version>
</dependency>
You can find a the list of available library versions under http://repo1.maven.org/maven2/org/eclipse/swt/.
Stopping the processes sometimes takes a few seconds, because the shutdown may do some cleanup work.
In some error cases it happens that the JobManager or TaskManager cannot be stopped with the provided stop-scripts (bin/stop-local.sh
or bin/stop-cluster.sh
).
You can kill their processes on Linux/Mac as follows:
jps
command on Linux(if you have OpenJDK installed) or command ps -ef | grep java
to find all Java processes. kill -9 <pid>
, where pid
is the process id of the affected JobManager or TaskManager process.On Windows, the TaskManager shows a table of all processes and allows you to destroy a process by right its entry.
These exceptions occur usually when the functions in the program consume a lot of memory by collection large numbers of objects, for example in lists or maps. The OutOfMemoryExceptions in Java are kind of tricky. The exception is not necessarily thrown by the component that allocated most of the memory but by the component that tried to requested the latest bit of memory that could not be provided.
There are two ways to go about this:
1. See whether you can use less memory inside the functions. For example, use arrays of primitive types instead of object types.
2. Reduce the memory that Stratosphere reserves for its own processing. The TaskManager reserves a certain portion of the available memory for sorting, hashing, caching, network buffering, etc. That part of the memory is unavailable to the user-defined functions. By reserving it, the system can guarantee to not run out of memory on large inputs, but to plan with the available memory and destage operations to disk, if necessary. By default, the system reserves around 70% of the memory. If you frequently run applications that need more memory in the user-defined functions, you can reduce that value using the configuration entries taskmanager.memory.fraction
or taskmanager.memory.size
. See the Configuration Reference for details. This will leave more memory to JVM heap, but may cause data processing tasks to go to disk more often.
Check the logging behavior of your jobs. Emitting logging per or tuple may be helpful to debug jobs in small setups with tiny data sets, it becomes very inefficient and disk space consuming if used for large input data.
Features
Fault tolerance will go into the open source project in the next versions.
Stratosphere's Accumulators work very similar like Hadoop's counters, but are more powerful.
A distributed cache is not available right now. In many cases, operators like cross and the upcoming broadcast variables handle these situations more efficiently.