Compare commits

..

7 Commits

2 changed files with 86 additions and 12 deletions

View File

@ -5,7 +5,7 @@ import pathlib
import yaml
from typing import Type, TypeVar, ClassVar, Optional, Any, Union
from dataclasses import dataclass, asdict
from .tags import Tag, TagLine
from .tags import Tag, TagLine, TagError, match_tags
QuestionInst = TypeVar('QuestionInst', bound='Question')
AnswerInst = TypeVar('AnswerInst', bound='Answer')
@ -164,7 +164,10 @@ class Message():
return tags
@classmethod
def from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst:
def from_file(cls: Type[MessageInst], file_path: pathlib.Path, # noqa: 11
tags_or: Optional[set[Tag]] = None,
tags_and: Optional[set[Tag]] = None,
tags_not: Optional[set[Tag]] = None) -> Optional[MessageInst]:
"""
Create a Message from the given file. Expects the following file structures:
For '.txt':
@ -176,18 +179,26 @@ class Message():
* Question.yaml_key: single or multiline string
* Answer.yaml_key: single or multiline string
* Message.tags_yaml_key: list of strings
Returns 'None' if the message does not fulfill the tag requirements.
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
tags: set[Tag]
tags: set[Tag] = set()
question: Question
answer: Answer
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
tags = TagLine(fd.readline()).tags()
try:
tags = TagLine(fd.readline()).tags()
except TagError:
fd.seek(0, 0) # allow message files without tags
if tags_or or tags_and or tags_not:
# match with an empty set if the file has no tags
if not match_tags(tags, tags_or, tags_and, tags_not):
return None
text = fd.read().strip().split('\n')
question_idx = text.index(Question.txt_header) + 1
answer_idx = text.index(Answer.txt_header)
@ -197,6 +208,12 @@ class Message():
else: # '.yaml'
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
if tags_or or tags_and or tags_not:
if Message.tags_yaml_key in data:
tags = set([Tag(tag) for tag in data[Message.tags_yaml_key]])
# match with an empty set if the file has no tags
if not match_tags(tags, tags_or, tags_and, tags_not):
return None
data[cls.file_yaml_key] = file_path
return cls.from_dict(data)

View File

@ -180,19 +180,55 @@ class MessageFromFileTxtTestCase(CmmTestCase):
This is a question.
{Answer.txt_header}
This is an answer.
""")
self.file_no_tags = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path_no_tags = pathlib.Path(self.file_no_tags.name)
with open(self.file_path_no_tags, "w") as fd:
fd.write(f"""{Question.txt_header}
This is a question.
{Answer.txt_header}
This is an answer.
""")
def tearDown(self) -> None:
self.file.close()
self.file_no_tags.close()
self.file_path.unlink()
self.file_path_no_tags.unlink()
def test_from_file_txt(self) -> None:
message = Message.from_file(self.file_path)
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
if message:
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_txt_no_tags(self) -> None:
message = Message.from_file(self.file_path_no_tags)
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message:
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_no_tags)
def test_from_file_txt_tags_match(self) -> None:
message = Message.from_file(self.file_path, tags_or={Tag('tag1')})
self.assertIsNotNone(message)
self.assertIsInstance(message, Message)
if message:
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_txt_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path, tags_or={Tag('tag3')})
self.assertIsNone(message)
def test_from_file_not_exists(self) -> None:
file_not_exists = pathlib.Path("example.txt")
@ -215,6 +251,15 @@ class MessageFromFileYamlTestCase(CmmTestCase):
- tag1
- tag2
""")
self.file_no_tags = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path_no_tags = pathlib.Path(self.file_no_tags.name)
with open(self.file_path_no_tags, "w") as fd:
fd.write(f"""
{Question.yaml_key}: |-
This is a question.
{Answer.yaml_key}: |-
This is an answer.
""")
def tearDown(self) -> None:
self.file.close()
@ -223,10 +268,22 @@ class MessageFromFileYamlTestCase(CmmTestCase):
def test_from_file_yaml(self) -> None:
message = Message.from_file(self.file_path)
self.assertIsInstance(message, Message)
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
self.assertIsNotNone(message)
if message:
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), {Tag('tag1'), Tag('tag2')})
self.assertEqual(message.file_path, self.file_path)
def test_from_file_yaml_no_tags(self) -> None:
message = Message.from_file(self.file_path_no_tags)
self.assertIsInstance(message, Message)
self.assertIsNotNone(message)
if message:
self.assertEqual(message.question, 'This is a question.')
self.assertEqual(message.answer, 'This is an answer.')
self.assertSetEqual(cast(set[Tag], message.tags), set())
self.assertEqual(message.file_path, self.file_path_no_tags)
def test_from_file_not_exists(self) -> None:
file_not_exists = pathlib.Path("example.yaml")