Compare commits
8 Commits
6b3855ef08
...
b88a6b9a92
| Author | SHA1 | Date | |
|---|---|---|---|
| b88a6b9a92 | |||
| 6eca2003a1 | |||
| 922e66e4ee | |||
| 68a2e99c3c | |||
| 2c589281b9 | |||
| 029e54ac14 | |||
| fa5d88f7a1 | |||
| 4b54e96ab5 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -130,4 +130,5 @@ dmypy.json
|
|||||||
|
|
||||||
.config.yaml
|
.config.yaml
|
||||||
db
|
db
|
||||||
noweb
|
noweb
|
||||||
|
Session.vim
|
||||||
|
|||||||
143
chatmastermind/chat.py
Normal file
143
chatmastermind/chat.py
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
"""
|
||||||
|
Module implementing various chat classes and functions for managing a chat history.
|
||||||
|
"""
|
||||||
|
import shutil
|
||||||
|
from pprint import PrettyPrinter
|
||||||
|
import pathlib
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import TypeVar, Type, Optional, ClassVar, Any
|
||||||
|
from .message import Message, MessageFilter, MessageError
|
||||||
|
|
||||||
|
ChatInst = TypeVar('ChatInst', bound='Chat')
|
||||||
|
ChatDirInst = TypeVar('ChatDirInst', bound='ChatDir')
|
||||||
|
|
||||||
|
|
||||||
|
class ChatError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def terminal_width() -> int:
|
||||||
|
return shutil.get_terminal_size().columns
|
||||||
|
|
||||||
|
|
||||||
|
def pp(*args: Any, **kwargs: Any) -> None:
|
||||||
|
return PrettyPrinter(width=terminal_width()).pprint(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Chat:
|
||||||
|
"""
|
||||||
|
A class containing a complete chat history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
messages: list[Message]
|
||||||
|
|
||||||
|
def filter(self, mfilter: MessageFilter) -> None:
|
||||||
|
"""
|
||||||
|
Use 'Message.match(mfilter) to remove all messages that
|
||||||
|
don't fulfill the filter requirements.
|
||||||
|
"""
|
||||||
|
self.messages = [m for m in self.messages if m.match(mfilter)]
|
||||||
|
|
||||||
|
def print(self, dump: bool = False) -> None:
|
||||||
|
if dump:
|
||||||
|
pp(self)
|
||||||
|
return
|
||||||
|
# for message in self.messages:
|
||||||
|
# text_too_long = len(message['content']) > terminal_width() - len(message['role']) - 2
|
||||||
|
# if source_code:
|
||||||
|
# display_source_code(message['content'])
|
||||||
|
# continue
|
||||||
|
# if message['role'] == 'user':
|
||||||
|
# print('-' * terminal_width())
|
||||||
|
# if text_too_long:
|
||||||
|
# print(f"{message['role'].upper()}:")
|
||||||
|
# print(message['content'])
|
||||||
|
# else:
|
||||||
|
# print(f"{message['role'].upper()}: {message['content']}")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ChatDir(Chat):
|
||||||
|
"""
|
||||||
|
A Chat class that is bound to a given directory. Supports reading
|
||||||
|
and writing messages from / to that directory.
|
||||||
|
"""
|
||||||
|
|
||||||
|
default_file_suffix: ClassVar[str] = '.txt'
|
||||||
|
|
||||||
|
directory: pathlib.Path
|
||||||
|
# a MessageFilter that all messages must match (if given)
|
||||||
|
mfilter: Optional[MessageFilter] = None
|
||||||
|
file_suffix: str = default_file_suffix
|
||||||
|
# set containing all file names of the current messages
|
||||||
|
message_files: set[str] = field(default_factory=set)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dir(cls: Type[ChatDirInst],
|
||||||
|
path: pathlib.Path,
|
||||||
|
glob: Optional[str] = None,
|
||||||
|
mfilter: Optional[MessageFilter] = None) -> ChatDirInst:
|
||||||
|
"""
|
||||||
|
Create a ChatDir instance from the given directory. If 'glob' is specified,
|
||||||
|
files will be filtered using 'path.glob()', otherwise it uses 'path.iterdir()'.
|
||||||
|
Messages are created using 'Message.from_file()' and the optional MessageFilter.
|
||||||
|
"""
|
||||||
|
messages: list[Message] = []
|
||||||
|
message_files: set[str] = set()
|
||||||
|
file_iter = path.glob(glob) if glob else path.iterdir()
|
||||||
|
for file_path in sorted(file_iter):
|
||||||
|
if file_path.is_file():
|
||||||
|
try:
|
||||||
|
message = Message.from_file(file_path, mfilter)
|
||||||
|
if message:
|
||||||
|
messages.append(message)
|
||||||
|
message_files.add(file_path.name)
|
||||||
|
except MessageError as e:
|
||||||
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
||||||
|
return cls(messages, path, mfilter, cls.default_file_suffix, message_files)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_messages(cls: Type[ChatDirInst],
|
||||||
|
path: pathlib.Path,
|
||||||
|
messages: list[Message],
|
||||||
|
mfilter: Optional[MessageFilter]) -> ChatDirInst:
|
||||||
|
"""
|
||||||
|
Create a ChatDir instance from the given message list.
|
||||||
|
Note that the next call to 'dump()' will write all files
|
||||||
|
in order to synchronize the messages. 'update()' is not
|
||||||
|
supported until after the first 'dump()'.
|
||||||
|
"""
|
||||||
|
return cls(messages, path, mfilter)
|
||||||
|
|
||||||
|
def get_next_fid(self) -> int:
|
||||||
|
next_fname = self.directory / '.next'
|
||||||
|
try:
|
||||||
|
with open(next_fname, 'r') as f:
|
||||||
|
return int(f.read()) + 1
|
||||||
|
except Exception:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
def set_next_fid(self, fid: int) -> None:
|
||||||
|
next_fname = self.directory / '.next'
|
||||||
|
with open(next_fname, 'w') as f:
|
||||||
|
f.write(f'{fid}')
|
||||||
|
|
||||||
|
def dump(self, force_all: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Writes all messages to the bound directory. If a message has no file_path,
|
||||||
|
it will create a new one. By default, only messages that have not been
|
||||||
|
written (or read) before will be dumped. Use 'force_all' to force writing
|
||||||
|
all message files.
|
||||||
|
"""
|
||||||
|
# FIXME: write to 'db' subfolder or given folder
|
||||||
|
for message in self.messages:
|
||||||
|
# skip messages that we have already written (or read)
|
||||||
|
if message.file_path and message.file_path in self.message_files and not force_all:
|
||||||
|
continue
|
||||||
|
file_path = message.file_path
|
||||||
|
if not file_path:
|
||||||
|
fid = self.get_next_fid()
|
||||||
|
file_path = self.directory / f"{fid:04d}{self.file_suffix}"
|
||||||
|
self.set_next_fid(fid)
|
||||||
|
message.to_file(file_path)
|
||||||
@ -63,4 +63,7 @@ class Config():
|
|||||||
|
|
||||||
def to_file(self, path: str) -> None:
|
def to_file(self, path: str) -> None:
|
||||||
with open(path, 'w') as f:
|
with open(path, 'w') as f:
|
||||||
yaml.dump(asdict(self), f)
|
yaml.dump(asdict(self), f, sort_keys=False)
|
||||||
|
|
||||||
|
def as_dict(self) -> dict[str, Any]:
|
||||||
|
return asdict(self)
|
||||||
|
|||||||
413
chatmastermind/message.py
Normal file
413
chatmastermind/message.py
Normal file
@ -0,0 +1,413 @@
|
|||||||
|
"""
|
||||||
|
Module implementing message related functions and classes.
|
||||||
|
"""
|
||||||
|
import pathlib
|
||||||
|
import yaml
|
||||||
|
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from .tags import Tag, TagLine, TagError, match_tags
|
||||||
|
|
||||||
|
QuestionInst = TypeVar('QuestionInst', bound='Question')
|
||||||
|
AnswerInst = TypeVar('AnswerInst', bound='Answer')
|
||||||
|
MessageInst = TypeVar('MessageInst', bound='Message')
|
||||||
|
AILineInst = TypeVar('AILineInst', bound='AILine')
|
||||||
|
ModelLineInst = TypeVar('ModelLineInst', bound='ModelLine')
|
||||||
|
YamlDict = dict[str, Union[QuestionInst, AnswerInst, set[Tag]]]
|
||||||
|
|
||||||
|
|
||||||
|
class MessageError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
|
||||||
|
"""
|
||||||
|
Changes the YAML dump style to multiline syntax for multiline strings.
|
||||||
|
"""
|
||||||
|
if len(data.splitlines()) > 1:
|
||||||
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
||||||
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
||||||
|
|
||||||
|
|
||||||
|
yaml.add_representer(str, str_presenter)
|
||||||
|
|
||||||
|
|
||||||
|
def source_code(text: str, include_delims: bool = False) -> list[str]:
|
||||||
|
"""
|
||||||
|
Extract all source code sections from the given text, i. e. all lines
|
||||||
|
surrounded by lines tarting with '```'. If 'include_delims' is True,
|
||||||
|
the surrounding lines are included, otherwise they are omitted. The
|
||||||
|
result list contains every source code section as a single string.
|
||||||
|
The order in the list represents the order of the sections in the text.
|
||||||
|
"""
|
||||||
|
code_sections: list[str] = []
|
||||||
|
code_lines: list[str] = []
|
||||||
|
in_code_block = False
|
||||||
|
|
||||||
|
for line in text.split('\n'):
|
||||||
|
if line.strip().startswith('```'):
|
||||||
|
if include_delims:
|
||||||
|
code_lines.append(line)
|
||||||
|
if in_code_block:
|
||||||
|
code_sections.append('\n'.join(code_lines) + '\n')
|
||||||
|
code_lines.clear()
|
||||||
|
in_code_block = not in_code_block
|
||||||
|
elif in_code_block:
|
||||||
|
code_lines.append(line)
|
||||||
|
|
||||||
|
return code_sections
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(kw_only=True)
|
||||||
|
class MessageFilter:
|
||||||
|
"""
|
||||||
|
Various filters for a Message.
|
||||||
|
"""
|
||||||
|
tags_or: Optional[set[Tag]] = None
|
||||||
|
tags_and: Optional[set[Tag]] = None
|
||||||
|
tags_not: Optional[set[Tag]] = None
|
||||||
|
ai: Optional[str] = None
|
||||||
|
model: Optional[str] = None
|
||||||
|
question_contains: Optional[str] = None
|
||||||
|
answer_contains: Optional[str] = None
|
||||||
|
answer_state: Optional[Literal['available', 'missing']] = None
|
||||||
|
ai_state: Optional[Literal['available', 'missing']] = None
|
||||||
|
model_state: Optional[Literal['available', 'missing']] = None
|
||||||
|
|
||||||
|
|
||||||
|
class AILine(str):
|
||||||
|
"""
|
||||||
|
A line that represents the AI name in a '.txt' file..
|
||||||
|
"""
|
||||||
|
prefix: Final[str] = 'AI:'
|
||||||
|
|
||||||
|
def __new__(cls: Type[AILineInst], string: str) -> AILineInst:
|
||||||
|
if not string.startswith(cls.prefix):
|
||||||
|
raise TagError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
|
||||||
|
instance = super().__new__(cls, string)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
def ai(self) -> str:
|
||||||
|
return self[len(self.prefix):].strip()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_ai(cls: Type[AILineInst], ai: str) -> AILineInst:
|
||||||
|
return cls(' '.join([cls.prefix, ai]))
|
||||||
|
|
||||||
|
|
||||||
|
class ModelLine(str):
|
||||||
|
"""
|
||||||
|
A line that represents the model name in a '.txt' file..
|
||||||
|
"""
|
||||||
|
prefix: Final[str] = 'MODEL:'
|
||||||
|
|
||||||
|
def __new__(cls: Type[ModelLineInst], string: str) -> ModelLineInst:
|
||||||
|
if not string.startswith(cls.prefix):
|
||||||
|
raise TagError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
|
||||||
|
instance = super().__new__(cls, string)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
def model(self) -> str:
|
||||||
|
return self[len(self.prefix):].strip()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_model(cls: Type[ModelLineInst], model: str) -> ModelLineInst:
|
||||||
|
return cls(' '.join([cls.prefix, model]))
|
||||||
|
|
||||||
|
|
||||||
|
class Question(str):
|
||||||
|
"""
|
||||||
|
A single question with a defined header.
|
||||||
|
"""
|
||||||
|
txt_header: ClassVar[str] = '=== QUESTION ==='
|
||||||
|
yaml_key: ClassVar[str] = 'question'
|
||||||
|
|
||||||
|
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||||
|
"""
|
||||||
|
Make sure the question string does not contain the header.
|
||||||
|
"""
|
||||||
|
if cls.txt_header in string:
|
||||||
|
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||||
|
instance = super().__new__(cls, string)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
||||||
|
"""
|
||||||
|
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||||
|
"""
|
||||||
|
if any(cls.txt_header in string for string in strings):
|
||||||
|
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||||
|
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||||
|
return instance
|
||||||
|
|
||||||
|
def source_code(self, include_delims: bool = False) -> list[str]:
|
||||||
|
"""
|
||||||
|
Extract and return all source code sections.
|
||||||
|
"""
|
||||||
|
return source_code(self, include_delims)
|
||||||
|
|
||||||
|
|
||||||
|
class Answer(str):
|
||||||
|
"""
|
||||||
|
A single answer with a defined header.
|
||||||
|
"""
|
||||||
|
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||||
|
yaml_key: ClassVar[str] = 'answer'
|
||||||
|
|
||||||
|
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||||
|
"""
|
||||||
|
Make sure the answer string does not contain the header.
|
||||||
|
"""
|
||||||
|
if cls.txt_header in string:
|
||||||
|
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||||
|
instance = super().__new__(cls, string)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||||
|
"""
|
||||||
|
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||||
|
"""
|
||||||
|
if any(cls.txt_header in string for string in strings):
|
||||||
|
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||||
|
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||||
|
return instance
|
||||||
|
|
||||||
|
def source_code(self, include_delims: bool = False) -> list[str]:
|
||||||
|
"""
|
||||||
|
Extract and return all source code sections.
|
||||||
|
"""
|
||||||
|
return source_code(self, include_delims)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Message():
|
||||||
|
"""
|
||||||
|
Single message. Consists of a question and optionally an answer, a set of tags
|
||||||
|
and a file path.
|
||||||
|
"""
|
||||||
|
question: Question
|
||||||
|
answer: Optional[Answer] = None # FIXME: support multiple answers
|
||||||
|
tags: Optional[set[Tag]] = None
|
||||||
|
ai: Optional[str] = None
|
||||||
|
model: Optional[str] = None
|
||||||
|
file_path: Optional[pathlib.Path] = None
|
||||||
|
# class variables
|
||||||
|
file_suffixes: ClassVar[list[str]] = ['.txt', '.yaml']
|
||||||
|
tags_yaml_key: ClassVar[str] = 'tags'
|
||||||
|
file_yaml_key: ClassVar[str] = 'file_path'
|
||||||
|
ai_yaml_key: ClassVar[str] = 'ai'
|
||||||
|
model_yaml_key: ClassVar[str] = 'model'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls: Type[MessageInst], data: dict[str, Any]) -> MessageInst:
|
||||||
|
"""
|
||||||
|
Create a Message from the given dict.
|
||||||
|
"""
|
||||||
|
return cls(question=data[Question.yaml_key],
|
||||||
|
answer=data.get(Answer.yaml_key, None),
|
||||||
|
tags=set(data.get(cls.tags_yaml_key, [])),
|
||||||
|
ai=data.get(cls.ai_yaml_key, None),
|
||||||
|
model=data.get(cls.model_yaml_key, None),
|
||||||
|
file_path=data.get(cls.file_yaml_key, None))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
|
||||||
|
"""
|
||||||
|
Return only the tags from the given Message file.
|
||||||
|
"""
|
||||||
|
if not file_path.exists():
|
||||||
|
raise MessageError(f"Message file '{file_path}' does not exist")
|
||||||
|
if file_path.suffix not in cls.file_suffixes:
|
||||||
|
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
||||||
|
if file_path.suffix == '.txt':
|
||||||
|
with open(file_path, "r") as fd:
|
||||||
|
tags = TagLine(fd.readline()).tags()
|
||||||
|
else: # '.yaml'
|
||||||
|
with open(file_path, "r") as fd:
|
||||||
|
data = yaml.load(fd, Loader=yaml.FullLoader)
|
||||||
|
tags = set(sorted(data[cls.tags_yaml_key]))
|
||||||
|
return tags
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_file(cls: Type[MessageInst], file_path: pathlib.Path,
|
||||||
|
mfilter: Optional[MessageFilter] = None) -> Optional[MessageInst]:
|
||||||
|
"""
|
||||||
|
Create a Message from the given file. Returns 'None' if the message does
|
||||||
|
not fulfill the filter requirements. For TXT files, the tags are matched
|
||||||
|
before building the whole message. The other filters are applied afterwards.
|
||||||
|
"""
|
||||||
|
if not file_path.exists():
|
||||||
|
raise MessageError(f"Message file '{file_path}' does not exist")
|
||||||
|
if file_path.suffix not in cls.file_suffixes:
|
||||||
|
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
||||||
|
|
||||||
|
if file_path.suffix == '.txt':
|
||||||
|
message = cls.__from_file_txt(file_path,
|
||||||
|
mfilter.tags_or if mfilter else None,
|
||||||
|
mfilter.tags_and if mfilter else None,
|
||||||
|
mfilter.tags_not if mfilter else None)
|
||||||
|
else:
|
||||||
|
message = cls.__from_file_yaml(file_path)
|
||||||
|
if message and (not mfilter or (mfilter and message.match(mfilter))):
|
||||||
|
return message
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __from_file_txt(cls: Type[MessageInst], file_path: pathlib.Path, # noqa: 11
|
||||||
|
tags_or: Optional[set[Tag]] = None,
|
||||||
|
tags_and: Optional[set[Tag]] = None,
|
||||||
|
tags_not: Optional[set[Tag]] = None) -> Optional[MessageInst]:
|
||||||
|
"""
|
||||||
|
Create a Message from the given TXT file. Expects the following file structures:
|
||||||
|
For '.txt':
|
||||||
|
* TagLine [Optional]
|
||||||
|
* AI [Optional]
|
||||||
|
* Model [Optional]
|
||||||
|
* Question.txt_header
|
||||||
|
* Question
|
||||||
|
* Answer.txt_header [Optional]
|
||||||
|
* Answer [Optional]
|
||||||
|
|
||||||
|
Returns 'None' if the message does not fulfill the tag requirements.
|
||||||
|
"""
|
||||||
|
tags: set[Tag] = set()
|
||||||
|
question: Question
|
||||||
|
answer: Optional[Answer] = None
|
||||||
|
ai: Optional[str] = None
|
||||||
|
model: Optional[str] = None
|
||||||
|
with open(file_path, "r") as fd:
|
||||||
|
# TagLine (Optional)
|
||||||
|
try:
|
||||||
|
pos = fd.tell()
|
||||||
|
tags = TagLine(fd.readline()).tags()
|
||||||
|
except TagError:
|
||||||
|
fd.seek(pos)
|
||||||
|
if tags_or or tags_and or tags_not:
|
||||||
|
# match with an empty set if the file has no tags
|
||||||
|
if not match_tags(tags, tags_or, tags_and, tags_not):
|
||||||
|
return None
|
||||||
|
# AILine (Optional)
|
||||||
|
try:
|
||||||
|
pos = fd.tell()
|
||||||
|
ai = AILine(fd.readline()).ai()
|
||||||
|
except TagError:
|
||||||
|
fd.seek(pos)
|
||||||
|
# ModelLine (Optional)
|
||||||
|
try:
|
||||||
|
pos = fd.tell()
|
||||||
|
model = ModelLine(fd.readline()).model()
|
||||||
|
except TagError:
|
||||||
|
fd.seek(pos)
|
||||||
|
# Question and Answer
|
||||||
|
text = fd.read().strip().split('\n')
|
||||||
|
question_idx = text.index(Question.txt_header) + 1
|
||||||
|
try:
|
||||||
|
answer_idx = text.index(Answer.txt_header)
|
||||||
|
question = Question.from_list(text[question_idx:answer_idx])
|
||||||
|
answer = Answer.from_list(text[answer_idx + 1:])
|
||||||
|
except ValueError:
|
||||||
|
question = Question.from_list(text[question_idx:])
|
||||||
|
return cls(question, answer, tags, ai, model, file_path)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __from_file_yaml(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst:
|
||||||
|
"""
|
||||||
|
Create a Message from the given YAML file. Expects the following file structures:
|
||||||
|
* Question.yaml_key: single or multiline string
|
||||||
|
* Answer.yaml_key: single or multiline string [Optional]
|
||||||
|
* Message.tags_yaml_key: list of strings [Optional]
|
||||||
|
* Message.ai_yaml_key: str [Optional]
|
||||||
|
* Message.model_yaml_key: str [Optional]
|
||||||
|
"""
|
||||||
|
with open(file_path, "r") as fd:
|
||||||
|
data = yaml.load(fd, Loader=yaml.FullLoader)
|
||||||
|
data[cls.file_yaml_key] = file_path
|
||||||
|
return cls.from_dict(data)
|
||||||
|
|
||||||
|
def to_file(self, file_path: Optional[pathlib.Path]=None) -> None: # noqa: 11
|
||||||
|
"""
|
||||||
|
Write a Message to the given file. Type is determined based on the suffix.
|
||||||
|
Currently supported suffixes: ['.txt', '.yaml']
|
||||||
|
"""
|
||||||
|
if file_path:
|
||||||
|
self.file_path = file_path
|
||||||
|
if not self.file_path:
|
||||||
|
raise MessageError("Got no valid path to write message")
|
||||||
|
if self.file_path.suffix not in self.file_suffixes:
|
||||||
|
raise MessageError(f"File type '{self.file_path.suffix}' is not supported")
|
||||||
|
# TXT
|
||||||
|
if self.file_path.suffix == '.txt':
|
||||||
|
return self.__to_file_txt(self.file_path)
|
||||||
|
elif self.file_path.suffix == '.yaml':
|
||||||
|
return self.__to_file_yaml(self.file_path)
|
||||||
|
|
||||||
|
def __to_file_txt(self, file_path: pathlib.Path) -> None:
|
||||||
|
"""
|
||||||
|
Write a Message to the given file in TXT format.
|
||||||
|
Creates the following file structures:
|
||||||
|
* TagLine
|
||||||
|
* AI [Optional]
|
||||||
|
* Model [Optional]
|
||||||
|
* Question.txt_header
|
||||||
|
* Question
|
||||||
|
* Answer.txt_header
|
||||||
|
* Answer
|
||||||
|
"""
|
||||||
|
with open(file_path, "w") as fd:
|
||||||
|
if self.tags:
|
||||||
|
fd.write(f'{TagLine.from_set(self.tags)}\n')
|
||||||
|
if self.ai:
|
||||||
|
fd.write(f'{AILine.from_ai(self.ai)}\n')
|
||||||
|
if self.model:
|
||||||
|
fd.write(f'{ModelLine.from_model(self.model)}\n')
|
||||||
|
fd.write(f'{Question.txt_header}\n{self.question}\n')
|
||||||
|
if self.answer:
|
||||||
|
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
|
||||||
|
|
||||||
|
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
|
||||||
|
"""
|
||||||
|
Write a Message to the given file in YAML format.
|
||||||
|
Creates the following file structures:
|
||||||
|
* Question.yaml_key: single or multiline string
|
||||||
|
* Answer.yaml_key: single or multiline string
|
||||||
|
* Message.tags_yaml_key: list of strings
|
||||||
|
* Message.ai_yaml_key: str [Optional]
|
||||||
|
* Message.model_yaml_key: str [Optional]
|
||||||
|
"""
|
||||||
|
with open(file_path, "w") as fd:
|
||||||
|
data: YamlDict = {Question.yaml_key: str(self.question)}
|
||||||
|
if self.answer:
|
||||||
|
data[Answer.yaml_key] = str(self.answer)
|
||||||
|
if self.ai:
|
||||||
|
data[self.ai_yaml_key] = self.ai
|
||||||
|
if self.model:
|
||||||
|
data[self.model_yaml_key] = self.model
|
||||||
|
if self.tags:
|
||||||
|
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
||||||
|
yaml.dump(data, fd, sort_keys=False)
|
||||||
|
|
||||||
|
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
|
||||||
|
"""
|
||||||
|
Matches the current Message to the given filter atttributes.
|
||||||
|
Return True if all attributes match, else False.
|
||||||
|
"""
|
||||||
|
mytags = self.tags or set()
|
||||||
|
if (((mfilter.tags_or or mfilter.tags_and or mfilter.tags_not)
|
||||||
|
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
|
||||||
|
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
|
||||||
|
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
|
||||||
|
or (mfilter.question_contains and mfilter.question_contains not in self.question) # noqa: W503
|
||||||
|
or (mfilter.answer_contains and (not self.answer or mfilter.answer_contains not in self.answer)) # noqa: W503
|
||||||
|
or (mfilter.answer_state == 'available' and not self.answer) # noqa: W503
|
||||||
|
or (mfilter.ai_state == 'available' and not self.ai) # noqa: W503
|
||||||
|
or (mfilter.model_state == 'available' and not self.model) # noqa: W503
|
||||||
|
or (mfilter.answer_state == 'missing' and self.answer) # noqa: W503
|
||||||
|
or (mfilter.ai_state == 'missing' and self.ai) # noqa: W503
|
||||||
|
or (mfilter.model_state == 'missing' and self.model)): # noqa: W503
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def as_dict(self) -> dict[str, Any]:
|
||||||
|
return asdict(self)
|
||||||
@ -1,7 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
Module implementing tag related functions and classes.
|
Module implementing tag related functions and classes.
|
||||||
"""
|
"""
|
||||||
from typing import Type, TypeVar, Optional
|
from typing import Type, TypeVar, Optional, Final
|
||||||
|
|
||||||
TagInst = TypeVar('TagInst', bound='Tag')
|
TagInst = TypeVar('TagInst', bound='Tag')
|
||||||
TagLineInst = TypeVar('TagLineInst', bound='TagLine')
|
TagLineInst = TypeVar('TagLineInst', bound='TagLine')
|
||||||
@ -16,9 +16,9 @@ class Tag(str):
|
|||||||
A single tag. A string that can contain anything but the default separator (' ').
|
A single tag. A string that can contain anything but the default separator (' ').
|
||||||
"""
|
"""
|
||||||
# default separator
|
# default separator
|
||||||
default_separator = ' '
|
default_separator: Final[str] = ' '
|
||||||
# alternative separators (e. g. for backwards compatibility)
|
# alternative separators (e. g. for backwards compatibility)
|
||||||
alternative_separators = [',']
|
alternative_separators: Final[list[str]] = [',']
|
||||||
|
|
||||||
def __new__(cls: Type[TagInst], string: str) -> TagInst:
|
def __new__(cls: Type[TagInst], string: str) -> TagInst:
|
||||||
"""
|
"""
|
||||||
@ -93,19 +93,21 @@ def match_tags(tags: set[Tag], tags_or: Optional[set[Tag]], tags_and: Optional[s
|
|||||||
|
|
||||||
class TagLine(str):
|
class TagLine(str):
|
||||||
"""
|
"""
|
||||||
A line of tags. It starts with a prefix ('TAGS:'), followed by a list of tags,
|
A line of tags in a '.txt' file. It starts with a prefix ('TAGS:'), followed by
|
||||||
separated by the defaut separator (' '). Any operations on a TagLine will sort
|
a list of tags, separated by the defaut separator (' '). Any operations on a
|
||||||
the tags.
|
TagLine will sort the tags.
|
||||||
"""
|
"""
|
||||||
# the prefix
|
# the prefix
|
||||||
prefix = 'TAGS:'
|
prefix: Final[str] = 'TAGS:'
|
||||||
|
|
||||||
def __new__(cls: Type[TagLineInst], string: str) -> TagLineInst:
|
def __new__(cls: Type[TagLineInst], string: str) -> TagLineInst:
|
||||||
"""
|
"""
|
||||||
Make sure the tagline string starts with the prefix.
|
Make sure the tagline string starts with the prefix. Also replace newlines
|
||||||
|
and multiple spaces with ' ', in order to support multiline TagLines.
|
||||||
"""
|
"""
|
||||||
if not string.startswith(cls.prefix):
|
if not string.startswith(cls.prefix):
|
||||||
raise TagError(f"TagLine '{string}' is missing prefix '{cls.prefix}'")
|
raise TagError(f"TagLine '{string}' is missing prefix '{cls.prefix}'")
|
||||||
|
string = ' '.join(string.split())
|
||||||
instance = super().__new__(cls, string)
|
instance = super().__new__(cls, string)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
@ -114,7 +116,7 @@ class TagLine(str):
|
|||||||
"""
|
"""
|
||||||
Create a new TagLine from a set of tags.
|
Create a new TagLine from a set of tags.
|
||||||
"""
|
"""
|
||||||
return cls(' '.join([TagLine.prefix] + sorted([t for t in tags])))
|
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
|
||||||
|
|
||||||
def tags(self) -> set[Tag]:
|
def tags(self) -> set[Tag]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -7,7 +7,6 @@ from chatmastermind.main import create_parser, ask_cmd
|
|||||||
from chatmastermind.api_client import ai
|
from chatmastermind.api_client import ai
|
||||||
from chatmastermind.configuration import Config
|
from chatmastermind.configuration import Config
|
||||||
from chatmastermind.storage import create_chat_hist, save_answers, dump_data
|
from chatmastermind.storage import create_chat_hist, save_answers, dump_data
|
||||||
from chatmastermind.tags import Tag, TagLine, TagError
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
from unittest.mock import patch, MagicMock, Mock, ANY
|
from unittest.mock import patch, MagicMock, Mock, ANY
|
||||||
|
|
||||||
@ -232,116 +231,3 @@ class TestCreateParser(CmmTestCase):
|
|||||||
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
|
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
|
||||||
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
|
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
|
||||||
self.assertTrue('.config.yaml' in parser.get_default('config'))
|
self.assertTrue('.config.yaml' in parser.get_default('config'))
|
||||||
|
|
||||||
|
|
||||||
class TestTag(CmmTestCase):
|
|
||||||
def test_valid_tag(self) -> None:
|
|
||||||
tag = Tag('mytag')
|
|
||||||
self.assertEqual(tag, 'mytag')
|
|
||||||
|
|
||||||
def test_invalid_tag(self) -> None:
|
|
||||||
with self.assertRaises(TagError):
|
|
||||||
Tag('tag with space')
|
|
||||||
|
|
||||||
def test_default_separator(self) -> None:
|
|
||||||
self.assertEqual(Tag.default_separator, ' ')
|
|
||||||
|
|
||||||
def test_alternative_separators(self) -> None:
|
|
||||||
self.assertEqual(Tag.alternative_separators, [','])
|
|
||||||
|
|
||||||
|
|
||||||
class TestTagLine(CmmTestCase):
|
|
||||||
def test_valid_tagline(self) -> None:
|
|
||||||
tagline = TagLine('TAGS: tag1 tag2')
|
|
||||||
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
|
||||||
|
|
||||||
def test_invalid_tagline(self) -> None:
|
|
||||||
with self.assertRaises(TagError):
|
|
||||||
TagLine('tag1 tag2')
|
|
||||||
|
|
||||||
def test_prefix(self) -> None:
|
|
||||||
self.assertEqual(TagLine.prefix, 'TAGS:')
|
|
||||||
|
|
||||||
def test_from_set(self) -> None:
|
|
||||||
tags = {Tag('tag1'), Tag('tag2')}
|
|
||||||
tagline = TagLine.from_set(tags)
|
|
||||||
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
|
||||||
|
|
||||||
def test_tags(self) -> None:
|
|
||||||
tagline = TagLine('TAGS: tag1 tag2')
|
|
||||||
tags = tagline.tags()
|
|
||||||
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
|
||||||
|
|
||||||
def test_merge(self) -> None:
|
|
||||||
tagline1 = TagLine('TAGS: tag1 tag2')
|
|
||||||
tagline2 = TagLine('TAGS: tag2 tag3')
|
|
||||||
merged_tagline = tagline1.merge({tagline2})
|
|
||||||
self.assertEqual(merged_tagline, 'TAGS: tag1 tag2 tag3')
|
|
||||||
|
|
||||||
def test_delete_tags(self) -> None:
|
|
||||||
tagline = TagLine('TAGS: tag1 tag2 tag3')
|
|
||||||
new_tagline = tagline.delete_tags({Tag('tag1'), Tag('tag3')})
|
|
||||||
self.assertEqual(new_tagline, 'TAGS: tag2')
|
|
||||||
|
|
||||||
def test_add_tags(self) -> None:
|
|
||||||
tagline = TagLine('TAGS: tag1')
|
|
||||||
new_tagline = tagline.add_tags({Tag('tag2'), Tag('tag3')})
|
|
||||||
self.assertEqual(new_tagline, 'TAGS: tag1 tag2 tag3')
|
|
||||||
|
|
||||||
def test_rename_tags(self) -> None:
|
|
||||||
tagline = TagLine('TAGS: old1 old2')
|
|
||||||
new_tagline = tagline.rename_tags({(Tag('old1'), Tag('new1')), (Tag('old2'), Tag('new2'))})
|
|
||||||
self.assertEqual(new_tagline, 'TAGS: new1 new2')
|
|
||||||
|
|
||||||
def test_match_tags(self) -> None:
|
|
||||||
tagline = TagLine('TAGS: tag1 tag2 tag3')
|
|
||||||
|
|
||||||
# Test case 1: Match any tag in 'tags_or'
|
|
||||||
tags_or = {Tag('tag1'), Tag('tag4')}
|
|
||||||
tags_and: set[Tag] = set()
|
|
||||||
tags_not: set[Tag] = set()
|
|
||||||
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 2: Match all tags in 'tags_and'
|
|
||||||
tags_or = set()
|
|
||||||
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3')}
|
|
||||||
tags_not = set()
|
|
||||||
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 3: Match any tag in 'tags_or' and match all tags in 'tags_and'
|
|
||||||
tags_or = {Tag('tag1'), Tag('tag4')}
|
|
||||||
tags_and = {Tag('tag1'), Tag('tag2')}
|
|
||||||
tags_not = set()
|
|
||||||
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 4: Match any tag in 'tags_or', match all tags in 'tags_and', and exclude tags in 'tags_not'
|
|
||||||
tags_or = {Tag('tag1'), Tag('tag4')}
|
|
||||||
tags_and = {Tag('tag1'), Tag('tag2')}
|
|
||||||
tags_not = {Tag('tag5')}
|
|
||||||
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 5: No matching tags in 'tags_or'
|
|
||||||
tags_or = {Tag('tag4'), Tag('tag5')}
|
|
||||||
tags_and = set()
|
|
||||||
tags_not = set()
|
|
||||||
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 6: Not all tags in 'tags_and' are present
|
|
||||||
tags_or = set()
|
|
||||||
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3'), Tag('tag4')}
|
|
||||||
tags_not = set()
|
|
||||||
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 7: Some tags in 'tags_not' are present
|
|
||||||
tags_or = {Tag('tag1')}
|
|
||||||
tags_and = set()
|
|
||||||
tags_not = {Tag('tag2')}
|
|
||||||
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
|
|
||||||
|
|
||||||
# Test case 8: 'tags_or' and 'tags_and' are None, match all tags
|
|
||||||
tags_not = set()
|
|
||||||
self.assertTrue(tagline.match_tags(None, None, tags_not))
|
|
||||||
|
|
||||||
# Test case 9: 'tags_or' and 'tags_and' are None, match all tags except excluded tags
|
|
||||||
tags_not = {Tag('tag2')}
|
|
||||||
self.assertFalse(tagline.match_tags(None, None, tags_not))
|
|
||||||
|
|||||||
577
tests/test_message.py
Normal file
577
tests/test_message.py
Normal file
@ -0,0 +1,577 @@
|
|||||||
|
import pathlib
|
||||||
|
import tempfile
|
||||||
|
from typing import cast
|
||||||
|
from .test_main import CmmTestCase
|
||||||
|
from chatmastermind.message import source_code, Message, MessageError, Question, Answer, AILine, ModelLine, MessageFilter
|
||||||
|
from chatmastermind.tags import Tag, TagLine
|
||||||
|
|
||||||
|
|
||||||
|
class SourceCodeTestCase(CmmTestCase):
|
||||||
|
def test_source_code_with_include_delims(self) -> None:
|
||||||
|
text = """
|
||||||
|
Some text before the code block
|
||||||
|
```python
|
||||||
|
print("Hello, World!")
|
||||||
|
```
|
||||||
|
Some text after the code block
|
||||||
|
```python
|
||||||
|
x = 10
|
||||||
|
y = 20
|
||||||
|
print(x + y)
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
expected_result = [
|
||||||
|
" ```python\n print(\"Hello, World!\")\n ```\n",
|
||||||
|
" ```python\n x = 10\n y = 20\n print(x + y)\n ```\n"
|
||||||
|
]
|
||||||
|
result = source_code(text, include_delims=True)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
def test_source_code_without_include_delims(self) -> None:
|
||||||
|
text = """
|
||||||
|
Some text before the code block
|
||||||
|
```python
|
||||||
|
print("Hello, World!")
|
||||||
|
```
|
||||||
|
Some text after the code block
|
||||||
|
```python
|
||||||
|
x = 10
|
||||||
|
y = 20
|
||||||
|
print(x + y)
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
expected_result = [
|
||||||
|
" print(\"Hello, World!\")\n",
|
||||||
|
" x = 10\n y = 20\n print(x + y)\n"
|
||||||
|
]
|
||||||
|
result = source_code(text, include_delims=False)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
def test_source_code_with_single_code_block(self) -> None:
|
||||||
|
text = "```python\nprint(\"Hello, World!\")\n```"
|
||||||
|
expected_result = ["```python\nprint(\"Hello, World!\")\n```\n"]
|
||||||
|
result = source_code(text, include_delims=True)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
def test_source_code_with_no_code_blocks(self) -> None:
|
||||||
|
text = "Some text without any code blocks"
|
||||||
|
expected_result: list[str] = []
|
||||||
|
result = source_code(text, include_delims=True)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
|
||||||
|
class QuestionTestCase(CmmTestCase):
|
||||||
|
def test_question_with_prefix(self) -> None:
|
||||||
|
with self.assertRaises(MessageError):
|
||||||
|
Question("=== QUESTION === What is your name?")
|
||||||
|
|
||||||
|
def test_question_without_prefix(self) -> None:
|
||||||
|
question = Question("What is your favorite color?")
|
||||||
|
self.assertIsInstance(question, Question)
|
||||||
|
self.assertEqual(question, "What is your favorite color?")
|
||||||
|
|
||||||
|
|
||||||
|
class AnswerTestCase(CmmTestCase):
|
||||||
|
def test_answer_with_prefix(self) -> None:
|
||||||
|
with self.assertRaises(MessageError):
|
||||||
|
Answer("=== ANSWER === Yes")
|
||||||
|
|
||||||
|
def test_answer_without_prefix(self) -> None:
|
||||||
|
answer = Answer("No")
|
||||||
|
self.assertIsInstance(answer, Answer)
|
||||||
|
self.assertEqual(answer, "No")
|
||||||
|
|
||||||
|
|
||||||
|
class MessageToFileTxtTestCase(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||||
|
self.file_path = pathlib.Path(self.file.name)
|
||||||
|
self.message_complete = Message(Question('This is a question.'),
|
||||||
|
Answer('This is an answer.'),
|
||||||
|
{Tag('tag1'), Tag('tag2')},
|
||||||
|
ai='ChatGPT',
|
||||||
|
model='gpt-3.5-turbo',
|
||||||
|
file_path=self.file_path)
|
||||||
|
self.message_min = Message(Question('This is a question.'),
|
||||||
|
file_path=self.file_path)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.file.close()
|
||||||
|
self.file_path.unlink()
|
||||||
|
|
||||||
|
def test_to_file_txt_complete(self) -> None:
|
||||||
|
self.message_complete.to_file(self.file_path)
|
||||||
|
|
||||||
|
with open(self.file_path, "r") as fd:
|
||||||
|
content = fd.read()
|
||||||
|
expected_content = f"""{TagLine.prefix} tag1 tag2
|
||||||
|
{AILine.prefix} ChatGPT
|
||||||
|
{ModelLine.prefix} gpt-3.5-turbo
|
||||||
|
{Question.txt_header}
|
||||||
|
This is a question.
|
||||||
|
{Answer.txt_header}
|
||||||
|
This is an answer.
|
||||||
|
"""
|
||||||
|
self.assertEqual(content, expected_content)
|
||||||
|
|
||||||
|
def test_to_file_txt_min(self) -> None:
|
||||||
|
self.message_min.to_file(self.file_path)
|
||||||
|
|
||||||
|
with open(self.file_path, "r") as fd:
|
||||||
|
content = fd.read()
|
||||||
|
expected_content = f"""{Question.txt_header}
|
||||||
|
This is a question.
|
||||||
|
"""
|
||||||
|
self.assertEqual(content, expected_content)
|
||||||
|
|
||||||
|
def test_to_file_unsupported_file_type(self) -> None:
|
||||||
|
unsupported_file_path = pathlib.Path("example.doc")
|
||||||
|
with self.assertRaises(MessageError) as cm:
|
||||||
|
self.message_complete.to_file(unsupported_file_path)
|
||||||
|
self.assertEqual(str(cm.exception), "File type '.doc' is not supported")
|
||||||
|
|
||||||
|
def test_to_file_no_file_path(self) -> None:
|
||||||
|
"""
|
||||||
|
Provoke an exception using an empty path.
|
||||||
|
"""
|
||||||
|
with self.assertRaises(MessageError) as cm:
|
||||||
|
# clear the internal file_path
|
||||||
|
self.message_complete.file_path = None
|
||||||
|
self.message_complete.to_file(None)
|
||||||
|
self.assertEqual(str(cm.exception), "Got no valid path to write message")
|
||||||
|
# reset the internal file_path
|
||||||
|
self.message_complete.file_path = self.file_path
|
||||||
|
|
||||||
|
|
||||||
|
class MessageToFileYamlTestCase(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
|
||||||
|
self.file_path = pathlib.Path(self.file.name)
|
||||||
|
self.message_complete = Message(Question('This is a question.'),
|
||||||
|
Answer('This is an answer.'),
|
||||||
|
{Tag('tag1'), Tag('tag2')},
|
||||||
|
ai='ChatGPT',
|
||||||
|
model='gpt-3.5-turbo',
|
||||||
|
file_path=self.file_path)
|
||||||
|
self.message_multiline = Message(Question('This is a\nmultiline question.'),
|
||||||
|
Answer('This is a\nmultiline answer.'),
|
||||||
|
{Tag('tag1'), Tag('tag2')},
|
||||||
|
ai='ChatGPT',
|
||||||
|
model='gpt-3.5-turbo',
|
||||||
|
file_path=self.file_path)
|
||||||
|
self.message_min = Message(Question('This is a question.'),
|
||||||
|
file_path=self.file_path)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.file.close()
|
||||||
|
self.file_path.unlink()
|
||||||
|
|
||||||
|
def test_to_file_yaml_complete(self) -> None:
|
||||||
|
self.message_complete.to_file(self.file_path)
|
||||||
|
|
||||||
|
with open(self.file_path, "r") as fd:
|
||||||
|
content = fd.read()
|
||||||
|
expected_content = f"""{Question.yaml_key}: This is a question.
|
||||||
|
{Answer.yaml_key}: This is an answer.
|
||||||
|
{Message.ai_yaml_key}: ChatGPT
|
||||||
|
{Message.model_yaml_key}: gpt-3.5-turbo
|
||||||
|
{Message.tags_yaml_key}:
|
||||||
|
- tag1
|
||||||
|
- tag2
|
||||||
|
"""
|
||||||
|
self.assertEqual(content, expected_content)
|
||||||
|
|
||||||
|
def test_to_file_yaml_multiline(self) -> None:
|
||||||
|
self.message_multiline.to_file(self.file_path)
|
||||||
|
|
||||||
|
with open(self.file_path, "r") as fd:
|
||||||
|
content = fd.read()
|
||||||
|
expected_content = f"""{Question.yaml_key}: |-
|
||||||
|
This is a
|
||||||
|
multiline question.
|
||||||
|
{Answer.yaml_key}: |-
|
||||||
|
This is a
|
||||||
|
multiline answer.
|
||||||
|
{Message.ai_yaml_key}: ChatGPT
|
||||||
|
{Message.model_yaml_key}: gpt-3.5-turbo
|
||||||
|
{Message.tags_yaml_key}:
|
||||||
|
- tag1
|
||||||
|
- tag2
|
||||||
|
"""
|
||||||
|
self.assertEqual(content, expected_content)
|
||||||
|
|
||||||
|
def test_to_file_yaml_min(self) -> None:
|
||||||
|
self.message_min.to_file(self.file_path)
|
||||||
|
|
||||||
|
with open(self.file_path, "r") as fd:
|
||||||
|
content = fd.read()
|
||||||
|
expected_content = f"{Question.yaml_key}: This is a question.\n"
|
||||||
|
self.assertEqual(content, expected_content)
|
||||||
|
|
||||||
|
|
||||||
|
class MessageFromFileTxtTestCase(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||||
|
self.file_path = pathlib.Path(self.file.name)
|
||||||
|
with open(self.file_path, "w") as fd:
|
||||||
|
fd.write(f"""{TagLine.prefix} tag1 tag2
|
||||||
|
{AILine.prefix} ChatGPT
|
||||||
|
{ModelLine.prefix} gpt-3.5-turbo
|
||||||
|
{Question.txt_header}
|
||||||
|
This is a question.
|
||||||
|
{Answer.txt_header}
|
||||||
|
This is an answer.
|
||||||
|
""")
|
||||||
|
self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||||
|
self.file_path_min = pathlib.Path(self.file_min.name)
|
||||||
|
with open(self.file_path_min, "w") as fd:
|
||||||
|
fd.write(f"""{Question.txt_header}
|
||||||
|
This is a question.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.file.close()
|
||||||
|
self.file_min.close()
|
||||||
|
self.file_path.unlink()
|
||||||
|
self.file_path_min.unlink()
|
||||||
|
|
||||||
|
def test_from_file_txt_complete(self) -> None:
|
||||||
|
"""
|
||||||
|
Read a complete message (with all optional values).
|
||||||
|
"""
|
||||||
|
message = Message.from_file(self.file_path)
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertEqual(message.answer, 'This is an answer.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
|
||||||
|
self.assertEqual(message.ai, 'ChatGPT')
|
||||||
|
self.assertEqual(message.model, 'gpt-3.5-turbo')
|
||||||
|
self.assertEqual(message.file_path, self.file_path)
|
||||||
|
|
||||||
|
def test_from_file_txt_min(self) -> None:
|
||||||
|
"""
|
||||||
|
Read a message with only required values.
|
||||||
|
"""
|
||||||
|
message = Message.from_file(self.file_path_min)
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertEqual(message.file_path, self.file_path_min)
|
||||||
|
self.assertIsNone(message.answer)
|
||||||
|
|
||||||
|
def test_from_file_txt_tags_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(tags_or={Tag('tag1')}))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertEqual(message.answer, 'This is an answer.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
|
||||||
|
self.assertEqual(message.file_path, self.file_path)
|
||||||
|
|
||||||
|
def test_from_file_txt_tags_dont_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(tags_or={Tag('tag3')}))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_no_tags_dont_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(tags_or={Tag('tag1')}))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(tags_not={Tag('tag1')}))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), set())
|
||||||
|
self.assertEqual(message.file_path, self.file_path_min)
|
||||||
|
|
||||||
|
def test_from_file_not_exists(self) -> None:
|
||||||
|
file_not_exists = pathlib.Path("example.txt")
|
||||||
|
with self.assertRaises(MessageError) as cm:
|
||||||
|
Message.from_file(file_not_exists)
|
||||||
|
self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist")
|
||||||
|
|
||||||
|
def test_from_file_txt_question_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(question_contains='question'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_contains='answer'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_available(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_state='available'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_missing(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(answer_state='missing'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_txt_question_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(question_contains='answer'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_contains='question'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_not_exists(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(answer_contains='answer'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_not_available(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(answer_state='available'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_answer_not_missing(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_state='missing'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_ai_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(ai='ChatGPT'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_txt_ai_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(ai='Foo'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_model_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(model='gpt-3.5-turbo'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_txt_model_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(model='Bar'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
|
||||||
|
class MessageFromFileYamlTestCase(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
|
||||||
|
self.file_path = pathlib.Path(self.file.name)
|
||||||
|
with open(self.file_path, "w") as fd:
|
||||||
|
fd.write(f"""
|
||||||
|
{Question.yaml_key}: |-
|
||||||
|
This is a question.
|
||||||
|
{Answer.yaml_key}: |-
|
||||||
|
This is an answer.
|
||||||
|
{Message.ai_yaml_key}: ChatGPT
|
||||||
|
{Message.model_yaml_key}: gpt-3.5-turbo
|
||||||
|
{Message.tags_yaml_key}:
|
||||||
|
- tag1
|
||||||
|
- tag2
|
||||||
|
""")
|
||||||
|
self.file_min = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
|
||||||
|
self.file_path_min = pathlib.Path(self.file_min.name)
|
||||||
|
with open(self.file_path_min, "w") as fd:
|
||||||
|
fd.write(f"""
|
||||||
|
{Question.yaml_key}: |-
|
||||||
|
This is a question.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.file.close()
|
||||||
|
self.file_path.unlink()
|
||||||
|
self.file_min.close()
|
||||||
|
self.file_path_min.unlink()
|
||||||
|
|
||||||
|
def test_from_file_yaml_complete(self) -> None:
|
||||||
|
"""
|
||||||
|
Read a complete message (with all optional values).
|
||||||
|
"""
|
||||||
|
message = Message.from_file(self.file_path)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertEqual(message.answer, 'This is an answer.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
|
||||||
|
self.assertEqual(message.ai, 'ChatGPT')
|
||||||
|
self.assertEqual(message.model, 'gpt-3.5-turbo')
|
||||||
|
self.assertEqual(message.file_path, self.file_path)
|
||||||
|
|
||||||
|
def test_from_file_yaml_min(self) -> None:
|
||||||
|
"""
|
||||||
|
Read a message with only the required values.
|
||||||
|
"""
|
||||||
|
message = Message.from_file(self.file_path_min)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), set())
|
||||||
|
self.assertEqual(message.file_path, self.file_path_min)
|
||||||
|
self.assertIsNone(message.answer)
|
||||||
|
|
||||||
|
def test_from_file_not_exists(self) -> None:
|
||||||
|
file_not_exists = pathlib.Path("example.yaml")
|
||||||
|
with self.assertRaises(MessageError) as cm:
|
||||||
|
Message.from_file(file_not_exists)
|
||||||
|
self.assertEqual(str(cm.exception), f"Message file '{file_not_exists}' does not exist")
|
||||||
|
|
||||||
|
def test_from_file_yaml_tags_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(tags_or={Tag('tag1')}))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertEqual(message.answer, 'This is an answer.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
|
||||||
|
self.assertEqual(message.file_path, self.file_path)
|
||||||
|
|
||||||
|
def test_from_file_yaml_tags_dont_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(tags_or={Tag('tag3')}))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_no_tags_dont_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(tags_or={Tag('tag1')}))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_no_tags_match_tags_not(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(tags_not={Tag('tag1')}))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
if message: # mypy bug
|
||||||
|
self.assertEqual(message.question, 'This is a question.')
|
||||||
|
self.assertSetEqual(cast(set[Tag], message.tags), set())
|
||||||
|
self.assertEqual(message.file_path, self.file_path_min)
|
||||||
|
|
||||||
|
def test_from_file_yaml_question_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(question_contains='question'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_contains='answer'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_available(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_state='available'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_missing(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(answer_state='missing'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_question_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(question_contains='answer'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_contains='question'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_not_exists(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(answer_contains='answer'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_not_available(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(answer_state='available'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_answer_not_missing(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(answer_state='missing'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_ai_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(ai='ChatGPT'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_ai_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(ai='Foo'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_model_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(model='gpt-3.5-turbo'))
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
self.assertIsInstance(message, Message)
|
||||||
|
|
||||||
|
def test_from_file_yaml_model_doesnt_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path,
|
||||||
|
MessageFilter(model='Bar'))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
|
||||||
|
class TagsFromFileTestCase(CmmTestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
|
||||||
|
self.file_path_txt = pathlib.Path(self.file_txt.name)
|
||||||
|
with open(self.file_path_txt, "w") as fd:
|
||||||
|
fd.write(f"""{TagLine.prefix} tag1 tag2
|
||||||
|
{Question.txt_header}
|
||||||
|
This is a question.
|
||||||
|
{Answer.txt_header}
|
||||||
|
This is an answer.
|
||||||
|
""")
|
||||||
|
self.file_yaml = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
|
||||||
|
self.file_path_yaml = pathlib.Path(self.file_yaml.name)
|
||||||
|
with open(self.file_path_yaml, "w") as fd:
|
||||||
|
fd.write(f"""
|
||||||
|
{Question.yaml_key}: |-
|
||||||
|
This is a question.
|
||||||
|
{Answer.yaml_key}: |-
|
||||||
|
This is an answer.
|
||||||
|
{Message.tags_yaml_key}:
|
||||||
|
- tag1
|
||||||
|
- tag2
|
||||||
|
""")
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.file_txt.close()
|
||||||
|
self.file_path_txt.unlink()
|
||||||
|
self.file_yaml.close()
|
||||||
|
self.file_path_yaml.unlink()
|
||||||
|
|
||||||
|
def test_tags_from_file_txt(self) -> None:
|
||||||
|
tags = Message.tags_from_file(self.file_path_txt)
|
||||||
|
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||||
|
|
||||||
|
def test_tags_from_file_yaml(self) -> None:
|
||||||
|
tags = Message.tags_from_file(self.file_path_yaml)
|
||||||
|
self.assertSetEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||||
124
tests/test_tags.py
Normal file
124
tests/test_tags.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
from .test_main import CmmTestCase
|
||||||
|
from chatmastermind.tags import Tag, TagLine, TagError
|
||||||
|
|
||||||
|
|
||||||
|
class TestTag(CmmTestCase):
|
||||||
|
def test_valid_tag(self) -> None:
|
||||||
|
tag = Tag('mytag')
|
||||||
|
self.assertEqual(tag, 'mytag')
|
||||||
|
|
||||||
|
def test_invalid_tag(self) -> None:
|
||||||
|
with self.assertRaises(TagError):
|
||||||
|
Tag('tag with space')
|
||||||
|
|
||||||
|
def test_default_separator(self) -> None:
|
||||||
|
self.assertEqual(Tag.default_separator, ' ')
|
||||||
|
|
||||||
|
def test_alternative_separators(self) -> None:
|
||||||
|
self.assertEqual(Tag.alternative_separators, [','])
|
||||||
|
|
||||||
|
|
||||||
|
class TestTagLine(CmmTestCase):
|
||||||
|
def test_valid_tagline(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1 tag2')
|
||||||
|
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
||||||
|
|
||||||
|
def test_valid_tagline_with_newline(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1\n tag2')
|
||||||
|
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
||||||
|
|
||||||
|
def test_invalid_tagline(self) -> None:
|
||||||
|
with self.assertRaises(TagError):
|
||||||
|
TagLine('tag1 tag2')
|
||||||
|
|
||||||
|
def test_prefix(self) -> None:
|
||||||
|
self.assertEqual(TagLine.prefix, 'TAGS:')
|
||||||
|
|
||||||
|
def test_from_set(self) -> None:
|
||||||
|
tags = {Tag('tag1'), Tag('tag2')}
|
||||||
|
tagline = TagLine.from_set(tags)
|
||||||
|
self.assertEqual(tagline, 'TAGS: tag1 tag2')
|
||||||
|
|
||||||
|
def test_tags(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1 tag2')
|
||||||
|
tags = tagline.tags()
|
||||||
|
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||||
|
|
||||||
|
def test_tags_with_newline(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1\n tag2')
|
||||||
|
tags = tagline.tags()
|
||||||
|
self.assertEqual(tags, {Tag('tag1'), Tag('tag2')})
|
||||||
|
|
||||||
|
def test_merge(self) -> None:
|
||||||
|
tagline1 = TagLine('TAGS: tag1 tag2')
|
||||||
|
tagline2 = TagLine('TAGS: tag2 tag3')
|
||||||
|
merged_tagline = tagline1.merge({tagline2})
|
||||||
|
self.assertEqual(merged_tagline, 'TAGS: tag1 tag2 tag3')
|
||||||
|
|
||||||
|
def test_delete_tags(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1 tag2 tag3')
|
||||||
|
new_tagline = tagline.delete_tags({Tag('tag1'), Tag('tag3')})
|
||||||
|
self.assertEqual(new_tagline, 'TAGS: tag2')
|
||||||
|
|
||||||
|
def test_add_tags(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1')
|
||||||
|
new_tagline = tagline.add_tags({Tag('tag2'), Tag('tag3')})
|
||||||
|
self.assertEqual(new_tagline, 'TAGS: tag1 tag2 tag3')
|
||||||
|
|
||||||
|
def test_rename_tags(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: old1 old2')
|
||||||
|
new_tagline = tagline.rename_tags({(Tag('old1'), Tag('new1')), (Tag('old2'), Tag('new2'))})
|
||||||
|
self.assertEqual(new_tagline, 'TAGS: new1 new2')
|
||||||
|
|
||||||
|
def test_match_tags(self) -> None:
|
||||||
|
tagline = TagLine('TAGS: tag1 tag2 tag3')
|
||||||
|
|
||||||
|
# Test case 1: Match any tag in 'tags_or'
|
||||||
|
tags_or = {Tag('tag1'), Tag('tag4')}
|
||||||
|
tags_and: set[Tag] = set()
|
||||||
|
tags_not: set[Tag] = set()
|
||||||
|
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 2: Match all tags in 'tags_and'
|
||||||
|
tags_or = set()
|
||||||
|
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3')}
|
||||||
|
tags_not = set()
|
||||||
|
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 3: Match any tag in 'tags_or' and match all tags in 'tags_and'
|
||||||
|
tags_or = {Tag('tag1'), Tag('tag4')}
|
||||||
|
tags_and = {Tag('tag1'), Tag('tag2')}
|
||||||
|
tags_not = set()
|
||||||
|
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 4: Match any tag in 'tags_or', match all tags in 'tags_and', and exclude tags in 'tags_not'
|
||||||
|
tags_or = {Tag('tag1'), Tag('tag4')}
|
||||||
|
tags_and = {Tag('tag1'), Tag('tag2')}
|
||||||
|
tags_not = {Tag('tag5')}
|
||||||
|
self.assertTrue(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 5: No matching tags in 'tags_or'
|
||||||
|
tags_or = {Tag('tag4'), Tag('tag5')}
|
||||||
|
tags_and = set()
|
||||||
|
tags_not = set()
|
||||||
|
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 6: Not all tags in 'tags_and' are present
|
||||||
|
tags_or = set()
|
||||||
|
tags_and = {Tag('tag1'), Tag('tag2'), Tag('tag3'), Tag('tag4')}
|
||||||
|
tags_not = set()
|
||||||
|
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 7: Some tags in 'tags_not' are present
|
||||||
|
tags_or = {Tag('tag1')}
|
||||||
|
tags_and = set()
|
||||||
|
tags_not = {Tag('tag2')}
|
||||||
|
self.assertFalse(tagline.match_tags(tags_or, tags_and, tags_not))
|
||||||
|
|
||||||
|
# Test case 8: 'tags_or' and 'tags_and' are None, match all tags
|
||||||
|
tags_not = set()
|
||||||
|
self.assertTrue(tagline.match_tags(None, None, tags_not))
|
||||||
|
|
||||||
|
# Test case 9: 'tags_or' and 'tags_and' are None, match all tags except excluded tags
|
||||||
|
tags_not = {Tag('tag2')}
|
||||||
|
self.assertFalse(tagline.match_tags(None, None, tags_not))
|
||||||
Loading…
x
Reference in New Issue
Block a user