# Implementing Kafka event streaming

## Prerequisites

Upvest provisions access for you. Before you connect, make sure you have received the following from your Upvest contact:

- **Bootstrap server.** The `hostname` and `port` your Kafka client connects to.
- **Username and password.** Your credentials for the cluster.
- **Simple Authentication and Security Layer (SASL) mechanism.** The authentication mechanism to configure on your client.


## Connection parameters

| **Parameter** | **Value** |
|  --- | --- |
| Bootstrap server | `<BOOTSTRAP_SERVER>` |
| Security protocol | `SASL_SSL` |
| SASL mechanism | Provided by Upvest |
| Username | `<USERNAME>` |
| Password | `<PASSWORD>` |
| Transport Layer Security (TLS) | Standard system Certificate Authority (CA) bundle, no custom certificate required |


Connections are authenticated over TLS using a publicly trusted certificate authority, so you do not need any custom certificate authority configuration. Use the SASL mechanism, username, and password exactly as provided by Upvest.

You can connect using any standard Kafka client to complete the connection.

## Topics

Your topics are exclusive to your organisation. Upvest provisions two types of topics:

| **Topic** | **Contents** |
|  --- | --- |
| `Notifications` | All event types you would otherwise receive over webhooks |
| `Prices` | Price update events |


Upvest gives you the exact topic names to use. Use each name as provided, without adding a prefix or suffix.

## Message format

Each message value is a single event, encoded as a JSON UTF-8 string. The event has the same structure as the events delivered over webhooks, so you can reuse the same parsing logic.

Each event contains the following fields:

| **Field name** | **Type** | **Description** |
|  --- | --- | --- |
| `id` | String (uuid) | The unique ID of the event. |
| `created_at` | String | An [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339) timestamp indicating the time at which the event occurred. |
| `type` | String | The event type. See the [Event categories & types](/products/omnibus/getting_started/implementing_webhooks/webhooks_categories_types) section for the full list. |
| `object` | JSON object | The event-specific payload, identical to the `object` documented for the same event. |


The event payload contains the exact same fields as the related webhook. The `object` you receive on a topic matches the `object` in the webhook for that event type.

The only difference is the envelope. A webhook request bundles several events under a top-level `payload` array; Kafka delivers one event per message, so there is no `payload` wrapper to unpack.

Each message also carries Kafka headers for routing without parsing the body:

| **Header** | **Value** |
|  --- | --- |
| `event-id` | The event `id` |
| `event-type` | The event `type` |
| `content-type` | `application/json` |


Kafka preserves the order of messages within a partition. However, the message order is not guaranteed across partitions, so you should design your consumer to handle each event independently, where possible.

**Example message value**

A single `USER.CREATED` event, as delivered on a topic. For the exact fields of this event, see the [user event reference](/api/users/user_event):

```json
{
    "id": "fbecea50-2f35-4969-96af-342271da9eca",
    "created_at": "2021-07-21T14:10:00.00Z",
    "type": "USER.CREATED",
    "object": {
        "id": "83d83ec2-d2ca-49ff-bbea-b92b5c3be202",
        "type": "USER"
    }
}
```

Parse the message value as JSON before you access any field, and handle unknown fields gracefully, because new fields can be added in non-breaking releases.

If a message cannot be parsed as valid JSON, treat it as a processing error and route it to your dead-letter queue (DLQ) or alerting pipeline. Do not silently discard malformed messages.

## Consumer groups

Use a consumer group ID that is unique to your application. A clear convention is to name it after the service or use case that consumes the events, for example `notifications-consumer` or `reconciliation`. Run multiple instances with the same group ID to share the partition workload automatically.

## Testing connectivity

Before you integrate, verify the connection with `kcat`, a command-line Kafka client. Install it on your platform, then run the commands below, filling in the SASL mechanism Upvest provided.

List topics to confirm authentication
```bash
kcat -b <BOOTSTRAP_SERVER> \
     -X security.protocol=SASL_SSL \
     -X sasl.mechanism=<SASL_MECHANISM> \
     -X sasl.username=<USERNAME> \
     -X sasl.password=<PASSWORD> \
     -L
```

A successful metadata response confirms that authentication works.

Consume from the beginning of a topic
```bash
kcat -b <BOOTSTRAP_SERVER> \
     -X security.protocol=SASL_SSL \
     -X sasl.mechanism=<SASL_MECHANISM> \
     -X sasl.username=<USERNAME> \
     -X sasl.password=<PASSWORD> \
     -C -t <TOPIC_NAME> -o beginning
```

Each line is one event message value, in the JSON format shown above. If no messages appear, the topic may be empty. That is not a connectivity issue.

## Environments

| **Environment** | **Purpose** | **Bootstrap server** |
|  --- | --- | --- |
| **Sandbox** | Integration testing with no real data | Provided by Upvest |
| **Production** | Live data | Provided after sandbox sign-off |


Credentials are issued separately per environment. Sandbox credentials do not work against production, and production credentials do not work against sandbox.