Compare commits

...

8 Commits

8 changed files with 1274 additions and 125 deletions

1
.gitignore vendored
View File

@ -131,3 +131,4 @@ dmypy.json
.config.yaml
db
noweb
Session.vim

143
chatmastermind/chat.py Normal file
View File

@ -0,0 +1,143 @@
"""
Module implementing various chat classes and functions for managing a chat history.
"""
import shutil
from pprint import PrettyPrinter
import pathlib
from dataclasses import dataclass, field
from typing import TypeVar, Type, Optional, ClassVar, Any
from .message import Message, MessageFilter, MessageError
ChatInst = TypeVar('ChatInst', bound='Chat')
ChatDirInst = TypeVar('ChatDirInst', bound='ChatDir')
class ChatError(Exception):
pass
def terminal_width() -> int:
return shutil.get_terminal_size().columns
def pp(*args: Any, **kwargs: Any) -> None:
return PrettyPrinter(width=terminal_width()).pprint(*args, **kwargs)
@dataclass
class Chat:
"""
A class containing a complete chat history.
"""
messages: list[Message]
def filter(self, mfilter: MessageFilter) -> None:
"""
Use 'Message.match(mfilter) to remove all messages that
don't fulfill the filter requirements.
"""
self.messages = [m for m in self.messages if m.match(mfilter)]
def print(self, dump: bool = False) -> None:
if dump:
pp(self)
return
# for message in self.messages:
# text_too_long = len(message['content']) > terminal_width() - len(message['role']) - 2
# if source_code:
# display_source_code(message['content'])
# continue
# if message['role'] == 'user':
# print('-' * terminal_width())
# if text_too_long:
# print(f"{message['role'].upper()}:")
# print(message['content'])
# else:
# print(f"{message['role'].upper()}: {message['content']}")
@dataclass
class ChatDir(Chat):
"""
A Chat class that is bound to a given directory. Supports reading
and writing messages from / to that directory.
"""
default_file_suffix: ClassVar[str] = '.txt'
directory: pathlib.Path
# a MessageFilter that all messages must match (if given)
mfilter: Optional[MessageFilter] = None
file_suffix: str = default_file_suffix
# set containing all file names of the current messages
message_files: set[str] = field(default_factory=set)
@classmethod
def from_dir(cls: Type[ChatDirInst],
path: pathlib.Path,
glob: Optional[str] = None,
mfilter: Optional[MessageFilter] = None) -> ChatDirInst:
"""
Create a ChatDir instance from the given directory. If 'glob' is specified,
files will be filtered using 'path.glob()', otherwise it uses 'path.iterdir()'.
Messages are created using 'Message.from_file()' and the optional MessageFilter.
"""
messages: list[Message] = []
message_files: set[str] = set()
file_iter = path.glob(glob) if glob else path.iterdir()
for file_path in sorted(file_iter):
if file_path.is_file():
try:
message = Message.from_file(file_path, mfilter)
if message:
messages.append(message)
message_files.add(file_path.name)
except MessageError as e:
print(f"Error processing message in '{file_path}': {str(e)}")
return cls(messages, path, mfilter, cls.default_file_suffix, message_files)
@classmethod
def from_messages(cls: Type[ChatDirInst],
path: pathlib.Path,
messages: list[Message],
mfilter: Optional[MessageFilter]) -> ChatDirInst:
"""
Create a ChatDir instance from the given message list.
Note that the next call to 'dump()' will write all files
in order to synchronize the messages. 'update()' is not
supported until after the first 'dump()'.
"""
return cls(messages, path, mfilter)
def get_next_fid(self) -> int:
next_fname = self.directory / '.next'
try:
with open(next_fname, 'r') as f:
return int(f.read()) + 1
except Exception:
return 1
def set_next_fid(self, fid: int) -> None:
next_fname = self.directory / '.next'
with open(next_fname, 'w') as f:
f.write(f'{fid}')
def dump(self, force_all: bool = False) -> None:
"""
Writes all messages to the bound directory. If a message has no file_path,
it will create a new one. By default, only messages that have not been
written (or read) before will be dumped. Use 'force_all' to force writing
all message files.
"""
# FIXME: write to 'db' subfolder or given folder
for message in self.messages:
# skip messages that we have already written (or read)
if message.file_path and message.file_path in self.message_files and not force_all:
continue
file_path = message.file_path
if not file_path:
fid = self.get_next_fid()
file_path = self.directory / f"{fid:04d}{self.file_suffix}"
self.set_next_fid(fid)
message.to_file(file_path)

View File

