Compare commits
4 Commits
0d6ed2ba63
...
33a321b9ff
| Author | SHA1 | Date | |
|---|---|---|---|
| 33a321b9ff | |||
| dd37d3fb12 | |||
| 1c35e85178 | |||
| 2aa397d609 |
@ -2,10 +2,9 @@
|
|||||||
Module implementing message related functions and classes.
|
Module implementing message related functions and classes.
|
||||||
"""
|
"""
|
||||||
import pathlib
|
import pathlib
|
||||||
import yaml
|
from typing import Type, TypeVar, Optional, Any, Final
|
||||||
from typing import Type, TypeVar, ClassVar, Optional, Any
|
from dataclasses import dataclass, asdict, field
|
||||||
from dataclasses import dataclass, asdict
|
from .tags import Tag
|
||||||
from .tags import Tag, TagLine
|
|
||||||
|
|
||||||
QuestionInst = TypeVar('QuestionInst', bound='Question')
|
QuestionInst = TypeVar('QuestionInst', bound='Question')
|
||||||
AnswerInst = TypeVar('AnswerInst', bound='Answer')
|
AnswerInst = TypeVar('AnswerInst', bound='Answer')
|
||||||
@ -46,7 +45,7 @@ class Question(str):
|
|||||||
"""
|
"""
|
||||||
A single question with a defined header.
|
A single question with a defined header.
|
||||||
"""
|
"""
|
||||||
header: ClassVar[str] = '=== QUESTION ==='
|
header: Final[str] = '=== QUESTION ==='
|
||||||
|
|
||||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||||
"""
|
"""
|
||||||
@ -57,16 +56,6 @@ class Question(str):
|
|||||||
instance = super().__new__(cls, string)
|
instance = super().__new__(cls, string)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
|
|
||||||
"""
|
|
||||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
|
||||||
"""
|
|
||||||
if any(cls.header in string for string in strings):
|
|
||||||
raise MessageError(f"Question contains the header '{cls.header}'")
|
|
||||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
|
||||||
return instance
|
|
||||||
|
|
||||||
def source_code(self, include_delims: bool = False) -> list[str]:
|
def source_code(self, include_delims: bool = False) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Extract and return all source code sections.
|
Extract and return all source code sections.
|
||||||
@ -78,7 +67,7 @@ class Answer(str):
|
|||||||
"""
|
"""
|
||||||
A single answer with a defined header.
|
A single answer with a defined header.
|
||||||
"""
|
"""
|
||||||
header: ClassVar[str] = '=== ANSWER ==='
|
header: Final[str] = '=== ANSWER ==='
|
||||||
|
|
||||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||||
"""
|
"""
|
||||||
@ -89,16 +78,6 @@ class Answer(str):
|
|||||||
instance = super().__new__(cls, string)
|
instance = super().__new__(cls, string)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
|
||||||
"""
|
|
||||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
|
||||||
"""
|
|
||||||
if any(cls.header in string for string in strings):
|
|
||||||
raise MessageError(f"Question contains the header '{cls.header}'")
|
|
||||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
|
||||||
return instance
|
|
||||||
|
|
||||||
def source_code(self, include_delims: bool = False) -> list[str]:
|
def source_code(self, include_delims: bool = False) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Extract and return all source code sections.
|
Extract and return all source code sections.
|
||||||
@ -116,57 +95,24 @@ class Message():
|
|||||||
answer: Optional[Answer]
|
answer: Optional[Answer]
|
||||||
tags: Optional[set[Tag]]
|
tags: Optional[set[Tag]]
|
||||||
file_path: Optional[pathlib.Path]
|
file_path: Optional[pathlib.Path]
|
||||||
file_suffixes: ClassVar[list[str]] = ['.txt', '.yaml']
|
file_suffixes: Final[list[str]] = field(default_factory=lambda: ['.txt', '.yaml'])
|
||||||
|
|
||||||
@classmethod
|
# @classmethod
|
||||||
def from_dict(cls: Type[MessageInst], data: dict[str, Any]) -> MessageInst:
|
# def from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst:
|
||||||
"""
|
# """
|
||||||
Create a Message fromt he given dict.
|
# Create a Message from the given file. Expects the following file structure:
|
||||||
"""
|
# * TagLine (from 'self.tags')
|
||||||
return cls(question=data['question'],
|
# * Question.Header
|
||||||
answer=data.get('answer', None),
|
# * Question
|
||||||
tags=set(data.get('tags', [])),
|
# * Answer.Header
|
||||||
file_path=data.get('file_path', None))
|
# """
|
||||||
|
# if file_path:
|
||||||
@classmethod
|
# self.file_path = file_path
|
||||||
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
|
# if not self.file_path:
|
||||||
"""
|
# raise MessageError("Got no valid path to read message")
|
||||||
Return only the tags from the given Message file.
|
# if self.file_path.suffix not in self.file_suffixes:
|
||||||
"""
|
# raise MessageError(f"File type '{self.file_path.suffix}' is not supported")
|
||||||
return set() # FIXME
|
# pass
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst:
|
|
||||||
"""
|
|
||||||
Create a Message from the given file. Expects the following file structure:
|
|
||||||
* TagLine (from 'self.tags')
|
|
||||||
* Question.Header
|
|
||||||
* Question
|
|
||||||
* Answer.Header
|
|
||||||
"""
|
|
||||||
if not file_path.exists():
|
|
||||||
raise MessageError(f"Message file '{file_path}' does not exist")
|
|
||||||
if file_path.suffix not in cls.file_suffixes:
|
|
||||||
raise MessageError(f"File type '{file_path.suffix}' is not supported")
|
|
||||||
|
|
||||||
tags: set[Tag]
|
|
||||||
question: Question
|
|
||||||
answer: Answer
|
|
||||||
if file_path.suffix == '.txt':
|
|
||||||
with open(file_path, "r") as fd:
|
|
||||||
tags = TagLine(fd.readline()).tags()
|
|
||||||
text = fd.read().strip().split('\n')
|
|
||||||
question_idx = text.index(Question.header) + 1
|
|
||||||
answer_idx = text.index(Answer.header)
|
|
||||||
question = Question.from_list(text[question_idx:answer_idx])
|
|
||||||
answer = Answer.from_list(text[answer_idx + 1:])
|
|
||||||
return cls(question, answer, tags, file_path)
|
|
||||||
else: # '.yaml'
|
|
||||||
with open(file_path, "r") as fd:
|
|
||||||
# FIXME: use the actual YAML format
|
|
||||||
data = yaml.load(fd, Loader=yaml.FullLoader)
|
|
||||||
data['file_path'] = file_path
|
|
||||||
return cls.from_dict(data)
|
|
||||||
|
|
||||||
def to_file(self, file_path: Optional[pathlib.Path]) -> None:
|
def to_file(self, file_path: Optional[pathlib.Path]) -> None:
|
||||||
"""
|
"""
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user