Compare commits
10 Commits
213f174ffe
...
0ec606cdaa
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ec606cdaa | |||
| 2b2faa70ea | |||
| 7cbd373742 | |||
| 45393e3a15 | |||
| 8e0a158ac9 | |||
| 6736d1ce4f | |||
| 43fdb59dbf | |||
| 7f612bfc17 | |||
| 93290da5b5 | |||
| 9f4897a5b8 |
64
chatmastermind/ai.py
Normal file
64
chatmastermind/ai.py
Normal file
@ -0,0 +1,64 @@
|
||||
from dataclasses import dataclass
|
||||
from abc import abstractmethod
|
||||
from typing import Protocol, Optional, Union
|
||||
from .configuration import AIConfig
|
||||
from .message import Message
|
||||
from .chat import Chat
|
||||
|
||||
|
||||
class AIError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tokens:
|
||||
prompt: int = 0
|
||||
completion: int = 0
|
||||
total: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class AIResponse:
|
||||
"""
|
||||
The response to an AI request. Consists of one or more messages
|
||||
(each containing the question and a single answer) and the nr.
|
||||
of used tokens.
|
||||
"""
|
||||
messages: list[Message]
|
||||
tokens: Optional[Tokens] = None
|
||||
|
||||
|
||||
class AI(Protocol):
|
||||
"""
|
||||
The base class for AI clients.
|
||||
"""
|
||||
|
||||
name: str
|
||||
config: AIConfig
|
||||
|
||||
@abstractmethod
|
||||
def request(self,
|
||||
question: Message,
|
||||
context: Chat,
|
||||
num_answers: int = 1) -> AIResponse:
|
||||
"""
|
||||
Make an AI request, asking the given question with the given
|
||||
context (i. e. chat history). The nr. of requested answers
|
||||
corresponds to the nr. of messages in the 'AIResponse'.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def models(self) -> list[str]:
|
||||
"""
|
||||
Return all models supported by this AI.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def tokens(self, data: Union[Message, Chat]) -> int:
|
||||
"""
|
||||
Computes the nr. of AI language tokens for the given message
|
||||
or chat. Note that the computation may not be 100% accurate
|
||||
and is not implemented for all AIs.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
43
chatmastermind/ais/openai.py
Normal file
43
chatmastermind/ais/openai.py
Normal file
@ -0,0 +1,43 @@
|
||||
"""
|
||||
Implements the OpenAI client classes and functions.
|
||||
"""
|
||||
import openai
|
||||
from ..message import Message
|
||||
from ..chat import Chat
|
||||
from ..ai import AI, AIResponse
|
||||
|
||||
|
||||
class OpenAI(AI):
|
||||
"""
|
||||
The OpenAI AI client.
|
||||
"""
|
||||
|
||||
def request(self,
|
||||
question: Message,
|
||||
context: Chat,
|
||||
num_answers: int = 1) -> AIResponse:
|
||||
"""
|
||||
Make an AI request, asking the given question with the given
|
||||
context (i. e. chat history). The nr. of requested answers
|
||||
corresponds to the nr. of messages in the 'AIResponse'.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def models(self) -> list[str]:
|
||||
"""
|
||||
Return all models supported by this AI.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def print_models(self) -> None:
|
||||
"""
|
||||
Print all models supported by the current AI.
|
||||
"""
|
||||
not_ready = []
|
||||
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
|
||||
if engine['ready']:
|
||||
print(engine['id'])
|
||||
else:
|
||||
not_ready.append(engine['id'])
|
||||
if len(not_ready) > 0:
|
||||
print('\nNot ready: ' + ', '.join(not_ready))
|
||||
294
chatmastermind/chat.py
Normal file
294
chatmastermind/chat.py
Normal file
@ -0,0 +1,294 @@
|
||||
"""
|
||||
Module implementing various chat classes and functions for managing a chat history.
|
||||
"""
|
||||
import shutil
|
||||
import pathlib
|
||||
from pprint import PrettyPrinter
|
||||
from pydoc import pager
|
||||
from dataclasses import dataclass
|
||||
from typing import TypeVar, Type, Optional, ClassVar, Any, Callable
|
||||
from .message import Question, Answer, Message, MessageFilter, MessageError, source_code, message_in
|
||||
from .tags import Tag
|
||||
|
||||
ChatInst = TypeVar('ChatInst', bound='Chat')
|
||||
ChatDBInst = TypeVar('ChatDBInst', bound='ChatDB')
|
||||
|
||||
|
||||
class ChatError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def terminal_width() -> int:
|
||||
return shutil.get_terminal_size().columns
|
||||
|
||||
|
||||
def pp(*args: Any, **kwargs: Any) -> None:
|
||||
return PrettyPrinter(width=terminal_width()).pprint(*args, **kwargs)
|
||||
|
||||
|
||||
def print_paged(text: str) -> None:
|
||||
pager(text)
|
||||
|
||||
|
||||
def read_dir(dir_path: pathlib.Path,
|
||||
glob: Optional[str] = None,
|
||||
mfilter: Optional[MessageFilter] = None) -> list[Message]:
|
||||
"""
|
||||
Reads the messages from the given folder.
|
||||
Parameters:
|
||||
* 'dir_path': source directory
|
||||
* 'glob': if specified, files will be filtered using 'path.glob()',
|
||||
otherwise it uses 'path.iterdir()'.
|
||||
* 'mfilter': use with 'Message.from_file()' to filter messages
|
||||
when reading them.
|
||||
"""
|
||||
messages: list[Message] = []
|
||||
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
|
||||
for file_path in sorted(file_iter):
|
||||
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
|
||||
try:
|
||||
message = Message.from_file(file_path, mfilter)
|
||||
if message:
|
||||
messages.append(message)
|
||||
except MessageError as e:
|
||||
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||
return messages
|
||||
|
||||
|
||||
def write_dir(dir_path: pathlib.Path,
|
||||
messages: list[Message],
|
||||
file_suffix: str,
|
||||
next_fid: Callable[[], int]) -> None:
|
||||
"""
|
||||
Write all messages to the given directory. If a message has no file_path,
|
||||
a new one will be created. If message.file_path exists, it will be modified
|
||||
to point to the given directory.
|
||||
Parameters:
|
||||
* 'dir_path': destination directory
|
||||
* 'messages': list of messages to write
|
||||
* 'file_suffix': suffix for the message files ['.txt'|'.yaml']
|
||||
* 'next_fid': callable that returns the next file ID
|
||||
"""
|
||||
for message in messages:
|
||||
file_path = message.file_path
|
||||
# message has no file_path: create one
|
||||
if not file_path:
|
||||
fid = next_fid()
|
||||
fname = f"{fid:04d}{file_suffix}"
|
||||
file_path = dir_path / fname
|
||||
# file_path does not point to given directory: modify it
|
||||
elif not file_path.parent.samefile(dir_path):
|
||||
file_path = dir_path / file_path.name
|
||||
message.to_file(file_path)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Chat:
|
||||
"""
|
||||
A class containing a complete chat history.
|
||||
"""
|
||||
|
||||
messages: list[Message]
|
||||
|
||||
def filter(self, mfilter: MessageFilter) -> None:
|
||||
"""
|
||||
Use 'Message.match(mfilter) to remove all messages that
|
||||
don't fulfill the filter requirements.
|
||||
"""
|
||||
self.messages = [m for m in self.messages if m.match(mfilter)]
|
||||
|
||||
def sort(self, reverse: bool = False) -> None:
|
||||
"""
|
||||
Sort the messages according to 'Message.msg_id()'.
|
||||
"""
|
||||
try:
|
||||
# the message may not have an ID if it doesn't have a file_path
|
||||
self.messages.sort(key=lambda m: m.msg_id(), reverse=reverse)
|
||||
except MessageError:
|
||||
pass
|
||||
|
||||
def clear(self) -> None:
|
||||
"""
|
||||
Delete all messages.
|
||||
"""
|
||||
self.messages = []
|
||||
|
||||
def add_msgs(self, msgs: list[Message]) -> None:
|
||||
"""
|
||||
Add new messages and sort them if possible.
|
||||
"""
|
||||
self.messages += msgs
|
||||
self.sort()
|
||||
|
||||
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||
"""
|
||||
Get the tags of all messages, optionally filtered by prefix or substring.
|
||||
"""
|
||||
tags: set[Tag] = set()
|
||||
for m in self.messages:
|
||||
tags |= m.filter_tags(prefix, contain)
|
||||
return set(sorted(tags))
|
||||
|
||||
def tags_frequency(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> dict[Tag, int]:
|
||||
"""
|
||||
Get the frequency of all tags of all messages, optionally filtered by prefix or substring.
|
||||
"""
|
||||
tags: list[Tag] = []
|
||||
for m in self.messages:
|
||||
tags += [tag for tag in m.filter_tags(prefix, contain)]
|
||||
return {tag: tags.count(tag) for tag in sorted(tags)}
|
||||
|
||||
def tokens(self) -> int:
|
||||
"""
|
||||
Returns the nr. of AI language tokens used by all messages in this chat.
|
||||
If unknown, 0 is returned.
|
||||
"""
|
||||
return sum(m.tokens() for m in self.messages)
|
||||
|
||||
def print(self, dump: bool = False, source_code_only: bool = False,
|
||||
with_tags: bool = False, with_file: bool = False,
|
||||
paged: bool = True) -> None:
|
||||
if dump:
|
||||
pp(self)
|
||||
return
|
||||
output: list[str] = []
|
||||
for message in self.messages:
|
||||
if source_code_only:
|
||||
output.extend(source_code(message.question, include_delims=True))
|
||||
continue
|
||||
output.append('-' * terminal_width())
|
||||
output.append(Question.txt_header)
|
||||
output.append(message.question)
|
||||
if message.answer:
|
||||
output.append(Answer.txt_header)
|
||||
output.append(message.answer)
|
||||
if with_tags:
|
||||
output.append(message.tags_str())
|
||||
if with_file:
|
||||
output.append('FILE: ' + str(message.file_path))
|
||||
if paged:
|
||||
print_paged('\n'.join(output))
|
||||
else:
|
||||
print(*output, sep='\n')
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChatDB(Chat):
|
||||
"""
|
||||
A 'Chat' class that is bound to a given directory structure. Supports reading
|
||||
and writing messages from / to that structure. Such a structure consists of
|
||||
two directories: a 'cache directory', where all messages are temporarily
|
||||
stored, and a 'DB' directory, where selected messages can be stored
|
||||
persistently.
|
||||
"""
|
||||
|
||||
default_file_suffix: ClassVar[str] = '.txt'
|
||||
|
||||
cache_path: pathlib.Path
|
||||
db_path: pathlib.Path
|
||||
# a MessageFilter that all messages must match (if given)
|
||||
mfilter: Optional[MessageFilter] = None
|
||||
file_suffix: str = default_file_suffix
|
||||
# the glob pattern for all messages
|
||||
glob: Optional[str] = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
# contains the latest message ID
|
||||
self.next_fname = self.db_path / '.next'
|
||||
# make all paths absolute
|
||||
self.cache_path = self.cache_path.absolute()
|
||||
self.db_path = self.db_path.absolute()
|
||||
|
||||
@classmethod
|
||||
def from_dir(cls: Type[ChatDBInst],
|
||||
cache_path: pathlib.Path,
|
||||
db_path: pathlib.Path,
|
||||
glob: Optional[str] = None,
|
||||
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
|
||||
"""
|
||||
Create a 'ChatDB' instance from the given directory structure.
|
||||
Reads all messages from 'db_path' into the local message list.
|
||||
Parameters:
|
||||
* 'cache_path': path to the directory for temporary messages
|
||||
* 'db_path': path to the directory for persistent messages
|
||||
* 'glob': if specified, files will be filtered using 'path.glob()',
|
||||
otherwise it uses 'path.iterdir()'.
|
||||
* 'mfilter': use with 'Message.from_file()' to filter messages
|
||||
when reading them.
|
||||
"""
|
||||
messages = read_dir(db_path, glob, mfilter)
|
||||
return cls(messages, cache_path, db_path, mfilter,
|
||||
cls.default_file_suffix, glob)
|
||||
|
||||
@classmethod
|
||||
def from_messages(cls: Type[ChatDBInst],
|
||||
cache_path: pathlib.Path,
|
||||
db_path: pathlib.Path,
|
||||
messages: list[Message],
|
||||
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
|
||||
"""
|
||||
Create a ChatDB instance from the given message list.
|
||||
"""
|
||||
return cls(messages, cache_path, db_path, mfilter)
|
||||
|
||||
def get_next_fid(self) -> int:
|
||||
try:
|
||||
with open(self.next_fname, 'r') as f:
|
||||
next_fid = int(f.read()) + 1
|
||||
self.set_next_fid(next_fid)
|
||||
return next_fid
|
||||
except Exception:
|
||||
self.set_next_fid(1)
|
||||
return 1
|
||||
|
||||
def set_next_fid(self, fid: int) -> None:
|
||||
with open(self.next_fname, 'w') as f:
|
||||
f.write(f'{fid}')
|
||||
|
||||
def read_db(self) -> None:
|
||||
"""
|
||||
Reads new messages from the DB directory. New ones are added to the internal list,
|
||||
existing ones are replaced. A message is determined as 'existing' if a message with
|
||||
the same base filename (i. e. 'file_path.name') is already in the list.
|
||||
"""
|
||||
new_messages = read_dir(self.db_path, self.glob, self.mfilter)
|
||||
# remove all messages from self.messages that are in the new list
|
||||
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
|
||||
# copy the messages from the temporary list to self.messages and sort them
|
||||
self.messages += new_messages
|
||||
self.sort()
|
||||
|
||||
def read_cache(self) -> None:
|
||||
"""
|
||||
Reads new messages from the cache directory. New ones are added to the internal list,
|
||||
existing ones are replaced. A message is determined as 'existing' if a message with
|
||||
the same base filename (i. e. 'file_path.name') is already in the list.
|
||||
"""
|
||||
new_messages = read_dir(self.cache_path, self.glob, self.mfilter)
|
||||
# remove all messages from self.messages that are in the new list
|
||||
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
|
||||
# copy the messages from the temporary list to self.messages and sort them
|
||||
self.messages += new_messages
|
||||
self.sort()
|
||||
|
||||
def write_db(self, msgs: Optional[list[Message]] = None) -> None:
|
||||
"""
|
||||
Write messages to the DB directory. If a message has no file_path, a new one
|
||||
will be created. If message.file_path exists, it will be modified to point
|
||||
to the DB directory.
|
||||
"""
|
||||
write_dir(self.db_path,
|
||||
msgs if msgs else self.messages,
|
||||
self.file_suffix,
|
||||
self.get_next_fid)
|
||||
|
||||
def write_cache(self, msgs: Optional[list[Message]] = None) -> None:
|
||||
"""
|
||||
Write messages to the cache directory. If a message has no file_path, a new one
|
||||
will be created. If message.file_path exists, it will be modified to point to
|
||||
the cache directory.
|
||||
"""
|
||||
write_dir(self.cache_path,
|
||||
msgs if msgs else self.messages,
|
||||
self.file_suffix,
|
||||
self.get_next_fid)
|
||||
@ -7,7 +7,15 @@ OpenAIConfigInst = TypeVar('OpenAIConfigInst', bound='OpenAIConfig')
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenAIConfig():
|
||||
class AIConfig:
|
||||
"""
|
||||
The base class of all AI configurations.
|
||||
"""
|
||||
name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenAIConfig(AIConfig):
|
||||
"""
|
||||
The OpenAI section of the configuration file.
|
||||
"""
|
||||
@ -25,6 +33,7 @@ class OpenAIConfig():
|
||||
Create OpenAIConfig from a dict.
|
||||
"""
|
||||
return cls(
|
||||
name='OpenAI',
|
||||
api_key=str(source['api_key']),
|
||||
model=str(source['model']),
|
||||
max_tokens=int(source['max_tokens']),
|
||||
@ -36,7 +45,7 @@ class OpenAIConfig():
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config():
|
||||
class Config:
|
||||
"""
|
||||
The configuration file structure.
|
||||
"""
|
||||
|
||||
@ -7,10 +7,11 @@ import sys
|
||||
import argcomplete
|
||||
import argparse
|
||||
import pathlib
|
||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, print_tags_frequency, ChatType
|
||||
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data
|
||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
|
||||
from .storage import save_answers, create_chat_hist, get_tags_unique, read_file, dump_data
|
||||
from .api_client import ai, openai_api_key, print_models
|
||||
from .configuration import Config
|
||||
from .chat import ChatDB
|
||||
from itertools import zip_longest
|
||||
from typing import Any
|
||||
|
||||
@ -61,8 +62,12 @@ def tag_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'tag' command.
|
||||
"""
|
||||
chat = ChatDB.from_dir(cache_path=pathlib.Path('.'),
|
||||
db_path=pathlib.Path(config.db))
|
||||
if args.list:
|
||||
print_tags_frequency(get_tags(config, None))
|
||||
tags_freq = chat.tags_frequency(args.prefix, args.contain)
|
||||
for tag, freq in tags_freq.items():
|
||||
print(f"- {tag}: {freq}")
|
||||
|
||||
|
||||
def config_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
@ -195,6 +200,8 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
|
||||
tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
||||
action='store_true')
|
||||
tag_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
|
||||
tag_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
|
||||
|
||||
# 'config' command parser
|
||||
config_cmd_parser = cmdparser.add_parser('config',
|
||||
|
||||
@ -128,28 +128,29 @@ class ModelLine(str):
|
||||
return cls(' '.join([cls.prefix, model]))
|
||||
|
||||
|
||||
class Question(str):
|
||||
class Answer(str):
|
||||
"""
|
||||
A single question with a defined header.
|
||||
A single answer with a defined header.
|
||||
"""
|
||||
txt_header: ClassVar[str] = '=== QUESTION ==='
|
||||
yaml_key: ClassVar[str] = 'question'
|
||||
tokens: int = 0 # tokens used by this answer
|
||||
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||
yaml_key: ClassVar[str] = 'answer'
|
||||
|
||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||
"""
|
||||
Make sure the question string does not contain the header.
|
||||
Make sure the answer string does not contain the header as a whole line.
|
||||
"""
|
||||
if cls.txt_header in string:
|
||||
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||
if cls.txt_header in string.split('\n'):
|
||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if any(cls.txt_header in string for string in strings):
|
||||
if cls.txt_header in strings:
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
@ -161,28 +162,33 @@ class Question(str):
|
||||
return source_code(self, include_delims)
|
||||
|
||||
|
||||
class Answer(str):
|
||||
class Question(str):
|
||||
"""
|
||||
A single answer with a defined header.
|
||||
A single question with a defined header.
|
||||
"""
|
||||
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||
yaml_key: ClassVar[str] = 'answer'
|
||||
tokens: int = 0 # tokens used by this question
|
||||
txt_header: ClassVar[str] = '=== QUESTION ==='
|
||||
yaml_key: ClassVar[str] = 'question'
|
||||
|
||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||
"""
|
||||
Make sure the answer string does not contain the header.
|
||||
Make sure the question string does not contain the header as a whole line
|
||||
(also not that from 'Answer', so it's always clear where the answer starts).
|
||||
"""
|
||||
if cls.txt_header in string:
|
||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||
string_lines = string.split('\n')
|
||||
if cls.txt_header in string_lines:
|
||||
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||
if Answer.txt_header in string_lines:
|
||||
raise MessageError(f"Question '{string}' contains the header '{Answer.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if any(cls.txt_header in string for string in strings):
|
||||
if cls.txt_header in strings:
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
@ -502,3 +508,13 @@ class Message():
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
def tokens(self) -> int:
|
||||
"""
|
||||
Returns the nr. of AI language tokens used by this message.
|
||||
If unknown, 0 is returned.
|
||||
"""
|
||||
if self.answer:
|
||||
return self.question.tokens + self.answer.tokens
|
||||
else:
|
||||
return self.question.tokens
|
||||
|
||||
@ -78,8 +78,3 @@ def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = Fals
|
||||
print(message['content'])
|
||||
else:
|
||||
print(f"{message['role'].upper()}: {message['content']}")
|
||||
|
||||
|
||||
def print_tags_frequency(tags: list[str]) -> None:
|
||||
for tag in sorted(set(tags)):
|
||||
print(f"- {tag}: {tags.count(tag)}")
|
||||
|
||||
302
tests/test_chat.py
Normal file
302
tests/test_chat.py
Normal file
@ -0,0 +1,302 @@
|
||||
import pathlib
|
||||
import tempfile
|
||||
import time
|
||||
from io import StringIO
|
||||
from unittest.mock import patch
|
||||
from chatmastermind.tags import TagLine
|
||||
from chatmastermind.message import Message, Question, Answer, Tag, MessageFilter
|
||||
from chatmastermind.chat import Chat, ChatDB, terminal_width
|
||||
from .test_main import CmmTestCase
|
||||
|
||||
|
||||
class TestChat(CmmTestCase):
|
||||
def setUp(self) -> None:
|
||||
self.chat = Chat([])
|
||||
self.message1 = Message(Question('Question 1'),
|
||||
Answer('Answer 1'),
|
||||
{Tag('atag1'), Tag('btag2')},
|
||||
file_path=pathlib.Path('0001.txt'))
|
||||
self.message2 = Message(Question('Question 2'),
|
||||
Answer('Answer 2'),
|
||||
{Tag('btag2')},
|
||||
file_path=pathlib.Path('0002.txt'))
|
||||
|
||||
def test_filter(self) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
self.chat.filter(MessageFilter(answer_contains='Answer 1'))
|
||||
|
||||
self.assertEqual(len(self.chat.messages), 1)
|
||||
self.assertEqual(self.chat.messages[0].question, 'Question 1')
|
||||
|
||||
def test_sort(self) -> None:
|
||||
self.chat.add_msgs([self.message2, self.message1])
|
||||
self.chat.sort()
|
||||
self.assertEqual(self.chat.messages[0].question, 'Question 1')
|
||||
self.assertEqual(self.chat.messages[1].question, 'Question 2')
|
||||
self.chat.sort(reverse=True)
|
||||
self.assertEqual(self.chat.messages[0].question, 'Question 2')
|
||||
self.assertEqual(self.chat.messages[1].question, 'Question 1')
|
||||
|
||||
def test_clear(self) -> None:
|
||||
self.chat.add_msgs([self.message1])
|
||||
self.chat.clear()
|
||||
self.assertEqual(len(self.chat.messages), 0)
|
||||
|
||||
def test_add_msgs(self) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
self.assertEqual(len(self.chat.messages), 2)
|
||||
self.assertEqual(self.chat.messages[0].question, 'Question 1')
|
||||
self.assertEqual(self.chat.messages[1].question, 'Question 2')
|
||||
|
||||
def test_tags(self) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
tags_all = self.chat.tags()
|
||||
self.assertSetEqual(tags_all, {Tag('atag1'), Tag('btag2')})
|
||||
tags_pref = self.chat.tags(prefix='a')
|
||||
self.assertSetEqual(tags_pref, {Tag('atag1')})
|
||||
tags_cont = self.chat.tags(contain='2')
|
||||
self.assertSetEqual(tags_cont, {Tag('btag2')})
|
||||
|
||||
def test_tags_frequency(self) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
tags_freq = self.chat.tags_frequency()
|
||||
self.assertDictEqual(tags_freq, {'atag1': 1, 'btag2': 2})
|
||||
|
||||
@patch('sys.stdout', new_callable=StringIO)
|
||||
def test_print(self, mock_stdout: StringIO) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
self.chat.print(paged=False)
|
||||
expected_output = f"""{'-'*terminal_width()}
|
||||
{Question.txt_header}
|
||||
Question 1
|
||||
{Answer.txt_header}
|
||||
Answer 1
|
||||
{'-'*terminal_width()}
|
||||
{Question.txt_header}
|
||||
Question 2
|
||||
{Answer.txt_header}
|
||||
Answer 2
|
||||
"""
|
||||
self.assertEqual(mock_stdout.getvalue(), expected_output)
|
||||
|
||||
@patch('sys.stdout', new_callable=StringIO)
|
||||
def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
self.chat.print(paged=False, with_tags=True, with_file=True)
|
||||
expected_output = f"""{'-'*terminal_width()}
|
||||
{Question.txt_header}
|
||||
Question 1
|
||||
{Answer.txt_header}
|
||||
Answer 1
|
||||
{TagLine.prefix} atag1 btag2
|
||||
FILE: 0001.txt
|
||||
{'-'*terminal_width()}
|
||||
{Question.txt_header}
|
||||
Question 2
|
||||
{Answer.txt_header}
|
||||
Answer 2
|
||||
{TagLine.prefix} btag2
|
||||
FILE: 0002.txt
|
||||
"""
|
||||
self.assertEqual(mock_stdout.getvalue(), expected_output)
|
||||
|
||||
|
||||
class TestChatDB(CmmTestCase):
|
||||
def setUp(self) -> None:
|
||||
self.db_path = tempfile.TemporaryDirectory()
|
||||
self.cache_path = tempfile.TemporaryDirectory()
|
||||
|
||||
self.message1 = Message(Question('Question 1'),
|
||||
Answer('Answer 1'),
|
||||
{Tag('tag1')},
|
||||
file_path=pathlib.Path('0001.txt'))
|
||||
self.message2 = Message(Question('Question 2'),
|
||||
Answer('Answer 2'),
|
||||
{Tag('tag2')},
|
||||
file_path=pathlib.Path('0002.yaml'))
|
||||
self.message3 = Message(Question('Question 3'),
|
||||
Answer('Answer 3'),
|
||||
{Tag('tag3')},
|
||||
file_path=pathlib.Path('0003.txt'))
|
||||
self.message4 = Message(Question('Question 4'),
|
||||
Answer('Answer 4'),
|
||||
{Tag('tag4')},
|
||||
file_path=pathlib.Path('0004.yaml'))
|
||||
|
||||
self.message1.to_file(pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
self.message2.to_file(pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.message3.to_file(pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
self.message4.to_file(pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.db_path.cleanup()
|
||||
self.cache_path.cleanup()
|
||||
pass
|
||||
|
||||
def test_chat_db_from_dir(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name))
|
||||
self.assertEqual(len(chat_db.messages), 4)
|
||||
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||
# check that the files are sorted
|
||||
self.assertEqual(chat_db.messages[0].file_path,
|
||||
pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
self.assertEqual(chat_db.messages[1].file_path,
|
||||
pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[2].file_path,
|
||||
pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
self.assertEqual(chat_db.messages[3].file_path,
|
||||
pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||
|
||||
def test_chat_db_from_dir_glob(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
glob='*.txt')
|
||||
self.assertEqual(len(chat_db.messages), 2)
|
||||
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||
self.assertEqual(chat_db.messages[0].file_path,
|
||||
pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
self.assertEqual(chat_db.messages[1].file_path,
|
||||
pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
|
||||
def test_chat_db_filter(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
mfilter=MessageFilter(answer_contains='Answer 2'))
|
||||
self.assertEqual(len(chat_db.messages), 1)
|
||||
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||
self.assertEqual(chat_db.messages[0].file_path,
|
||||
pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
|
||||
|
||||
def test_chat_db_from_messges(self) -> None:
|
||||
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
messages=[self.message1, self.message2,
|
||||
self.message3, self.message4])
|
||||
self.assertEqual(len(chat_db.messages), 4)
|
||||
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||
|
||||
def test_chat_db_fids(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name))
|
||||
self.assertEqual(chat_db.get_next_fid(), 1)
|
||||
self.assertEqual(chat_db.get_next_fid(), 2)
|
||||
self.assertEqual(chat_db.get_next_fid(), 3)
|
||||
with open(chat_db.next_fname, 'r') as f:
|
||||
self.assertEqual(f.read(), '3')
|
||||
|
||||
def test_chat_db_write(self) -> None:
|
||||
# create a new ChatDB instance
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name))
|
||||
# check that Message.file_path is correct
|
||||
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||
|
||||
# write the messages to the cache directory
|
||||
chat_db.write_cache()
|
||||
# check if the written files are in the cache directory
|
||||
cache_dir_files = list(pathlib.Path(self.cache_path.name).glob('*'))
|
||||
self.assertEqual(len(cache_dir_files), 4)
|
||||
self.assertIn(pathlib.Path(self.cache_path.name, '0001.txt'), cache_dir_files)
|
||||
self.assertIn(pathlib.Path(self.cache_path.name, '0002.yaml'), cache_dir_files)
|
||||
self.assertIn(pathlib.Path(self.cache_path.name, '0003.txt'), cache_dir_files)
|
||||
self.assertIn(pathlib.Path(self.cache_path.name, '0004.yaml'), cache_dir_files)
|
||||
# check that Message.file_path has been correctly updated
|
||||
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.cache_path.name, '0001.txt'))
|
||||
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.cache_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.cache_path.name, '0003.txt'))
|
||||
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.cache_path.name, '0004.yaml'))
|
||||
|
||||
# check the timestamp of the files in the DB directory
|
||||
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
|
||||
self.assertEqual(len(db_dir_files), 4)
|
||||
old_timestamps = {file: file.stat().st_mtime for file in db_dir_files}
|
||||
# overwrite the messages in the db directory
|
||||
time.sleep(0.05)
|
||||
chat_db.write_db()
|
||||
# check if the written files are in the DB directory
|
||||
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
|
||||
self.assertEqual(len(db_dir_files), 4)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0001.txt'), db_dir_files)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0002.yaml'), db_dir_files)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0003.txt'), db_dir_files)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0004.yaml'), db_dir_files)
|
||||
# check if all files in the DB dir have actually been overwritten
|
||||
for file in db_dir_files:
|
||||
self.assertGreater(file.stat().st_mtime, old_timestamps[file])
|
||||
# check that Message.file_path has been correctly updated (again)
|
||||
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||
|
||||
def test_chat_db_read(self) -> None:
|
||||
# create a new ChatDB instance
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name))
|
||||
self.assertEqual(len(chat_db.messages), 4)
|
||||
|
||||
# create 2 new files in the DB directory
|
||||
new_message1 = Message(Question('Question 5'),
|
||||
Answer('Answer 5'),
|
||||
{Tag('tag5')})
|
||||
new_message2 = Message(Question('Question 6'),
|
||||
Answer('Answer 6'),
|
||||
{Tag('tag6')})
|
||||
new_message1.to_file(pathlib.Path(self.db_path.name, '0005.txt'))
|
||||
new_message2.to_file(pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||
# read and check them
|
||||
chat_db.read_db()
|
||||
self.assertEqual(len(chat_db.messages), 6)
|
||||
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
|
||||
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||
|
||||
# create 2 new files in the cache directory
|
||||
new_message3 = Message(Question('Question 7'),
|
||||
Answer('Answer 5'),
|
||||
{Tag('tag7')})
|
||||
new_message4 = Message(Question('Question 8'),
|
||||
Answer('Answer 6'),
|
||||
{Tag('tag8')})
|
||||
new_message3.to_file(pathlib.Path(self.cache_path.name, '0007.txt'))
|
||||
new_message4.to_file(pathlib.Path(self.cache_path.name, '0008.yaml'))
|
||||
# read and check them
|
||||
chat_db.read_cache()
|
||||
self.assertEqual(len(chat_db.messages), 8)
|
||||
# check that the new message have the cache dir path
|
||||
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.cache_path.name, '0007.txt'))
|
||||
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.cache_path.name, '0008.yaml'))
|
||||
# an the old ones keep their path (since they have not been replaced)
|
||||
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
|
||||
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||
|
||||
# now overwrite two messages in the DB directory
|
||||
new_message1.question = Question('New Question 1')
|
||||
new_message2.question = Question('New Question 2')
|
||||
new_message1.to_file(pathlib.Path(self.db_path.name, '0005.txt'))
|
||||
new_message2.to_file(pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||
# read from the DB dir and check if the modified messages have been updated
|
||||
chat_db.read_db()
|
||||
self.assertEqual(len(chat_db.messages), 8)
|
||||
self.assertEqual(chat_db.messages[4].question, 'New Question 1')
|
||||
self.assertEqual(chat_db.messages[5].question, 'New Question 2')
|
||||
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
|
||||
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||
|
||||
# now write the messages from the cache to the DB directory
|
||||
new_message3.to_file(pathlib.Path(self.db_path.name, '0007.txt'))
|
||||
new_message4.to_file(pathlib.Path(self.db_path.name, '0008.yaml'))
|
||||
# read and check them
|
||||
chat_db.read_db()
|
||||
self.assertEqual(len(chat_db.messages), 8)
|
||||
# check that they now have the DB path
|
||||
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.db_path.name, '0007.txt'))
|
||||
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.db_path.name, '0008.yaml'))
|
||||
@ -61,22 +61,39 @@ class SourceCodeTestCase(CmmTestCase):
|
||||
|
||||
|
||||
class QuestionTestCase(CmmTestCase):
|
||||
def test_question_with_prefix(self) -> None:
|
||||
def test_question_with_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Question("=== QUESTION === What is your name?")
|
||||
Question(f"{Question.txt_header}\nWhat is your name?")
|
||||
|
||||
def test_question_without_prefix(self) -> None:
|
||||
def test_question_with_answer_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Question(f"{Answer.txt_header}\nBob")
|
||||
|
||||
def test_question_with_legal_header(self) -> None:
|
||||
"""
|
||||
If the header is just a part of a line, it's fine.
|
||||
"""
|
||||
question = Question(f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||
self.assertIsInstance(question, Question)
|
||||
self.assertEqual(question, f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||
|
||||
def test_question_without_header(self) -> None:
|
||||
question = Question("What is your favorite color?")
|
||||
self.assertIsInstance(question, Question)
|
||||
self.assertEqual(question, "What is your favorite color?")
|
||||
|
||||
|
||||
class AnswerTestCase(CmmTestCase):
|
||||
def test_answer_with_prefix(self) -> None:
|
||||
def test_answer_with_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Answer("=== ANSWER === Yes")
|
||||
Answer(f"{Answer.txt_header}\nno")
|
||||
|
||||
def test_answer_without_prefix(self) -> None:
|
||||
def test_answer_with_legal_header(self) -> None:
|
||||
answer = Answer(f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||
self.assertIsInstance(answer, Answer)
|
||||
self.assertEqual(answer, f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||
|
||||
def test_answer_without_header(self) -> None:
|
||||
answer = Answer("No")
|
||||
self.assertIsInstance(answer, Answer)
|
||||
self.assertEqual(answer, "No")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user