KafkaChatMessageHistory(
self,
session_id: str,
bootstrap_servers: str,
ttl_ms: int = DEFAULT_TTL_MS| Name | Type | Description |
|---|---|---|
session_id* | str | The ID for single chat session. It is used as Kafka topic name. |
bootstrap_servers* | str | Comma-separated host/port pairs to establish connection to Kafka cluster https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers |
ttl_ms | int | Default: DEFAULT_TTL_MSTime-to-live (milliseconds) for automatic expiration of entries. Default 7 days. -1 for no expiration. It translates to https://kafka.apache.org/documentation.html#topicconfigs_retention.ms |
replication_factor | int | Default: DEFAULT_REPLICATION_FACTOR |
partition | int | Default: DEFAULT_PARTITION |
Chat message history stored in Kafka.
Setup:
Install confluent-kafka-python.
.. code-block:: bash
pip install confluent_kafka
Instantiate:
.. code-block:: python
from langchain_community.chat_message_histories import KafkaChatMessageHistory
history = KafkaChatMessageHistory( session_id="your_session_id", bootstrap_servers="host:port", )
Add and retrieve messages:
.. code-block:: python
history.add_messages([message1, message2, message3, ...])
message_batch_0 = history.messages
message_batch_1 = history.messages
messages_from_beginning = history.messages_from_beginning()
Retrieving messages is stateful. Internally, it uses Kafka consumer to read. The consumed offset is maintained persistently.
To retrieve messages, you can use the following methods:
messages:
continue consuming chat messages from last one.messages_from_beginning:
reset the consumer to the beginning of the chat history and return messages.
Optional parameters:
max_message_count: maximum number of messages to return.max_time_sec: maximum time in seconds to wait for messages.messages_from_latest:
reset to end of the chat history and try consuming messages.
Optional parameters same as above.messages_from_last_consumed:
continuing from the last consumed message, similar to messages.
Optional parameters same as above.max_message_count and max_time_sec are used to avoid blocking indefinitely
when retrieving messages. As a result, the method to retrieve messages may not
return all messages. Change max_message_count and max_time_sec to retrieve
all history messages.
The replication factor for the topic. Default 1.
The number of partitions for the topic. Default 3.