Skip to content

How to run a harvesting pipe#

What you need to know about harvesting:#

The harvester is a collection of services that are used to run a so-called pipe. You'll find all required services under the expression "consus".

These are the services in order:

scheduler → importer → (transformer) → exporter → hub

The scheduler manages 'the pipe'. It 'schedules' each pipe run. The scheduler therefore needs to know how to communicate with the next instance.

In this case the next instance is an importer. The importer does what the name suggests, it imports data.

These are the importers provided at the moment:

  • RDF importer
  • CKAN importer
  • CKAN single importer
  • CKAN PROP importer
  • SPARQL importer
  • UDATA importer
  • SOCRATA importer
  • DRUPAL importer

You can see by the name what each importer is doing. It imports a certain data format from a certain endpoint.

Each of these importers runs behind an endpoint and can be reached through this endpoint.

The scheduler receives a list containing all endpoints of the other required services (importer, exporter, transformer, ...).

The endpoint looks e.g. like this: http://piveau-importing-rdf:8080/pipe

But how does the scheduler know which importer to use? This information is given in the form of a json file and this json file is somewhat the so called, ominous 'pipe' and it looks like this:

  {
    "header": {
    "id": "137020cb-6923-42ea-9910-9f8b39d6cab1",
    "name": "camcom",
    "title": "Harvesting - Camera di Commercio delle Marche",
    "context": "EDP2",
    "transport": "payload",
    "version": "2.0.0"
  },
  "body": {
    "segments": [
      {
        "header": {
          "name": "importing-rdf",
          "segmentNumber": 1,
          "title": "Importing RDF",
          "processed": false
        },
        "body": {
          "config": {
            "address": "http://www.mc.camcom.it/uploaded/linked-open-data/dcat-opendata-catalog.json",
            "catalogue": "camcom",
            "inputFormat": "application/ld+json",
            "alternativeLoad": false
          }
        }
      },
      {
        "header": {
          "name": "exporting-hub",
          "segmentNumber": 2,
          "title": "Exporting hub",
          "processed": false
        },
        "body": {
          "config": {
            "hub": {
              "serviceName": "piveau-hub"
            }
          }
        }
      }
    ]
  }
}

Let's see, this json-file is called pipe-camcom.json and it consists of a header and a body. The important bit in the header is the name which is the name of the pipe. The complete schema of the pipe object is defined in the pipe model library.

Furthermore, the body consists of segments which again consist of a header and body. Pretty simple. These segments of the pipe tell each instance (service, importer, exporter,... ) what to do and where to go next.

In this case, the first segment is the RDF importer. So the scheduler checks in its list, which endpoint communicates with the RDF importer and sends the pipe there. When the pipe in form of this json file arrives there, the importer checks where to import from.

In this case, it's here: http://www.mc.camcom.it/uploaded/linked-open-data/dcat-opendata-catalog.json

Also, the importer must know where to import into and as you can see, it'll import the data into the catalogue "camcom".

Next, the exporter is called. At the moment there is only one exporter and it exports into our piveau-hub, meaning you should also run a hub if you want to build and run a harvester on your local machine or anywhere else.

That's pretty much it.

The scheduler#

You need to get it from here.

The scheduler provides a rest API to communicate with. If you run the scheduler on localhost, it will look like this:

GET http://localhost:8080/triggers → Resturns a list of pipeIds and scheduled triggers

GET http://localhost:8080/triggers/{pipeId} → Returns a list of triggers for a pipe with pipeId

PUT http://localhost:8080/triggers → Bulk update of all triggers

PUT http://localhost:8080/triggers/{pipeId} → Create or update a trigger for pipeId

DEL http://localhost:8080/triggers/{pipeId} → Delete a trigger for pipeId

GET http://localhost:8080/triggers/{pipeId}/{triggerId}/{status} → Set status for a trigger

So what are triggers? The scheduler uses the quartz scheduler to manage and trigger pipes. (They run in parallel.)

Triggers are JSON objects that contain information about when a pipe should be triggered. They must contain a pipeId/pipeName to know which pipe to trigger and a status which can be "enabled" or "disbaled". Optional information goes into the config field.

