Compare commits

...

13 Commits

10 changed files with 420 additions and 126 deletions

64
chatmastermind/ai.py Normal file
View File

@ -0,0 +1,64 @@
from dataclasses import dataclass
from abc import abstractmethod
from typing import Protocol, Optional, Union
from .configuration import AIConfig
from .message import Message
from .chat import Chat
class AIError(Exception):
pass
@dataclass
class Tokens:
prompt: int = 0
completion: int = 0
total: int = 0
@dataclass
class AIResponse:
"""
The response to an AI request. Consists of one or more messages
(each containing the question and a single answer) and the nr.
of used tokens.
"""
messages: list[Message]
tokens: Optional[Tokens] = None
class AI(Protocol):
"""
The base class for AI clients.
"""
name: str
config: AIConfig
@abstractmethod
def request(self,
question: Message,
context: Chat,
num_answers: int = 1) -> AIResponse:
"""
Make an AI request, asking the given question with the given
context (i. e. chat history). The nr. of requested answers
corresponds to the nr. of messages in the 'AIResponse'.
"""
raise NotImplementedError
@abstractmethod
def models(self) -> list[str]:
"""
Return all models supported by this AI.
"""
raise NotImplementedError
def tokens(self, data: Union[Message, Chat]) -> int:
"""
Computes the nr. of AI language tokens for the given message
or chat. Note that the computation may not be 100% accurate
and is not implemented for all AIs.
"""
raise NotImplementedError

View File

@ -0,0 +1,90 @@
"""
Implements the OpenAI client classes and functions.
"""
import openai
from typing import Optional
from ..tags import Tag
from ..message import Message, Answer
from ..chat import Chat
from ..ai import AI, AIResponse, Tokens
from ..config import OpenAIConfig
ChatType = list[dict[str, str]]
class OpenAI(AI):
"""
The OpenAI AI client.
"""
config: OpenAIConfig
def request(self,
question: Message,
chat: Chat,
num_answers: int = 1,
otags: Optional[set[Tag]] = None) -> AIResponse:
"""
Make an AI request, asking the given question with the given
chat history. The nr. of requested answers corresponds to the
nr. of messages in the 'AIResponse'.
"""
oai_chat = self.openai_chat(chat, self.config.system, question)
response = openai.ChatCompletion.create(
model=self.config.model,
messages=oai_chat,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
top_p=self.config.top_p,
n=num_answers,
frequency_penalty=self.config.frequency_penalty,
presence_penalty=self.config.presence_penalty)
answers: list[Message] = []
for choice in response['choices']: # type: ignore
answers.append(Message(question=question.question,
answer=Answer(choice['message']['content']),
tags=otags,
ai=self.name,
model=self.config.model))
return AIResponse(answers, Tokens(response['usage']['prompt'],
response['usage']['completion'],
response['usage']['total']))
def models(self) -> list[str]:
"""
Return all models supported by this AI.
"""
raise NotImplementedError
def print_models(self) -> None:
"""
Print all models supported by the current AI.
"""
not_ready = []
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
if engine['ready']:
print(engine['id'])
else:
not_ready.append(engine['id'])
if len(not_ready) > 0:
print('\nNot ready: ' + ', '.join(not_ready))
def openai_chat(self, chat: Chat, system: str,
question: Optional[Message] = None) -> ChatType:
"""
Create a chat history with system message in OpenAI format.
Optionally append a new question.
"""
oai_chat: ChatType = []
def append(role: str, content: str) -> None:
oai_chat.append({'role': role, 'content': content.replace("''", "'")})
append('system', system)
for message in chat.messages:
if message.answer:
append('user', message.question)
append('assistant', message.answer)
if question:
append('user', question.question)
return oai_chat

View File

