Skip to content

Pipe Connector#

Getting Started#

"Hello World!"#

fun main() = createPipeConnector().onSuccess { 
    it.handler { pipeContext -> 
        pipeContext.setResult("Hello World!").forward()
    }
}
public static void main(String[] args) {
    PipeConnector.create().onSuccess(connector ->
        connector.handler(pipeContext ->
            pipeContext.setResult("Hello World!").forward()));    
}

The No-Op Example#

This example just passes the payload to the next segment, without doing anything with it.

fun main() = createPipeConnector().onSuccess { it.handler { pipeContext -> pipeContext.pass() } }
public static void main(String[] args) {
    PipeConnector.create().onSuccess(connector -> connector.handler(pipeContext -> pipeContext.pass()));    
}

Vert.x Verticle:#

class MyPipeVerticle : CoroutineVerticle() {

    override suspend fun start() {
        vertx.eventBus().consumer(ADDRESS, this::handlePipe)
        awaitResult<PipeConnector> { createPipeConnector(vertx, it) }.pulishTo(ADDRESS)
    }

    private fun handlePipe(message: Message<PipeContext>): Unit = with(message.body()) {
        // The this pointer is now the PipeContext object

    }

    companion object {
        const val ADDRESS: String = "io.piveau.pipe.myservice.queue"
    }

}
public class MyPipeVerticle : AbstractVerticle() {

    public static final String ADDRESS = "io.piveau.pipe.myservice.queue";

    @Override
    public void start(Promise<Void> startPromise) {
        vertx.eventBus().consumer(ADDRESS, this::handlePipe);

        PipeConnector.create(vertx, ar -> {
            if (ar.succeded()) {
                ar.result().publishTo(ADDRESS);
            } else {
                starPromise.fail(ar.cause());
            }
        });
    }

    private void handlePipe(Message<PipeContext> message) {

    }

}

Tip

You can also separate the connector creation from the pipe receiving verticle. E.g. the createPipeConnector() method can be called in a main verticle that on success starts your pipe handling verticle and if that is succesful installs the publishTo() address. So you have better control of configuring your pipe handling verticle worker pool and instances.

Endpoint#

Configuration#

Pipe Context#

Within your handler, the PipeContext object is the man interface to the pipe related functionality.

Access the actual data:

val strData = pipeContext.stringData
val binData = pipeContext.binaryData
String strData = pipeContext.getStringData();
binary[] binData = pipeContext.getBinaryData();

Access the configuration for your segment:

val config = pipeContext.config
JsonObject config = pipeContext.getConfig();

Set and forward any processed data to the next segment:

pipeContext.setResult("Any processed data").forward()
pipeContext.setResult("Any processed data").forward();