In this article, we describe the execution of a Meteor query. Meteor provides a standalone client that can be executed on a different computer than the actual Stratosphere cluster.
As a precondition, the Stratosphere Jobmanager and the Sopremo Server must run.
First of all, the Meteor client translates a query into a SopremoPlan. The plan is then send to the Sopremo server and executed.
The Sopremo server must run on the same computer as the Stratosphere Jobmanager.
To start the Sopremo server, first adjust the server address in the sopremo-user.xml configuration file in the conf folder.
<property>
<key>sopremo.rpc.address</key>
<value>localhost</value>
</property>
Then launch the server.
$ ./bin/start-sopremo-server.sh
Currently, the server only executes one SopremoPlan at a time but that will be subject to change.
The script itself may be executed on an arbitrary computer. To execute a Meteor script, please store it in an external file. There is currently no support for an interactive shell.
usage: meteor-client.sh <scripts>
--configDir <config> Uses the given configuration
--port <port> Uses the specified port
--server <server> Uses the specified server
--updateTime <updateTime> Checks with the given update time in ms for the current status
--wait Waits until the script terminates on the server
The Meteor client first of all requires at least one script file to execute. Additionally, it needs the server address that can be specified in three ways.
Without additional parameters, the client exits immediately and prints out whether the script was successfully enqueued.
To monitor the progress of the script, users can add the --wait option that shows additional information about the job at the given updateTime (default 1000 ms).
Submitted script meteor_test.script.
Executing script meteor_test.script.............
Executed in 7734 ms
All packages that are used from Meteor scripts must either be in the classpath or in the current directory. To adjust the classpath, please edit the meteor-script.sh or invoke it with additional -cp options.
For each package, the Meteor/Sopremo duo checks whether there is a current version of that package on the server and transfers it when needed. Thus, custom Sopremo operators may be tested quite easily with Meteor.
This article briefly outlines how a meteor query looks and guides users to manage complex data analysis tasks.
Meteor is a query language specifically designed for complex data analysis tasks. Thus, the inputs and outputs of a query are usually files that reside within the hdfs. Meteor resembles Jaql but is at the same time simpler in the grammatical features and more powerful in the overall usage as we will show throughout this article.
The following Meteor query performs a selection of all students that are teen-aged and writes the result out to the HDFS.
$students = read from 'hdfs://server/path/students.json';
$teens = filter $students where (now()-$students.birthDay).year < 20;
write $teens to 'hdfs://server/path/result.json';
In the first line, the data set that is read from the 'students.json' is associated with the variable '$students'. Variables always start with a '$' in Meteor. The input and output paths in meteor can be either specified in an absolute or in a relative fashion (in respect to the location of the script).
The next two lines filter the data by a boolean expression that only retains students whose age is less than 20. The boolean expression is parsed to an expression tree that evaluates all incoming tuples to a boolean value.
Finally, the results are written back to the HDFS. We assume that in most complex analysis workflows, the result set is too large to print it to the user in any meaningful way. Consequently, there is currently no options to directly inspect the result set but users will have to open the result files.
In addition to the already described usecase, where the data is stored within the hdfs, meteor also supports other filesystems. To show this case the example from above stays the same, only the data-read and -write steps are altered.
$students = read from 'file:///home/user/students.json';
$teens = filter $students where (now()-$students.birthDay).year < 20;
write $teens to 'file:///home/user/result.json';
In this case the input and output files are stored in the users homedirectory.
One of the main design goals of the higher level language layer of Stratosphere is the support for a large variety of applications. As we neither want to nor can develop and maintain all possible operators, we encourage power users to develop their own operators.
In the following, we add a duplicate removal step to the script.
using cleansing;
$students = read from 'hdfs://server/path/students.json';
$students = remove duplicates $students
where average(levenshtein(name), dateSim(birthDay)) > 0.95
retain maxDate(enrollmentDate);
$teens = filter $students where (now()-$student.birthDay).year < 20;
write $teens to 'hdfs://server/path/result.json';
The ''using'' commands looks for a sopremo-cleansing.jar in the current classpath or directory and makes all contained operators available for future usage.
With ''remove duplicates'' we use and configure this operator that is contained in the cleansing package. The operator has two properties where and retain, which specify the similarity condition and the conflict resolution function.
Aside from operators, each Sopremo package may also contain functions and constants. For example, the cleansing package contains the ''levenshtein'' function that is used in the query.
Meteor differs quite heavily from most script languages because the syntax of operators are not defined statically. All operators follow the same syntax rules that are shown as an excerpt in the form of an EBNF.
operator ::= name + inputs? properties? ';'
inputs ::= (alias 'in')? variable (',' inputs)?
properties ::= property properties?
property ::= property_name expression
variable ::= '$' name
When we apply this grammar rule to the ''remove duplicates'' operator, we extract the following fragments that have matching colors to the syntax rules.
remove duplicates $students
where average(levenshtein(name), dateSim(birthDay)) > 0.95
retain maxDate(enrollmentDate);
The configuration of the operators resembles the message passing concepts of Smalltalk and similar programming languages. Operators are first-class citizens and they validate the property values by themselves.
There is no restriction in how many packages can be combined in one script. We expanded the running example to incorporate information extraction operators.
using cleansing;
using ie;
$students = read from 'hdfs://server/path/students.json';
$students = remove duplicates $students
where average(levenshtein(name), dateSim(birthDay)) > 0.95
retain maxDate(enrollmentDate);
$teens = filter $students
where (now()-$student.birthDay).year < 20;
$articles = read from 'news.json';
$articles = annotate sentences $articles
using morphAdorner;
$articles = annotate entities in $articles
using type.person and regex 'names.txt';
$peopleInNews = pivot $articles around $person = $article.annotations[*].entity
into {
name: $person,
articles: $articles
};
$teensInNews = join $teen in $teens, $person in $peopleInNews
where $teen.name == $person.name
into {
student: $teen,
articles: $person.articles[*].url
};
write $teensInNews to 'hdfs://server/path/result.json';
The additional operators coexist with the data cleansing operators. In fact, both operators also coexist with the base operator package that is implicitly included in Meteor scripts.
When only one operator or function from a package is used, they can be directly referenced via namespacing. ''cleansing:remove duplicates'' has the same effect in this context even when ''using cleansing'' is omitted. Namespacing also helps when packages have conflicting operator names.
Functions can be defined in three ways: * Packages may include functions * Sopremo functions may be directly defined in the script and basically act as macros * User-defined functions written in Java and imported with ''javaudf''
We already saw an example for the first function (''levenshtein'' in cleansing).
The next snippets show how the other two function types are defined.
square = fn(x) { x * x };
sum_udf= javaudf('packageName.JavaClass.sum');
The corresponding Java definition needs to process and return IJsonNodes. There may be an arbitrary number of parameters.
public static IntNode sum(final IntNode node1, final IntNode node2) {
return new IntNode(node1.getIntValue() + node2.getIntValue());
}
Advanced implementation should make use of Aggregations and object reusage:
@Name(noun = "max")
public static final Aggregation MAX = new AssociativeAggregation<IJsonNode>(NullNode.getInstance()) {
@Override
public IJsonNode aggregate(final IJsonNode aggregator, final IJsonNode node) {
if (aggregator == NullNode.getInstance())
return node.clone();
else if (ComparativeExpression.BinaryOperator.LESS.evaluate(aggregator, node))
aggregator.copyValueFrom(node);
return aggregator;
}
};
@Name(verb = "trim")
public static final SopremoFunction TRIM = new SopremoFunction1<TextNode>() {
private final transient TextNode result = new TextNode();
@Override
protected IJsonNode call(final TextNode input) {
int start = 0, end = input.length() - 1;
while (start < end && input.charAt(start) == ' ')
start++;
while (end > start && input.charAt(end) == ' ')
end--;
this.result.setValue(input, start, end + 1);
return this.result;
}
};
To process data meteor uses a whole set of operators. The following section covers the predefined ones within the base package that is included automatically in each script.
In this section we describe the syntax and semantics of the base operators in Meteor by examples.
The filter operator filters his input by only retaining those elements where the given predicate evaluates to true. This operators semantic is equivalent to the WHERE-clause in SQL.
filter <input> where <predicate>;
input: an iterable set of objects of type T (e.g. array of T)
output: the filtered set of objects of type T (-> count(output) <= count(input)
The filter operator automatically binds an iteration variable that can be used in the <predicate>. By default this variable has the same name as the <input>, but can be renamed. To do so the <input> in the described syntax of the operator has to be replaced with:
<iteration variable> in <input>
predicate: an expression that evaluates to a boolean value, this predicate decides which elements of the input are retained.
default iteration variable
$students = read from 'hdfs://server/path/students.json';
$teens = filter $students where (now()-$students.birthDay).year < 20;
write $teens to 'hdfs://server/path/result.json';
renaming iteration variable
$input = read from 'input.json';
$result = filter $emp in $input where $emp.mgr or $emp.income > 30000;
write $result to 'output.json';
The transform operator allows the transformation of each element of an input. This operator takes a set of type T1-elements and transforms them, according to a given expression, to a set of type T2-elements. The SQL-equivalent for this operator is the SELECT-clause.
transform <input> into <transformation expression>;
input: a set of type T1-elements (e.g. array of T1)
output: a set of transformed elements, each element of this set is of type T2 (-> count(output) == count(input))
The transform operator automatically binds an iteration variable that can be used inside the <transformation expression>. By default this variable has the same name as the <input>, this means values from records can be accessed via $input.key and values from array via $input[index].
transformation expression: This expression defines the transformation. It describes how each element of the result is constructed from his corresponding input-element.
$input = read from 'input.json';
$result = transform $input
into {
sum: $input.a + $input.b;
first: $input[0]
};
write $result to 'output.json';
The join operator allows to join two or more input sets into one result-set. Although this operator only allows joins between two or more inputs, a self-join can simply be realized by specifying the same data-source as both inputs. The join condition between two inputs can be any expression that evaluates to a boolean value, when joining more than two inputs the condition is assumed to be a conjunction of such expressions. This operator supports multiple types of joins like natural, left- and right-outer and outer-joins. The semantic is equivalent to SQL's JOIN.
join <input 1>, <input 2> ...
preserve? <input name> ...
where <join conditions>
into <output expression>;
input: Two or more sets of objects
output: A set of the result of this join. The structure of the elements of this set is defined by the <output expression>.
join conditions: These expressions define whether a tuple of the input elements is part of the result or not. You can use all kinds of comparisons inside this expressions.
output expression: This expression defines the structure of the elements of the result set. To minimize the copy&paste work when keeping all attributes of the input exactly the same in the output, $input.* can be used instead of copying all input-attributes expilicitly to the output.
preserve: If preserve is used for an input, all elements of the specified input will appear in the result set, whether they have found matching values or not. By using the preserve-option you are able to achieve the same semantics as the different OUTER JOIN options in SQL.
The join operator automatically binds the variable $input i to each element of <input i>. These variables can be used by both, the <join conditions> and the <output expression> to access the elements of each input. To rename the default-name of the variable $input i the <input i> in the described syntax has to be replaced with <variable name> in <input i>.
default variable names without preserve option
$users = read from 'users.json';
$pages = read from 'pages.json';
$result = join $users, $pages
where $users.id == $pages.userid
into {
$users.name,
$pages.*
};
write $result to 'result.json';
this example renames all of the default variable names
$users = read from 'users.json';
$pages = read from 'pages.json';
$result = join $u in $users, $p in $pages
where $u.id == $p.userid
into {
$u.name,
$p.*
};
write $result to 'result.json';
the next example makes use of the preserve option
$users = read from 'users.json';
$pages = read from 'pages.json';
$result = join $u in $users, $p in $pages
preserve $u
where $u.id == $p.userid
into {
$u.name,
$p.*
};
write $result to 'result.json';
to show that not only equi-joins are possible, the following join condition could be used
...
where $u.id < $p.userid
...
The group operator groups the elements of one or more inputs on a grouping key into one output. The resulting output contains one item for each group. During the process of transforming a whole group into the result item aggregate functions like count() and sum() can be applied. If the group operator is specified with a single input, its semantic is equivalent to SQL's GROUP BY clause.
single input
group <input> by <grouping expression> into <aggregation expression>;
input: a set of elements of type T (e.g. array of type T)
output: A set of the resulting groups, each group is represented by a single item of type T2. The structure of this items is defined by the <aggregation expression>.
grouping expression: This part is optional and defines the way of how to extract the grouping key from the elements of the input. The resulting key is used to group the elements together. If only one global aggregate should be generated as the result the described syntax can be modified like this example:
group <input> into <aggregation expression>;
aggregation expression: This expression is evaluated for each group and results in an item of type T2. This expression can apply functions like count() or sum() to aggregate the elements of a group.
The group operator automatically binds a variable name. Both, <grouping expression> and <aggregation expression> can use this name (<grouping expression> -> elements of the input, <aggregation expression> -> a single group). By default this name is the same as the input, but as any variable in meteor operators, renaming is possible. To do so the <input> in the described syntax has to be replaced with <variable name> in <input>.
multiple inputs
group <input 1> by <grouping expression 1>, <input 2> by <grouping expression 2> ...
into <aggregation expression>;
Single input -> single, global aggregate
$employees = read from 'employees.json';
$result = group $employees
into count($employees);
write $result to 'output.json';
Single input -> partition into several groups and apply a function per group
$employees = read from 'employees.json';
$result = group $employees
by $employees.dept
into {
$employees[0].dept,
total: sum($employees[*].income)
};
write $result to 'output.json';
Single input -> renaming default variable names, partition into several groups and apply a function per group
$employees = read from 'employees.json';
$result = group $e in $employees
by $e.dept
into {
$e[0].dept,
total: sum($e[*].income)
};
write $result to 'output.json';
Multiple inputs
$employees = read from 'employees.json';
$depts = read from 'departments.json';
$result = group $es in $employees by $es.dept, $ds in $depts by $ds.did
into {
dept: $ds.did,
deptName: $ds[0].name,
emps: $es[*].id,
numEmps: count($es)
};
write $result to 'output.json';
The intersect operator computes the intersection between two or more inputs.
intersect <input 1>, <input 2> ...;
input: two or more sets of elements of type T (e.g. array of type T)
output: a set of elements of type T that are contained in all input sets
$users1 = read from 'users1.json';
$users2 = read from 'users2.json';
$commonUsers = intersect $users1, $users2;
write $commonUsers to 'commonUsers.json';
The union operator computes the set-based union of two or more inputs.
union <input 1>, <input 2> ...;
input: two or more sets of elements of type T (e.g. array of type T)
output: a set of elements of type T that are contained in at least one of the inputs
$users1 = read from 'users1.json';
$users2 = read from 'users2.json';
$allUsers = union $users1, $users2;
write $allUsers to 'allUsers.json';
The substract operator computes the difference between two or more inputs. The result contains all elements of the first input that are not part of the other inputs.
subtract <input 1>, <input 2> ...;
input: two or more sets of elements of type T (e.g. array of type T)
output: a set of elements of type T that are contained in the first input but not in all others
$oldUsers = read from 'oldUsers.json';
$currentUsers = read from 'currentUsers.json';
$newUsers = subtract $currentUsers, $oldUsers;
write $newUsers to 'newUsers.json';
The union all operator computes the bag-based union of two or more inputs.
union all <input 1>, <input 2> ...;
input: two or more sets of elements of type T (e.g. array of type T)
output: a set of elements of type T that are contained in at least one of the inputs (may contain duplicates)
$users1 = read from 'users1.json';
$users2 = read from 'users2.json';
$allUsers = union all $users1, $users2;
write $allUsers to 'allUsers.json';
The cleansing package containes a set of operators that can be used to conveniently write cleansing tasks. Althought all of these tasks could be solved only with the standard operators described above, using this package has some advantages.
The operators within the cleansing package cover all steps of the standard cleansing pipeline.
scrubbing --> entity extraction --> duplicate detection --> fusion
The scrubbing operator is used to simply clean the input data from errors. These errors can be something like missing values in required fields, a wrong format of the data, wrong datatypes or unallowed values. For each field that should be scrubbed you can specify a set of rules, any not specified field of the input is copied unchanged to the output. These rules are then sequentialy (in the order of specification) applied to the records. If all rules hold on a specific record, this record is treated as clean. Should one rule for a specific field not hold, a so called fix is applied to the record (default: deletion of the whole record). After the first rule has failed, all subsequent rules for that field are not evaluated because it is assumed that the value of this field is now clean. In addition to rules and fixes, you can also specify certain function within the sequence of rule-evaluations. This functions are simply executed on the data. he following example shows the scrubbing operator:
using cleansing;
$scrubbed = scrub $companies with rules {
id: required,
founded: [required, hasPattern("\d{4}")],
name: required ?: default(""),
type: [required, notContainedIn([""])] ?: default("company"),
form: [required, illegalCharacters("?!,.") ?: removeIllegalCharacters],
category: [required, lower_case()]
};
As you can see, there are several ways of specifying rules and there corresponding fixes. To define a fix for one or more rules you have to use "?:". It simply means that if the rule holds than do nothing with that record, otherwise try to fix it. Another feature of the scrubbing operator is the ability to specify a single fix for several rules (like line 5 in the previous example). Line 7 in the example shows the usage of a function inside the rule evaluation sequence. After its sure that the category is present, the value is converted to lower case. The following list shows all rules (and corresponding fixes) actually implemented:
In addition to these rules and fixes, there is a common fix that can be applied to any rule.
The following example is a little more complex than the previous one, but shows many of the described features of the scrubbing operator:
using cleansing;
$companies_scrubbed = scrub $companies_preparedForScrubbing with rules {
_id: [required, concat_strings("_L")],
id: required,
founded: [required, hasPattern("\d{4}")],
subsidiaryCompanies: [required?: default([]), deempty()],
parentCompany: required?: default(""),
name: [required, notContainedIn([""])],
industry: required?: default(""),
locations: required?: default([]),
companyType: required?: default(""),
companyName: [required, illegalCharacters("\'")?:removeIllegalCharacters],
companyForm: [required?:default(""), lower_case(),
illegalCharacters(".")?:removeIllegalCharacters],
companyCategory: required?:default("company")
};
The entity extraction operator is able to create user defined dataobjects from existing data. It can merge the data from multiple datasources and is able to create multiple output schemata at once.
The following example shows this operator:
using cleansing;
$person, $legal_entities = map entities of $employees, $companies
where ($employees.company[1:1] == $companies.id[1:1])
into [
entity $employees identified by $employees.id with {
name: $employees.name,
worksFor: $companies.id
},
entity $companies identified by $companies.id with {
name: $companies.name,
type: $companies.category
}
];
Each of the sub-operators produce the schemata of one of the defined outputs. Its important to know that the value of the grouping key of the grouping operator is used as the value for the id field of the resulting record. This id field is automatically created. The ordering of the grouping operators corresponse to the order of the output variables.
A duplicate is a group of records that represent the same real world entity and may be desired in data integration use cases or undesired in a single data set.
The duplicate detection operator allows the user to specify a similarity measure and blocking functions to effectively and efficiently find such duplicate entries.
using cleansing;
$duplicates = detect duplicates $p in $persons
where levenshtein($p.firstName) >= 0.7
sort on $p.lastName
with window 20;
Here the ''similarity measure'' and the ''threshold'' is defined in the where property. More complex rules with possibly custom similarity functions are defined in nested boolean expression:
placeSim = fn(r1, r2) { ... };
...
where diff($p.age) < 2 and
(average(levenshtein($p.firstName), jaro($p.lastName)) >= 0.7 or
average(jaro($p.lastName), placeSim($p)) >= 0.8);
To increase the efficiency, multiple smaller passes are usually desired. Multiple passes can be expressed with
sort on [$p.firstName, $p.lastName, concat($p.age, $p.lastName)]
The fusion operator is responsible for "merging" several records that are marked as duplicates. Therefore the operator uses a set of resolutions that specify how the values of the fields of duplicates are merged. For each field that should be merged the given resolutions are applied in a sequencial order where each resolution merges the result of the previous one. Ideally, after the last resolution has been applied only one value remains. This value is then treated as the value of the merged duplicates. A warning will be shown if there are more than one value remaining, but the merged duplicate will nevertheless contain the result of the last resolution as the value (in this case: an array of the remaining values). If there is only one value remaining while there are still resolutions to apply, the sequence will be interupted and the remaining value is the one for the merged record. The following example shows the fusion operator in a simple scenario:
using cleansing;
$persons = fuse $persons_dup with resolutions {
name: [mostFrequent, defaultResolution("default")],
age: mergeDistinct
};
With an input of [{"name": "P1", "age": 25},{"name": "P1", "age": 24},{"name": "P2", "age": 25}] the previous example yields to the following fused entity: {"name": "P1", "age": [25,24]}. Because not all datasources in a datacleansing environment are at the same level of trust, the fusion operator can take such relations into account. For this purpose the operator is able to handle weights, where each datasource has its own set of weights that define how trustworthy the whole source or specific datafields are. While processing the resolutions these weights are than taken into account to determine the merge result. The following example shows such weights:
using cleansing;
$persons = fuse $persons_dup
with weights {
A: 0.6,
B : 0.7*{name: 0.8, age: 0.3}
}
with resolutions {
name: [mostFrequent, defaultResolution("default")],
age: mergeDistinct
};
As you can see, the datasource B has nested weights assigned to himself. In this example the datasource itself has a weight of 0.7, but when accessing the name field, the weights are multiplied. This yields to a weight for that field of 0.7 * 0.8 = 0.56. To allow the fusion operator to take these weights into account, each input record needs a field "_source" which points to that records origin. The previous example with an input of
[
{"name": "P1", "age": 25, "_source": "A"},
{"name": "P2", "age": 24, "_source": "B"}
]
would output the following merged record:
{"name": "P1", "age": [25,24]}
The name "P1" is choosen because both name values are equaly frequent in the input, but the name field of datasource A has a higher weight than the one from source B.
The following list contains all implemented resolutions: