Apache Kafka is an open-source stream processing platform. It is based on a massively scalable publish/subscribe message queue architected as a distributed transaction log.
In order to connect to Apache Kafka, the kafka-rar-0.8.0.rar has to be deployed as shown in the Installing a connector section of the Cloud Connectors overview.
Once deployed, something like the following should be printed to the startup log of Payara Micro:
[fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] [tid: _ThreadID=1 _ThreadName=main] [timeMillis: 1495395212347] [levelValue: 800] Kafka Resource Adapter Started..In order to make use of this connector in an application, the following Maven dependency is needed:
<dependency>
  <groupId>fish.payara.cloud.connectors.kafka</groupId>
  <artifactId>kafka-jca-api</artifactId>
  <version>0.8.0</version>
  <type>jar</type>
  <scope>provided</scope>
</dependency>Note that this dependency has scope provided since the types within this dependency are globally available to every application deployed to Payara Micro after the kafka-rar-0.8.0.rar was deployed.
Sending messages
Sending messages to Apache Kafka can be done via the JCA and a Kafka specific API. In order to start using this API to send messages, a resource has to be defined via the JCA API; a connection factory.
The connection factory has to be given a name, which can be any name that is valid for JNDI. The java:app namespace is typically recommended to be used. The type of the connection factory to be used for Kafka is fish.payara.cloud.connectors.kafka.KafkaConnectionFactory, and we have to specify the resource adapter name which is here kafka-rar-0.8.0.
The following gives an example:
@ConnectionFactoryDefinition (
    name = "java:app/kafka/factory",
    interfaceName = "fish.payara.cloud.connectors.kafka.KafkaConnectionFactory",
    resourceAdapter = "kafka-rar-0.8.0"
)With the above shown definition in place the following code shows an example of sending a message:
@Singleton
@Startup
public class SendKafkaMessage {
 @Resource(lookup = "java:app/kafka/factory")
 private KafkaConnectionFactory factory;
 @PostConstruct
 public void init() {
    try (KafkaConnection connection = factory.createConnection()) {
        connection.send(new ProducerRecord("test","hello","world"));
    }
    catch (Exception ex) {
    }
 }
}Receiving messages
Messages can be received from Apache Kafka by creating an MDB (Message Driven Bean) that implements the fish.payara.cloud.connectors.kafka.api.KafkaListener marker interface and has one or more methods annotated with @OnRecord and the method signature void method(ConsumerRecord record), or @OnRecords and the method signature void method(ConsumerRecords records)
The following gives an example:
@MessageDriven(activationConfig = {
 @ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "testGroup"),
 @ActivationConfigProperty(propertyName = "topics", propertyValue = "test,test2"),
 @ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
 @ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000")
})
public class ReceiveKafkaMessage implements KafkaListener {
 @OnRecord(topics="test")
 public void receiveMessage(ConsumerRecord record) {
     // Handle record
 }
}The full list of config properties is given below:
| Config Property Name | Type | Default | Notes | 
|---|---|---|---|
| 
 | String | localhost:9092 | Kafka Servers to connect to | 
| 
 | String | KafkaJCAClient | Client ID of the Producer | 
| 
 | String | 
 | Serializer class for value | 
| 
 | String | 
 | Serializer class for key | 
| 
 | Long | 33554432 | The total bytes the producer can use to buffer messages | 
| 
 | String | 1 | The number of acks the producer requires | 
| 
 | Integer | 0 | The number of retries if there is a transient error | 
| 
 | Long | 16384 | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition | 
| 
 | Long | 0 | The producer groups together any records that arrive in between request transmissions into a single batched request. | 
| 
 | Long | 60000 | How long can send block | 
| 
 | Long | 1048576 | Maximum size of request (bytes) | 
| 
 | Integer | 32768 | Receive Buffer (bytes) | 
| 
 | Integer | 30000 | Request Timeout (ms) | 
| 
 | String | "none" | Compression type of data sent | 
| 
 | Long | 540000 | Close Idle Kafka Connections | 
| 
 | Integer | 5 | Maximum unacknowledged requests to send before blocking | 
| 
 | Long | 300000 | Period of time before a refresh of Metadata (ms) | 
| 
 | Long | 100 | The amount of time to wait before attempting a retry (ms) | 
| 
 | Long | 100 | The amount of time to wait before attempting a reconnection (ms) |