Apache Kafka Cloud Connector
Apache Kafka is an open-source stream processing platform. It is based on a massively scalable publish/subscribe message queue architected as a distributed transaction log.
In order to connect to Apache Kafka, the KafkaRAR-0.1.0-SNAPSHOT.rar
has to be deployed as shown in the Installing a connector section of the Cloud Connectors overview.
Once deployed, something like the following should be printed to the startup log of Payara Micro:
[fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] [tid: _ThreadID=1 _ThreadName=main] [timeMillis: 1495395212347] [levelValue: 800] Kafka Resource Adapter Started..
In order to make use of this connector in an application, the following Maven dependency is needed:
<dependency>
<groupId>fish.payara.cloud.connectors.kafka</groupId>
<artifactId>KafkaRARAPI</artifactId>
<version>0.1.0-SNAPSHOT</version>
<type>jar</type>
<scope>provided</scope>
</dependency>
Note that this dependency has scope provided
since the types within this dependency are globally available to every application deployed to Payara Micro after the KafkaRAR-0.1.0-SNAPSHOT.rar
was deployed.
Sending messages
Sending messages to Apache Kafka can be done via the JCA and a Kafka specific API. In order to start using this API to send messages, a resource has to be defined via the JCA API; a connection factory.
The connection factory has to be given a name, which can be any name that is valid for JNDI. The java:app namespace is typically recommended to be used. The type of the connection factory to be used for Kafka is fish.payara.cloud.connectors.kafka.KafkaConnectionFactory
, and we have to specify the resource adapter name which is here KafkaRAR-0.1.0-SNAPSHOT
.
The following gives an example:
@ConnectionFactoryDefinition (
name = "java:app/kafka/factory",
interfaceName = "fish.payara.cloud.connectors.kafka.KafkaConnectionFactory",
resourceAdapter = "KafkaRAR-0.1.0-SNAPSHOT"
)
With the above shown definition in place the following code shows an example of sending a message:
@Singleton
@Startup
public class SendKafkaMessage {
@Resource(lookup = "java:app/kafka/factory")
private KafkaConnectionFactory factory;
@PostConstruct
public void init() {
try (KafkaConnection connection = factory.createConnection()) {
connection.send(new ProducerRecord("test","hello","world"));
}
catch (Exception ex) {
}
}
}
Receiving messages
Messages can be received from Apache Kafka by creating an MDB (Message Driven Bean) that implements the fish.payara.cloud.connectors.kafka.api.KafkaListener
marker interface and has one or more methods annotated with @OnRecord
and the method signature void method(ConsumerRecord record)
, or @OnRecords
and the method signature void method(ConsumerRecords records)
The following gives an example:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "testGroup"),
@ActivationConfigProperty(propertyName = "topics", propertyValue = "test,test2"),
@ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
@ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000")
})
public class ReceiveKafkaMessage implements KafkaListener {
@OnRecord(topics="test")
public void receiveMessage(ConsumerRecord record) {
// Handle record
}
}
The full list of config properties is given below:
Config Property Name | Type | Default | Notes |
---|---|---|---|
|
String |
localhost:9092 |
Kafka Servers to connect to |
|
String |
KafkaJCAClient |
Client ID of the Producer |
|
String |
|
Serializer class for value |
|
String |
|
Serializer class for key |
|
Long |
33554432 |
The total bytes the producer can use to buffer messages |
|
String |
1 |
The number of acks the producer requires |
|
Integer |
0 |
The number of retries if there is a transient error |
|
Long |
16384 |
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition |
|
Long |
0 |
The producer groups together any records that arrive in between request transmissions into a single batched request. |
|
Long |
60000 |
How long can send block |
|
Long |
1048576 |
Maximum size of request (bytes) |
|
Integer |
32768 |
Receive Buffer (bytes) |
|
Integer |
30000 |
Request Timeout (ms) |
|
String |
"none" |
Compression type of data sent |
|
Long |
540000 |
Close Idle Kafka Connections |
|
Integer |
5 |
Maximum unacknowledged requests to send before blocking |
|
Long |
300000 |
Period of time before a refresh of Metadata (ms) |
|
Long |
100 |
The amount of time to wait before attempting a retry (ms) |
|
Long |
100 |
The amount of time to wait before attempting a reconnection (ms) |