Apache Kafka Cloud Connector

Apache Kafka is an open-source stream processing platform. It is based on a massively scalable publish/subscribe message queue architected as a distributed transaction log.

Versioning

The tags below are for the Cloud Connectors repository. You should use the one corresponding to the Jakarta version you require.

0.8.0

Kafka Connector using JakartaEE 8 and the javax namespace

Kafka-1.0.0

Kafka Connector using JakartaEE 10 and the jakarta namespace

Usage

If you have selected to use Kafka Connector 1.0.0, replace all occurunces of 0.8.0 with 1.0.0 for the following example.

In order to connect to Apache Kafka, the kafka-rar-0.8.0.rar has to be deployed as shown in the Installing a connector section of the Cloud Connectors overview.

Once deployed, something like the following should be printed to the startup log of Payara Micro:

[fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] [tid: _ThreadID=1 _ThreadName=main] [timeMillis: 1495395212347] [levelValue: 800] Kafka Resource Adapter Started..

In order to make use of this connector in an application, the following Maven dependency is needed:

<dependency>
  <groupId>fish.payara.cloud.connectors.kafka</groupId>
  <artifactId>kafka-jca-api</artifactId>
  <version>0.8.0</version>
  <type>jar</type>
  <scope>provided</scope>
</dependency>

Note that this dependency has scope provided since the types within this dependency are globally available to every application deployed to Payara Micro after the kafka-rar-0.8.0.rar was deployed.

Sending messages

Sending messages to Apache Kafka can be done via the JCA and a Kafka specific API. In order to start using this API to send messages, a resource has to be defined via the JCA API; a connection factory.

The connection factory has to be given a name, which can be any name that is valid for JNDI. The java:app namespace is typically recommended to be used. The type of the connection factory to be used for Kafka is fish.payara.cloud.connectors.kafka.KafkaConnectionFactory, and we have to specify the resource adapter name which is here kafka-rar-0.8.0.

The following gives an example:

@ConnectionFactoryDefinition (
    name = "java:app/kafka/factory",
    interfaceName = "fish.payara.cloud.connectors.kafka.KafkaConnectionFactory",
    resourceAdapter = "kafka-rar-0.8.0"
)

With the above shown definition in place the following code shows an example of sending a message:

@Singleton
@Startup
public class SendKafkaMessage {

 @Resource(lookup = "java:app/kafka/factory")
 private KafkaConnectionFactory factory;

 @PostConstruct
 public void init() {
    try (KafkaConnection connection = factory.createConnection()) {
        connection.send(new ProducerRecord("test","hello","world"));
    }
    catch (Exception ex) {
    }
 }
}

Receiving messages

Messages can be received from Apache Kafka by creating an MDB (Message Driven Bean) that implements the fish.payara.cloud.connectors.kafka.api.KafkaListener marker interface and has one or more methods annotated with @OnRecord and the method signature void method(ConsumerRecord record), or @OnRecords and the method signature void method(ConsumerRecords records)

The following gives an example:

@MessageDriven(activationConfig = {
 @ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "testGroup"),
 @ActivationConfigProperty(propertyName = "topics", propertyValue = "test,test2"),
 @ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
 @ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000")
})
public class ReceiveKafkaMessage implements KafkaListener {

 @OnRecord(topics="test")
 public void receiveMessage(ConsumerRecord record) {
     // Handle record
 }
}

The full list of config properties is given below:

Config Property Name Type Default Notes

bootstrapServersConfig

String

localhost:9092

Kafka Servers to connect to

clientId

String

KafkaJCAClient

Client ID of the Producer

valueSerializer

String

org.apache.kafka.common .serialization.StringSerializer

Serializer class for value

keySerializer

String

org.apache.kafka.common .serialization.StringSerializer

Serializer class for key

bufferMemory

Long

33554432

The total bytes the producer can use to buffer messages

acks

String

1

The number of acks the producer requires

retries

Integer

0

The number of retries if there is a transient error

batchSize

Long

16384

The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition

lingerMS

Long

0

The producer groups together any records that arrive in between request transmissions into a single batched request.

maxBlockMS

Long

60000

How long can send block

maxRequestSize

Long

1048576

Maximum size of request (bytes)

receiveBufferBytes

Integer

32768

Receive Buffer (bytes)

requestTimeout

Integer

30000

Request Timeout (ms)

compression

String

"none"

Compression type of data sent

connectionsMaxIdle

Long

540000

Close Idle Kafka Connections

maxInflightConnections

Integer

5

Maximum unacknowledged requests to send before blocking

metadataMaxAge

Long

300000

Period of time before a refresh of Metadata (ms)

retryBackoff

Long

100

The amount of time to wait before attempting a retry (ms)

reconnectBackoff

Long

100

The amount of time to wait before attempting a reconnection (ms)