Compare commits

...

7 Commits

7 changed files with 878 additions and 125 deletions

1
.gitignore vendored
View File

@ -131,3 +131,4 @@ dmypy.json
.config.yaml .config.yaml
db db
noweb noweb
Session.vim

View File

@ -63,4 +63,7 @@ class Config():
def to_file(self, path: str) -> None: def to_file(self, path: str) -> None:
with open(path, 'w') as f: with open(path, 'w') as f:
yaml.dump(asdict(self), f) yaml.dump(asdict(self), f, sort_keys=False)
def as_dict(self) -> dict[str, Any]:
return asdict(self)

346
chatmastermind/message.py Normal file
View File

@ -0,0 +1,346 @@
"""
Module implementing message related functions and classes.
"""
import pathlib
import yaml
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final
from dataclasses import dataclass, asdict
from .tags import Tag, TagLine, TagError, match_tags
QuestionInst = TypeVar('QuestionInst', bound='Question')
AnswerInst = TypeVar('AnswerInst', bound='Answer')
MessageInst = TypeVar('MessageInst', bound='Message')
AILineInst = TypeVar('AILineInst', bound='AILine')
ModelLineInst = TypeVar('ModelLineInst', bound='ModelLine')
YamlDict = dict[str, Union[QuestionInst, AnswerInst, set[Tag]]]
class MessageError(Exception):
pass
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
"""
Changes the YAML dump style to multiline syntax for multiline strings.
"""
if len(data.splitlines()) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_presenter)
def source_code(text: str, include_delims: bool = False) -> list[str]:
"""
Extract all source code sections from the given text, i. e. all lines
surrounded by lines tarting with '```'. If 'include_delims' is True,
the surrounding lines are included, otherwise they are omitted. The
result list contains every source code section as a single string.
The order in the list represents the order of the sections in the text.
"""
code_sections: list[str] = []
code_lines: list[str] = []
in_code_block = False
for line in text.split('\n'):
if line.strip().startswith('```'):
if include_delims:
code_lines.append(line)
if in_code_block:
code_sections.append('\n'.join(code_lines) + '\n')
code_lines.clear()
in_code_block = not in_code_block
elif in_code_block:
code_lines.append(line)
return code_sections
class AILine(str):
"""
A line that represents the AI name in a '.txt' file..
"""
prefix: Final[str] = 'AI:'
def __new__(cls: Type[AILineInst], string: str) -> AILineInst:
if not string.startswith(cls.prefix):
raise TagError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
instance = super().__new__(cls, string)
return instance
def ai(self) -> str:
return self[len(self.prefix):].strip()
@classmethod
def from_ai(cls: Type[AILineInst], ai: str) -> AILineInst:
return cls(' '.join([cls.prefix, ai]))
class ModelLine(str):
"""
A line that represents the model name in a '.txt' file..
"""
prefix: Final[str] = 'MODEL:'
def __new__(cls: Type[ModelLineInst], string: str) -> ModelLineInst:
if not string.startswith(cls.prefix):
raise TagError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
instance = super().__new__(cls, string)
return instance
def model(self) -> str:
return self[len(self.prefix):].strip()
@classmethod
def from_model(cls: Type[ModelLineInst], model: str) -> ModelLineInst:
return cls(' '.join([cls.prefix, model]))
class Question(str):
"""
A single question with a defined header.
"""
txt_header: ClassVar[str] = '=== QUESTION ==='
yaml_key: ClassVar[str] = 'question'
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
"""
Make sure the question string does not contain the header.
"""
if cls.txt_header in string:
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.txt_header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
class Answer(str):
"""
A single answer with a defined header.
"""
txt_header: ClassVar[str] = '=== ANSWER ==='
yaml_key: ClassVar[str] = 'answer'
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
"""
Make sure the answer string does not contain the header.
"""
if cls.txt_header in string:
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.txt_header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
@dataclass
class Message():
"""
Single message. Consists of a question and optionally an answer, a set of tags
and a file path.
"""
question: Question
answer: Optional[Answer]
tags: Optional[set[Tag]]
ai: Optional[str]
model: Optional[str]
file_path: Optional[pathlib.Path]
# class variables
file_suffixes: ClassVar[list[str]] = ['.txt', '.yaml']
tags_yaml_key: ClassVar[str] = 'tags'
file_yaml_key: ClassVar[str] = 'file_path'
ai_yaml_key: ClassVar[str] = 'ai'
model_yaml_key: ClassVar[str] = 'model'
@classmethod
def from_dict(cls: Type[MessageInst], data: dict[str, Any]) -> MessageInst:
"""
Create a Message from the given dict.
"""
return cls(question=data[Question.yaml_key],
answer=data.get(Answer.yaml_key, None),
tags=set(data.get(cls.tags_yaml_key, [])),
ai=data.get(cls.ai_yaml_key, None),
model=data.get(cls.model_yaml_key, None),
file_path=data.get(cls.file_yaml_key, None))
@classmethod
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
"""
Return only the tags from the given Message file.
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
tags = TagLine(fd.readline()).tags()
else: # '.yaml'
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
tags = set(sorted(data[cls.tags_yaml_key]))
return tags
@classmethod
def from_file(cls: Type[MessageInst], file_path: pathlib.Path, # noqa: 11
tags_or: Optional[set[Tag]] = None,
tags_and: Optional[set[Tag]] = None,
tags_not: Optional[set[Tag]] = None) -> Optional[MessageInst]:
"""
Create a Message from the given file. Expects the following file structures:
For '.txt':
* TagLine [Optional]
* AI [Optional]
* Model [Optional]
* Question.txt_header
* Question
* Answer.txt_header [Optional]
# Answer [Optional]
For '.yaml':
* Question.yaml_key: single or multiline string
* Answer.yaml_key: single or multiline string [Optional]
* Message.tags_yaml_key: list of strings [Optional]
* Message.ai_yaml_key: str [Optional]
* Message.model_yaml_key: str [Optional]
Returns 'None' if the message does not fulfill the tag requirements.
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
tags: set[Tag] = set()
question: Question
answer: Optional[Answer] = None
ai: Optional[str] = None
model: Optional[str] = None
# TXT
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
# TagLine (Optional)
try:
pos = fd.tell()
tags = TagLine(fd.readline()).tags()
except TagError:
fd.seek(pos)
if tags_or or tags_and or tags_not:
# match with an empty set if the file has no tags
if not match_tags(tags, tags_or, tags_and, tags_not):
return None
# AILine (Optional)
try:
pos = fd.tell()
ai = AILine(fd.readline()).ai()
except TagError:
fd.seek(pos)
# ModelLine (Optional)
try:
pos = fd.tell()
model = ModelLine(fd.readline()).model()
except TagError:
fd.seek(pos)
# Question and Answer
text = fd.read().strip().split('\n')
question_idx = text.index(Question.txt_header) + 1
try:
answer_idx = text.index(Answer.txt_header)
question = Question.from_list(text[question_idx:answer_idx])
answer = Answer.from_list(text[answer_idx + 1:])
except ValueError:
question = Question.from_list(text[question_idx:])
return cls(question, answer, tags, ai, model, file_path)
# YAML
else:
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
if tags_or or tags_and or tags_not:
if Message.tags_yaml_key in data:
tags = set([Tag(tag) for tag in data[Message.tags_yaml_key]])
# match with an empty set if the file has no tags
if not match_tags(tags, tags_or, tags_and, tags_not):
return None
data[cls.file_yaml_key] = file_path
return cls.from_dict(data)
def to_file(self, file_path: Optional[pathlib.Path]) -> None: # noqa: 11
"""
Write a Message to the given file. Creates the following file structures:
For '.txt':
* TagLine
* AI [Optional]
* Model [Optional]
* Question.txt_header
* Question
* Answer.txt_header
* Answer
For '.yaml':
* Question.yaml_key: single or multiline string
* Answer.yaml_key: single or multiline string
* Message.tags_yaml_key: list of strings
* Message.ai_yaml_key: str [Optional]
* Message.model_yaml_key: str [Optional]
"""
if file_path:
self.file_path = file_path
if not self.file_path:
raise MessageError("Got no valid path to write message")
if self.file_path.suffix not in self.file_suffixes:
raise MessageError(f"File type '{self.file_path.suffix}' is not supported")
# TXT
if self.file_path.suffix == '.txt':
with open(self.file_path, "w") as fd:
msg_tags = self.tags or set()
fd.write(f'{TagLine.from_set(msg_tags)}\n')
if self.ai:
fd.write(f'{AILine.from_ai(self.ai)}\n')
if self.model:
fd.write(f'{ModelLine.from_model(self.model)}\n')
fd.write(f'{Question.txt_header}\n{self.question}\n')
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
# YAML
elif self.file_path.suffix == '.yaml':
with open(self.file_path, "w") as fd:
data: YamlDict = {Question.yaml_key: str(self.question)}
if self.answer:
data[Answer.yaml_key] = str(self.answer)
if self.ai:
data[self.ai_yaml_key] = self.ai
if self.model:
data[self.model_yaml_key] = self.model
if self.tags:
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, fd, sort_keys=False)
def as_dict(self) -> dict[str, Any]:
return asdict(self)