@ -45,7 +45,7 @@ def read_dir(dir_path: pathlib.Path,
messages: list[Message] = []
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
for file_path in sorted(file_iter):
if file_path.is_file():
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
try:
message = Message.from_file(file_path, mfilter)
if message:
@ -82,6 +82,17 @@ def write_dir(dir_path: pathlib.Path,
message.to_file(file_path)
def clear_dir(dir_path: pathlib.Path,
glob: Optional[str] = None) -> None:
"""
Deletes all Message files in the given directory.
"""
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
for file_path in file_iter:
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
file_path.unlink(missing_ok=True)
@dataclass
class Chat:
"""
@ -127,7 +138,16 @@ class Chat:
tags: set[Tag] = set()
for m in self.messages:
tags |= m.filter_tags(prefix, contain)
return tags
return set(sorted(tags))
def tags_frequency(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> dict[Tag, int]:
"""
Get the frequency of all tags of all messages, optionally filtered by prefix or substring.
"""
tags: list[Tag] = []
for m in self.messages:
tags += [tag for tag in m.filter_tags(prefix, contain)]
return {tag: tags.count(tag) for tag in sorted(tags)}
def tokens(self) -> int:
"""
@ -136,27 +156,24 @@ class Chat:
"""
return sum(m.tokens() for m in self.messages)
def print(self, dump: bool = False, source_code_only: bool = False,
with_tags: bool = False, with_file: bool = False,
def print(self, source_code_only: bool = False,
with_tags: bool = False, with_files: bool = False,
paged: bool = True) -> None:
if dump:
pp(self)
return
output: list[str] = []
for message in self.messages:
if source_code_only:
output.extend(source_code(message.question, include_delims=True))
continue
output.append('-' * terminal_width())
if with_tags:
output.append(message.tags_str())
if with_files:
output.append('FILE: ' + str(message.file_path))
output.append(Question.txt_header)
output.append(message.question)
if message.answer:
output.append(Answer.txt_header)
output.append(message.answer)
if with_tags:
output.append(message.tags_str())
if with_file:
output.append('FILE: ' + str(message.file_path))
if paged:
print_paged('\n'.join(output))
else:
@ -283,3 +300,12 @@ class ChatDB(Chat):
msgs if msgs else self.messages,
self.file_suffix,
self.get_next_fid)
def clear_cache(self) -> None:
"""
Deletes all Message files from the cache dir and removes those messages from
the internal list.
"""
clear_dir(self.cache_path, self.glob)
# only keep messages from DB dir (or those that have not yet been written)
self.messages = [m for m in self.messages if not m.file_path or m.file_path.parent.samefile(self.db_path)]

View File

@ -7,7 +7,15 @@ OpenAIConfigInst = TypeVar('OpenAIConfigInst', bound='OpenAIConfig')
@dataclass
class OpenAIConfig():
class AIConfig:
"""
The base class of all AI configurations.
"""
name: str
@dataclass
class OpenAIConfig(AIConfig):
"""
The OpenAI section of the configuration file.
"""
@ -25,6 +33,7 @@ class OpenAIConfig():
Create OpenAIConfig from a dict.
"""
return cls(
name='OpenAI',
api_key=str(source['api_key']),
model=str(source['model']),
max_tokens=int(source['max_tokens']),
@ -36,7 +45,7 @@ class OpenAIConfig():
@dataclass
class Config():
class Config:
"""
The configuration file structure.
"""
@ -47,7 +56,7 @@ class Config():
@classmethod
def from_dict(cls: Type[ConfigInst], source: dict[str, Any]) -> ConfigInst:
"""
Create OpenAIConfig from a dict.
Create Config from a dict.
"""
return cls(
system=str(source['system']),

View File

@ -6,11 +6,13 @@ import yaml
import sys
import argcomplete
import argparse
import pathlib
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, print_tags_frequency, ChatType
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data
from pathlib import Path
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
from .storage import save_answers, create_chat_hist, read_file, dump_data
from .api_client import ai, openai_api_key, print_models
from .configuration import Config
from .chat import ChatDB
from .message import Message, MessageFilter
from itertools import zip_longest
from typing import Any
@ -18,9 +20,8 @@ default_config = '.config.yaml'
def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
with open(parsed_args.config, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return get_tags_unique(config, prefix)
config = Config.from_file(parsed_args.config)
return list(Message.tags_from_dir(Path(config.db), prefix=prefix))
def create_question_with_hist(args: argparse.Namespace,
@ -31,11 +32,11 @@ def create_question_with_hist(args: argparse.Namespace,
by the specified tags.
"""
tags = args.tags or []
extags = args.extags or []
etags = args.etags or []
otags = args.output_tags or []
if not args.only_source_code:
print_tag_args(tags, extags, otags)
if not args.source_code_only:
print_tag_args(tags, etags, otags)
question_parts = []
question_list = args.question if args.question is not None else []
@ -52,17 +53,24 @@ def create_question_with_hist(args: argparse.Namespace,
question_parts.append(f"```\n{r.read().strip()}\n```")
full_question = '\n\n'.join(question_parts)
chat = create_chat_hist(full_question, tags, extags, config,
args.match_all_tags, False, False)
chat = create_chat_hist(full_question, tags, etags, config,
match_all_tags=True if args.atags else False, # FIXME
with_tags=False,
with_file=False)
return chat, full_question, tags
def tag_cmd(args: argparse.Namespace, config: Config) -> None:
def tags_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'tag' command.
Handler for the 'tags' command.
"""
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db))
if args.list:
print_tags_frequency(get_tags(config, None))
tags_freq = chat.tags_frequency(args.prefix, args.contain)
for tag, freq in tags_freq.items():
print(f"- {tag}: {freq}")
# TODO: add renaming
def config_cmd(args: argparse.Namespace, config: Config) -> None:
@ -89,7 +97,7 @@ def ask_cmd(args: argparse.Namespace, config: Config) -> None:
if args.model:
config.openai.model = args.model
chat, question, tags = create_question_with_hist(args, config)
print_chat_hist(chat, False, args.only_source_code)
print_chat_hist(chat, False, args.source_code_only)
otags = args.output_tags or []
answers, usage = ai(chat, config, args.number)
save_answers(question, answers, tags, otags, config)
@ -101,21 +109,25 @@ def hist_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'hist' command.
"""
tags = args.tags or []
extags = args.extags or []
chat = create_chat_hist(None, tags, extags, config,
args.match_all_tags,
mfilter = MessageFilter(tags_or=args.tags,
tags_and=args.atags,
tags_not=args.etags,
question_contains=args.question,
answer_contains=args.answer)
chat = ChatDB.from_dir(Path('.'),
Path(config.db),
mfilter=mfilter)
chat.print(args.source_code_only,
args.with_tags,
args.with_files)
print_chat_hist(chat, args.dump, args.only_source_code)
def print_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'print' command.
"""
fname = pathlib.Path(args.file)
fname = Path(args.file)
if fname.suffix == '.yaml':
with open(args.file, 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
@ -124,7 +136,7 @@ def print_cmd(args: argparse.Namespace, config: Config) -> None:
else:
print(f"Unknown file type: {args.file}")
sys.exit(1)
if args.only_source_code:
if args.source_code_only:
display_source_code(data['answer'])
else:
print(dump_data(data).strip())
@ -144,18 +156,17 @@ def create_parser() -> argparse.ArgumentParser:
# a parent parser for all commands that support tag selection
tag_parser = argparse.ArgumentParser(add_help=False)
tag_arg = tag_parser.add_argument('-t', '--tags', nargs='+',
help='List of tag names', metavar='TAGS')
help='List of tag names (one must match)', metavar='TAGS')
tag_arg.completer = tags_completer # type: ignore
extag_arg = tag_parser.add_argument('-e', '--extags', nargs='+',
help='List of tag names to exclude', metavar='EXTAGS')
extag_arg.completer = tags_completer # type: ignore
atag_arg = tag_parser.add_argument('-a', '--atags', nargs='+',
help='List of tag names (all must match)', metavar='TAGS')
atag_arg.completer = tags_completer # type: ignore
etag_arg = tag_parser.add_argument('-e', '--etags', nargs='+',
help='List of tag names to exclude', metavar='ETAGS')
etag_arg.completer = tags_completer # type: ignore
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
help='List of output tag names, default is input', metavar='OTAGS')
otag_arg.completer = tags_completer # type: ignore
tag_parser.add_argument('-a', '--match-all-tags',
help="All given tags must match when selecting chat history entries",
action='store_true')
# enable autocompletion for tags
# 'ask' command parser
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
@ -170,7 +181,7 @@ def create_parser() -> argparse.ArgumentParser:
ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int,
default=1)
ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
ask_cmd_parser.add_argument('-S', '--only-source-code', help='Add pure source code to the chat history',
ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
action='store_true')
# 'hist' command parser
@ -178,23 +189,25 @@ def create_parser() -> argparse.ArgumentParser:
help="Print chat history.",
aliases=['h'])
hist_cmd_parser.set_defaults(func=hist_cmd)
hist_cmd_parser.add_argument('-d', '--dump', help="Print chat history as Python structure",
action='store_true')
hist_cmd_parser.add_argument('-w', '--with-tags', help="Print chat history with tags.",
action='store_true')
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
action='store_true')
hist_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true')
hist_cmd_parser.add_argument('-A', '--answer', help='Search for answer substring')
hist_cmd_parser.add_argument('-Q', '--question', help='Search for question substring')
# 'tag' command parser
tag_cmd_parser = cmdparser.add_parser('tag',
# 'tags' command parser
tags_cmd_parser = cmdparser.add_parser('tags',
help="Manage tags.",
aliases=['t'])
tag_cmd_parser.set_defaults(func=tag_cmd)
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
tags_cmd_parser.set_defaults(func=tags_cmd)
tags_group = tags_cmd_parser.add_mutually_exclusive_group(required=True)
tags_group.add_argument('-l', '--list', help="List all tags and their frequency",
action='store_true')
tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
# 'config' command parser
config_cmd_parser = cmdparser.add_parser('config',
@ -214,7 +227,7 @@ def create_parser() -> argparse.ArgumentParser:
aliases=['p'])
print_cmd_parser.set_defaults(func=print_cmd)
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
print_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
print_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true')
argcomplete.autocomplete(parser)

View File

@ -5,7 +5,7 @@ import pathlib
import yaml
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal, Iterable
from dataclasses import dataclass, asdict, field
from .tags import Tag, TagLine, TagError, match_tags
from .tags import Tag, TagLine, TagError, match_tags, rename_tags
QuestionInst = TypeVar('QuestionInst', bound='Question')
AnswerInst = TypeVar('AnswerInst', bound='Answer')
@ -96,7 +96,7 @@ class AILine(str):
def __new__(cls: Type[AILineInst], string: str) -> AILineInst:
if not string.startswith(cls.prefix):
raise TagError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
raise MessageError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
instance = super().__new__(cls, string)
return instance
@ -116,7 +116,7 @@ class ModelLine(str):
def __new__(cls: Type[ModelLineInst], string: str) -> ModelLineInst:
if not string.startswith(cls.prefix):
raise TagError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
raise MessageError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
instance = super().__new__(cls, string)
return instance
@ -128,6 +128,40 @@ class ModelLine(str):
return cls(' '.join([cls.prefix, model]))
class Answer(str):
"""
A single answer with a defined header.
"""
tokens: int = 0 # tokens used by this answer
txt_header: ClassVar[str] = '==== ANSWER ===='
yaml_key: ClassVar[str] = 'answer'
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
"""
Make sure the answer string does not contain the header as a whole line.
"""
if cls.txt_header in string.split('\n'):
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if cls.txt_header in strings:
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
class Question(str):
"""
A single question with a defined header.
@ -138,10 +172,14 @@ class Question(str):
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
"""
Make sure the question string does not contain the header.
Make sure the question string does not contain the header as a whole line
(also not that from 'Answer', so it's always clear where the answer starts).
"""
if cls.txt_header in string:
string_lines = string.split('\n')
if cls.txt_header in string_lines:
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
if Answer.txt_header in string_lines:
raise MessageError(f"Question '{string}' contains the header '{Answer.txt_header}'")
instance = super().__new__(cls, string)
return instance
@ -150,41 +188,7 @@ class Question(str):
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.txt_header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
class Answer(str):
"""
A single answer with a defined header.
"""
tokens: int = 0 # tokens used by this answer
txt_header: ClassVar[str] = '=== ANSWER ==='
yaml_key: ClassVar[str] = 'answer'
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
"""
Make sure the answer string does not contain the header.
"""
if cls.txt_header in string:
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.txt_header in string for string in strings):
if cls.txt_header in strings:
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
@ -351,17 +355,20 @@ class Message():
try:
pos = fd.tell()
ai = AILine(fd.readline()).ai()
except TagError:
except MessageError:
fd.seek(pos)
# ModelLine (Optional)
try:
pos = fd.tell()
model = ModelLine(fd.readline()).model()
except TagError:
except MessageError:
fd.seek(pos)
# Question and Answer
text = fd.read().strip().split('\n')
try:
question_idx = text.index(Question.txt_header) + 1
except ValueError:
raise MessageError(f"Question header '{Question.txt_header}' not found in '{file_path}'")
try:
answer_idx = text.index(Answer.txt_header)
question = Question.from_list(text[question_idx:answer_idx])
@ -492,6 +499,14 @@ class Message():
return False
return True
def rename_tags(self, tags_rename: set[tuple[Tag, Tag]]) -> None:
"""
Renames the given tags. The first tuple element is the old name,
the second one is the new name.
"""
if self.tags:
self.tags = rename_tags(self.tags, tags_rename)
def msg_id(self) -> str:
"""
Returns an ID that is unique throughout all messages in the same (DB) directory.

View File

@ -78,8 +78,3 @@ def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = Fals
print(message['content'])
else:
print(f"{message['role'].upper()}: {message['content']}")
def print_tags_frequency(tags: list[str]) -> None:
for tag in sorted(set(tags)):
print(f"- {tag}: {tags.count(tag)}")

View File

@ -14,7 +14,7 @@ class TestChat(CmmTestCase):
self.chat = Chat([])
self.message1 = Message(Question('Question 1'),
Answer('Answer 1'),
{Tag('atag1')},
{Tag('atag1'), Tag('btag2')},
file_path=pathlib.Path('0001.txt'))
self.message2 = Message(Question('Question 2'),
Answer('Answer 2'),
@ -57,6 +57,11 @@ class TestChat(CmmTestCase):
tags_cont = self.chat.tags(contain='2')
self.assertSetEqual(tags_cont, {Tag('btag2')})
def test_tags_frequency(self) -> None:
self.chat.add_msgs([self.message1, self.message2])
tags_freq = self.chat.tags_frequency()
self.assertDictEqual(tags_freq, {'atag1': 1, 'btag2': 2})
@patch('sys.stdout', new_callable=StringIO)
def test_print(self, mock_stdout: StringIO) -> None:
self.chat.add_msgs([self.message1, self.message2])
@ -77,21 +82,21 @@ Answer 2
@patch('sys.stdout', new_callable=StringIO)
def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None:
self.chat.add_msgs([self.message1, self.message2])
self.chat.print(paged=False, with_tags=True, with_file=True)
self.chat.print(paged=False, with_tags=True, with_files=True)
expected_output = f"""{'-'*terminal_width()}
{TagLine.prefix} atag1 btag2
FILE: 0001.txt
{Question.txt_header}
Question 1
{Answer.txt_header}
Answer 1
{TagLine.prefix} atag1
FILE: 0001.txt
{'-'*terminal_width()}
{TagLine.prefix} btag2
FILE: 0002.txt
{Question.txt_header}
Question 2
{Answer.txt_header}
Answer 2
{TagLine.prefix} btag2
FILE: 0002.txt
"""
self.assertEqual(mock_stdout.getvalue(), expected_output)
@ -295,3 +300,48 @@ class TestChatDB(CmmTestCase):
# check that they now have the DB path
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.db_path.name, '0007.txt'))
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.db_path.name, '0008.yaml'))
def test_chat_db_clear(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
# check that Message.file_path is correct
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.db_path.name, '0001.txt'))
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.db_path.name, '0002.yaml'))
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
# write the messages to the cache directory
chat_db.write_cache()
# check if the written files are in the cache directory
cache_dir_files = list(pathlib.Path(self.cache_path.name).glob('*'))
self.assertEqual(len(cache_dir_files), 4)
# now rewrite them to the DB dir and check for modified paths
chat_db.write_db()
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
self.assertEqual(len(db_dir_files), 4)
self.assertIn(pathlib.Path(self.db_path.name, '0001.txt'), db_dir_files)
self.assertIn(pathlib.Path(self.db_path.name, '0002.yaml'), db_dir_files)
self.assertIn(pathlib.Path(self.db_path.name, '0003.txt'), db_dir_files)
self.assertIn(pathlib.Path(self.db_path.name, '0004.yaml'), db_dir_files)
# add a new message with empty file_path
message_empty = Message(question=Question("What the hell am I doing here?"),
answer=Answer("You don't belong here!"))
# and one for the cache dir
message_cache = Message(question=Question("What the hell am I doing here?"),
answer=Answer("You're a creep!"),
file_path=pathlib.Path(self.cache_path.name, '0005.txt'))
chat_db.add_msgs([message_empty, message_cache])
# clear the cache and check the cache dir
chat_db.clear_cache()
cache_dir_files = list(pathlib.Path(self.cache_path.name).glob('*'))
self.assertEqual(len(cache_dir_files), 0)
# make sure that the DB messages (and the new message) are still there
self.assertEqual(len(chat_db.messages), 5)
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
self.assertEqual(len(db_dir_files), 4)
# but not the message with the cache dir path
self.assertFalse(any(m.file_path == message_cache.file_path for m in chat_db.messages))