@ -63,4 +63,7 @@ class Config():
def to_file(self, path: str) -> None:
with open(path, 'w') as f:
yaml.dump(asdict(self), f)
yaml.dump(asdict(self), f, sort_keys=False)
def as_dict(self) -> dict[str, Any]:
return asdict(self)

413
chatmastermind/message.py Normal file
View File

@ -0,0 +1,413 @@
"""
Module implementing message related functions and classes.
"""
import pathlib
import yaml
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal
from dataclasses import dataclass, asdict
from .tags import Tag, TagLine, TagError, match_tags
QuestionInst = TypeVar('QuestionInst', bound='Question')
AnswerInst = TypeVar('AnswerInst', bound='Answer')
MessageInst = TypeVar('MessageInst', bound='Message')
AILineInst = TypeVar('AILineInst', bound='AILine')
ModelLineInst = TypeVar('ModelLineInst', bound='ModelLine')
YamlDict = dict[str, Union[QuestionInst, AnswerInst, set[Tag]]]
class MessageError(Exception):
pass
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
"""
Changes the YAML dump style to multiline syntax for multiline strings.
"""
if len(data.splitlines()) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_presenter)
def source_code(text: str, include_delims: bool = False) -> list[str]:
"""
Extract all source code sections from the given text, i. e. all lines
surrounded by lines tarting with '```'. If 'include_delims' is True,
the surrounding lines are included, otherwise they are omitted. The
result list contains every source code section as a single string.
The order in the list represents the order of the sections in the text.
"""
code_sections: list[str] = []
code_lines: list[str] = []
in_code_block = False
for line in text.split('\n'):
if line.strip().startswith('```'):
if include_delims:
code_lines.append(line)
if in_code_block:
code_sections.append('\n'.join(code_lines) + '\n')
code_lines.clear()
in_code_block = not in_code_block
elif in_code_block:
code_lines.append(line)
return code_sections
@dataclass(kw_only=True)
class MessageFilter:
"""
Various filters for a Message.
"""
tags_or: Optional[set[Tag]] = None
tags_and: Optional[set[Tag]] = None
tags_not: Optional[set[Tag]] = None
ai: Optional[str] = None
model: Optional[str] = None
question_contains: Optional[str] = None
answer_contains: Optional[str] = None
answer_state: Optional[Literal['available', 'missing']] = None
ai_state: Optional[Literal['available', 'missing']] = None
model_state: Optional[Literal['available', 'missing']] = None
class AILine(str):
"""
A line that represents the AI name in a '.txt' file..
"""
prefix: Final[str] = 'AI:'
def __new__(cls: Type[AILineInst], string: str) -> AILineInst:
if not string.startswith(cls.prefix):
raise TagError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
instance = super().__new__(cls, string)
return instance
def ai(self) -> str:
return self[len(self.prefix):].strip()
@classmethod
def from_ai(cls: Type[AILineInst], ai: str) -> AILineInst:
return cls(' '.join([cls.prefix, ai]))
class ModelLine(str):
"""
A line that represents the model name in a '.txt' file..
"""
prefix: Final[str] = 'MODEL:'
def __new__(cls: Type[ModelLineInst], string: str) -> ModelLineInst:
if not string.startswith(cls.prefix):
raise TagError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
instance = super().__new__(cls, string)
return instance
def model(self) -> str:
return self[len(self.prefix):].strip()
@classmethod
def from_model(cls: Type[ModelLineInst], model: str) -> ModelLineInst:
return cls(' '.join([cls.prefix, model]))
class Question(str):
"""
A single question with a defined header.
"""
txt_header: ClassVar[str] = '=== QUESTION ==='
yaml_key: ClassVar[str] = 'question'
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
"""
Make sure the question string does not contain the header.
"""
if cls.txt_header in string:
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.txt_header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
class Answer(str):
"""
A single answer with a defined header.
"""
txt_header: ClassVar[str] = '=== ANSWER ==='
yaml_key: ClassVar[str] = 'answer'
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
"""
Make sure the answer string does not contain the header.
"""
if cls.txt_header in string:
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.txt_header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.txt_header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
@dataclass
class Message():
"""
Single message. Consists of a question and optionally an answer, a set of tags
and a file path.
"""
question: Question
answer: Optional[Answer] = None # FIXME: support multiple answers
tags: Optional[set[Tag]] = None
ai: Optional[str] = None
model: Optional[str] = None
file_path: Optional[pathlib.Path] = None
# class variables
file_suffixes: ClassVar[list[str]] = ['.txt', '.yaml']
tags_yaml_key: ClassVar[str] = 'tags'
file_yaml_key: ClassVar[str] = 'file_path'
ai_yaml_key: ClassVar[str] = 'ai'
model_yaml_key: ClassVar[str] = 'model'
@classmethod
def from_dict(cls: Type[MessageInst], data: dict[str, Any]) -> MessageInst:
"""
Create a Message from the given dict.
"""
return cls(question=data[Question.yaml_key],
answer=data.get(Answer.yaml_key, None),
tags=set(data.get(cls.tags_yaml_key, [])),
ai=data.get(cls.ai_yaml_key, None),
model=data.get(cls.model_yaml_key, None),
file_path=data.get(cls.file_yaml_key, None))
@classmethod
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
"""
Return only the tags from the given Message file.
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
tags = TagLine(fd.readline()).tags()
else: # '.yaml'
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
tags = set(sorted(data[cls.tags_yaml_key]))
return tags
@classmethod
def from_file(cls: Type[MessageInst], file_path: pathlib.Path,
mfilter: Optional[MessageFilter] = None) -> Optional[MessageInst]:
"""
Create a Message from the given file. Returns 'None' if the message does
not fulfill the filter requirements. For TXT files, the tags are matched
before building the whole message. The other filters are applied afterwards.
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
if file_path.suffix == '.txt':
message = cls.__from_file_txt(file_path,
mfilter.tags_or if mfilter else None,
mfilter.tags_and if mfilter else None,
mfilter.tags_not if mfilter else None)
else:
message = cls.__from_file_yaml(file_path)
if message and (not mfilter or (mfilter and message.match(mfilter))):
return message
else:
return None
@classmethod
def __from_file_txt(cls: Type[MessageInst], file_path: pathlib.Path, # noqa: 11
tags_or: Optional[set[Tag]] = None,
tags_and: Optional[set[Tag]] = None,
tags_not: Optional[set[Tag]] = None) -> Optional[MessageInst]:
"""
Create a Message from the given TXT file. Expects the following file structures:
For '.txt':
* TagLine [Optional]
* AI [Optional]
* Model [Optional]
* Question.txt_header
* Question
* Answer.txt_header [Optional]
* Answer [Optional]
Returns 'None' if the message does not fulfill the tag requirements.
"""
tags: set[Tag] = set()
question: Question
answer: Optional[Answer] = None
ai: Optional[str] = None
model: Optional[str] = None
with open(file_path, "r") as fd:
# TagLine (Optional)
try:
pos = fd.tell()
tags = TagLine(fd.readline()).tags()
except TagError:
fd.seek(pos)
if tags_or or tags_and or tags_not:
# match with an empty set if the file has no tags
if not match_tags(tags, tags_or, tags_and, tags_not):
return None
# AILine (Optional)
try:
pos = fd.tell()
ai = AILine(fd.readline()).ai()
except TagError:
fd.seek(pos)
# ModelLine (Optional)
try:
pos = fd.tell()
model = ModelLine(fd.readline()).model()
except TagError:
fd.seek(pos)
# Question and Answer
text = fd.read().strip().split('\n')
question_idx = text.index(Question.txt_header) + 1
try:
answer_idx = text.index(Answer.txt_header)
question = Question.from_list(text[question_idx:answer_idx])
answer = Answer.from_list(text[answer_idx + 1:])
except ValueError:
question = Question.from_list(text[question_idx:])
return cls(question, answer, tags, ai, model, file_path)
@classmethod
def __from_file_yaml(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst:
"""
Create a Message from the given YAML file. Expects the following file structures:
* Question.yaml_key: single or multiline string
* Answer.yaml_key: single or multiline string [Optional]
* Message.tags_yaml_key: list of strings [Optional]
* Message.ai_yaml_key: str [Optional]
* Message.model_yaml_key: str [Optional]
"""
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
data[cls.file_yaml_key] = file_path
return cls.from_dict(data)
def to_file(self, file_path: Optional[pathlib.Path]=None) -> None: # noqa: 11
"""
Write a Message to the given file. Type is determined based on the suffix.
Currently supported suffixes: ['.txt', '.yaml']
"""
if file_path:
self.file_path = file_path
if not self.file_path:
raise MessageError("Got no valid path to write message")
if self.file_path.suffix not in self.file_suffixes:
raise MessageError(f"File type '{self.file_path.suffix}' is not supported")
# TXT
if self.file_path.suffix == '.txt':
return self.__to_file_txt(self.file_path)
elif self.file_path.suffix == '.yaml':
return self.__to_file_yaml(self.file_path)
def __to_file_txt(self, file_path: pathlib.Path) -> None:
"""
Write a Message to the given file in TXT format.
Creates the following file structures:
* TagLine
* AI [Optional]
* Model [Optional]
* Question.txt_header
* Question
* Answer.txt_header
* Answer
"""
with open(file_path, "w") as fd:
if self.tags:
fd.write(f'{TagLine.from_set(self.tags)}\n')
if self.ai:
fd.write(f'{AILine.from_ai(self.ai)}\n')
if self.model:
fd.write(f'{ModelLine.from_model(self.model)}\n')
fd.write(f'{Question.txt_header}\n{self.question}\n')
if self.answer:
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
"""
Write a Message to the given file in YAML format.
Creates the following file structures:
* Question.yaml_key: single or multiline string
* Answer.yaml_key: single or multiline string
* Message.tags_yaml_key: list of strings
* Message.ai_yaml_key: str [Optional]
* Message.model_yaml_key: str [Optional]
"""
with open(file_path, "w") as fd:
data: YamlDict = {Question.yaml_key: str(self.question)}
if self.answer:
data[Answer.yaml_key] = str(self.answer)
if self.ai:
data[self.ai_yaml_key] = self.ai
if self.model:
data[self.model_yaml_key] = self.model
if self.tags:
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, fd, sort_keys=False)
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
"""
Matches the current Message to the given filter atttributes.
Return True if all attributes match, else False.
"""
mytags = self.tags or set()
if (((mfilter.tags_or or mfilter.tags_and or mfilter.tags_not)
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
or (mfilter.question_contains and mfilter.question_contains not in self.question) # noqa: W503
or (mfilter.answer_contains and (not self.answer or mfilter.answer_contains not in self.answer)) # noqa: W503
or (mfilter.answer_state == 'available' and not self.answer) # noqa: W503
or (mfilter.ai_state == 'available' and not self.ai) # noqa: W503
or (mfilter.model_state == 'available' and not self.model) # noqa: W503
or (mfilter.answer_state == 'missing' and self.answer) # noqa: W503
or (mfilter.ai_state == 'missing' and self.ai) # noqa: W503
or (mfilter.model_state == 'missing' and self.model)): # noqa: W503
return False
return True
def as_dict(self) -> dict[str, Any]:
return asdict(self)

View File

@ -1,7 +1,7 @@
"""
Module implementing tag related functions and classes.
"""
from typing import Type, TypeVar, Optional
from typing import Type, TypeVar, Optional, Final
TagInst = TypeVar('TagInst', bound='Tag')
TagLineInst = TypeVar('TagLineInst', bound='TagLine')
@ -16,9 +16,9 @@ class Tag(str):
A single tag. A string that can contain anything but the default separator (' ').
"""
# default separator
default_separator = ' '
default_separator: Final[str] = ' '
# alternative separators (e. g. for backwards compatibility)
alternative_separators = [',']
alternative_separators: Final[list[str]] = [',']
def __new__(cls: Type[TagInst], string: str) -> TagInst:
"""
@ -93,19 +93,21 @@ def match_tags(tags: set[Tag], tags_or: Optional[set[Tag]], tags_and: Optional[s
class TagLine(str):
"""
A line of tags. It starts with a prefix ('TAGS:'), followed by a list of tags,
separated by the defaut separator (' '). Any operations on a TagLine will sort
the tags.
A line of tags in a '.txt' file. It starts with a prefix ('TAGS:'), followed by
a list of tags, separated by the defaut separator (' '). Any operations on a
TagLine will sort the tags.
"""
# the prefix
prefix = 'TAGS:'
prefix: Final[str] = 'TAGS:'
def __new__(cls: Type[TagLineInst], string: str) -> TagLineInst:
"""
Make sure the tagline string starts with the prefix.
Make sure the tagline string starts with the prefix. Also replace newlines
and multiple spaces with ' ', in order to support multiline TagLines.
"""
if not string.startswith(cls.prefix):
raise TagError(f"TagLine '{string}' is missing prefix '{cls.prefix}'")
string = ' '.join(string.split())
instance = super().__new__(cls, string)
return instance
@ -114,7 +116,7 @@ class TagLine(str):
"""
Create a new TagLine from a set of tags.
"""
return cls(' '.join([TagLine.prefix] + sorted([t for t in tags])))
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
def tags(self) -> set[Tag]:
"""

View File

@ -7,7 +7,6 @@ from chatmastermind.main import create_parser, ask_cmd
from chatmastermind.api_client import ai
from chatmastermind.configuration import Config
from chatmastermind.storage import create_chat_hist, save_answers, dump_data
from chatmastermind.tags import Tag, TagLine, TagError
from unittest import mock
from unittest.mock import patch, MagicMock, Mock, ANY
@ -232,116 +231,3 @@ class TestCreateParser(CmmTestCase):
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
self.assertTrue('.config.yaml' in parser.get_default('config'))
class TestTag(CmmTestCase):
def test_valid_tag(self) -> None:
tag = Tag('mytag')
self.assertEqual(tag, 'mytag')
def test_invalid_tag(self) -> None:
with self.assertRaises(TagError):
Tag('tag with space')
def test_default_separator(self) -> None:
self.assertEqual(Tag.default_separator, ' ')
def test_alternative_separators(self) -> None:
self.assertEqual(Tag.alternative_separators, [','])
class TestTagLine(CmmTestCase):
def test_valid_tagline(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_invalid_tagline(self) -> None:
with self.assertRaises(TagError):
TagLine('tag1 tag2')
def test_prefix(self) -> None:
self.assertEqual(TagLine.prefix, 'TAGS:')
def test_from_set(self) -> None:
tags = {Tag('tag1'), Tag('tag2')}
tagline = TagLine.from_set(tags)
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_merge(self) -> None:
tagline1 = TagLine('TAGS: tag1 tag2')
tagline2 = TagLine('TAGS: tag2 tag3')
merged_tagline = tagline1.merge({tagline2})
self.assertEqual(merged_tagline, 'TAGS: tag1 tag2 tag3')
def test_delete_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
new_tagline = tagline.delete_tags({Tag('tag1'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag2')
def test_add_tags(self) -> None:
tagline = TagLine('TAGS: tag1')
new_tagline = tagline.add_tags({Tag('tag2'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag1 tag2 tag3')
def test_rename_tags(self) -> None:
tagline = TagLine('TAGS: old1 old2')
new_tagline = tagline.rename_tags({(Tag('old1'), Tag('new1')), (Tag('old2'), Tag('new2'))})
self.assertEqual(new_tagline, 'TAGS: new1 new2')
def test_match_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
# Test case 1: Match any tag in 'tags_or'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and: set[Tag] = set()
tags_not: set[Tag] = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 2: Match all tags in 'tags_and'
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 3: Match any tag in 'tags_or' and match all tags in 'tags_and'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 4: Match any tag in 'tags_or', match all tags in 'tags_and', and exclude tags in 'tags_not'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = {Tag('tag5')}
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 5: No matching tags in 'tags_or'
tags_or = {Tag('tag4'), Tag('tag5')}
tags_and = set()
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 6: Not all tags in 'tags_and' are present
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3'), Tag('tag4')}
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 7: Some tags in 'tags_not' are present
tags_or = {Tag('tag1')}
tags_and = set()
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 8: 'tags_or' and 'tags_and' are None, match all tags
tags_not = set()
self.assertTrue(tagline.match_tags(None, None, tags_not))
# Test case 9: 'tags_or' and 'tags_and' are None, match all tags except excluded tags
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(None, None, tags_not))

577
tests/test_message.py Normal file
View File

@ -0,0 +1,577 @@
import pathlib
import tempfile
from typing import cast
from .test_main import CmmTestCase
from chatmastermind.message import source_code, Message, MessageError, Question, Answer, AILine, ModelLine, MessageFilter
from chatmastermind.tags import Tag, TagLine
class SourceCodeTestCase(CmmTestCase):
def test_source_code_with_include_delims(self) -> None:
text = """
Some text before the code block
```python
print("Hello, World!")
```
Some text after the code block
```python
x = 10
y = 20
print(x + y)
```
"""
expected_result = [
" ```python\n print(\"Hello, World!\")\n ```\n",
" ```python\n x = 10\n y = 20\n print(x + y)\n ```\n"
]
result = source_code(text, include_delims=True)
self.assertEqual(result, expected_result)
def test_source_code_without_include_delims(self) -> None:
text = """
Some text before the code block
```python
print("Hello, World!")
```
Some text after the code block
```python
x = 10
y = 20
print(x + y)
```
"""
expected_result = [
" print(\"Hello, World!\")\n",
" x = 10\n y = 20\n print(x + y)\n"
]
result = source_code(text, include_delims=False)
self.assertEqual(result, expected_result)
def test_source_code_with_single_code_block(self) -> None:
text = "```python\nprint(\"Hello, World!\")\n```"
expected_result = ["```python\nprint(\"Hello, World!\")\n```\n"]
result = source_code(text, include_delims=True)
self.assertEqual(result, expected_result)
def test_source_code_with_no_code_blocks(self) -> None:
text = "Some text without any code blocks"
expected_result: list[str] = []
result = source_code(text, include_delims=True)
self.assertEqual(result, expected_result)
class QuestionTestCase(CmmTestCase):
def test_question_with_prefix(self) -> None:
with self.assertRaises(MessageError):
Question("=== QUESTION === What is your name?")
def test_question_without_prefix(self) -> None:
question = Question("What is your favorite color?")
self.assertIsInstance(question, Question)
self.assertEqual(question, "What is your favorite color?")
class AnswerTestCase(CmmTestCase):
def test_answer_with_prefix(self) -> None:
with self.assertRaises(MessageError):
Answer("=== ANSWER === Yes")
def test_answer_without_prefix(self) -> None:
answer = Answer("No")
self.assertIsInstance(answer, Answer)
self.assertEqual(answer, "No")
class MessageToFileTxtTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
self.message_complete = Message(Question('This is a question.'),
Answer('This is an answer.'),
{Tag('tag1'), Tag('tag2')},
ai='ChatGPT',
model='gpt-3.5-turbo',
file_path=self.file_path)
self.message_min = Message(Question('This is a question.'),
file_path=self.file_path)
def tearDown(self) -> None:
self.file.close()
self.file_path.unlink()
def test_to_file_txt_complete(self) -> None:
self.message_complete.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{TagLine.prefix} tag1 tag2
{AILine.prefix} ChatGPT
{ModelLine.prefix} gpt-3.5-turbo
{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
"""
self.assertEqual(content, expected_content)
def test_to_file_txt_min(self) -> None:
self.message_min.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{Question.txt_header}
This is a question.
"""
self.assertEqual(content, expected_content)
def test_to_file_unsupported_file_type(self) -> None:
unsupported_file_path = pathlib.Path("example.doc")
with self.assertRaises(MessageError) as cm:
self.message_complete.to_file(unsupported_file_path)
self.assertEqual(str(cm.exception), "File type '.doc' is not supported")
def test_to_file_no_file_path(self) -> None:
"""
Provoke an exception using an empty path.
"""
with self.assertRaises(MessageError) as cm:
# clear the internal file_path
self.message_complete.file_path = None
self.message_complete.to_file(None)
self.assertEqual(str(cm.exception), "Got no valid path to write message")
# reset the internal file_path
self.message_complete.file_path = self.file_path
class MessageToFileYamlTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path = pathlib.Path(self.file.name)
self.message_complete = Message(Question('This is a question.'),
Answer('This is an answer.'),
{Tag('tag1'), Tag('tag2')},
ai='ChatGPT',
model='gpt-3.5-turbo',
file_path=self.file_path)
self.message_multiline = Message(Question('This is a\nmultiline question.'),
Answer('This is a\nmultiline answer.'),
{Tag('tag1'), Tag('tag2')},
ai='ChatGPT',
model='gpt-3.5-turbo',
file_path=self.file_path)
self.message_min = Message(Question('This is a question.'),
file_path=self.file_path)
def tearDown(self) -> None:
self.file.close()
self.file_path.unlink()
def test_to_file_yaml_complete(self) -> None:
self.message_complete.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{Question.yaml_key}: This is a question.
{Answer.yaml_key}: This is an answer.
{Message.ai_yaml_key}: ChatGPT
{Message.model_yaml_key}: gpt-3.5-turbo
{Message.tags_yaml_key}:
- tag1
- tag2
"""
self.assertEqual(content, expected_content)
def test_to_file_yaml_multiline(self) -> None:
self.message_multiline.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"""{Question.yaml_key}: |-
This is a
multiline question.
{Answer.yaml_key}: |-
This is a
multiline answer.
{Message.ai_yaml_key}: ChatGPT
{Message.model_yaml_key}: gpt-3.5-turbo
{Message.tags_yaml_key}:
- tag1
- tag2
"""
self.assertEqual(content, expected_content)
def test_to_file_yaml_min(self) -> None:
self.message_min.to_file(self.file_path)
with open(self.file_path, "r") as fd:
content = fd.read()
expected_content = f"{Question.yaml_key}: This is a question.\n"
self.assertEqual(content, expected_content)
class MessageFromFileTxtTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
with open(self.file_path, "w") as fd:
fd.write(f"""{TagLine.prefix} tag1 tag2
{AILine.prefix} ChatGPT
{ModelLine.prefix} gpt-3.5-turbo
{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
""")
self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path_min = pathlib.Path(self.file_min.name)
with open(self.file_path_min, "w") as fd:
fd.write(f"""{Question.txt_header}
This is a question.
""")
def tearDown(self) -> None:
self.file.close()
self.file_min.close()
self.file_path.unlink()
self.file_path_min.unlink()
def test_from_file_txt_complete(self) -> None:
"""
Read a complete message (with all optional values).
"""
message = Message.from_file(self.file_path)
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.ai, 'ChatGPT')
self.assertEqual(message.model, 'gpt-3.5-turbo')
self.assertEqual(message.file_path, self.file_path)
def test_from_file_txt_min(self) -> None:
"""
Read a message with only required values.
"""
message = Message.from_file(self.file_path_min)
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.file_path, self.file_path_min)
self.assertIsNone(message.answer)
def test_from_file_txt_tags_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(tags_or={Tag('tag1')}))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_txt_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(tags_or={Tag('tag3')}))
self.assertIsNone(message)
def test_from_file_txt_no_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_or={Tag('tag1')}))
self.assertIsNone(message)
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_not={Tag('tag1')}))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_min)
def test_from_file_not_exists(self) -> None:
file_not_exists = pathlib.Path("example.txt")
with self.assertRaises(MessageError) as cm:
Message.from_file(file_not_exists)
self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist")
def test_from_file_txt_question_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(question_contains='question'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_txt_answer_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_contains='answer'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_txt_answer_available(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_state='available'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_txt_answer_missing(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(answer_state='missing'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_txt_question_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(question_contains='answer'))
self.assertIsNone(message)
def test_from_file_txt_answer_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_contains='question'))
self.assertIsNone(message)
def test_from_file_txt_answer_not_exists(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(answer_contains='answer'))
self.assertIsNone(message)
def test_from_file_txt_answer_not_available(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(answer_state='available'))
self.assertIsNone(message)
def test_from_file_txt_answer_not_missing(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_state='missing'))
self.assertIsNone(message)
def test_from_file_txt_ai_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(ai='ChatGPT'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_txt_ai_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(ai='Foo'))
self.assertIsNone(message)
def test_from_file_txt_model_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(model='gpt-3.5-turbo'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_txt_model_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(model='Bar'))
self.assertIsNone(message)
class MessageFromFileYamlTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path = pathlib.Path(self.file.name)
with open(self.file_path, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
{Answer.yaml_key}: |-
This is an answer.
{Message.ai_yaml_key}: ChatGPT
{Message.model_yaml_key}: gpt-3.5-turbo
{Message.tags_yaml_key}:
- tag1
- tag2
""")
self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path_min = pathlib.Path(self.file_min.name)
with open(self.file_path_min, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
""")
def tearDown(self) -> None:
self.file.close()
self.file_path.unlink()
self.file_min.close()
self.file_path_min.unlink()
def test_from_file_yaml_complete(self) -> None:
"""
Read a complete message (with all optional values).
"""
message = Message.from_file(self.file_path)
self.assertIsInstance(message, Message)
self.assertIsNotNone(message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.ai, 'ChatGPT')
self.assertEqual(message.model, 'gpt-3.5-turbo')
self.assertEqual(message.file_path, self.file_path)
def test_from_file_yaml_min(self) -> None:
"""
Read a message with only the required values.
"""
message = Message.from_file(self.file_path_min)
self.assertIsInstance(message, Message)
self.assertIsNotNone(message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_min)
self.assertIsNone(message.answer)
def test_from_file_not_exists(self) -> None:
file_not_exists = pathlib.Path("example.yaml")
with self.assertRaises(MessageError) as cm:
Message.from_file(file_not_exists)
self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist")
def test_from_file_yaml_tags_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(tags_or={Tag('tag1')}))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_yaml_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(tags_or={Tag('tag3')}))
self.assertIsNone(message)
def test_from_file_yaml_no_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_or={Tag('tag1')}))
self.assertIsNone(message)
def test_from_file_yaml_no_tags_match_tags_not(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_not={Tag('tag1')}))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message: # mypy bug
self.assertEqual(message.question, 'This is a question.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_min)
def test_from_file_yaml_question_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(question_contains='question'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_yaml_answer_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_contains='answer'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_yaml_answer_available(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_state='available'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_yaml_answer_missing(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(answer_state='missing'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_yaml_question_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(question_contains='answer'))
self.assertIsNone(message)
def test_from_file_yaml_answer_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_contains='question'))
self.assertIsNone(message)
def test_from_file_yaml_answer_not_exists(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(answer_contains='answer'))
self.assertIsNone(message)
def test_from_file_yaml_answer_not_available(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(answer_state='available'))
self.assertIsNone(message)
def test_from_file_yaml_answer_not_missing(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(answer_state='missing'))
self.assertIsNone(message)
def test_from_file_yaml_ai_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(ai='ChatGPT'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_yaml_ai_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(ai='Foo'))
self.assertIsNone(message)
def test_from_file_yaml_model_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(model='gpt-3.5-turbo'))
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
def test_from_file_yaml_model_doesnt_match(self) -> None:
message = Message.from_file(self.file_path,
MessageFilter(model='Bar'))
self.assertIsNone(message)
class TagsFromFileTestCase(CmmTestCase):
def setUp(self) -> None:
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path_txt = pathlib.Path(self.file_txt.name)
with open(self.file_path_txt, "w") as fd:
fd.write(f"""{TagLine.prefix} tag1 tag2
{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
""")
self.file_yaml = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path_yaml = pathlib.Path(self.file_yaml.name)
with open(self.file_path_yaml, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
{Answer.yaml_key}: |-
This is an answer.
{Message.tags_yaml_key}:
- tag1
- tag2
""")
def tearDown(self) -> None:
self.file_txt.close()
self.file_path_txt.unlink()
self.file_yaml.close()
self.file_path_yaml.unlink()
def test_tags_from_file_txt(self) -> None:
tags = Message.tags_from_file(self.file_path_txt)
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_tags_from_file_yaml(self) -> None:
tags = Message.tags_from_file(self.file_path_yaml)
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})

124
tests/test_tags.py Normal file
View File

@ -0,0 +1,124 @@
from .test_main import CmmTestCase
from chatmastermind.tags import Tag, TagLine, TagError
class TestTag(CmmTestCase):
def test_valid_tag(self) -> None:
tag = Tag('mytag')
self.assertEqual(tag, 'mytag')
def test_invalid_tag(self) -> None:
with self.assertRaises(TagError):
Tag('tag with space')
def test_default_separator(self) -> None:
self.assertEqual(Tag.default_separator, ' ')
def test_alternative_separators(self) -> None:
self.assertEqual(Tag.alternative_separators, [','])
class TestTagLine(CmmTestCase):
def test_valid_tagline(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_valid_tagline_with_newline(self) -> None:
tagline = TagLine('TAGS: tag1\n tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_invalid_tagline(self) -> None:
with self.assertRaises(TagError):
TagLine('tag1 tag2')
def test_prefix(self) -> None:
self.assertEqual(TagLine.prefix, 'TAGS:')
def test_from_set(self) -> None:
tags = {Tag('tag1'), Tag('tag2')}
tagline = TagLine.from_set(tags)
self.assertEqual(tagline, 'TAGS: tag1 tag2')
def test_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_tags_with_newline(self) -> None:
tagline = TagLine('TAGS: tag1\n tag2')
tags = tagline.tags()
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
def test_merge(self) -> None:
tagline1 = TagLine('TAGS: tag1 tag2')
tagline2 = TagLine('TAGS: tag2 tag3')
merged_tagline = tagline1.merge({tagline2})
self.assertEqual(merged_tagline, 'TAGS: tag1 tag2 tag3')
def test_delete_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
new_tagline = tagline.delete_tags({Tag('tag1'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag2')
def test_add_tags(self) -> None:
tagline = TagLine('TAGS: tag1')
new_tagline = tagline.add_tags({Tag('tag2'), Tag('tag3')})
self.assertEqual(new_tagline, 'TAGS: tag1 tag2 tag3')
def test_rename_tags(self) -> None:
tagline = TagLine('TAGS: old1 old2')
new_tagline = tagline.rename_tags({(Tag('old1'), Tag('new1')), (Tag('old2'), Tag('new2'))})
self.assertEqual(new_tagline, 'TAGS: new1 new2')
def test_match_tags(self) -> None:
tagline = TagLine('TAGS: tag1 tag2 tag3')
# Test case 1: Match any tag in 'tags_or'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and: set[Tag] = set()
tags_not: set[Tag] = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 2: Match all tags in 'tags_and'
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 3: Match any tag in 'tags_or' and match all tags in 'tags_and'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = set()
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 4: Match any tag in 'tags_or', match all tags in 'tags_and', and exclude tags in 'tags_not'
tags_or = {Tag('tag1'), Tag('tag4')}
tags_and = {Tag('tag1'), Tag('tag2')}
tags_not = {Tag('tag5')}
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 5: No matching tags in 'tags_or'
tags_or = {Tag('tag4'), Tag('tag5')}
tags_and = set()
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 6: Not all tags in 'tags_and' are present
tags_or = set()
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3'), Tag('tag4')}
tags_not = set()
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 7: Some tags in 'tags_not' are present
tags_or = {Tag('tag1')}
tags_and = set()
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
# Test case 8: 'tags_or' and 'tags_and' are None, match all tags
tags_not = set()
self.assertTrue(tagline.match_tags(None, None, tags_not))
# Test case 9: 'tags_or' and 'tags_and' are None, match all tags except excluded tags
tags_not = {Tag('tag2')}
self.assertFalse(tagline.match_tags(None, None, tags_not))