# KafkaChatMessageHistory

> **Class** in `langchain_community`

📖 [View in docs](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory)

Chat message history stored in Kafka.

## Signature

```python
KafkaChatMessageHistory(
    self,
    session_id: str,
    bootstrap_servers: str,
    ttl_ms: int = DEFAULT_TTL_MS,
    replication_factor: int = DEFAULT_REPLICATION_FACTOR,
    partition: int = DEFAULT_PARTITION,
)
```

## Description

**Setup:**

Install ``confluent-kafka-python``.

.. code-block:: bash

    pip install confluent_kafka

**Instantiate:**

.. code-block:: python

from langchain_community.chat_message_histories import KafkaChatMessageHistory

history = KafkaChatMessageHistory(
    session_id="your_session_id",
    bootstrap_servers="host:port",
)

**Add and retrieve messages:**

.. code-block:: python

# Add messages
history.add_messages([message1, message2, message3, ...])

# Retrieve messages
message_batch_0 = history.messages

# retrieve messages after message_batch_0
message_batch_1 = history.messages

# Reset to beginning and retrieve messages
messages_from_beginning = history.messages_from_beginning()

Retrieving messages is stateful. Internally, it uses Kafka consumer to read.
The consumed offset is maintained persistently.

To retrieve messages, you can use the following methods:
- `messages`:
    continue consuming chat messages from last one.
- `messages_from_beginning`:
    reset the consumer to the beginning of the chat history and return messages.
    Optional parameters:
    1. `max_message_count`: maximum number of messages to return.
    2. `max_time_sec`: maximum time in seconds to wait for messages.
- `messages_from_latest`:
    reset to end of the chat history and try consuming messages.
    Optional parameters same as above.
- `messages_from_last_consumed`:
    continuing from the last consumed message, similar to `messages`.
    Optional parameters same as above.

`max_message_count` and `max_time_sec` are used to avoid blocking indefinitely
 when retrieving messages. As a result, the method to retrieve messages may not
 return all messages. Change `max_message_count` and `max_time_sec` to retrieve
 all history messages.

## Parameters

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `session_id` | `str` | Yes | The ID for single chat session. It is used as Kafka topic name. |
| `bootstrap_servers` | `str` | Yes |  Comma-separated host/port pairs to establish connection to Kafka cluster https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers |
| `ttl_ms` | `int` | No |  Time-to-live (milliseconds) for automatic expiration of entries. Default 7 days. -1 for no expiration. It translates to https://kafka.apache.org/documentation.html#topicconfigs_retention.ms (default: `DEFAULT_TTL_MS`) |
| `replication_factor` | `int` | No | The replication factor for the topic. Default 1. (default: `DEFAULT_REPLICATION_FACTOR`) |
| `partition` | `int` | No | The number of partitions for the topic. Default 3. (default: `DEFAULT_PARTITION`) |

## Extends

- `BaseChatMessageHistory`

## Constructors

```python
__init__(
    self,
    session_id: str,
    bootstrap_servers: str,
    ttl_ms: int = DEFAULT_TTL_MS,
    replication_factor: int = DEFAULT_REPLICATION_FACTOR,
    partition: int = DEFAULT_PARTITION,
)
```

| Name | Type |
|------|------|
| `session_id` | `str` |
| `bootstrap_servers` | `str` |
| `ttl_ms` | `int` |
| `replication_factor` | `int` |
| `partition` | `int` |


## Properties

- `session_id`
- `bootstrap_servers`
- `admin_client`
- `num_partitions`
- `producer`
- `messages`

## Methods

- [`add_messages()`](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory/add_messages)
- [`messages_from_beginning()`](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory/messages_from_beginning)
- [`messages_from_latest()`](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory/messages_from_latest)
- [`messages_from_last_consumed()`](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory/messages_from_last_consumed)
- [`clear()`](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory/clear)
- [`close()`](https://reference.langchain.com/python/langchain-community/chat_message_histories/kafka/KafkaChatMessageHistory/close)

---

[View source on GitHub](https://github.com/langchain-ai/langchain-community/blob/a6a6079511ac8a5c1293337f88096b8641562e77/libs/community/langchain_community/chat_message_histories/kafka.py#L86)