Apache Kafka Cloud Connector
Apache Kafka is an open-source stream processing platform. It is based on a massively scalable publish/subscribe message queue architected as a distributed transaction log.
In order to connect to Apache Kafka, the KafkaRAR-0.1.0-SNAPSHOT.rar
has to be
deployed as shown in the
Installing a connector section of the Cloud Connectors overview.
Once deployed, something like the following should be printed to the startup log of Payara Micro Community:
[fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] [tid: _ThreadID=1 _ThreadName=main] [timeMillis: 1495395212347] [levelValue: 800] Kafka Resource Adapter Started..
In order to make use of this connector in an application, the following Maven dependency is needed:
<dependency>
<groupId>fish.payara.cloud.connectors.kafka</groupId>
<artifactId>KafkaRARAPI</artifactId>
<version>0.1.0-SNAPSHOT</version>
<type>jar</type>
<scope>provided</scope>
</dependency>
Note that this dependency has scope provided
since the types within this
dependency are globally available to every application deployed to Payara Micro Community
after the KafkaRAR-0.1.0-SNAPSHOT.rar
was deployed.
Sending messages
Sending messages to Apache Kafka can be done via the JCA and a Kafka specific API. In order to start using this API to send messages, a resource has to be defined via the JCA API; a connection factory.
The connection factory has to be given a name, which can be any name that is
valid for JNDI. The java:app namespace is typically recommended to be used. The
type of the connection factory to be used for Kafka is
fish.payara.cloud.connectors.kafka.KafkaConnectionFactory
, and we have to
specify the resource adapter name which is here KafkaRAR-0.1.0-SNAPSHOT
.
The following gives an example:
@ConnectionFactoryDefinition (
name = "java:app/kafka/factory",
interfaceName = "fish.payara.cloud.connectors.kafka.KafkaConnectionFactory",
resourceAdapter = "KafkaRAR-0.1.0-SNAPSHOT"
)
With the above shown definition in place the following code shows an example of sending a message:
@Singleton
@Startup
public class SendKafkaMessage {
@Resource(lookup = "java:app/kafka/factory")
private KafkaConnectionFactory factory;
@PostConstruct
public void init() {
try (KafkaConnection connection = factory.createConnection()) {
connection.send(new ProducerRecord("test","hello","world"));
}
catch (Exception ex) {
}
}
}
Receiving messages
Messages can be received from Apache Kafka by creating an MDB (Message Driven
Bean) that implements the fish.payara.cloud.connectors.kafka.api.KafkaListener
marker interface and has one or more methods annotated with @OnRecord
and the
method signature void method(ConsumerRecord record)
, or @OnRecords
and the
method signature void method(ConsumerRecords records)
The following gives an example:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "testGroup"),
@ActivationConfigProperty(propertyName = "topics", propertyValue = "test,test2"),
@ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
@ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000")
})
public class ReceiveKafkaMessage implements KafkaListener {
@OnRecord(topics="test")
public void receiveMessage(ConsumerRecord record) {
// Handle record
}
}
The full list of config properties is given below:
Config Property Name | Type | Default | Notes |
---|---|---|---|
bootstrapServersConfig |
String |
localhost:9092 |
Kafka Servers to connect to |
clientId |
String |
KafkaJCAClient |
Client ID of the Producer |
valueSerializer |
String |
|
Serializer class for value |
keySerializer |
String |
|
Serializer class for key |
bufferMemory |
Long |
33554432 |
The total bytes the producer can use to buffer messages |
acks |
String |
1 |
The number of acks the producer requires |
retries |
Integer |
0 |
The number of retries if there is a transient error |
batchSize |
Long |
16384 |
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition |
lingerMS |
Long |
0 |
The producer groups together any records that arrive in between request transmissions into a single batched request. |
maxBlockMS |
Long |
60000 |
How long can send block |
maxRequestSize |
Long |
1048576 |
Maximum size of request (bytes) |
receiveBufferBytes |
Integer |
32768 |
Receive Buffer (bytes) |
requestTimeout |
Integer |
30000 |
Request Timeout (ms) |
compression |
String |
"none" |
Compression type of data sent |
connectionsMaxIdle |
Long |
540000 |
Close Idle Kafka Connections |
maxInflightConnections |
Integer |
5 |
Maximum unacknowledged requests to send before blocking |
metadataMaxAge |
Long |
300000 |
Period of time before a refresh of Metadata (ms) |
retryBackoff |
Long |
100 |
The amount of time to wait before attempting a retry (ms) |
reconnectBackoff |
Long |
100 |
The amount of time to wait before attempting a reconnection (ms) |