Java Client
A thin library designed to publish messages to Hermes.
Features
- http client-agnostic API
- synchronous/asynchronous publishing
- configurable retries
- metrics
Overview
Core functionality is provided by HermesClient
class, which in turn uses HermesSender
to do the heavy lifting.
At the moment there are four implementations of HermesSender
:
- RestTemplateHermesSender - recommended for services built on Spring framework; uses AsyncRestTemplate for asynchronous transmission
- WebClientHermesSender - for services using Spring WebFlux; uses WebClient
- JerseyHermesSender - recommended for services using Jersey
- OkHttpHermesSender - supports both HTTP/1.1 and HTTP/2 protocols, uses OkHttp3 client
Creating
To start using HermesClient
, add it as an dependency:
compile group: 'pl.allegro.tech.hermes', name: 'hermes-client', version: versions.hermes
Client should be always built using HermesClientBuilder
, which allows on setting:
HermesClient client = HermesClientBuilder.hermesClient(...)
.withURI(...) // Hermes URI
.withRetries(...) // how many times retry in case of errors, default: 3
.withRetrySleep(...) // initial and max delay between consecutive retries in milliseconds, default: 100ms (initial), 300ms (max)
.withDefaultContentType(...) // what Content-Type to use when none set, default: application/json
.withDefaultHeaderValue(...) // append default headers added to each message
.withMetrics(metricsRegistry) // see Metrics section below
.build();
See Sender implementations sections for guides on how to construct HermesSender
.
Once created, you can start publishing messages. Hermes Client API is asynchronous by default, returning
CompletableFuture
object that holds the promise of a result.
HermesClient
exposes methods for easy publication of JSON and Avro messages.
JSON sender sets application/json
content type.
hermesClient.publishJSON("com.group.json", "{hello: 1}");
Avro sender sets avro/binary
content type. It also requires to pass Avro schema version of this message, which is
passed on to Hermes in Schema-Version
header.
hermesClient.publishAvro("com.group.avro", 1, avroMessage.getBytes());
You can also use HermesMessage#Builder
to create HermesMessage object, to e.g. pass custom headers:
hermesClient.publish(
HermesMessage.hermesMessage("com.group.topic", "{hello: 1}")
.json()
.withHeader("My-Header", "header value")
.build()
);
Publication results in returning HermesResponse
object:
CompletableFuture<HermesResponse> result = client.publish("group.topic", "{}");
HermesResponse response = result.join();
assert response.isSuccess();
assert response.getStatusCode() == 201;
assert response.getMessageId().equals("..."); // message UUID generated by Hermes
Closing
The client allows graceful shutdown, which causes it to stop accepting publish requests and await for delivery of currently processed messages.
Two variants of shutting down the client are available:
- synchronous
void close(long pollInterval, long timeout)
method will return when all currently processed messages (including retries) are delivered or discarded - asynchronous
CompletableFuture<Void> closeAsync(long pollInterval)
returns immediately and the returned CompletableFuture completes when all messages are delivered or discarded
pollInterval
parameter is used for polling the internal counter of asynchronously processed messages with the user specified interval in milliseconds.
Calls to publish
methods will return an exceptionally completed future with HermesClientShutdownException
:
client.close(50, 1000);
assert client.publish("group.topic", "{}").isCompletedExceptionally();
One can use the asynchronous method to wait for the client to terminate within a given timeout, e.g. in a shutdown hook:
client.closeAsync(50).get(1, TimeUnit.SECONDS);
Metrics
Requirement: dependency io.dropwizard.metrics:metrics-core
must be provided at runtime.
After providing MetricRegistry
, Hermes Client metrics will be reported to hermes-client
branch. They include
measured latency, meters for each status code received and meter for failures.
MetricRegistry registry = myMetricRegistryFactory.createMetricRegistry();
HermesClient client = HermesClientBuilder.hermesClient(sender)
.withURI(URI.create("http://localhost:8080"))
.withMetrics(registry)
.build();
Sender implementations
Spring - WebClient
Requirement: org.springframework:spring-webflux
must be provided at runtime.
HermesClient client = HermesClientBuilder.hermesClient(new WebClientHermesSender(WebClient.create()))
.withURI(URI.create("http://localhost:8080"))
.build();
Spring - AsyncRestTemplate
Requirement: org.springframework:spring-web
must be provided at runtime.
HermesClient client = HermesClientBuilder.hermesClient(new RestTemplateHermesSender(new AsyncRestTemplate()))
.withURI(URI.create("http://localhost:8080"))
.build();
Jersey Client
Requirement: org.glassfish.jersey.core:jersey-client
must be provided at runtime.
HermesClient client = HermesClientBuilder.hermesClient(new JerseyHermesSender(ClientBuilder.newClient()))
.withURI(URI.create("http://localhost:8080"))
.build();
OkHttp Client
Requirement: com.squareup.okhttp3:okhttp
must be provided at runtime.
HermesClient client = HermesClientBuilder.hermesClient(new OkHttpHermesSender(new OkHttpClient()))
.withURI(URI.create("http://localhost:8080"))
.build();
HTTP2 support
Requirements:
JVM configured with ALPN support:
java -Xbootclasspath/p:<path_to_alpn_boot_jar> ...
OkHttp Client configured with SSL support:
OkHttpClient client = new OkHttpClient.Builder()
.sslSocketFactory(sslContext.getSocketFactory())
.build();
HermesClient client = HermesClientBuilder.hermesClient(new OkHttpHermesSender(okHttpClient))
.withURI(URI.create("https://localhost:8443"))
.build();
Custom HermesSender
Example with Unirest - very simple http client.
HermesClient client = HermesClientBuilder.hermesClient((uri, message) -> {
CompletableFuture<HermesResponse> future = new CompletableFuture<>();
Unirest.post(uri.toString()).body(message.getBody()).asStringAsync(new Callback<String>() {
@Override
public void completed(HttpResponse<String> response) {
future.complete(() -> response.getStatus());
}
@Override
public void failed(UnirestException exception) {
future.completeExceptionally(exception);
}
@Override
public void cancelled() {
future.cancel(true);
}
});
return future;
})
.withURI(URI.create("http://localhost:8080"))
.build();