Compare commits
2 Commits
10bf881060
...
1d6ae0c8e2
| Author | SHA1 | Date | |
|---|---|---|---|
| 1d6ae0c8e2 | |||
| a5ca1c1107 |
204
chatmastermind/chat.py
Normal file
204
chatmastermind/chat.py
Normal file
@ -0,0 +1,204 @@
|
|||||||
|
"""
|
||||||
|
Module implementing various chat classes and functions for managing a chat history.
|
||||||
|
"""
|
||||||
|
import shutil
|
||||||
|
from pprint import PrettyPrinter
|
||||||
|
import pathlib
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import TypeVar, Type, Optional, ClassVar, Any
|
||||||
|
from .message import Message, MessageFilter, MessageError
|
||||||
|
|
||||||
|
ChatInst = TypeVar('ChatInst', bound='Chat')
|
||||||
|
ChatDBInst = TypeVar('ChatDBInst', bound='ChatDB')
|
||||||
|
|
||||||
|
|
||||||
|
class ChatError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def terminal_width() -> int:
|
||||||
|
return shutil.get_terminal_size().columns
|
||||||
|
|
||||||
|
|
||||||
|
def pp(*args: Any, **kwargs: Any) -> None:
|
||||||
|
return PrettyPrinter(width=terminal_width()).pprint(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Chat:
|
||||||
|
"""
|
||||||
|
A class containing a complete chat history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
messages: list[Message]
|
||||||
|
|
||||||
|
def filter(self, mfilter: MessageFilter) -> None:
|
||||||
|
"""
|
||||||
|
Use 'Message.match(mfilter) to remove all messages that
|
||||||
|
don't fulfill the filter requirements.
|
||||||
|
"""
|
||||||
|
self.messages = [m for m in self.messages if m.match(mfilter)]
|
||||||
|
|
||||||
|
def sort(self, reverse: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Sort the messages according to 'Message.msg_id()'.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# the message may not have an ID if it doesn't have a file_path
|
||||||
|
self.messages.sort(key=lambda m: m.msg_id(), reverse=reverse)
|
||||||
|
except MessageError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""
|
||||||
|
Delete all messages.
|
||||||
|
"""
|
||||||
|
self.messages = []
|
||||||
|
|
||||||
|
def add_msgs(self, msgs: list[Message]) -> None:
|
||||||
|
"""
|
||||||
|
Add new messages and sort them if possible.
|
||||||
|
"""
|
||||||
|
self.messages += msgs
|
||||||
|
self.sort()
|
||||||
|
|
||||||
|
def print(self, dump: bool = False) -> None:
|
||||||
|
if dump:
|
||||||
|
pp(self)
|
||||||
|
return
|
||||||
|
# for message in self.messages:
|
||||||
|
# text_too_long = len(message['content']) > terminal_width() - len(message['role']) - 2
|
||||||
|
# if source_code:
|
||||||
|
# display_source_code(message['content'])
|
||||||
|
# continue
|
||||||
|
# if message['role'] == 'user':
|
||||||
|
# print('-' * terminal_width())
|
||||||
|
# if text_too_long:
|
||||||
|
# print(f"{message['role'].upper()}:")
|
||||||
|
# print(message['content'])
|
||||||
|
# else:
|
||||||
|
# print(f"{message['role'].upper()}: {message['content']}")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ChatDB(Chat):
|
||||||
|
"""
|
||||||
|
A 'Chat' class that is bound to a given directory structure. Supports reading
|
||||||
|
and writing messages from / to that structure. Such a structure consists of
|
||||||
|
two directories: a 'cache directory', where all messages are temporarily
|
||||||
|
stored, and a 'DB' directory, where selected messages can be stored
|
||||||
|
persistently.
|
||||||
|
"""
|
||||||
|
|
||||||
|
default_file_suffix: ClassVar[str] = '.txt'
|
||||||
|
|
||||||
|
cache_path: pathlib.Path
|
||||||
|
db_path: pathlib.Path
|
||||||
|
# a MessageFilter that all messages must match (if given)
|
||||||
|
mfilter: Optional[MessageFilter] = None
|
||||||
|
file_suffix: str = default_file_suffix
|
||||||
|
# the glob pattern for all messages
|
||||||
|
glob: Optional[str] = None
|
||||||
|
# set containing all file names of the current messages
|
||||||
|
message_files: set[str] = field(default_factory=set, repr=False)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dir(cls: Type[ChatDBInst],
|
||||||
|
cache_path: pathlib.Path,
|
||||||
|
db_path: pathlib.Path,
|
||||||
|
glob: Optional[str] = None,
|
||||||
|
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
|
||||||
|
"""
|
||||||
|
Create a 'ChatDB' instance from the given directory structure.
|
||||||
|
Reads all messages from 'db_path' into the local message list.
|
||||||
|
Parameters:
|
||||||
|
* 'cache_path': path to the directory for temporary messages
|
||||||
|
* 'db_path': path to the directory for persistent messages
|
||||||
|
* 'glob' fs specified, files will be filtered using 'path.glob()',
|
||||||
|
otherwise it uses 'path.iterdir()'.
|
||||||
|
* 'mfilter': use with 'Message.from_file()' to filter messages
|
||||||
|
when reading them.
|
||||||
|
"""
|
||||||
|
messages: list[Message] = []
|
||||||
|
message_files: set[str] = set()
|
||||||
|
file_iter = db_path.glob(glob) if glob else db_path.iterdir()
|
||||||
|
for file_path in sorted(file_iter):
|
||||||
|
if file_path.is_file():
|
||||||
|
try:
|
||||||
|
message = Message.from_file(file_path, mfilter)
|
||||||
|
if message:
|
||||||
|
messages.append(message)
|
||||||
|
message_files.add(file_path.name)
|
||||||
|
except MessageError as e:
|
||||||
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
|
return cls(messages, cache_path, db_path, mfilter,
|
||||||
|
cls.default_file_suffix, glob, message_files)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_messages(cls: Type[ChatDBInst],
|
||||||
|
cache_path: pathlib.Path,
|
||||||
|
db_path: pathlib.Path,
|
||||||
|
messages: list[Message],
|
||||||
|
mfilter: Optional[MessageFilter]) -> ChatDBInst:
|
||||||
|
"""
|
||||||
|
Create a ChatDB instance from the given message list. Note that the next
|
||||||
|
call to 'dump()' will write all files in order to synchronize the messages.
|
||||||
|
Similarly, 'update()' will read all messages, so you may end up with a lot
|
||||||
|
of duplicates when using 'update()' first.
|
||||||
|
"""
|
||||||
|
return cls(messages, cache_path, db_path, mfilter)
|
||||||
|
|
||||||
|
def get_next_fid(self) -> int:
|
||||||
|
next_fname = self.db_path / '.next'
|
||||||
|
try:
|
||||||
|
with open(next_fname, 'r') as f:
|
||||||
|
return int(f.read()) + 1
|
||||||
|
except Exception:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
def set_next_fid(self, fid: int) -> None:
|
||||||
|
next_fname = self.db_path / '.next'
|
||||||
|
with open(next_fname, 'w') as f:
|
||||||
|
f.write(f'{fid}')
|
||||||
|
|
||||||
|
def dump(self, to_db: bool = False, force_all: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Write all messages to 'cache_path' (or 'db_path' if 'to_db' is True). If a message
|
||||||
|
has no file_path, a new one will be created. By default, only messages that have
|
||||||
|
not been written (or read) before will be dumped. Use 'force_all' to force writing
|
||||||
|
all message files.
|
||||||
|
"""
|
||||||
|
for message in self.messages:
|
||||||
|
# skip messages that we have already written (or read)
|
||||||
|
if message.file_path and message.file_path in self.message_files and not force_all:
|
||||||
|
continue
|
||||||
|
file_path = message.file_path
|
||||||
|
if not file_path:
|
||||||
|
fid = self.get_next_fid()
|
||||||
|
fname = f"{fid:04d}{self.file_suffix}"
|
||||||
|
file_path = self.db_path / fname if to_db else self.cache_path / fname
|
||||||
|
self.set_next_fid(fid)
|
||||||
|
message.to_file(file_path)
|
||||||
|
|
||||||
|
def update(self, from_cache: bool = False, force_all: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Read new messages from 'db_path' (or 'cache_path' if 'from_cache' is true).
|
||||||
|
By default, only messages that have not been read (or written) before will
|
||||||
|
be read. Use 'force_all' to force reading all messages.
|
||||||
|
"""
|
||||||
|
if from_cache:
|
||||||
|
file_iter = self.cache_path.glob(self.glob) if self.glob else self.cache_path.iterdir()
|
||||||
|
else:
|
||||||
|
file_iter = self.cache_path.glob(self.glob) if self.glob else self.cache_path.iterdir()
|
||||||
|
for file_path in sorted(file_iter):
|
||||||
|
if file_path.is_file():
|
||||||
|
if file_path.name in self.message_files and not force_all:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
message = Message.from_file(file_path, self.mfilter)
|
||||||
|
if message:
|
||||||
|
self.messages.append(message)
|
||||||
|
self.message_files.add(file_path.name)
|
||||||
|
except MessageError as e:
|
||||||
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
|
self.sort()
|
||||||
@ -219,9 +219,12 @@ class Message():
|
|||||||
file_path=data.get(cls.file_yaml_key, None))
|
file_path=data.get(cls.file_yaml_key, None))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
|
def tags_from_file(cls: Type[MessageInst],
|
||||||
|
file_path: pathlib.Path,
|
||||||
|
prefix: Optional[str] = None) -> set[Tag]:
|
||||||
"""
|
"""
|
||||||
Return only the tags from the given Message file.
|
Return only the tags from the given Message file,
|
||||||
|
optionally filtered based on prefix.
|
||||||
"""
|
"""
|
||||||
if not file_path.exists():
|
if not file_path.exists():
|
||||||
raise MessageError(f"Message file '{file_path}' does not exist")
|
raise MessageError(f"Message file '{file_path}' does not exist")
|
||||||
@ -229,11 +232,34 @@ class Message():
|
|||||||
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
||||||
if file_path.suffix == '.txt':
|
if file_path.suffix == '.txt':
|
||||||
with open(file_path, "r") as fd:
|
with open(file_path, "r") as fd:
|
||||||
tags = TagLine(fd.readline()).tags()
|
tags = TagLine(fd.readline()).tags(prefix)
|
||||||
else: # '.yaml'
|
else: # '.yaml'
|
||||||
with open(file_path, "r") as fd:
|
with open(file_path, "r") as fd:
|
||||||
data = yaml.load(fd, Loader=yaml.FullLoader)
|
data = yaml.load(fd, Loader=yaml.FullLoader)
|
||||||
tags = set(sorted(data[cls.tags_yaml_key]))
|
if prefix and len(prefix) > 0:
|
||||||
|
tags = set(sorted([t.strip() for t in data[cls.tags_yaml_key] if t.startswith(prefix)]))
|
||||||
|
else:
|
||||||
|
tags = set(sorted(data[cls.tags_yaml_key]))
|
||||||
|
return tags
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tags_from_dir(cls: Type[MessageInst],
|
||||||
|
path: pathlib.Path,
|
||||||
|
glob: Optional[str] = None,
|
||||||
|
prefix: Optional[str] = None) -> set[Tag]:
|
||||||
|
|
||||||
|
"""
|
||||||
|
Return only the tags from message files in the given directory.
|
||||||
|
The files can be filtered using 'glob', the tags by using 'prefix'.
|
||||||
|
"""
|
||||||
|
tags: set[Tag] = set()
|
||||||
|
file_iter = path.glob(glob) if glob else path.iterdir()
|
||||||
|
for file_path in sorted(file_iter):
|
||||||
|
if file_path.is_file():
|
||||||
|
try:
|
||||||
|
tags |= cls.tags_from_file(file_path, prefix)
|
||||||
|
except MessageError as e:
|
||||||
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
return tags
|
return tags
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@ -118,9 +118,10 @@ class TagLine(str):
|
|||||||
"""
|
"""
|
||||||
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
|
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
|
||||||
|
|
||||||
def tags(self) -> set[Tag]:
|
def tags(self, prefix: Optional[str] = None) -> set[Tag]:
|
||||||
"""
|
"""
|
||||||
Returns all tags contained in this line as a set.
|
Returns all tags contained in this line as a set, optionally
|
||||||
|
filtered based on prefix.
|
||||||
"""
|
"""
|
||||||
tagstr = self[len(self.prefix):].strip()
|
tagstr = self[len(self.prefix):].strip()
|
||||||
separator = Tag.default_separator
|
separator = Tag.default_separator
|
||||||
@ -130,7 +131,10 @@ class TagLine(str):
|
|||||||
if s in tagstr:
|
if s in tagstr:
|
||||||
separator = s
|
separator = s
|
||||||
break
|
break
|
||||||
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
|
if prefix and len(prefix) > 0:
|
||||||
|
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator) if t.startswith(prefix)]))
|
||||||
|
else:
|
||||||
|
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
|
||||||
|
|
||||||
def merge(self, taglines: set['TagLine']) -> 'TagLine':
|
def merge(self, taglines: set['TagLine']) -> 'TagLine':
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -543,7 +543,7 @@ class TagsFromFileTestCase(CmmTestCase):
|
|||||||
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||||
self.file_path_txt = pathlib.Path(self.file_txt.name)
|
self.file_path_txt = pathlib.Path(self.file_txt.name)
|
||||||
with open(self.file_path_txt, "w") as fd:
|
with open(self.file_path_txt, "w") as fd:
|
||||||
fd.write(f"""{TagLine.prefix} tag1 tag2
|
fd.write(f"""{TagLine.prefix} tag1 tag2 ptag3
|
||||||
{Question.txt_header}
|
{Question.txt_header}
|
||||||
This is a question.
|
This is a question.
|
||||||
{Answer.txt_header}
|
{Answer.txt_header}
|
||||||
@ -560,6 +560,7 @@ This is an answer.
|
|||||||
{Message.tags_yaml_key}:
|
{Message.tags_yaml_key}:
|
||||||
- tag1
|
- tag1
|
||||||
- tag2
|
- tag2
|
||||||
|
- ptag3
|
||||||
""")
|
""")
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
def tearDown(self) -> None:
|
||||||
@ -570,11 +571,67 @@ This is an answer.
|
|||||||
|
|
||||||
def test_tags_from_file_txt(self) -> None:
|
def test_tags_from_file_txt(self) -> None:
|
||||||
tags = Message.tags_from_file(self.file_path_txt)
|
tags = Message.tags_from_file(self.file_path_txt)
|
||||||
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
|
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2'), Tag('ptag3')})
|
||||||
|
|
||||||
def test_tags_from_file_yaml(self) -> None:
|
def test_tags_from_file_yaml(self) -> None:
|
||||||
tags = Message.tags_from_file(self.file_path_yaml)
|
tags = Message.tags_from_file(self.file_path_yaml)
|
||||||
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
|
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2'), Tag('ptag3')})
|
||||||
|
|
||||||
|
def test_tags_from_file_txt_prefix(self) -> None:
|
||||||
|
tags = Message.tags_from_file(self.file_path_txt, prefix='p')
|
||||||
|
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||||
|
|
||||||
|
def test_tags_from_file_yaml_prefix(self) -> None:
|
||||||
|
tags = Message.tags_from_file(self.file_path_yaml, prefix='p')
|
||||||
|
self.assertSetEqual(tags, {Tag('ptag3')})
|
||||||
|
|
||||||
|
|
||||||
|
class TestTagsFromFile(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.temp_dir = tempfile.TemporaryDirectory()
|
||||||
|
self.temp_dir_no_tags = tempfile.TemporaryDirectory()
|
||||||
|
self.tag_sets = [
|
||||||
|
{Tag('atag1'), Tag('atag2')},
|
||||||
|
{Tag('btag3'), Tag('btag4')},
|
||||||
|
{Tag('ctag5'), Tag('ctag6')}
|
||||||
|
]
|
||||||
|
self.files = [
|
||||||
|
pathlib.Path(self.temp_dir.name, 'file1.txt'),
|
||||||
|
pathlib.Path(self.temp_dir.name, 'file2.yaml'),
|
||||||
|
pathlib.Path(self.temp_dir.name, 'file3.txt')
|
||||||
|
]
|
||||||
|
self.files_no_tags = [
|
||||||
|
pathlib.Path(self.temp_dir_no_tags.name, 'file4.txt'),
|
||||||
|
pathlib.Path(self.temp_dir_no_tags.name, 'file5.yaml'),
|
||||||
|
pathlib.Path(self.temp_dir_no_tags.name, 'file6.txt')
|
||||||
|
]
|
||||||
|
for file, tags in zip(self.files, self.tag_sets):
|
||||||
|
message = Message(Question('This is a question.'),
|
||||||
|
Answer('This is an answer.'),
|
||||||
|
tags)
|
||||||
|
message.to_file(file)
|
||||||
|
for file in self.files_no_tags:
|
||||||
|
message = Message(Question('This is a question.'),
|
||||||
|
Answer('This is an answer.'))
|
||||||
|
message.to_file(file)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.temp_dir.cleanup()
|
||||||
|
|
||||||
|
def test_tags_from_dir(self) -> None:
|
||||||
|
all_tags = Message.tags_from_dir(pathlib.Path(self.temp_dir.name))
|
||||||
|
expected_tags = self.tag_sets[0] | self.tag_sets[1] | self.tag_sets[2]
|
||||||
|
self.assertEqual(all_tags, expected_tags)
|
||||||
|
|
||||||
|
def test_tags_from_dir_prefix(self) -> None:
|
||||||
|
atags = Message.tags_from_dir(pathlib.Path(self.temp_dir.name), prefix='a')
|
||||||
|
expected_tags = self.tag_sets[0]
|
||||||
|
self.assertEqual(atags, expected_tags)
|
||||||
|
|
||||||
|
# FIXME
|
||||||
|
# def test_tags_from_dir_no_tags(self) -> None:
|
||||||
|
# all_tags = Message.tags_from_dir(pathlib.Path(self.temp_dir_no_tags.name))
|
||||||
|
# self.assertSetEqual(all_tags, set())
|
||||||
|
|
||||||
|
|
||||||
class MessageIDTestCase(CmmTestCase):
|
class MessageIDTestCase(CmmTestCase):
|
||||||
|
|||||||
@ -49,6 +49,13 @@ class TestTagLine(CmmTestCase):
|
|||||||
tags = tagline.tags()
|
tags = tagline.tags()
|
||||||
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||||
|
|
||||||
|
def test_tags_prefix(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: atag1 stag2 stag3')
|
||||||
|
tags = tagline.tags(prefix='a')
|
||||||
|
self.assertEqual(tags, {Tag('atag1')})
|
||||||
|
tags = tagline.tags(prefix='s')
|
||||||
|
self.assertEqual(tags, {Tag('stag2'), Tag('stag3')})
|
||||||
|
|
||||||
def test_merge(self) -> None:
|
def test_merge(self) -> None:
|
||||||
tagline1 = TagLine('TAGS: tag1 tag2')
|
tagline1 = TagLine('TAGS: tag1 tag2')
|
||||||
tagline2 = TagLine('TAGS: tag2 tag3')
|
tagline2 = TagLine('TAGS: tag2 tag3')
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user