Compare commits
10 Commits
213f174ffe
...
0ec606cdaa
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ec606cdaa | |||
| 2b2faa70ea | |||
| 7cbd373742 | |||
| 45393e3a15 | |||
| 8e0a158ac9 | |||
| 6736d1ce4f | |||
| 43fdb59dbf | |||
| 7f612bfc17 | |||
| 93290da5b5 | |||
| 9f4897a5b8 |
64
chatmastermind/ai.py
Normal file
64
chatmastermind/ai.py
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from abc import abstractmethod
|
||||||
|
from typing import Protocol, Optional, Union
|
||||||
|
from .configuration import AIConfig
|
||||||
|
from .message import Message
|
||||||
|
from .chat import Chat
|
||||||
|
|
||||||
|
|
||||||
|
class AIError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Tokens:
|
||||||
|
prompt: int = 0
|
||||||
|
completion: int = 0
|
||||||
|
total: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AIResponse:
|
||||||
|
"""
|
||||||
|
The response to an AI request. Consists of one or more messages
|
||||||
|
(each containing the question and a single answer) and the nr.
|
||||||
|
of used tokens.
|
||||||
|
"""
|
||||||
|
messages: list[Message]
|
||||||
|
tokens: Optional[Tokens] = None
|
||||||
|
|
||||||
|
|
||||||
|
class AI(Protocol):
|
||||||
|
"""
|
||||||
|
The base class for AI clients.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
config: AIConfig
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def request(self,
|
||||||
|
question: Message,
|
||||||
|
context: Chat,
|
||||||
|
num_answers: int = 1) -> AIResponse:
|
||||||
|
"""
|
||||||
|
Make an AI request, asking the given question with the given
|
||||||
|
context (i. e. chat history). The nr. of requested answers
|
||||||
|
corresponds to the nr. of messages in the 'AIResponse'.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def models(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Return all models supported by this AI.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def tokens(self, data: Union[Message, Chat]) -> int:
|
||||||
|
"""
|
||||||
|
Computes the nr. of AI language tokens for the given message
|
||||||
|
or chat. Note that the computation may not be 100% accurate
|
||||||
|
and is not implemented for all AIs.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
43
chatmastermind/ais/openai.py
Normal file
43
chatmastermind/ais/openai.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
"""
|
||||||
|
Implements the OpenAI client classes and functions.
|
||||||
|
"""
|
||||||
|
import openai
|
||||||
|
from ..message import Message
|
||||||
|
from ..chat import Chat
|
||||||
|
from ..ai import AI, AIResponse
|
||||||
|
|
||||||
|
|
||||||
|
class OpenAI(AI):
|
||||||
|
"""
|
||||||
|
The OpenAI AI client.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def request(self,
|
||||||
|
question: Message,
|
||||||
|
context: Chat,
|
||||||
|
num_answers: int = 1) -> AIResponse:
|
||||||
|
"""
|
||||||
|
Make an AI request, asking the given question with the given
|
||||||
|
context (i. e. chat history). The nr. of requested answers
|
||||||
|
corresponds to the nr. of messages in the 'AIResponse'.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def models(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Return all models supported by this AI.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def print_models(self) -> None:
|
||||||
|
"""
|
||||||
|
Print all models supported by the current AI.
|
||||||
|
"""
|
||||||
|
not_ready = []
|
||||||
|
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
|
||||||
|
if engine['ready']:
|
||||||
|
print(engine['id'])
|
||||||
|
else:
|
||||||
|
not_ready.append(engine['id'])
|
||||||
|
if len(not_ready) > 0:
|
||||||
|
print('\nNot ready: ' + ', '.join(not_ready))
|
||||||
294
chatmastermind/chat.py
Normal file
294
chatmastermind/chat.py
Normal file
@ -0,0 +1,294 @@
|
|||||||
|
"""
|
||||||
|
Module implementing various chat classes and functions for managing a chat history.
|
||||||
|
"""
|
||||||
|
import shutil
|
||||||
|
import pathlib
|
||||||
|
from pprint import PrettyPrinter
|
||||||
|
from pydoc import pager
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import TypeVar, Type, Optional, ClassVar, Any, Callable
|
||||||
|
from .message import Question, Answer, Message, MessageFilter, MessageError, source_code, message_in
|
||||||
|
from .tags import Tag
|
||||||
|
|
||||||
|
ChatInst = TypeVar('ChatInst', bound='Chat')
|
||||||
|
ChatDBInst = TypeVar('ChatDBInst', bound='ChatDB')
|
||||||
|
|
||||||
|
|
||||||
|
class ChatError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def terminal_width() -> int:
|
||||||
|
return shutil.get_terminal_size().columns
|
||||||
|
|
||||||
|
|
||||||
|
def pp(*args: Any, **kwargs: Any) -> None:
|
||||||
|
return PrettyPrinter(width=terminal_width()).pprint(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def print_paged(text: str) -> None:
|
||||||
|
pager(text)
|
||||||
|
|
||||||
|
|
||||||
|
def read_dir(dir_path: pathlib.Path,
|
||||||
|
glob: Optional[str] = None,
|
||||||
|
mfilter: Optional[MessageFilter] = None) -> list[Message]:
|
||||||
|
"""
|
||||||
|
Reads the messages from the given folder.
|
||||||
|
Parameters:
|
||||||
|
* 'dir_path': source directory
|
||||||
|
* 'glob': if specified, files will be filtered using 'path.glob()',
|
||||||
|
otherwise it uses 'path.iterdir()'.
|
||||||
|
* 'mfilter': use with 'Message.from_file()' to filter messages
|
||||||
|
when reading them.
|
||||||
|
"""
|
||||||
|
messages: list[Message] = []
|
||||||
|
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
|
||||||
|
for file_path in sorted(file_iter):
|
||||||
|
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
|
||||||
|
try:
|
||||||
|
message = Message.from_file(file_path, mfilter)
|
||||||
|
if message:
|
||||||
|
messages.append(message)
|
||||||
|
except MessageError as e:
|
||||||
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
|
return messages
|
||||||
|
|
||||||
|
|
||||||
|
def write_dir(dir_path: pathlib.Path,
|
||||||
|
messages: list[Message],
|
||||||
|
file_suffix: str,
|
||||||
|
next_fid: Callable[[], int]) -> None:
|
||||||
|
"""
|
||||||
|
Write all messages to the given directory. If a message has no file_path,
|
||||||
|
a new one will be created. If message.file_path exists, it will be modified
|
||||||
|
to point to the given directory.
|
||||||
|
Parameters:
|
||||||
|
* 'dir_path': destination directory
|
||||||
|
* 'messages': list of messages to write
|
||||||
|
* 'file_suffix': suffix for the message files ['.txt'|'.yaml']
|
||||||
|
* 'next_fid': callable that returns the next file ID
|
||||||
|
"""
|
||||||
|
for message in messages:
|
||||||
|
file_path = message.file_path
|
||||||
|
# message has no file_path: create one
|
||||||
|
if not file_path:
|
||||||
|
fid = next_fid()
|
||||||
|
fname = f"{fid:04d}{file_suffix}"
|
||||||
|
file_path = dir_path / fname
|
||||||
|
# file_path does not point to given directory: modify it
|
||||||
|
elif not file_path.parent.samefile(dir_path):
|
||||||
|
file_path = dir_path / file_path.name
|
||||||
|
message.to_file(file_path)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Chat:
|
||||||
|
"""
|
||||||
|
A class containing a complete chat history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
messages: list[Message]
|
||||||
|
|
||||||
|
def filter(self, mfilter: MessageFilter) -> None:
|
||||||
|
"""
|
||||||
|
Use 'Message.match(mfilter) to remove all messages that
|
||||||
|
don't fulfill the filter requirements.
|
||||||
|
"""
|
||||||
|
self.messages = [m for m in self.messages if m.match(mfilter)]
|
||||||
|
|
||||||
|
def sort(self, reverse: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Sort the messages according to 'Message.msg_id()'.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# the message may not have an ID if it doesn't have a file_path
|
||||||
|
self.messages.sort(key=lambda m: m.msg_id(), reverse=reverse)
|
||||||
|
except MessageError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""
|
||||||
|
Delete all messages.
|
||||||
|
"""
|
||||||
|
self.messages = []
|
||||||
|
|
||||||
|
def add_msgs(self, msgs: list[Message]) -> None:
|
||||||
|
"""
|
||||||
|
Add new messages and sort them if possible.
|
||||||
|
"""
|
||||||
|
self.messages += msgs
|
||||||
|
self.sort()
|
||||||
|
|
||||||
|
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||||
|
"""
|
||||||
|
Get the tags of all messages, optionally filtered by prefix or substring.
|
||||||
|
"""
|
||||||
|
tags: set[Tag] = set()
|
||||||
|
for m in self.messages:
|
||||||
|
tags |= m.filter_tags(prefix, contain)
|
||||||
|
return set(sorted(tags))
|
||||||
|
|
||||||
|
def tags_frequency(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> dict[Tag, int]:
|
||||||
|
"""
|
||||||
|
Get the frequency of all tags of all messages, optionally filtered by prefix or substring.
|
||||||
|
"""
|
||||||
|
tags: list[Tag] = []
|
||||||
|
for m in self.messages:
|
||||||
|
tags += [tag for tag in m.filter_tags(prefix, contain)]
|
||||||
|
return {tag: tags.count(tag) for tag in sorted(tags)}
|
||||||
|
|
||||||
|
def tokens(self) -> int:
|
||||||
|
"""
|
||||||
|
Returns the nr. of AI language tokens used by all messages in this chat.
|
||||||
|
If unknown, 0 is returned.
|
||||||
|
"""
|
||||||
|
return sum(m.tokens() for m in self.messages)
|
||||||
|
|
||||||
|
def print(self, dump: bool = False, source_code_only: bool = False,
|
||||||
|
with_tags: bool = False, with_file: bool = False,
|
||||||
|
paged: bool = True) -> None:
|
||||||
|
if dump:
|
||||||
|
pp(self)
|
||||||
|
return
|
||||||
|
output: list[str] = []
|
||||||
|
for message in self.messages:
|
||||||
|
if source_code_only:
|
||||||
|
output.extend(source_code(message.question, include_delims=True))
|
||||||
|
continue
|
||||||
|
output.append('-' * terminal_width())
|
||||||
|
output.append(Question.txt_header)
|
||||||
|
output.append(message.question)
|
||||||
|
if message.answer:
|
||||||
|
output.append(Answer.txt_header)
|
||||||
|
output.append(message.answer)
|
||||||
|
if with_tags:
|
||||||
|
output.append(message.tags_str())
|
||||||
|
if with_file:
|
||||||
|
output.append('FILE: ' + str(message.file_path))
|
||||||
|
if paged:
|
||||||
|
print_paged('\n'.join(output))
|
||||||
|
else:
|
||||||
|
print(*output, sep='\n')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ChatDB(Chat):
|
||||||
|
"""
|
||||||
|
A 'Chat' class that is bound to a given directory structure. Supports reading
|
||||||
|
and writing messages from / to that structure. Such a structure consists of
|
||||||
|
two directories: a 'cache directory', where all messages are temporarily
|
||||||
|
stored, and a 'DB' directory, where selected messages can be stored
|
||||||
|
persistently.
|
||||||
|
"""
|
||||||
|
|
||||||
|
default_file_suffix: ClassVar[str] = '.txt'
|
||||||
|
|
||||||
|
cache_path: pathlib.Path
|
||||||
|
db_path: pathlib.Path
|
||||||
|
# a MessageFilter that all messages must match (if given)
|
||||||
|
mfilter: Optional[MessageFilter] = None
|
||||||
|
file_suffix: str = default_file_suffix
|
||||||
|
# the glob pattern for all messages
|
||||||
|
glob: Optional[str] = None
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
# contains the latest message ID
|
||||||
|
self.next_fname = self.db_path / '.next'
|
||||||
|
# make all paths absolute
|
||||||
|
self.cache_path = self.cache_path.absolute()
|
||||||
|
self.db_path = self.db_path.absolute()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dir(cls: Type[ChatDBInst],
|
||||||
|
cache_path: pathlib.Path,
|
||||||
|
db_path: pathlib.Path,
|
||||||
|
glob: Optional[str] = None,
|
||||||
|
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
|
||||||
|
"""
|
||||||
|
Create a 'ChatDB' instance from the given directory structure.
|
||||||
|
Reads all messages from 'db_path' into the local message list.
|
||||||
|
Parameters:
|
||||||
|
* 'cache_path': path to the directory for temporary messages
|
||||||
|
* 'db_path': path to the directory for persistent messages
|
||||||
|
* 'glob': if specified, files will be filtered using 'path.glob()',
|
||||||
|
otherwise it uses 'path.iterdir()'.
|
||||||
|
* 'mfilter': use with 'Message.from_file()' to filter messages
|
||||||
|
when reading them.
|
||||||
|
"""
|
||||||
|
messages = read_dir(db_path, glob, mfilter)
|
||||||
|
return cls(messages, cache_path, db_path, mfilter,
|
||||||
|
cls.default_file_suffix, glob)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_messages(cls: Type[ChatDBInst],
|
||||||
|
cache_path: pathlib.Path,
|
||||||
|
db_path: pathlib.Path,
|
||||||
|
messages: list[Message],
|
||||||
|
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
|
||||||
|
"""
|
||||||
|
Create a ChatDB instance from the given message list.
|
||||||
|
"""
|
||||||
|
return cls(messages, cache_path, db_path, mfilter)
|
||||||
|
|
||||||
|
def get_next_fid(self) -> int:
|
||||||
|
try:
|
||||||
|
with open(self.next_fname, 'r') as f:
|
||||||
|
next_fid = int(f.read()) + 1
|
||||||
|
self.set_next_fid(next_fid)
|
||||||
|
return next_fid
|
||||||
|
except Exception:
|
||||||
|
self.set_next_fid(1)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
def set_next_fid(self, fid: int) -> None:
|
||||||
|
with open(self.next_fname, 'w') as f:
|
||||||
|
f.write(f'{fid}')
|
||||||
|
|
||||||
|
def read_db(self) -> None:
|
||||||
|
"""
|
||||||
|
Reads new messages from the DB directory. New ones are added to the internal list,
|
||||||
|
existing ones are replaced. A message is determined as 'existing' if a message with
|
||||||
|
the same base filename (i. e. 'file_path.name') is already in the list.
|
||||||
|
"""
|
||||||
|
new_messages = read_dir(self.db_path, self.glob, self.mfilter)
|
||||||
|
# remove all messages from self.messages that are in the new list
|
||||||
|
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
|
||||||
|
# copy the messages from the temporary list to self.messages and sort them
|
||||||
|
self.messages += new_messages
|
||||||
|
self.sort()
|
||||||
|
|
||||||
|
def read_cache(self) -> None:
|
||||||
|
"""
|
||||||
|
Reads new messages from the cache directory. New ones are added to the internal list,
|
||||||
|
existing ones are replaced. A message is determined as 'existing' if a message with
|
||||||
|
the same base filename (i. e. 'file_path.name') is already in the list.
|
||||||
|
"""
|
||||||
|
new_messages = read_dir(self.cache_path, self.glob, self.mfilter)
|
||||||
|
# remove all messages from self.messages that are in the new list
|
||||||
|
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
|
||||||
|
# copy the messages from the temporary list to self.messages and sort them
|
||||||
|
self.messages += new_messages
|
||||||
|
self.sort()
|
||||||
|
|
||||||
|
def write_db(self, msgs: Optional[list[Message]] = None) -> None:
|
||||||
|
"""
|
||||||
|
Write messages to the DB directory. If a message has no file_path, a new one
|
||||||
|
will be created. If message.file_path exists, it will be modified to point
|
||||||
|
to the DB directory.
|
||||||
|
"""
|
||||||
|
write_dir(self.db_path,
|
||||||
|
msgs if msgs else self.messages,
|
||||||
|
self.file_suffix,
|
||||||
|
self.get_next_fid)
|
||||||
|
|
||||||
|
def write_cache(self, msgs: Optional[list[Message]] = None) -> None:
|
||||||
|
"""
|
||||||
|
Write messages to the cache directory. If a message has no file_path, a new one
|
||||||
|
will be created. If message.file_path exists, it will be modified to point to
|
||||||
|
the cache directory.
|
||||||
|
"""
|
||||||
|
write_dir(self.cache_path,
|
||||||
|
msgs if msgs else self.messages,
|
||||||
|
self.file_suffix,
|
||||||
|
self.get_next_fid)
|
||||||
@ -7,7 +7,15 @@ OpenAIConfigInst = TypeVar('OpenAIConfigInst', bound='OpenAIConfig')
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class OpenAIConfig():
|
class AIConfig:
|
||||||
|
"""
|
||||||
|
The base class of all AI configurations.
|
||||||
|
"""
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OpenAIConfig(AIConfig):
|
||||||
"""
|
"""
|
||||||
The OpenAI section of the configuration file.
|
The OpenAI section of the configuration file.
|
||||||
"""
|
"""
|
||||||
@ -25,6 +33,7 @@ class OpenAIConfig():
|
|||||||
Create OpenAIConfig from a dict.
|
Create OpenAIConfig from a dict.
|
||||||
"""
|
"""
|
||||||
return cls(
|
return cls(
|
||||||
|
name='OpenAI',
|
||||||
api_key=str(source['api_key']),
|
api_key=str(source['api_key']),
|
||||||
model=str(source['model']),
|
model=str(source['model']),
|
||||||
max_tokens=int(source['max_tokens']),
|
max_tokens=int(source['max_tokens']),
|
||||||
@ -36,7 +45,7 @@ class OpenAIConfig():
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Config():
|
class Config:
|
||||||
"""
|
"""
|
||||||
The configuration file structure.
|
The configuration file structure.
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -7,10 +7,11 @@ import sys
|
|||||||
import argcomplete
|
import argcomplete
|
||||||
import argparse
|
import argparse
|
||||||
import pathlib
|
import pathlib
|
||||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, print_tags_frequency, ChatType
|
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
|
||||||
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data
|
from .storage import save_answers, create_chat_hist, get_tags_unique, read_file, dump_data
|
||||||
from .api_client import ai, openai_api_key, print_models
|
from .api_client import ai, openai_api_key, print_models
|
||||||
from .configuration import Config
|
from .configuration import Config
|
||||||
|
from .chat import ChatDB
|
||||||
from itertools import zip_longest
|
from itertools import zip_longest
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@ -61,8 +62,12 @@ def tag_cmd(args: argparse.Namespace, config: Config) -> None:
|
|||||||
"""
|
"""
|
||||||
Handler for the 'tag' command.
|
Handler for the 'tag' command.
|
||||||
"""
|
"""
|
||||||
|
chat = ChatDB.from_dir(cache_path=pathlib.Path('.'),
|
||||||
|
db_path=pathlib.Path(config.db))
|
||||||
if args.list:
|
if args.list:
|
||||||
print_tags_frequency(get_tags(config, None))
|
tags_freq = chat.tags_frequency(args.prefix, args.contain)
|
||||||
|
for tag, freq in tags_freq.items():
|
||||||
|
print(f"- {tag}: {freq}")
|
||||||
|
|
||||||
|
|
||||||
def config_cmd(args: argparse.Namespace, config: Config) -> None:
|
def config_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||||
@ -195,6 +200,8 @@ def create_parser() -> argparse.ArgumentParser:
|
|||||||
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
|
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
|
||||||
tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
||||||
action='store_true')
|
action='store_true')
|
||||||
|
tag_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
|
||||||
|
tag_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
|
||||||
|
|
||||||
# 'config' command parser
|
# 'config' command parser
|
||||||
config_cmd_parser = cmdparser.add_parser('config',
|
config_cmd_parser = cmdparser.add_parser('config',
|
||||||
|
|||||||
@ -128,28 +128,29 @@ class ModelLine(str):
|
|||||||
return cls(' '.join([cls.prefix, model]))
|
return cls(' '.join([cls.prefix, model]))
|
||||||
|
|
||||||
|
|
||||||
class Question(str):
|
class Answer(str):
|
||||||
"""
|
"""
|
||||||
A single question with a defined header.
|
A single answer with a defined header.
|
||||||
"""
|
"""
|
||||||
txt_header: ClassVar[str] = '=== QUESTION ==='
|
tokens: int = 0 # tokens used by this answer
|
||||||
yaml_key: ClassVar[str] = 'question'
|
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||||
|
yaml_key: ClassVar[str] = 'answer'
|
||||||
|
|
||||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||||
"""
|
"""
|
||||||
Make sure the question string does not contain the header.
|
Make sure the answer string does not contain the header as a whole line.
|
||||||
"""
|
"""
|
||||||
if cls.txt_header in string:
|
if cls.txt_header in string.split('\n'):
|
||||||
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||||
instance = super().__new__(cls, string)
|
instance = super().__new__(cls, string)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||||
"""
|
"""
|
||||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||||
"""
|
"""
|
||||||
if any(cls.txt_header in string for string in strings):
|
if cls.txt_header in strings:
|
||||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||||
return instance
|
return instance
|
||||||
@ -161,28 +162,33 @@ class Question(str):
|
|||||||
return source_code(self, include_delims)
|
return source_code(self, include_delims)
|
||||||
|
|
||||||
|
|
||||||
class Answer(str):
|
class Question(str):
|
||||||
"""
|
"""
|
||||||
A single answer with a defined header.
|
A single question with a defined header.
|
||||||
"""
|
"""
|
||||||
txt_header: ClassVar[str] = '=== ANSWER ==='
|
tokens: int = 0 # tokens used by this question
|
||||||
yaml_key: ClassVar[str] = 'answer'
|
txt_header: ClassVar[str] = '=== QUESTION ==='
|
||||||
|
yaml_key: ClassVar[str] = 'question'
|
||||||
|
|
||||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||||
"""
|
"""
|
||||||
Make sure the answer string does not contain the header.
|
Make sure the question string does not contain the header as a whole line
|
||||||
|
(also not that from 'Answer', so it's always clear where the answer starts).
|
||||||
"""
|
"""
|
||||||
if cls.txt_header in string:
|
string_lines = string.split('\n')
|
||||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
if cls.txt_header in string_lines:
|
||||||
|
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||||
|
if Answer.txt_header in string_lines:
|
||||||
|
raise MessageError(f"Question '{string}' contains the header '{Answer.txt_header}'")
|
||||||
instance = super().__new__(cls, string)
|
instance = super().__new__(cls, string)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
||||||
"""
|
"""
|
||||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||||
"""
|
"""
|
||||||
if any(cls.txt_header in string for string in strings):
|
if cls.txt_header in strings:
|
||||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||||
return instance
|
return instance
|
||||||
@ -502,3 +508,13 @@ class Message():
|
|||||||
|
|
||||||
def as_dict(self) -> dict[str, Any]:
|
def as_dict(self) -> dict[str, Any]:
|
||||||
return asdict(self)
|
return asdict(self)
|
||||||
|
|
||||||
|
def tokens(self) -> int:
|
||||||
|
"""
|
||||||
|
Returns the nr. of AI language tokens used by this message.
|
||||||
|
If unknown, 0 is returned.
|
||||||
|
"""
|
||||||
|
if self.answer:
|
||||||
|
return self.question.tokens + self.answer.tokens
|
||||||
|
else:
|
||||||
|
return self.question.tokens
|
||||||
|
|||||||
@ -78,8 +78,3 @@ def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = Fals
|
|||||||
print(message['content'])
|
print(message['content'])
|
||||||
else:
|
else:
|
||||||
print(f"{message['role'].upper()}: {message['content']}")
|
print(f"{message['role'].upper()}: {message['content']}")
|
||||||
|
|
||||||
|
|
||||||
def print_tags_frequency(tags: list[str]) -> None:
|
|
||||||
for tag in sorted(set(tags)):
|
|
||||||
print(f"- {tag}: {tags.count(tag)}")
|
|
||||||
|
|||||||
302
tests/test_chat.py
Normal file
302
tests/test_chat.py
Normal file
@ -0,0 +1,302 @@
|
|||||||
|
import pathlib
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
from io import StringIO
|
||||||
|
from unittest.mock import patch
|
||||||
|
from chatmastermind.tags import TagLine
|
||||||
|
from chatmastermind.message import Message, Question, Answer, Tag, MessageFilter
|
||||||
|
from chatmastermind.chat import Chat, ChatDB, terminal_width
|
||||||
|
from .test_main import CmmTestCase
|
||||||
|
|
||||||
|
|
||||||
|
class TestChat(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.chat = Chat([])
|
||||||
|
self.message1 = Message(Question('Question 1'),
|
||||||
|
Answer('Answer 1'),
|
||||||
|
{Tag('atag1'), Tag('btag2')},
|
||||||
|
file_path=pathlib.Path('0001.txt'))
|
||||||
|
self.message2 = Message(Question('Question 2'),
|
||||||
|
Answer('Answer 2'),
|
||||||
|
{Tag('btag2')},
|
||||||
|
file_path=pathlib.Path('0002.txt'))
|
||||||
|
|
||||||
|
def test_filter(self) -> None:
|
||||||
|
self.chat.add_msgs([self.message1, self.message2])
|
||||||
|
self.chat.filter(MessageFilter(answer_contains='Answer 1'))
|
||||||
|
|
||||||
|
self.assertEqual(len(self.chat.messages), 1)
|
||||||
|
self.assertEqual(self.chat.messages[0].question, 'Question 1')
|
||||||
|
|
||||||
|
def test_sort(self) -> None:
|
||||||
|
self.chat.add_msgs([self.message2, self.message1])
|
||||||
|
self.chat.sort()
|
||||||
|
self.assertEqual(self.chat.messages[0].question, 'Question 1')
|
||||||
|
self.assertEqual(self.chat.messages[1].question, 'Question 2')
|
||||||
|
self.chat.sort(reverse=True)
|
||||||
|
self.assertEqual(self.chat.messages[0].question, 'Question 2')
|
||||||
|
self.assertEqual(self.chat.messages[1].question, 'Question 1')
|
||||||
|
|
||||||
|
def test_clear(self) -> None:
|
||||||
|
self.chat.add_msgs([self.message1])
|
||||||
|
self.chat.clear()
|
||||||
|
self.assertEqual(len(self.chat.messages), 0)
|
||||||
|
|
||||||
|
def test_add_msgs(self) -> None:
|
||||||
|
self.chat.add_msgs([self.message1, self.message2])
|
||||||
|
self.assertEqual(len(self.chat.messages), 2)
|
||||||
|
self.assertEqual(self.chat.messages[0].question, 'Question 1')
|
||||||
|
self.assertEqual(self.chat.messages[1].question, 'Question 2')
|
||||||
|
|
||||||
|
def test_tags(self) -> None:
|
||||||
|
self.chat.add_msgs([self.message1, self.message2])
|
||||||
|
tags_all = self.chat.tags()
|
||||||
|
self.assertSetEqual(tags_all, {Tag('atag1'), Tag('btag2')})
|
||||||
|
tags_pref = self.chat.tags(prefix='a')
|
||||||
|
self.assertSetEqual(tags_pref, {Tag('atag1')})
|
||||||
|
tags_cont = self.chat.tags(contain='2')
|
||||||
|
self.assertSetEqual(tags_cont, {Tag('btag2')})
|
||||||
|
|
||||||
|
def test_tags_frequency(self) -> None:
|
||||||
|
self.chat.add_msgs([self.message1, self.message2])
|
||||||
|
tags_freq = self.chat.tags_frequency()
|
||||||
|
self.assertDictEqual(tags_freq, {'atag1': 1, 'btag2': 2})
|
||||||
|
|
||||||
|
@patch('sys.stdout', new_callable=StringIO)
|
||||||
|
def test_print(self, mock_stdout: StringIO) -> None:
|
||||||
|
self.chat.add_msgs([self.message1, self.message2])
|
||||||
|
self.chat.print(paged=False)
|
||||||
|
expected_output = f"""{'-'*terminal_width()}
|
||||||
|
{Question.txt_header}
|
||||||
|
Question 1
|
||||||
|
{Answer.txt_header}
|
||||||
|
Answer 1
|
||||||
|
{'-'*terminal_width()}
|
||||||
|
{Question.txt_header}
|
||||||
|
Question 2
|
||||||
|
{Answer.txt_header}
|
||||||
|
Answer 2
|
||||||
|
"""
|
||||||
|
self.assertEqual(mock_stdout.getvalue(), expected_output)
|
||||||
|
|
||||||
|
@patch('sys.stdout', new_callable=StringIO)
|
||||||
|
def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None:
|
||||||
|
self.chat.add_msgs([self.message1, self.message2])
|
||||||
|
self.chat.print(paged=False, with_tags=True, with_file=True)
|
||||||
|
expected_output = f"""{'-'*terminal_width()}
|
||||||
|
{Question.txt_header}
|
||||||
|
Question 1
|
||||||
|
{Answer.txt_header}
|
||||||
|
Answer 1
|
||||||
|
{TagLine.prefix} atag1 btag2
|
||||||
|
FILE: 0001.txt
|
||||||
|
{'-'*terminal_width()}
|
||||||
|
{Question.txt_header}
|
||||||
|
Question 2
|
||||||
|
{Answer.txt_header}
|
||||||
|
Answer 2
|
||||||
|
{TagLine.prefix} btag2
|
||||||
|
FILE: 0002.txt
|
||||||
|
"""
|
||||||
|
self.assertEqual(mock_stdout.getvalue(), expected_output)
|
||||||
|
|
||||||
|
|
||||||
|
class TestChatDB(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.db_path = tempfile.TemporaryDirectory()
|
||||||
|
self.cache_path = tempfile.TemporaryDirectory()
|
||||||
|
|
||||||
|
self.message1 = Message(Question('Question 1'),
|
||||||
|
Answer('Answer 1'),
|
||||||
|
{Tag('tag1')},
|
||||||
|
file_path=pathlib.Path('0001.txt'))
|
||||||
|
self.message2 = Message(Question('Question 2'),
|
||||||
|
Answer('Answer 2'),
|
||||||
|
{Tag('tag2')},
|
||||||
|
file_path=pathlib.Path('0002.yaml'))
|
||||||
|
self.message3 = Message(Question('Question 3'),
|
||||||
|
Answer('Answer 3'),
|
||||||
|
{Tag('tag3')},
|
||||||
|
file_path=pathlib.Path('0003.txt'))
|
||||||
|
self.message4 = Message(Question('Question 4'),
|
||||||
|
Answer('Answer 4'),
|
||||||
|
{Tag('tag4')},
|
||||||
|
file_path=pathlib.Path('0004.yaml'))
|
||||||
|
|
||||||
|
self.message1.to_file(pathlib.Path(self.db_path.name, '0001.txt'))
|
||||||
|
self.message2.to_file(pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||||
|
self.message3.to_file(pathlib.Path(self.db_path.name, '0003.txt'))
|
||||||
|
self.message4.to_file(pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.db_path.cleanup()
|
||||||
|
self.cache_path.cleanup()
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_chat_db_from_dir(self) -> None:
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name))
|
||||||
|
self.assertEqual(len(chat_db.messages), 4)
|
||||||
|
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||||
|
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||||
|
# check that the files are sorted
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0001.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[1].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||||
|
self.assertEqual(chat_db.messages[2].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0003.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[3].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||||
|
|
||||||
|
def test_chat_db_from_dir_glob(self) -> None:
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name),
|
||||||
|
glob='*.txt')
|
||||||
|
self.assertEqual(len(chat_db.messages), 2)
|
||||||
|
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||||
|
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0001.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[1].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0003.txt'))
|
||||||
|
|
||||||
|
def test_chat_db_filter(self) -> None:
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name),
|
||||||
|
mfilter=MessageFilter(answer_contains='Answer 2'))
|
||||||
|
self.assertEqual(len(chat_db.messages), 1)
|
||||||
|
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||||
|
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||||
|
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
|
||||||
|
|
||||||
|
def test_chat_db_from_messges(self) -> None:
|
||||||
|
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name),
|
||||||
|
messages=[self.message1, self.message2,
|
||||||
|
self.message3, self.message4])
|
||||||
|
self.assertEqual(len(chat_db.messages), 4)
|
||||||
|
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||||
|
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||||
|
|
||||||
|
def test_chat_db_fids(self) -> None:
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name))
|
||||||
|
self.assertEqual(chat_db.get_next_fid(), 1)
|
||||||
|
self.assertEqual(chat_db.get_next_fid(), 2)
|
||||||
|
self.assertEqual(chat_db.get_next_fid(), 3)
|
||||||
|
with open(chat_db.next_fname, 'r') as f:
|
||||||
|
self.assertEqual(f.read(), '3')
|
||||||
|
|
||||||
|
def test_chat_db_write(self) -> None:
|
||||||
|
# create a new ChatDB instance
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name))
|
||||||
|
# check that Message.file_path is correct
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.db_path.name, '0001.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||||
|
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||||
|
|
||||||
|
# write the messages to the cache directory
|
||||||
|
chat_db.write_cache()
|
||||||
|
# check if the written files are in the cache directory
|
||||||
|
cache_dir_files = list(pathlib.Path(self.cache_path.name).glob('*'))
|
||||||
|
self.assertEqual(len(cache_dir_files), 4)
|
||||||
|
self.assertIn(pathlib.Path(self.cache_path.name, '0001.txt'), cache_dir_files)
|
||||||
|
self.assertIn(pathlib.Path(self.cache_path.name, '0002.yaml'), cache_dir_files)
|
||||||
|
self.assertIn(pathlib.Path(self.cache_path.name, '0003.txt'), cache_dir_files)
|
||||||
|
self.assertIn(pathlib.Path(self.cache_path.name, '0004.yaml'), cache_dir_files)
|
||||||
|
# check that Message.file_path has been correctly updated
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.cache_path.name, '0001.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.cache_path.name, '0002.yaml'))
|
||||||
|
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.cache_path.name, '0003.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.cache_path.name, '0004.yaml'))
|
||||||
|
|
||||||
|
# check the timestamp of the files in the DB directory
|
||||||
|
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
|
||||||
|
self.assertEqual(len(db_dir_files), 4)
|
||||||
|
old_timestamps = {file: file.stat().st_mtime for file in db_dir_files}
|
||||||
|
# overwrite the messages in the db directory
|
||||||
|
time.sleep(0.05)
|
||||||
|
chat_db.write_db()
|
||||||
|
# check if the written files are in the DB directory
|
||||||
|
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
|
||||||
|
self.assertEqual(len(db_dir_files), 4)
|
||||||
|
self.assertIn(pathlib.Path(self.db_path.name, '0001.txt'), db_dir_files)
|
||||||
|
self.assertIn(pathlib.Path(self.db_path.name, '0002.yaml'), db_dir_files)
|
||||||
|
self.assertIn(pathlib.Path(self.db_path.name, '0003.txt'), db_dir_files)
|
||||||
|
self.assertIn(pathlib.Path(self.db_path.name, '0004.yaml'), db_dir_files)
|
||||||
|
# check if all files in the DB dir have actually been overwritten
|
||||||
|
for file in db_dir_files:
|
||||||
|
self.assertGreater(file.stat().st_mtime, old_timestamps[file])
|
||||||
|
# check that Message.file_path has been correctly updated (again)
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.db_path.name, '0001.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||||
|
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||||
|
|
||||||
|
def test_chat_db_read(self) -> None:
|
||||||
|
# create a new ChatDB instance
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name))
|
||||||
|
self.assertEqual(len(chat_db.messages), 4)
|
||||||
|
|
||||||
|
# create 2 new files in the DB directory
|
||||||
|
new_message1 = Message(Question('Question 5'),
|
||||||
|
Answer('Answer 5'),
|
||||||
|
{Tag('tag5')})
|
||||||
|
new_message2 = Message(Question('Question 6'),
|
||||||
|
Answer('Answer 6'),
|
||||||
|
{Tag('tag6')})
|
||||||
|
new_message1.to_file(pathlib.Path(self.db_path.name, '0005.txt'))
|
||||||
|
new_message2.to_file(pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||||
|
# read and check them
|
||||||
|
chat_db.read_db()
|
||||||
|
self.assertEqual(len(chat_db.messages), 6)
|
||||||
|
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||||
|
|
||||||
|
# create 2 new files in the cache directory
|
||||||
|
new_message3 = Message(Question('Question 7'),
|
||||||
|
Answer('Answer 5'),
|
||||||
|
{Tag('tag7')})
|
||||||
|
new_message4 = Message(Question('Question 8'),
|
||||||
|
Answer('Answer 6'),
|
||||||
|
{Tag('tag8')})
|
||||||
|
new_message3.to_file(pathlib.Path(self.cache_path.name, '0007.txt'))
|
||||||
|
new_message4.to_file(pathlib.Path(self.cache_path.name, '0008.yaml'))
|
||||||
|
# read and check them
|
||||||
|
chat_db.read_cache()
|
||||||
|
self.assertEqual(len(chat_db.messages), 8)
|
||||||
|
# check that the new message have the cache dir path
|
||||||
|
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.cache_path.name, '0007.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.cache_path.name, '0008.yaml'))
|
||||||
|
# an the old ones keep their path (since they have not been replaced)
|
||||||
|
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||||
|
|
||||||
|
# now overwrite two messages in the DB directory
|
||||||
|
new_message1.question = Question('New Question 1')
|
||||||
|
new_message2.question = Question('New Question 2')
|
||||||
|
new_message1.to_file(pathlib.Path(self.db_path.name, '0005.txt'))
|
||||||
|
new_message2.to_file(pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||||
|
# read from the DB dir and check if the modified messages have been updated
|
||||||
|
chat_db.read_db()
|
||||||
|
self.assertEqual(len(chat_db.messages), 8)
|
||||||
|
self.assertEqual(chat_db.messages[4].question, 'New Question 1')
|
||||||
|
self.assertEqual(chat_db.messages[5].question, 'New Question 2')
|
||||||
|
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
|
||||||
|
|
||||||
|
# now write the messages from the cache to the DB directory
|
||||||
|
new_message3.to_file(pathlib.Path(self.db_path.name, '0007.txt'))
|
||||||
|
new_message4.to_file(pathlib.Path(self.db_path.name, '0008.yaml'))
|
||||||
|
# read and check them
|
||||||
|
chat_db.read_db()
|
||||||
|
self.assertEqual(len(chat_db.messages), 8)
|
||||||
|
# check that they now have the DB path
|
||||||
|
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.db_path.name, '0007.txt'))
|
||||||
|
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.db_path.name, '0008.yaml'))
|
||||||
@ -61,22 +61,39 @@ class SourceCodeTestCase(CmmTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class QuestionTestCase(CmmTestCase):
|
class QuestionTestCase(CmmTestCase):
|
||||||
def test_question_with_prefix(self) -> None:
|
def test_question_with_header(self) -> None:
|
||||||
with self.assertRaises(MessageError):
|
with self.assertRaises(MessageError):
|
||||||
Question("=== QUESTION === What is your name?")
|
Question(f"{Question.txt_header}\nWhat is your name?")
|
||||||
|
|
||||||
def test_question_without_prefix(self) -> None:
|
def test_question_with_answer_header(self) -> None:
|
||||||
|
with self.assertRaises(MessageError):
|
||||||
|
Question(f"{Answer.txt_header}\nBob")
|
||||||
|
|
||||||
|
def test_question_with_legal_header(self) -> None:
|
||||||
|
"""
|
||||||
|
If the header is just a part of a line, it's fine.
|
||||||
|
"""
|
||||||
|
question = Question(f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||||
|
self.assertIsInstance(question, Question)
|
||||||
|
self.assertEqual(question, f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||||
|
|
||||||
|
def test_question_without_header(self) -> None:
|
||||||
question = Question("What is your favorite color?")
|
question = Question("What is your favorite color?")
|
||||||
self.assertIsInstance(question, Question)
|
self.assertIsInstance(question, Question)
|
||||||
self.assertEqual(question, "What is your favorite color?")
|
self.assertEqual(question, "What is your favorite color?")
|
||||||
|
|
||||||
|
|
||||||
class AnswerTestCase(CmmTestCase):
|
class AnswerTestCase(CmmTestCase):
|
||||||
def test_answer_with_prefix(self) -> None:
|
def test_answer_with_header(self) -> None:
|
||||||
with self.assertRaises(MessageError):
|
with self.assertRaises(MessageError):
|
||||||
Answer("=== ANSWER === Yes")
|
Answer(f"{Answer.txt_header}\nno")
|
||||||
|
|
||||||
def test_answer_without_prefix(self) -> None:
|
def test_answer_with_legal_header(self) -> None:
|
||||||
|
answer = Answer(f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||||
|
self.assertIsInstance(answer, Answer)
|
||||||
|
self.assertEqual(answer, f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||||
|
|
||||||
|
def test_answer_without_header(self) -> None:
|
||||||
answer = Answer("No")
|
answer = Answer("No")
|
||||||
self.assertIsInstance(answer, Answer)
|
self.assertIsInstance(answer, Answer)
|
||||||
self.assertEqual(answer, "No")
|
self.assertEqual(answer, "No")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user