TripleWave

An open-source tool to bring RDF streams on the Web

Installation

Requirements

  • NodeJS > 6.0.0
  • Java 8 (for fuseki, theoretically you can ignore this if you plan to use only the transform mode)

In order to install TripleWave just clone the repository

git clone https://github.com/streamreasoning/TripleWave.git

Then simply run

npm install

for installing the node dependencies.

Possible bugs

  • While installing the dependency node-icu-charset-detector you could see ../node-icu-charset-detector.cpp:5:10: fatal error: ‘unicode/ucsdet.h’ file not found (tested on OSX 10.10), see here
  • While installing the dependency node-iconv you could see ../node_modules/nan/nan.h:601:20: error: no type named ‘GCPrologueCallback’ in ‘v8::Isolate’(tested on 10.10), please ignore..

How to Run

To start TripleWave, run the start.sh file if you are using Mac/Linux, or start.bat if you are using Windows.

By default, TripleWave starts on the ports 8114 (stream description) and 8124 (stream distribution), and it converts the English Wikipedia changes stream. To customize the TripleWave behavior, see the Configuration section.

Configuration file

The configuration file is located in the config folder allows to fully customize the TripleWave installation.

Running the examples

TripleWave comes with three scenarios configured and ready to be run: the web stream transformation case, the replay of the Linked Sensor Data dataset and the endless stream of the same dataset

In order to run the different cases just set the mode to transform, for the first case, replay for the second, and endless for the last.

In all the cases the sgraph of the corresponding stream will be available at

http://hostname:port/path/sgraph

As configured in the configuration file.

You can have a look at the stream of the transformation and endless scenario by visiting following links:

Run TripleWave to convert a Web stream

TripleWave allows to generate an RDF stream from an existing stream from the Web. As an example, consider the change stream of Wikipedia. This stream features all the changes that occur on the Wikipedia website. It comprehends not only elements related to the creation or modification of pages (e.g., articles and books), but also events related to users (new registrations and blocked users), and discussions among them.

For example the following JSON excerpt (collected with the API provided here) shows a fragment of the stream of changes of Wikipedia. In particular, it shows that the user Jmorrison230582 modified an article of the English Wikipedia about Naruto: Ultimate Ninja. Furthermore, the delta attribute tell us that the user deleted some words, and the url attribute refers the to the Wikipedia page that describes the event.

{ 
  "page": "Naruto: Ultimate Ninja",
  "pageUrl": "http://en.wikipedia.org/wiki/Naruto:_Ultimate_Ninja",
  "url": "https://en.wikipedia.org/w/index.php?diff=669355471&oldid=669215360",
  "delta": -7, "comment": "/ Characters /",
  "wikipediaUrl": "http://en.wikipedia.org", 
  "channel": "#en.wikipedia", 
  "wikipediaShort": "en",
  "user": "Jmorrison230582", 
  "userUrl": "http://en.wikipedia.org/wiki/User/Jmorrison230582",
  "unpatrolled": false, 
  "newPage": false, 
  "robot": false,
  "namespace": "article" 
}

In order to transform a web stream you need two components:

  • A connector to the web stream
  • A R2RML tranformation

Web Stream Connector

A Web Stream connector is a Javascript file that needs to transform data retrieved from some web API to a NodeJS stream.

Basically what you need to do is to implement a Transform Stream (a Readable stream is fine too)

Let’s have a look at the Wikipedia example:

var stream = require('stream');
var util = require('util');
var wikichanges = require("wikichanges");

var Transform = stream.Transform || require('readable-stream').Transform;

function WikiStream(options) {
  // allow use without new
  if (!(this instanceof WikiStream)) {
    return new WikiStream(options);
  }

  this.close = false;
  this.w = new wikichanges.WikiChanges({
    ircNickname: "jsonLDBot",
    wikipedias: ["#en.wikipedia"]
  });
  _this = this;

  this.w.listen(function(c) {
     if (!_this.close) {
       _this.push(JSON.stringify(c));
     } else {
        _this.push(null);
     }
  });

  // init Transform
  Transform.call(this, options);
}

util.inherits(WikiStream, Transform);

WikiStream.prototype._read = function(enc, cb) {};

WikiStream.prototype.closeStream = function() {
  this.close = true;
};
exports = module.exports = WikiStream;

The lines var stream = require('stream'); var util = require('util'); are needed for requiring the stream module and the util module that is needed to implement the inheritance

Then, var Transform = stream.Transform || require('readable-stream').Transform; requires the actual Transform stream class

Then all the logic is implemented inside the WikiStream function