View File

@ -1,7 +1,7 @@
""" """
Module implementing tag related functions and classes. Module implementing tag related functions and classes.
""" """
from typing import Type, TypeVar, Optional from typing import Type, TypeVar, Optional, Final
TagInst = TypeVar('TagInst', bound='Tag') TagInst = TypeVar('TagInst', bound='Tag')
TagLineInst = TypeVar('TagLineInst', bound='TagLine') TagLineInst = TypeVar('TagLineInst', bound='TagLine')
@ -16,9 +16,9 @@ class Tag(str):
A single tag. A string that can contain anything but the default separator (' '). A single tag. A string that can contain anything but the default separator (' ').
""" """
# default separator # default separator
default_separator = ' ' default_separator: Final[str] = ' '
# alternative separators (e. g. for backwards compatibility) # alternative separators (e. g. for backwards compatibility)
alternative_separators = [','] alternative_separators: Final[list[str]] = [',']
def __new__(cls: Type[TagInst], string: str) -> TagInst: def __new__(cls: Type[TagInst], string: str) -> TagInst:
""" """
@ -93,19 +93,21 @@ def match_tags(tags: set[Tag], tags_or: Optional[set[Tag]], tags_and: Optional[s
class TagLine(str): class TagLine(str):
""" """
A line of tags. It starts with a prefix ('TAGS:'), followed by a list of tags, A line of tags in a '.txt' file. It starts with a prefix ('TAGS:'), followed by
separated by the defaut separator (' '). Any operations on a TagLine will sort a list of tags, separated by the defaut separator (' '). Any operations on a
the tags. TagLine will sort the tags.
""" """
# the prefix # the prefix
prefix = 'TAGS:' prefix: Final[str] = 'TAGS:'
def __new__(cls: Type[TagLineInst], string: str) -> TagLineInst: def __new__(cls: Type[TagLineInst], string: str) -> TagLineInst:
""" """
Make sure the tagline string starts with the prefix. Make sure the tagline string starts with the prefix. Also replace newlines
and multiple spaces with ' ', in order to support multiline TagLines.
""" """
if not string.startswith(cls.prefix): if not string.startswith(cls.prefix):
raise TagError(f"TagLine '{string}' is missing prefix '{cls.prefix}'") raise TagError(f"TagLine '{string}' is missing prefix '{cls.prefix}'")
string = ' '.join(string.split())
instance = super().__new__(cls, string) instance = super().__new__(cls, string)
return instance return instance
@ -114,7 +116,7 @@ class TagLine(str):
""" """
Create a new TagLine from a set of tags. Create a new TagLine from a set of tags.
""" """
return cls(' '.join([TagLine.prefix] + sorted([t for t in tags]))) return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
def tags(self) -> set[Tag]: def tags(self) -> set[Tag]:
""" """

View File

@ -7,7 +7,6 @@ from chatmastermind.main import create_parser, ask_cmd
from chatmastermind.api_client import ai from chatmastermind.api_client import ai
from chatmastermind.configuration import Config from chatmastermind.configuration import Config
from chatmastermind.storage import create_chat_hist, save_answers, dump_data from chatmastermind.storage import create_chat_hist, save_answers, dump_data
from chatmastermind.tags import Tag, TagLine, TagError
from unittest import mock from unittest import mock
from unittest.mock import patch, MagicMock, Mock, ANY from unittest.mock import patch, MagicMock, Mock, ANY
@ -232,116 +231,3 @@ class TestCreateParser(CmmTestCase):
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY) mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY) mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
self.assertTrue('.config.yaml' in parser.get_default('config')) self.assertTrue('.config.yaml' in parser.get_default('config'))
class TestTag(CmmTestCase):
def test_valid_tag(self) -> None:
tag = Tag('mytag')
self.assertEqual(tag, 'mytag')
def test_invalid_tag(self) -> None:
with self.assertRaises(TagError):
Tag('tag with space')
def test_default_separator(self) -> None:
self.assertEqual(Tag.default_separator, ' ')
def test_alternative_separators(self) -> None:
self.assertEqual(Tag.alternative_separators, [','])
class TestTagLine(CmmTestCase):
def test_valid_tagline(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_invalid_tagline(self) -> None:
with self.assertRaises(TagError):
TagLine('tag1 tag2')
def test_prefix(self) -> None:
self.assertEqual(TagLine.prefix, 'TAGS:')
def test_from_set(self) -> None:
tags = {Tag('tag1'), Tag('tag2')}
tagline = TagLine.from_set(tags)
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_merge(self) -> None:
tagline1 = TagLine('TAGS: tag1 tag2')
tagline2 = TagLine('TAGS: tag2 tag3')
merged_tagline = tagline1.merge({tagline2})
self.assertEqual(merged_tagline, 'TAGS: tag1 tag2 tag3')
def test_delete_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
new_tagline = tagline.delete_tags({Tag('tag1'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag2')
def test_add_tags(self) -> None:
tagline = TagLine('TAGS: tag1')
new_tagline = tagline.add_tags({Tag('tag2'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag1 tag2 tag3')
def test_rename_tags(self) -> None:
tagline = TagLine('TAGS: old1 old2')
new_tagline = tagline.rename_tags({(Tag('old1'), Tag('new1')), (Tag('old2'), Tag('new2'))})
self.assertEqual(new_tagline, 'TAGS: new1 new2')
def test_match_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
# Test case 1: Match any tag in 'tags_or'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and: set[Tag] = set()
tags_not: set[Tag] = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 2: Match all tags in 'tags_and'
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 3: Match any tag in 'tags_or' and match all tags in 'tags_and'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 4: Match any tag in 'tags_or', match all tags in 'tags_and', and exclude tags in 'tags_not'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = {Tag('tag5')}
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 5: No matching tags in 'tags_or'
tags_or = {Tag('tag4'), Tag('tag5')}
tags_and = set()
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 6: Not all tags in 'tags_and' are present
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3'), Tag('tag4')}
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 7: Some tags in 'tags_not' are present
tags_or = {Tag('tag1')}
tags_and = set()
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 8: 'tags_or' and 'tags_and' are None, match all tags
tags_not = set()
self.assertTrue(tagline.match_tags(None, None, tags_not))
# Test case 9: 'tags_or' and 'tags_and' are None, match all tags except excluded tags
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(None, None, tags_not))

391
tests/test_message.py Normal file
View File

@ -0,0 +1,391 @@
import pathlib
import tempfile
from typing import cast
from .test_main import CmmTestCase
from chatmastermind.message import source_code, Message, MessageError, Question, Answer, AILine, ModelLine
from chatmastermind.tags import Tag, TagLine
class SourceCodeTestCase(CmmTestCase):
def test_source_code_with_include_delims(self) -> None:
text = """
Some text before the code block
```python
print("Hello, World!")
```
Some text after the code block
```python
x = 10
y = 20
print(x + y)
```
"""
expected_result = [
" ```python\n print(\"Hello, World!\")\n ```\n",
" ```python\n x = 10\n y = 20\n print(x + y)\n ```\n"
]
result = source_code(text, include_delims=True)
self.assertEqual(result, expected_result)
def test_source_code_without_include_delims(self) -> None:
text = """
Some text before the code block
```python
print("Hello, World!")
```
Some text after the code block
```python
x = 10
y = 20
print(x + y)
```
"""
expected_result = [
" print(\"Hello, World!\")\n",
" x = 10\n y = 20\n print(x + y)\n"
]
result = source_code(text, include_delims=False)
self.assertEqual(result, expected_result)
def test_source_code_with_single_code_block(self) -> None:
text = "```python\nprint(\"Hello, World!\")\n```"
expected_result = ["```python\nprint(\"Hello, World!\")\n```\n"]
result = source_code(text, include_delims=True)
self.assertEqual(result, expected_result)
def test_source_code_with_no_code_blocks(self) -> None:
text = "Some text without any code blocks"
expected_result: list[str] = []
result = source_code(text, include_delims=True)
self.assertEqual(result, expected_result)
class QuestionTestCase(CmmTestCase):
def test_question_with_prefix(self) -> None:
with self.assertRaises(MessageError):
Question("=== QUESTION === What is your name?")
def test_question_without_prefix(self) -> None:
question = Question("What is your favorite color?")
self.assertIsInstance(question, Question)
self.assertEqual(question, "What is your favorite color?")
class AnswerTestCase(CmmTestCase):
def test_answer_with_prefix(self) -> None:
with self.assertRaises(MessageError):
Answer("=== ANSWER === Yes")
def test_answer_without_prefix(self) -> None:
answer = Answer("No")
self.assertIsInstance(answer, Answer)
self.assertEqual(answer, "No")
class MessageToFileTxtTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
self.message = Message(Question('This is a question.'),
Answer('This is an answer.'),
{Tag('tag1'), Tag('tag2')},
ai='ChatGPT',
model='gpt-3.5-turbo',
file_path=self.file_path)
def tearDown(self) -> None:
self.file.close()
self.file_path.unlink()
def test_to_file_txt(self) -> None:
self.message.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{TagLine.prefix} tag1 tag2
{AILine.prefix} ChatGPT
{ModelLine.prefix} gpt-3.5-turbo
{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
"""
self.assertEqual(content, expected_content)
def test_to_file_unsupported_file_type(self) -> None:
unsupported_file_path = pathlib.Path("example.doc")
with self.assertRaises(MessageError) as cm:
self.message.to_file(unsupported_file_path)
self.assertEqual(str(cm.exception), "File type '.doc' is not supported")
def test_to_file_no_file_path(self) -> None:
"""
Provoke an exception using an empty path.
"""
with self.assertRaises(MessageError) as cm:
# clear the internal file_path
self.message.file_path = None
self.message.to_file(None)
self.assertEqual(str(cm.exception), "Got no valid path to write message")
# reset the internal file_path
self.message.file_path = self.file_path
class MessageToFileYamlTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path = pathlib.Path(self.file.name)
self.message = Message(Question('This is a question.'),
Answer('This is an answer.'),
{Tag('tag1'), Tag('tag2')},
ai='ChatGPT',
model='gpt-3.5-turbo',
file_path=self.file_path)
self.message_multiline = Message(Question('This is a\nmultiline question.'),
Answer('This is a\nmultiline answer.'),
{Tag('tag1'), Tag('tag2')},
ai='ChatGPT',
model='gpt-3.5-turbo',
file_path=self.file_path)
def tearDown(self) -> None:
self.file.close()
self.file_path.unlink()
def test_to_file_yaml(self) -> None:
self.message.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{Question.yaml_key}: This is a question.
{Answer.yaml_key}: This is an answer.
{Message.ai_yaml_key}: ChatGPT
{Message.model_yaml_key}: gpt-3.5-turbo
{Message.tags_yaml_key}:
- tag1
- tag2
"""
self.assertEqual(content, expected_content)
def test_to_file_yaml_multiline(self) -> None:
self.message_multiline.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{Question.yaml_key}: |-
This is a
multiline question.
{Answer.yaml_key}: |-
This is a
multiline answer.
{Message.ai_yaml_key}: ChatGPT
{Message.model_yaml_key}: gpt-3.5-turbo
{Message.tags_yaml_key}:
- tag1
- tag2
"""
self.assertEqual(content, expected_content)
class MessageFromFileTxtTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
with open(self.file_path, "w") as fd:
fd.write(f"""{TagLine.prefix} tag1 tag2
{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
""")
self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path_min = pathlib.Path(self.file_min.name)
with open(self.file_path_min, "w") as fd:
fd.write(f"""{Question.txt_header}
This is a question.
""")
def tearDown(self) -> None:
self.file.close()
self.file_min.close()
self.file_path.unlink()
self.file_path_min.unlink()
def test_from_file_txt_complete(self) -> None:
"""
Read a complete message (with all optional values).
"""
message = Message.from_file(self.file_path)
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_txt_min(self) -> None:
"""
Read a message with only required values.
"""
message = Message.from_file(self.file_path_min)
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.file_path, self.file_path_min)
self.assertIsNone(message.answer)
def test_from_file_txt_tags_match(self) -> None:
message = Message.from_file(self.file_path, tags_or={Tag('tag1')})
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_txt_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path, tags_or={Tag('tag3')})
self.assertIsNone(message)
def test_from_file_txt_no_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path_min, tags_or={Tag('tag1')})
self.assertIsNone(message)
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
message = Message.from_file(self.file_path_min, tags_not={Tag('tag1')})
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_min)
def test_from_file_not_exists(self) -> None:
file_not_exists = pathlib.Path("example.txt")
with self.assertRaises(MessageError) as cm:
Message.from_file(file_not_exists)
self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist")
class MessageFromFileYamlTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path = pathlib.Path(self.file.name)
with open(self.file_path, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
{Answer.yaml_key}: |-
This is an answer.
{Message.tags_yaml_key}:
- tag1
- tag2
""")
self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path_min = pathlib.Path(self.file_min.name)
with open(self.file_path_min, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
""")
def tearDown(self) -> None:
self.file.close()
self.file_path.unlink()
self.file_min.close()
self.file_path_min.unlink()
def test_from_file_yaml_complete(self) -> None:
"""
Read a complete message (with all optional values).
"""
message = Message.from_file(self.file_path)
self.assertIsInstance(message, Message)
self.assertIsNotNone(message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_yaml_min(self) -> None:
"""
Read a message with only the required values.
"""
message = Message.from_file(self.file_path_min)
self.assertIsInstance(message, Message)
self.assertIsNotNone(message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_min)
self.assertIsNone(message.answer)
def test_from_file_not_exists(self) -> None:
file_not_exists = pathlib.Path("example.yaml")
with self.assertRaises(MessageError) as cm:
Message.from_file(file_not_exists)
self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist")
def test_from_file_yaml_tags_match(self) -> None:
message = Message.from_file(self.file_path, tags_or={Tag('tag1')})
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_yaml_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path, tags_or={Tag('tag3')})
self.assertIsNone(message)
def test_from_file_yaml_no_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path_min, tags_or={Tag('tag1')})
self.assertIsNone(message)
def test_from_file_yaml_no_tags_match_tags_not(self) -> None:
message = Message.from_file(self.file_path_min, tags_not={Tag('tag1')})
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_min)
class TagsFromFileTestCase(CmmTestCase):
def setUp(self) -> None:
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path_txt = pathlib.Path(self.file_txt.name)
with open(self.file_path_txt, "w") as fd:
fd.write(f"""{TagLine.prefix} tag1 tag2
{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
""")
self.file_yaml = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path_yaml = pathlib.Path(self.file_yaml.name)
with open(self.file_path_yaml, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
{Answer.yaml_key}: |-
This is an answer.
{Message.tags_yaml_key}:
- tag1
- tag2
""")
def test_tags_from_file_txt(self) -> None:
tags = Message.tags_from_file(self.file_path_txt)
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_tags_from_file_yaml(self) -> None:
tags = Message.tags_from_file(self.file_path_yaml)
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})

124
tests/test_tags.py Normal file
View File

@ -0,0 +1,124 @@
from .test_main import CmmTestCase
from chatmastermind.tags import Tag, TagLine, TagError
class TestTag(CmmTestCase):
def test_valid_tag(self) -> None:
tag = Tag('mytag')
self.assertEqual(tag, 'mytag')
def test_invalid_tag(self) -> None:
with self.assertRaises(TagError):
Tag('tag with space')
def test_default_separator(self) -> None:
self.assertEqual(Tag.default_separator, ' ')
def test_alternative_separators(self) -> None:
self.assertEqual(Tag.alternative_separators, [','])
class TestTagLine(CmmTestCase):
def test_valid_tagline(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_valid_tagline_with_newline(self) -> None:
tagline = TagLine('TAGS: tag1\n tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_invalid_tagline(self) -> None:
with self.assertRaises(TagError):
TagLine('tag1 tag2')
def test_prefix(self) -> None:
self.assertEqual(TagLine.prefix, 'TAGS:')
def test_from_set(self) -> None:
tags = {Tag('tag1'), Tag('tag2')}
tagline = TagLine.from_set(tags)
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_tags_with_newline(self) -> None:
tagline = TagLine('TAGS: tag1\n tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_merge(self) -> None:
tagline1 = TagLine('TAGS: tag1 tag2')
tagline2 = TagLine('TAGS: tag2 tag3')
merged_tagline = tagline1.merge({tagline2})
self.assertEqual(merged_tagline, 'TAGS: tag1 tag2 tag3')
def test_delete_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
new_tagline = tagline.delete_tags({Tag('tag1'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag2')
def test_add_tags(self) -> None:
tagline = TagLine('TAGS: tag1')
new_tagline = tagline.add_tags({Tag('tag2'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag1 tag2 tag3')
def test_rename_tags(self) -> None:
tagline = TagLine('TAGS: old1 old2')
new_tagline = tagline.rename_tags({(Tag('old1'), Tag('new1')), (Tag('old2'), Tag('new2'))})
self.assertEqual(new_tagline, 'TAGS: new1 new2')
def test_match_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
# Test case 1: Match any tag in 'tags_or'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and: set[Tag] = set()
tags_not: set[Tag] = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 2: Match all tags in 'tags_and'
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 3: Match any tag in 'tags_or' and match all tags in 'tags_and'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 4: Match any tag in 'tags_or', match all tags in 'tags_and', and exclude tags in 'tags_not'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = {Tag('tag5')}
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 5: No matching tags in 'tags_or'
tags_or = {Tag('tag4'), Tag('tag5')}
tags_and = set()
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 6: Not all tags in 'tags_and' are present
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3'), Tag('tag4')}
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 7: Some tags in 'tags_not' are present
tags_or = {Tag('tag1')}
tags_and = set()
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 8: 'tags_or' and 'tags_and' are None, match all tags
tags_not = set()
self.assertTrue(tagline.match_tags(None, None, tags_not))
# Test case 9: 'tags_or' and 'tags_and' are None, match all tags except excluded tags
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(None, None, tags_not))