added new module 'message.py'

This commit is contained in:
juk0de 2023-08-18 16:07:50 +02:00
parent 604e5ccf73
commit 99885ba1e1

233
chatmastermind/message.py Normal file
View File

@ -0,0 +1,233 @@
"""
Module implementing message related functions and classes.
"""
import pathlib
import yaml
from typing import Type, TypeVar, ClassVar, Optional, Any, Union
from dataclasses import dataclass, asdict
from .tags import Tag, TagLine
QuestionInst = TypeVar('QuestionInst', bound='Question')
AnswerInst = TypeVar('AnswerInst', bound='Answer')
MessageInst = TypeVar('MessageInst', bound='Message')
YamlDict = dict[str, Union[QuestionInst, AnswerInst, set[Tag]]]
class MessageError(Exception):
pass
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
"""
Changes the YAML dump style to multiline syntax for multiline strings.
"""
if len(data.splitlines()) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_presenter)
def source_code(text: str, include_delims: bool = False) -> list[str]:
"""
Extract all source code sections from the given text, i. e. all lines
surrounded by lines tarting with '```'. If 'include_delims' is True,
the surrounding lines are included, otherwise they are omitted. The
result list contains every source code section as a single string.
The order in the list represents the order of the sections in the text.
"""
code_sections: list[str] = []
code_lines: list[str] = []
in_code_block = False
for line in text.split('\n'):
if line.strip().startswith('```'):
if include_delims:
code_lines.append(line)
if in_code_block:
code_sections.append('\n'.join(code_lines) + '\n')
code_lines.clear()
in_code_block = not in_code_block
elif in_code_block:
code_lines.append(line)
return code_sections
class Question(str):
"""
A single question with a defined header.
"""
header: ClassVar[str] = '=== QUESTION ==='
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
"""
Make sure the question string does not contain the header.
"""
if cls.header in string:
raise MessageError(f"Question '{string}' contains the header '{cls.header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[QuestionInst], strings: list[str]) -> QuestionInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
class Answer(str):
"""
A single answer with a defined header.
"""
header: ClassVar[str] = '=== ANSWER ==='
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
"""
Make sure the answer string does not contain the header.
"""
if cls.header in string:
raise MessageError(f"Answer '{string}' contains the header '{cls.header}'")
instance = super().__new__(cls, string)
return instance
@classmethod
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
"""
Build Question from a list of strings. Make sure strings do not contain the header.
"""
if any(cls.header in string for string in strings):
raise MessageError(f"Question contains the header '{cls.header}'")
instance = super().__new__(cls, '\n'.join(strings).strip())
return instance
def source_code(self, include_delims: bool = False) -> list[str]:
"""
Extract and return all source code sections.
"""
return source_code(self, include_delims)
@dataclass
class Message():
"""
Single message. Consists of a question and optionally an answer, a set of tags
and a file path.
"""
question: Question
answer: Optional[Answer]
tags: Optional[set[Tag]]
file_path: Optional[pathlib.Path]
file_suffixes: ClassVar[list[str]] = ['.txt', '.yaml']
@classmethod
def from_dict(cls: Type[MessageInst], data: dict[str, Any]) -> MessageInst:
"""
Create a Message from the given dict.
"""
return cls(question=data['question'],
answer=data.get('answer', None),
tags=set(data.get('tags', [])),
file_path=data.get('file_path', None))
@classmethod
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
"""
Return only the tags from the given Message file.
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
tags = TagLine(fd.readline()).tags()
else: # '.yaml'
tags = set() # FIXME
return tags
@classmethod
def from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> MessageInst:
"""
Create a Message from the given file. Expects the following file structures:
For '.txt':
* TagLine
* Question.Header
* Question
* Answer.Header
For '.yaml':
* question: single or multiline string
* answer: single or multiline string
* tags: list of strings
"""
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
tags: set[Tag]
question: Question
answer: Answer
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
tags = TagLine(fd.readline()).tags()
text = fd.read().strip().split('\n')
question_idx = text.index(Question.header) + 1
answer_idx = text.index(Answer.header)
question = Question.from_list(text[question_idx:answer_idx])
answer = Answer.from_list(text[answer_idx + 1:])
return cls(question, answer, tags, file_path)
else: # '.yaml'
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
data['file_path'] = file_path
return cls.from_dict(data)
def to_file(self, file_path: Optional[pathlib.Path]) -> None:
"""
Write Message to the given file. Creates the following file structures:
For '.txt':
* TagLine
* Question.Header
* Question
* Answer.Header
* Answer
For '.yaml':
* question: single or multiline string
* answer: single or multiline string
* tags: list of strings
"""
if file_path:
self.file_path = file_path
if not self.file_path:
raise MessageError("Got no valid path to write message")
if self.file_path.suffix not in self.file_suffixes:
raise MessageError(f"File type '{self.file_path.suffix}' is not supported")
if self.file_path.suffix == '.txt':
with open(self.file_path, "w") as fd:
msg_tags = self.tags or set()
fd.write(f'{TagLine.from_set(msg_tags)}\n')
fd.write(f'{Question.header}\n{self.question}\n')
fd.write(f'{Answer.header}\n{self.answer}\n')
elif self.file_path.suffix == '.yaml':
with open(self.file_path, "w") as fd:
data: YamlDict = {'question': str(self.question)}
if self.answer:
data['answer'] = str(self.answer)
if self.tags:
data['tags'] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, fd)
def as_dict(self) -> dict[str, Any]:
return asdict(self)