Whenever you want to put some data in the stream you need to call the `this.push(/* some data*/) function (remember that in the stream you can pass only strings)

In this particular example the code works like this:

var wikichanges = require("wikichanges"); requires the library to connect to the stream of changes of wikipedia

The code

this.w = new wikichanges.WikiChanges({
    ircNickname: "jsonLDBot",
    wikipedias: ["#en.wikipedia"]
});

opens the stream.

Then with the lines

this.w.listen(function(c) {
  if (!_this.close) {
    _this.push(JSON.stringify(c));
  } else {
    _this.push(null);
  }
});

we create a handler that put the data in our stream whenever they are available from Wikipedia

In order to use a custom stream you need to put your file in the stream/input_stream folder, and then set the stream_name parameter in the configuration file equal to the name of your .js file

Furthermore you can use the SampleStream.js file as a stub to create your own connector.

R2RML Transformation

To adapt and transform Web streams to RDF streams we use a generic transformation process that is specified as R2RML mappings. The example below specifies how a Wikipedia stream update can be mapped to a graph of an RDF stream. This mapping defines first a triple that indicates that the generated subject is of type schema:UpdateAction. The predicateObjectMap clauses add two more triples, one specifying the object of the update (e.g. the modified wiki page) and the author of the update.

:wikiUpdateMap a rr:TriplesMap; rr:logicalTable :wikistream;
  rr:subjectMap [ rr:template "http://131.175.141.249/TripleWave/{time}"; 
                  rr:class schema:UpdateAction; rr:graphMap :streamGraph ];
  rr:predicateObjectMap [rr:predicate schema:object; 
                         rr:objectMap [ rr:column "pageUrl" ]];     		   		  
  rr:predicateObjectMap [rr:predicate schema:agent;  
                         rr:objectMap [ rr:column "userUrl"] ];.

Additional mappings can be specified, as in the example below, for providing more information about the user (e.g. user name):

 :wikiUserMap a rr:TriplesMap; rr:logicalTable :wikistream; 
   rr:subjectMap   [ rr:column "userUrl"; 
                rr:class schema:Person; rr:graphMap :streamGraph ];
   rr:predicateObjectMap [ rr:predicate schema:name; 
                           rr:objectMap [ rr:column "user" ]];.  

A snippet of the resulting RDF Stream graph, serialized in JSON-LD, is shown below.

{"http://www.w3.org/ns/prov#generatedAtTime": "2015-06-30T16:44:59.587Z",
  "@id": "http://131.175.141.249/TripleWave/1435682699587",
  "@graph": [ 
    { "@id": "http://en.wikipedia.org/wiki/User:Jmorrison230582",
      "@type": "https://schema.org/Person",
      "name": "Jmorrison230582" },
    { "@id": "http://131.175.141.249/TripleWave/1435682699587",
      "@type": "https://schema.org/UpdateAction",
      "object": {"@id": "http://en.wikipedia.org/wiki/Naruto_Ultimate_Ninja"},
      "agent":  {"@id": "http://en.wikipedia.org/wiki/User:Jmorrison230582"}
    }
  ],
 "@context": "https://schema.org/"  
}

In order to use your transformation you need to put the R2RML file in the transformation folder and set the stream_mapping parameter as the name of the transformation file.

Run Triplewave to stream your own RDF data

TripleWave can convert an existing dataset (containing some temporal information) in an RDF stream and can stream it out. In the following, we explain how to configure TripleWave in order to work in this setting.

Set the execution mode and the input file

In order to stream your own RDF file, you should first set one of the two execution modes in the mode parameter of the configuration file.

  • Replay
  • Endless

Moreover, you should set the file location of the RDF file to be converted. It can be done by filling the field rdf_file in the config file, e.g.,

rdf_file=../rdf/data.ttl

Create the stream item structure

The first conversion step consists in specifying how to create the RDF stream items, i.e., a set of pairs (g,t) where g denotes an RDF graph and t a time stamp.

Being the file imported an RDF graph, i.e., a set of triples, it is necessary to specify the criteria to (1) group the data in RDF graphs and (2) associate a time instant to each of them. It is done through the following parametric SPARQL query:

PREFIX sr: <http://streamreasoning.org/>
WITH sr:sgraph
INSERT{
  ?g prov:generatedAt ?ts ; sr:hasKey ?key
}
WHERE {
  GRAPH sr:input{
  [rdf_stream_item_pattern]
  BIND (iri(concat("http://streamreasoning.org/igraph/",afn:localname(?key))) AS ?g)
  } 
}

The above query is dependent on the input data and this fact is captured by the [rdf_stream_item_pattern] parameter. It is necessary to set through the rdf_stream_item_pattern parameter in the config file the value with the following constraints:

  • it is a Basic Graph pattern;
  • it uses two special variables ?key and ?ts to set respectively the resource used to partition the data and the relative timestamp;
  • there is a 1:1 relation between ?key and ?ts, i.e., for each value of ?key there is exactly one ?ts value (and vice versa).

TripleWave assumes that the three constraints are verified, otherwise it may not behave properly. With reference to the supplied example file data.ttl, the stream_item_pattern parameter can be set as (in one line):

rdf_stream_item_pattern = 
  ?key <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> ?time . 
  ?time <http://www.w3.org/2006/time#inXSDDateTime> ?ts

Consequently, the following query is executed over the input data

PREFIX sr: <http://streamreasoning.org/>
WITH sr:sgraph
INSERT{
  ?g prov:generatedAt ?ts ; sr:hasKey ?key
}
WHERE {
  GRAPH sr:input{
   ?key <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> ?time . 
   ?time <http://www.w3.org/2006/time#inXSDDateTime> ?ts
   BIND (iri(concat("http://streamreasoning.org/igraph/",afn:localname(?key)))  AS ?g)
  } 
}

Fill the stream items

The previous step creates the stream item structure, with element names and relative time instants. To complete the conversion, it is necessary to fill the stream elements with their content. This operation is done with a set of SPARQL queries in the form:

PREFIX sr: <http://streamreasoning.org/>
WITH [g]
INSERT{
  [stream_item_content_pattern]
}
WHERE {
	GRAPH sr:input{
		[stream_item_content_pattern]
	}
}

[g] denotes a stream element identifier, while [stream_item_content_pattern] indicates the Basic Graph Pattern that extracts the window content.

[g] is automatically set by the TripleWave, while [stream_item_content_pattern] is loaded by the config file. That means, you should set the [stream_item_content_pattern] value through the stream_item_content_pattern parameter. As before, the special variable ?key has to be used to compose the basic graph pattern.

Continuing the example, stream_item_content_pattern parameter can be set as:

rdf_stream_item_pattern = 
  ?key ?p ?o. 

Consequently, the following example query is executed over the input data

PREFIX sr: <http://streamreasoning.org/>
WITH <http://example.org/data.ttl#item05>
INSERT{
	<http://example.org/data.ttl#key5> ?p ?o. 
}
WHERE {
	GRAPH sr:input{
		<http://example.org/data.ttl#key5> ?p ?o. 
	}
}

It is worth noting that TripleWave automatically replace the ?key variable with the value related to the graph it has to be created.

Consuming the TripleWave data

RSP service interfaces

The user can consume a triple wave stream by exploiting RSP services interfaces. The RSP services offers simple HTTP call to interact with an RSP and register stream, register query and consume results. The user can interact with RSP (in this we exemplify the operation flow using the C-SPARQL engine) and consume the stream as follow:

  • Identifies the stream by its IRI of the stream (which is the URL of the sGraph)
  • Register the new stream in the C-SPARQL engine using an HTTP PUT Call (<serveraddress>/streams) to the RSP Sevices interfaces with the parameter streamIRI in the body (it represents the unique ID of the stream in the engine).
  • RSP Services looks at the sGraph URL, parses it and gets the information regarding the TBox and WebSocket
  • The TBox (if available) is associated to the stream.
  • A WebSocket connection is established and the data flows into C-SPARQL
  • Register a new query for the registered stream using an HTTP PUT call <serveraddress>/streams/<queryName> with the query in the body as raw string
  • The TBox is loaded into the reasoner (if available) associated to the query
  • The query is performed on the flowing data.

Here is available a compressed file containing running examples that exploit online RDF streams created with TripleWave and the C-SPARQL Engine (via RSP Services).

The source code of the RSP Services can be found on github.

The source code of the running examples client can be found on github.

About

Publications

  • Andrea Mauri, Jean-Paul Calbimonte, Daniele Dell’Aglio, Marco Balduini, Marco Brambilla, Emanuele Della Valle, Karl Aberer: TripleWave: Spreading RDF Streams on the Web. Resource Paper at International Semantic Web Conference 2016.
  • Andrea Mauri, Jean-Paul Calbimonte, Daniele Dell’Aglio, Marco Balduini, Emanuele Della Valle, Karl Aberer: Where Are the RDF Streams?: On Deploying RDF Streams on the Web of Data with TripleWave. Poster at International Semantic Web Conference 2015.

Licence

TripleWave is released under Apache Public Licence 2.0.

News

Paper at ISWC!

We got accepted at the ISWC 2016 resources track: * Andrea Mauri, Jean-Paul Calbimonte, Daniele Dell’Aglio, Marco Balduini, Marco Brambilla, Emanuele Della Valle, Karl Aberer: TripleWave: Spreading RDF Streams on the Web. Resource Paper at International Semantic Web Conference 2016 (to appear).