Compare commits
4 Commits
7f75afcbc5
...
c143c001f9
| Author | SHA1 | Date | |
|---|---|---|---|
| c143c001f9 | |||
| 59b851650a | |||
| 6f71a2ff69 | |||
| eca44b14cb |
@ -52,9 +52,9 @@ def question_cmd(args: argparse.Namespace, config: Config) -> None:
|
|||||||
"""
|
"""
|
||||||
Handler for the 'question' command.
|
Handler for the 'question' command.
|
||||||
"""
|
"""
|
||||||
mfilter = MessageFilter(tags_or=args.or_tags,
|
mfilter = MessageFilter(tags_or=args.or_tags if args.or_tags is not None else set(),
|
||||||
tags_and=args.and_tags,
|
tags_and=args.and_tags if args.and_tags is not None else set(),
|
||||||
tags_not=args.exclude_tags)
|
tags_not=args.exclude_tags if args.exclude_tags is not None else set())
|
||||||
chat = ChatDB.from_dir(cache_path=Path('.'),
|
chat = ChatDB.from_dir(cache_path=Path('.'),
|
||||||
db_path=Path(config.db),
|
db_path=Path(config.db),
|
||||||
mfilter=mfilter)
|
mfilter=mfilter)
|
||||||
|
|||||||
@ -17,6 +17,18 @@ class ConfigError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
|
||||||
|
"""
|
||||||
|
Changes the YAML dump style to multiline syntax for multiline strings.
|
||||||
|
"""
|
||||||
|
if len(data.splitlines()) > 1:
|
||||||
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
||||||
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
||||||
|
|
||||||
|
|
||||||
|
yaml.add_representer(str, str_presenter)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AIConfig:
|
class AIConfig:
|
||||||
"""
|
"""
|
||||||
@ -48,13 +60,13 @@ class OpenAIConfig(AIConfig):
|
|||||||
# a default configuration
|
# a default configuration
|
||||||
ID: str = 'default'
|
ID: str = 'default'
|
||||||
api_key: str = '0123456789'
|
api_key: str = '0123456789'
|
||||||
system: str = 'You are an assistant'
|
|
||||||
model: str = 'gpt-3.5-turbo-16k'
|
model: str = 'gpt-3.5-turbo-16k'
|
||||||
temperature: float = 1.0
|
temperature: float = 1.0
|
||||||
max_tokens: int = 4000
|
max_tokens: int = 4000
|
||||||
top_p: float = 1.0
|
top_p: float = 1.0
|
||||||
frequency_penalty: float = 0.0
|
frequency_penalty: float = 0.0
|
||||||
presence_penalty: float = 0.0
|
presence_penalty: float = 0.0
|
||||||
|
system: str = 'You are an assistant'
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls: Type[OpenAIConfigInst], source: dict[str, Any]) -> OpenAIConfigInst:
|
def from_dict(cls: Type[OpenAIConfigInst], source: dict[str, Any]) -> OpenAIConfigInst:
|
||||||
@ -62,14 +74,14 @@ class OpenAIConfig(AIConfig):
|
|||||||
Create OpenAIConfig from a dict.
|
Create OpenAIConfig from a dict.
|
||||||
"""
|
"""
|
||||||
res = cls(
|
res = cls(
|
||||||
system=str(source['system']),
|
|
||||||
api_key=str(source['api_key']),
|
api_key=str(source['api_key']),
|
||||||
model=str(source['model']),
|
model=str(source['model']),
|
||||||
max_tokens=int(source['max_tokens']),
|
max_tokens=int(source['max_tokens']),
|
||||||
temperature=float(source['temperature']),
|
temperature=float(source['temperature']),
|
||||||
top_p=float(source['top_p']),
|
top_p=float(source['top_p']),
|
||||||
frequency_penalty=float(source['frequency_penalty']),
|
frequency_penalty=float(source['frequency_penalty']),
|
||||||
presence_penalty=float(source['presence_penalty'])
|
presence_penalty=float(source['presence_penalty']),
|
||||||
|
system=str(source['system'])
|
||||||
)
|
)
|
||||||
# overwrite default ID if provided
|
# overwrite default ID if provided
|
||||||
if 'ID' in source:
|
if 'ID' in source:
|
||||||
@ -148,6 +160,8 @@ class Config:
|
|||||||
|
|
||||||
def as_dict(self) -> dict[str, Any]:
|
def as_dict(self) -> dict[str, Any]:
|
||||||
res = asdict(self)
|
res = asdict(self)
|
||||||
|
# add the AI name manually (as first element)
|
||||||
|
# (not done by 'asdict' because it's a class variable)
|
||||||
for ID, conf in res['ais'].items():
|
for ID, conf in res['ais'].items():
|
||||||
conf.update({'name': self.ais[ID].name})
|
res['ais'][ID] = {**{'name': self.ais[ID].name}, **conf}
|
||||||
return res
|
return res
|
||||||
|
|||||||
@ -3,6 +3,8 @@ Module implementing message related functions and classes.
|
|||||||
"""
|
"""
|
||||||
import pathlib
|
import pathlib
|
||||||
import yaml
|
import yaml
|
||||||
|
import tempfile
|
||||||
|
import shutil
|
||||||
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal, Iterable
|
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal, Iterable
|
||||||
from dataclasses import dataclass, asdict, field
|
from dataclasses import dataclass, asdict, field
|
||||||
from .tags import Tag, TagLine, TagError, match_tags, rename_tags
|
from .tags import Tag, TagLine, TagError, match_tags, rename_tags
|
||||||
@ -312,7 +314,7 @@ class Message():
|
|||||||
mfilter.tags_not if mfilter else None)
|
mfilter.tags_not if mfilter else None)
|
||||||
else:
|
else:
|
||||||
message = cls.__from_file_yaml(file_path)
|
message = cls.__from_file_yaml(file_path)
|
||||||
if message and (not mfilter or (mfilter and message.match(mfilter))):
|
if message and (mfilter is None or message.match(mfilter)):
|
||||||
return message
|
return message
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
@ -445,16 +447,18 @@ class Message():
|
|||||||
* Answer.txt_header
|
* Answer.txt_header
|
||||||
* Answer
|
* Answer
|
||||||
"""
|
"""
|
||||||
with open(file_path, "w") as fd:
|
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
|
||||||
|
temp_file_path = pathlib.Path(temp_fd.name)
|
||||||
if self.tags:
|
if self.tags:
|
||||||
fd.write(f'{TagLine.from_set(self.tags)}\n')
|
temp_fd.write(f'{TagLine.from_set(self.tags)}\n')
|
||||||
if self.ai:
|
if self.ai:
|
||||||
fd.write(f'{AILine.from_ai(self.ai)}\n')
|
temp_fd.write(f'{AILine.from_ai(self.ai)}\n')
|
||||||
if self.model:
|
if self.model:
|
||||||
fd.write(f'{ModelLine.from_model(self.model)}\n')
|
temp_fd.write(f'{ModelLine.from_model(self.model)}\n')
|
||||||
fd.write(f'{Question.txt_header}\n{self.question}\n')
|
temp_fd.write(f'{Question.txt_header}\n{self.question}\n')
|
||||||
if self.answer:
|
if self.answer:
|
||||||
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
|
temp_fd.write(f'{Answer.txt_header}\n{self.answer}\n')
|
||||||
|
shutil.move(temp_file_path, file_path)
|
||||||
|
|
||||||
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
|
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
|
||||||
"""
|
"""
|
||||||
@ -466,7 +470,8 @@ class Message():
|
|||||||
* Message.ai_yaml_key: str [Optional]
|
* Message.ai_yaml_key: str [Optional]
|
||||||
* Message.model_yaml_key: str [Optional]
|
* Message.model_yaml_key: str [Optional]
|
||||||
"""
|
"""
|
||||||
with open(file_path, "w") as fd:
|
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
|
||||||
|
temp_file_path = pathlib.Path(temp_fd.name)
|
||||||
data: YamlDict = {Question.yaml_key: str(self.question)}
|
data: YamlDict = {Question.yaml_key: str(self.question)}
|
||||||
if self.answer:
|
if self.answer:
|
||||||
data[Answer.yaml_key] = str(self.answer)
|
data[Answer.yaml_key] = str(self.answer)
|
||||||
@ -476,7 +481,8 @@ class Message():
|
|||||||
data[self.model_yaml_key] = self.model
|
data[self.model_yaml_key] = self.model
|
||||||
if self.tags:
|
if self.tags:
|
||||||
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
|
||||||
yaml.dump(data, fd, sort_keys=False)
|
yaml.dump(data, temp_fd, sort_keys=False)
|
||||||
|
shutil.move(temp_file_path, file_path)
|
||||||
|
|
||||||
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
|
||||||
"""
|
"""
|
||||||
@ -508,7 +514,7 @@ class Message():
|
|||||||
Return True if all attributes match, else False.
|
Return True if all attributes match, else False.
|
||||||
"""
|
"""
|
||||||
mytags = self.tags or set()
|
mytags = self.tags or set()
|
||||||
if (((mfilter.tags_or or mfilter.tags_and or mfilter.tags_not)
|
if (((mfilter.tags_or is not None or mfilter.tags_and is not None or mfilter.tags_not is not None)
|
||||||
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
|
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
|
||||||
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
|
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
|
||||||
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
|
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
|
||||||
|
|||||||
@ -202,7 +202,25 @@ class TestChatDB(unittest.TestCase):
|
|||||||
self.assertEqual(chat_db.messages[1].file_path,
|
self.assertEqual(chat_db.messages[1].file_path,
|
||||||
pathlib.Path(self.db_path.name, '0003.txt'))
|
pathlib.Path(self.db_path.name, '0003.txt'))
|
||||||
|
|
||||||
def test_chat_db_filter(self) -> None:
|
def test_chat_db_from_dir_filter_tags(self) -> None:
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name),
|
||||||
|
mfilter=MessageFilter(tags_or={Tag('tag1')}))
|
||||||
|
self.assertEqual(len(chat_db.messages), 1)
|
||||||
|
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
|
||||||
|
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
|
||||||
|
self.assertEqual(chat_db.messages[0].file_path,
|
||||||
|
pathlib.Path(self.db_path.name, '0001.txt'))
|
||||||
|
|
||||||
|
def test_chat_db_from_dir_filter_tags_empty(self) -> None:
|
||||||
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
|
pathlib.Path(self.db_path.name),
|
||||||
|
mfilter=MessageFilter(tags_or=set(),
|
||||||
|
tags_and=set(),
|
||||||
|
tags_not=set()))
|
||||||
|
self.assertEqual(len(chat_db.messages), 0)
|
||||||
|
|
||||||
|
def test_chat_db_from_dir_filter_answer(self) -> None:
|
||||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||||
pathlib.Path(self.db_path.name),
|
pathlib.Path(self.db_path.name),
|
||||||
mfilter=MessageFilter(answer_contains='Answer 2'))
|
mfilter=MessageFilter(answer_contains='Answer 2'))
|
||||||
@ -213,7 +231,7 @@ class TestChatDB(unittest.TestCase):
|
|||||||
pathlib.Path(self.db_path.name, '0002.yaml'))
|
pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||||
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
|
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
|
||||||
|
|
||||||
def test_chat_db_from_messges(self) -> None:
|
def test_chat_db_from_messages(self) -> None:
|
||||||
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
|
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
|
||||||
pathlib.Path(self.db_path.name),
|
pathlib.Path(self.db_path.name),
|
||||||
messages=[self.message1, self.message2,
|
messages=[self.message1, self.message2,
|
||||||
|
|||||||
@ -300,6 +300,12 @@ This is a question.
|
|||||||
MessageFilter(tags_or={Tag('tag1')}))
|
MessageFilter(tags_or={Tag('tag1')}))
|
||||||
self.assertIsNone(message)
|
self.assertIsNone(message)
|
||||||
|
|
||||||
|
def test_from_file_txt_empty_tags_dont_match(self) -> None:
|
||||||
|
message = Message.from_file(self.file_path_min,
|
||||||
|
MessageFilter(tags_or=set(),
|
||||||
|
tags_and=set()))
|
||||||
|
self.assertIsNone(message)
|
||||||
|
|
||||||
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
|
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
|
||||||
message = Message.from_file(self.file_path_min,
|
message = Message.from_file(self.file_path_min,
|
||||||
MessageFilter(tags_not={Tag('tag1')}))
|
MessageFilter(tags_not={Tag('tag1')}))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user