Compare commits
4 Commits
7f75afcbc5
...
c143c001f9
| Author | SHA1 | Date | |
|---|---|---|---|
| c143c001f9 | |||
| 59b851650a | |||
| 6f71a2ff69 | |||
| eca44b14cb |
@ -52,9 +52,9 @@ def question_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'question' command.
|
||||
"""
|
||||
mfilter = MessageFilter(tags_or=args.or_tags,
|
||||
tags_and=args.and_tags,
|
||||
tags_not=args.exclude_tags)
|
||||
mfilter = MessageFilter(tags_or=args.or_tags if args.or_tags is not None else set(),
|
||||
tags_and=args.and_tags if args.and_tags is not None else set(),
|
||||
tags_not=args.exclude_tags if args.exclude_tags is not None else set())
|
||||
chat = ChatDB.from_dir(cache_path=Path('.'),
|
||||
db_path=Path(config.db),
|
||||
mfilter=mfilter)
|
||||
|
||||
@ -17,6 +17,18 @@ class ConfigError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
|
||||
"""
|
||||
Changes the YAML dump style to multiline syntax for multiline strings.
|
||||
"""
|
||||
if len(data.splitlines()) > 1:
|
||||
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
||||
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
||||
|
||||
|
||||
yaml.add_representer(str, str_presenter)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AIConfig:
|
||||
"""
|
||||
@ -48,13 +60,13 @@ class OpenAIConfig(AIConfig):
|
||||
# a default configuration
|
||||
ID: str = 'default'
|
||||
api_key: str = '0123456789'
|
||||
system: str = 'You are an assistant'
|
||||
model: str = 'gpt-3.5-turbo-16k'
|
||||
temperature: float = 1.0
|
||||
max_tokens: int = 4000
|
||||
top_p: float = 1.0
|
||||
frequency_penalty: float = 0.0
|
||||
presence_penalty: float = 0.0
|
||||
system: str = 'You are an assistant'
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls: Type[OpenAIConfigInst], source: dict[str, Any]) -> OpenAIConfigInst:
|
||||
@ -62,14 +74,14 @@ class OpenAIConfig(AIConfig):
|
||||
Create OpenAIConfig from a dict.
|
||||
"""
|
||||
res = cls(
|
||||
system=str(source['system']),
|
||||
api_key=str(source['api_key']),
|
||||
model=str(source['model']),
|
||||
max_tokens=int(source['max_tokens']),
|
||||
temperature=float(source['temperature']),
|
||||
top_p=float(source['top_p']),
|
||||
frequency_penalty=float(source['frequency_penalty']),
|
||||
presence_penalty=float(source['presence_penalty'])
|
||||
presence_penalty=float(source['presence_penalty']),
|
||||
system=str(source['system'])
|
||||
)
|
||||
# overwrite default ID if provided
|
||||
if 'ID' in source:
|
||||
@ -148,6 +160,8 @@ class Config:
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
res = asdict(self)
|
||||
# add the AI name manually (as first element)
|
||||
# (not done by 'asdict' because it's a class variable)
|
||||
for ID, conf in res['ais'].items():
|
||||
conf.update({'name': self.ais[ID].name})
|
||||
res['ais'][ID] = {**{'name': self.ais[ID].name}, **conf}
|
||||
return res
|
||||
|
||||
@ -3,6 +3,8 @@ Module implementing message related functions and classes.
|
||||
"""
|
||||
import pathlib
|
||||
import yaml
|
||||
import tempfile
|
||||
import shutil
|
||||
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal, Iterable
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from .tags import Tag, TagLine, TagError, match_tags, rename_tags
|
||||
@ -312,7 +314,7 @@ class Message():
|
||||
mfilter.tags_not if mfilter else None)
|
||||
else:
|
||||
message = cls.__from_file_yaml(file_path)
|
||||
if message and (not mfilter or (mfilter and message.match(mfilter))):
|
||||
if message and (mfilter is None or message.match(mfilter)):
|
||||
return message
|
||||
else:
|
||||
return None
|
||||
@ -445,16 +447,18 @@ class Message():
|
||||
* Answer.txt_header
|
||||
* Answer
|
||||
"""
|
||||
with open(file_path, "w") as fd:
|
||||
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
|
||||
temp_file_path = pathlib.Path(temp_fd.name)
|
||||
if self.tags:
|
||||
fd.write(f'{TagLine.from_set(self.tags)}\n')
|
||||
temp_fd.write(f'{TagLine.from_set(self.tags)}\n')
|
||||
if self.ai:
|
||||
fd.write(f'{AILine.from_ai(self.ai)}\n')
|
||||
temp_fd.write(f'{AILine.from_ai(self.ai)}\n')
|
||||
if self.model:
|
||||
fd.write(f'{ModelLine.from_model(self.model)}\n')
|
||||
fd.write(f'{Question.txt_header}\n{self.question}\n')
|
||||
temp_fd.write(f'{ModelLine.from_model(self.model)}\n')
|
||||
temp_fd.write(f'{Question.txt_header}\n{self.question}\n')
|
||||
if self.answer:
|
||||
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
|
||||
temp_fd.write(f'{Answer.txt_header}\n{self.answer}\n')
|
||||
shutil.move(temp_file_path, file_path)
|
||||
|
||||
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
|
||||
"""
|
||||
@ -466,7 +470,8 @@ class Message():
|
||||
* Message.ai_yaml_key: str [Optional]
|
||||
* Message.model_yaml_key: str [Optional]
|
||||
"""
|
||||
with open(file_path, "w") as fd:
|
||||
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
|
||||
temp_file_path = pathlib.Path(temp_fd.name)
|
||||
data: YamlDict = {Question.yaml_key: str(self.question)}
|
||||
if self.answer:
|
||||
data[Answer.yaml_key] = str(self.answer)
|
||||
@ -476,7 +481,8 @@ class Message():
|
||||
data[self.model_yaml_key] = self.model
|
||||
if self.tags:
|
||||
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
||||
yaml.dump(data, fd, sort_keys=False)
|
||||
yaml.dump(data, temp_fd, sort_keys=False)
|
||||
shutil.move(temp_file_path, file_path)
|
||||
|
||||
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||
"""
|
||||
@ -508,7 +514,7 @@ class Message():
|
||||
Return True if all attributes match, else False.
|
||||
"""
|
||||
mytags = self.tags or set()
|
||||
if (((mfilter.tags_or or mfilter.tags_and or mfilter.tags_not)
|
||||
if (((mfilter.tags_or is not None or mfilter.tags_and is not None or mfilter.tags_not is not None)
|
||||
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
|
||||
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
|
||||
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
|
||||
|
||||
@ -202,7 +202,25 @@ class TestChatDB(unittest.TestCase):
|
||||
self.assertEqual(chat_db.messages[1].file_path,
|
||||
pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
|
||||
def test_chat_db_filter(self) -> None:
|
||||
def test_chat_db_from_dir_filter_tags(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
mfilter=MessageFilter(tags_or={Tag('tag1')}))
|
||||
self.assertEqual(len(chat_db.messages), 1)
|
||||
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||
self.assertEqual(chat_db.messages[0].file_path,
|
||||
pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
|
||||
def test_chat_db_from_dir_filter_tags_empty(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
mfilter=MessageFilter(tags_or=set(),
|
||||
tags_and=set(),
|
||||
tags_not=set()))
|
||||
self.assertEqual(len(chat_db.messages), 0)
|
||||
|
||||
def test_chat_db_from_dir_filter_answer(self) -> None:
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
mfilter=MessageFilter(answer_contains='Answer 2'))
|
||||
@ -213,7 +231,7 @@ class TestChatDB(unittest.TestCase):
|
||||
pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
|
||||
|
||||
def test_chat_db_from_messges(self) -> None:
|
||||
def test_chat_db_from_messages(self) -> None:
|
||||
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name),
|
||||
messages=[self.message1, self.message2,
|
||||
|
||||
@ -300,6 +300,12 @@ This is a question.
|
||||
MessageFilter(tags_or={Tag('tag1')}))
|
||||
self.assertIsNone(message)
|
||||
|
||||
def test_from_file_txt_empty_tags_dont_match(self) -> None:
|
||||
message = Message.from_file(self.file_path_min,
|
||||
MessageFilter(tags_or=set(),
|
||||
tags_and=set()))
|
||||
self.assertIsNone(message)
|
||||
|
||||
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
|
||||
message = Message.from_file(self.file_path_min,
|
||||
MessageFilter(tags_not={Tag('tag1')}))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user