Types of triggers:

  • Immediate trigger
  • Interval trigger
  • Cron trigger
  • Specific trigger

Trigger example for immediate trigger:

[
  {
    "status": "enabled",
    "id": "testPipe",
    "configs": {}
  }
]

As the name suggests, the immediate trigger triggers immediately.

The interval trigger triggers the pipe in certain intervals and so on. You can find out more about the structure of a trigger in the scheduler APi description. configs can contain segment specific configurations. If you e.g. wanted to add an additional field to the first segment of the camcon pipe above, for the importing-rdf module, the trigger would look like this:

[
  {
    "status": "enabled",
    "id": "testPipe",
    "configs": {
      "importing-rdf": {
        "preProcessing": true
      }
    }
  }
]

Note

values that are already present in the pipe will be overwritten. JsonArrays are treated like any other entry, i.e. replaced entirely.

When a pipe is triggered, the scheduler sends it to its first segment, in our case the RDF importer.

But where does the scheduler get the pipes from? There are 3 possibilities:

  • Gitlab repository
  • Reads it from the directpry piveau-consus-scheduling/src/main/resources/pipes
  • Reads it from a directory "pipes" in the directory where the scheduler was started

The Pipe#

The pipe is as mentioned before a json file that mainly consists of several instances of header/body type json objects.

The header of a pipe consists of its ID, name, title, context, transport and version.

The name is especially important because the name is what's used by the scheduler to trigger the pipe.

The body of a pipe consists of several segments (or just one). The segments are provided in the form of an array.

The segments in turn consist again of a header and body. The header has information about which service (importer, exporter, transformer) is to be used next.

The body then has information about where to import from, export to and into which catalogue to export depending on the segment type. (Importer segments have information about which importer (rdf, ckan, ...) to use, the URL from which to get the data and the catalogue into which the harvested data shall go, exporter have information about where to export the data to - in our case that's only the hub at the moment.)

Pipe schema can be found here.

Example pipe can be found here.

Trigger#

Trigger are always given in the form of an array [].

You can create more than one trigger at once.

Types of triggers:

  • Immediate trigger
  • Interval trigger
  • Cron trigger
  • Specific trigger

Trigger example for immediate trigger:

[
  {
    "status": "enabled",
    "id": "test-pipe"
  }
]
As the name suggests, the immediate trigger triggers immediately.

Interval trigger:

{
    "id": "test-pipe",
    "status": "enabled",
    "interval": {
        "unit": "DAY",
        "value": 1
    },
    "next": "2020-05-02T17:00:00Z"
}
Cron trigger:

Consult this or this for more information about how to create cron expression.

{
    "id" : "test-pipe",
    "status" : "enabled",
    "cron" : "0 0 12 * * ?",
    "next" : "2020-07-03T15:55:00Z"
}
Specific trigger:
{
    "id" : "test-pipe",
    "status" : "enabled",
    "specific" : ["2020-07-03T15:55:00Z"]
}

Pipe Repository#

If you looked closer at our PIVEAU_CLUSTER_CONFIG environment variable you might have noticed that at the start there's mention of a repository and a URL.

Here's the json to refresh your memory:

"pipeRepositories": {
    "harvesters": {
      "uri": "https://gitlab.com/european-data-portal/harvester/harvesting-pipes.git",
      "username": "username",
      "token": "token",
      "branch": "develop"
    }
  }

That's relatively self explanatory. The scheduler when starting looks for the repository given in this json format, loads all pipes from the repositor and stores them.

The import of those pipe repositories is implemented in piveau-pipe-launcher.

In the README.md there, you'll find the json schema for a pipe repository and a service repository. Check it out if you like to learn more!

"pipeRepositories": {
  "resources": {
    "paths": [ "pipes" ]
  },
  "system": {
    "uri": "",
    "branch": "master",
    "username": "",
    "token": ""
  }
}

"serviceDiscovery": {
  "test1-segment": {
    "http": {
      "method": "POST",
      "address": "http://example.com:8080/pipe"
    },
    "eventbus": {
      "address": "piveau.pipe.test.queue"
    }
  },
  "test2-segment": {
    ...
  }
}