Skip to content

Java Client

A thin library designed to publish messages to Hermes.

Features

  • http client-agnostic API
  • synchronous/asynchronous publishing
  • configurable retries
  • metrics

Overview

Core functionality is provided by HermesClient class, which in turn uses HermesSender to do the heavy lifting. At the moment there are four implementations of HermesSender:

Creating

To start using HermesClient, add it as an dependency:

compile group: 'pl.allegro.tech.hermes', name: 'hermes-client', version: versions.hermes

Client should be always built using HermesClientBuilder, which allows on setting:

HermesClient client = HermesClientBuilder.hermesClient(...)
    .withURI(...) // Hermes URI
    .withRetries(...) // how many times retry in case of errors, default: 3
    .withRetrySleep(...) // initial and max delay between consecutive retries in milliseconds, default: 100ms (initial), 300ms (max)
    .withDefaultContentType(...) // what Content-Type to use when none set, default: application/json
    .withDefaultHeaderValue(...) // append default headers added to each message
    .withMetrics(metricsRegistry) // see Metrics section below
    .build();

See Sender implementations sections for guides on how to construct HermesSender.

Once created, you can start publishing messages. Hermes Client API is asynchronous by default, returning CompletableFuture object that holds the promise of a result.

HermesClient exposes methods for easy publication of JSON and Avro messages.

JSON sender sets application/json content type.

hermesClient.publishJSON("com.group.json", "{hello: 1}");

Avro sender sets avro/binary content type. It also requires to pass Avro schema version of this message, which is passed on to Hermes in Schema-Version header.

hermesClient.publishAvro("com.group.avro", 1, avroMessage.getBytes());

You can also use HermesMessage#Builder to create HermesMessage object, to e.g. pass custom headers:

hermesClient.publish(
    HermesMessage.hermesMessage("com.group.topic", "{hello: 1}")
        .json()
        .withHeader("My-Header", "header value")
        .build()
);

Publication results in returning HermesResponse object:

CompletableFuture<HermesResponse> result = client.publish("group.topic", "{}");

HermesResponse response = result.join();

assert response.isSuccess();
assert response.getStatusCode() == 201;
assert response.getMessageId().equals("..."); // message UUID generated by Hermes

Closing

The client allows graceful shutdown, which causes it to stop accepting publish requests and await for delivery of currently processed messages.

Two variants of shutting down the client are available:

  • synchronous void close(long pollInterval, long timeout) method will return when all currently processed messages (including retries) are delivered or discarded
  • asynchronous CompletableFuture<Void> closeAsync(long pollInterval) returns immediately and the returned CompletableFuture completes when all messages are delivered or discarded

pollInterval parameter is used for polling the internal counter of asynchronously processed messages with the user specified interval in milliseconds.

Calls to publish methods will return an exceptionally completed future with HermesClientShutdownException:

client.close(50, 1000);

assert client.publish("group.topic", "{}").isCompletedExceptionally();

One can use the asynchronous method to wait for the client to terminate within a given timeout, e.g. in a shutdown hook:

client.closeAsync(50).get(1, TimeUnit.SECONDS);

Metrics

Requirement: dependency io.dropwizard.metrics:metrics-core must be provided at runtime.

After providing MetricRegistry, Hermes Client metrics will be reported to hermes-client branch. They include measured latency, meters for each status code received and meter for failures.

MetricRegistry registry = myMetricRegistryFactory.createMetricRegistry();

HermesClient client = HermesClientBuilder.hermesClient(sender)
    .withURI(URI.create("http://localhost:8080"))
    .withMetrics(registry)
    .build();

Sender implementations

Spring - WebClient

Requirement: org.springframework:spring-webflux must be provided at runtime.

HermesClient client = HermesClientBuilder.hermesClient(new WebClientHermesSender(WebClient.create()))
    .withURI(URI.create("http://localhost:8080"))
    .build();

Spring - AsyncRestTemplate

Requirement: org.springframework:spring-web must be provided at runtime.

HermesClient client = HermesClientBuilder.hermesClient(new RestTemplateHermesSender(new AsyncRestTemplate()))
    .withURI(URI.create("http://localhost:8080"))
    .build();

Jersey Client

Requirement: org.glassfish.jersey.core:jersey-client must be provided at runtime.

HermesClient client = HermesClientBuilder.hermesClient(new JerseyHermesSender(ClientBuilder.newClient()))
    .withURI(URI.create("http://localhost:8080"))
    .build();

OkHttp Client

Requirement: com.squareup.okhttp3:okhttp must be provided at runtime.

HermesClient client = HermesClientBuilder.hermesClient(new OkHttpHermesSender(new OkHttpClient()))
    .withURI(URI.create("http://localhost:8080"))
    .build();

HTTP2 support

Requirements:

JVM configured with ALPN support:

java -Xbootclasspath/p:<path_to_alpn_boot_jar> ...

OkHttp Client configured with SSL support:

OkHttpClient client = new OkHttpClient.Builder()
        .sslSocketFactory(sslContext.getSocketFactory())
        .build();

HermesClient client = HermesClientBuilder.hermesClient(new OkHttpHermesSender(okHttpClient))
    .withURI(URI.create("https://localhost:8443"))
    .build();

Custom HermesSender

Example with Unirest - very simple http client.

HermesClient client = HermesClientBuilder.hermesClient((uri, message) -> {
    CompletableFuture<HermesResponse> future = new CompletableFuture<>();

    Unirest.post(uri.toString()).body(message.getBody()).asStringAsync(new Callback<String>() {
        @Override
        public void completed(HttpResponse<String> response) {
            future.complete(() -> response.getStatus());
        }

        @Override
        public void failed(UnirestException exception) {
            future.completeExceptionally(exception);
        }

        @Override
        public void cancelled() {
            future.cancel(true);
        }
    });

    return future;
})
.withURI(URI.create("http://localhost:8080"))
.build();