View File

@ -115,11 +115,12 @@ class TestHandleQuestion(CmmTestCase):
self.question = "test question"
self.args = argparse.Namespace(
tags=['tag1'],
extags=['extag1'],
atags=None,
etags=['etag1'],
output_tags=None,
question=[self.question],
source=None,
only_source_code=False,
source_code_only=False,
number=3,
max_tokens=None,
temperature=None,
@ -143,16 +144,18 @@ class TestHandleQuestion(CmmTestCase):
with patch("chatmastermind.storage.open", open_mock):
ask_cmd(self.args, self.config)
mock_print_tag_args.assert_called_once_with(self.args.tags,
self.args.extags,
self.args.etags,
[])
mock_create_chat_hist.assert_called_once_with(self.question,
self.args.tags,
self.args.extags,
self.args.etags,
self.config,
False, False, False)
match_all_tags=False,
with_tags=False,
with_file=False)
mock_print_chat_hist.assert_called_once_with('test_chat',
False,
self.args.only_source_code)
self.args.source_code_only)
mock_ai.assert_called_with("test_chat",
self.config,
self.args.number)
@ -227,7 +230,7 @@ class TestCreateParser(CmmTestCase):
mock_add_subparsers.assert_called_once_with(dest='command', title='commands', description='supported commands', required=True)
mock_cmdparser.add_parser.assert_any_call('ask', parents=ANY, help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('hist', parents=ANY, help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('tag', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('tags', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
self.assertTrue('.config.yaml' in parser.get_default('config'))

View File

@ -61,22 +61,39 @@ class SourceCodeTestCase(CmmTestCase):
class QuestionTestCase(CmmTestCase):
def test_question_with_prefix(self) -> None:
def test_question_with_header(self) -> None:
with self.assertRaises(MessageError):
Question("=== QUESTION === What is your name?")
Question(f"{Question.txt_header}\nWhat is your name?")
def test_question_without_prefix(self) -> None:
def test_question_with_answer_header(self) -> None:
with self.assertRaises(MessageError):
Question(f"{Answer.txt_header}\nBob")
def test_question_with_legal_header(self) -> None:
"""
If the header is just a part of a line, it's fine.
"""
question = Question(f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
self.assertIsInstance(question, Question)
self.assertEqual(question, f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
def test_question_without_header(self) -> None:
question = Question("What is your favorite color?")
self.assertIsInstance(question, Question)
self.assertEqual(question, "What is your favorite color?")
class AnswerTestCase(CmmTestCase):
def test_answer_with_prefix(self) -> None:
def test_answer_with_header(self) -> None:
with self.assertRaises(MessageError):
Answer("=== ANSWER === Yes")
Answer(f"{Answer.txt_header}\nno")
def test_answer_without_prefix(self) -> None:
def test_answer_with_legal_header(self) -> None:
answer = Answer(f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
self.assertIsInstance(answer, Answer)
self.assertEqual(answer, f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
def test_answer_without_header(self) -> None:
answer = Answer("No")
self.assertIsInstance(answer, Answer)
self.assertEqual(answer, "No")
@ -775,3 +792,15 @@ class MessageInTestCase(CmmTestCase):
def test_message_in(self) -> None:
self.assertTrue(message_in(self.message1, [self.message1]))
self.assertFalse(message_in(self.message1, [self.message2]))
class MessageRenameTagsTestCase(CmmTestCase):
def setUp(self) -> None:
self.message = Message(Question('This is a question.'),
tags={Tag('atag1'), Tag('btag2')},
file_path=pathlib.Path('/tmp/foo/bla'))
def test_rename_tags(self) -> None:
self.message.rename_tags({(Tag('atag1'), Tag('atag2')), (Tag('btag2'), Tag('btag3'))})
self.assertIsNotNone(self.message.tags)
self.assertSetEqual(self.message.tags, {Tag('atag2'), Tag('btag3')}) # type: ignore [arg-type]