Compare commits
26 commits
a9c089f900
...
f611fa5f2a
| Author | SHA1 | Date | |
|---|---|---|---|
|
f611fa5f2a |
|||
|
ee43046e59 |
|||
|
1b8220e6e1 |
|||
|
812fee592e |
|||
|
54a27a2f75 |
|||
|
e156ea4108 |
|||
|
3d93be39d6 |
|||
|
a591fe20e8 |
|||
|
c6d361f649 |
|||
|
4fb1fff521 |
|||
|
af9a6d82f4 |
|||
|
fc81531f3c |
|||
|
67595b3220 |
|||
|
5908a70691 |
|||
|
ff00c70ce9 |
|||
|
654d996a53 |
|||
|
02ca346eae |
|||
|
701d79583d |
|||
|
eeb1573f99 |
|||
|
4eb438bab3 |
|||
| d328e640e0 | |||
|
be20284f78 |
|||
|
d37bffdc51 |
|||
|
2398431a7b |
|||
|
c6ac0d6043 |
|||
|
e1c9ee0c9c |
30 changed files with 1734 additions and 963 deletions
2
.flake8
2
.flake8
|
|
@ -1,2 +0,0 @@
|
|||
[flake8]
|
||||
max-line-length = 89
|
||||
30
.forgejo/workflows/ci.yaml
Normal file
30
.forgejo/workflows/ci.yaml
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
---
|
||||
name: checks
|
||||
on: # yamllint disable-line rule:truthy
|
||||
- 'push'
|
||||
|
||||
jobs:
|
||||
pre-commit:
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: https://code.forgejo.org/actions/checkout@v4
|
||||
- uses: https://code.forgejo.org/actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
- uses: opentofu/setup-opentofu@v1
|
||||
with:
|
||||
tofu_version: 1.7.0
|
||||
- uses: pre-commit/action@v3.0.1
|
||||
|
||||
tests:
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: https://code.forgejo.org/actions/checkout@v4
|
||||
- uses: https://code.forgejo.org/actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dependencies
|
||||
run: pdm install -G testing
|
||||
|
||||
- run: poetry run tests
|
||||
|
|
@ -1,11 +1,8 @@
|
|||
default_language_version:
|
||||
python: python3.10
|
||||
|
||||
files: ^halig|tests$
|
||||
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v4.4.0
|
||||
rev: v4.6.0
|
||||
hooks:
|
||||
- id: trailing-whitespace
|
||||
args: [ --markdown-linebreak-ext=md ]
|
||||
|
|
@ -22,20 +19,14 @@ repos:
|
|||
args: [ --fix=lf ]
|
||||
|
||||
- repo: https://github.com/charliermarsh/ruff-pre-commit
|
||||
rev: v0.0.260
|
||||
rev: v0.5.6
|
||||
hooks:
|
||||
- id: ruff
|
||||
args:
|
||||
- --fix
|
||||
- --exit-non-zero-on-fix
|
||||
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 23.3.0
|
||||
hooks:
|
||||
- id: black
|
||||
pass_filenames: false
|
||||
args:
|
||||
- "halig"
|
||||
- id: ruff-format
|
||||
|
||||
|
||||
- repo: local
|
||||
hooks:
|
||||
|
|
|
|||
23
README.md
23
README.md
|
|
@ -11,13 +11,21 @@
|
|||
it encrypts the new contents into an [age](https://github.com/FiloSottile/age) file that
|
||||
you can store, _relatively_ safe, anywhere.
|
||||
|
||||
|
||||
## Features
|
||||
|
||||
- Simple notebooks management with paths autocompletion
|
||||
- Passphrase-less, fully-encrypted notes, compatible with existing SSH keys
|
||||
- No external `age` binary needed
|
||||
- Almost all `age` advantages, like having multiple keys for encryption and decryption
|
||||
- Remote (HTTP) public keys import: e.g: github.com/\<username\>.keys
|
||||
|
||||
## Install
|
||||
|
||||
```shell
|
||||
pipx install halig # or pip
|
||||
```
|
||||
|
||||
|
||||
## Setup TLDR
|
||||
|
||||
```shell
|
||||
|
|
@ -26,9 +34,13 @@ ssh-keygen -t ed25519
|
|||
mkdir -p "${XDG_CONFIG_HOME:-$HOME/.config}/halig"
|
||||
cat << EOF > "${XDG_CONFIG_HOME:-$HOME/.config}/halig/halig.yml"
|
||||
---
|
||||
notebooks_root_path: /home/$(id -un)/Documents/Notebooks
|
||||
identity_path: /home/$(id -un)/.ssh/id_ed25519
|
||||
recipient_path: /home/$(id -un)/.ssh/id_ed25519.pub
|
||||
notebooks_root_path: ~/Documents/Notebooks
|
||||
identity_paths:
|
||||
- ~/.ssh/id_ed25519
|
||||
recipient_paths:
|
||||
- ~/.ssh/id_ed25519.pub
|
||||
- https://github.com/<username>.keys
|
||||
- https://gitlab.com/<username>.keys
|
||||
EOF
|
||||
```
|
||||
|
||||
|
|
@ -38,4 +50,5 @@ EOF
|
|||
halig edit some_notebook # edit today's note relative to <notebooks_root_path>/some_notebook
|
||||
halig edit some_notebook/foo # edit <notebooks_root_path>/some_notebook/foo.age
|
||||
halig notebooks # list current notebooks
|
||||
```
|
||||
```
|
||||
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
__version__ = "0.1.8"
|
||||
__version__ = "0.5.0"
|
||||
|
|
|
|||
|
|
@ -5,10 +5,12 @@ from halig.settings import Settings
|
|||
|
||||
class ICommand(ABC):
|
||||
@abstractmethod
|
||||
def run(self):
|
||||
... # pragma: no cover
|
||||
def run(self): ... # pragma: no cover
|
||||
|
||||
|
||||
class BaseCommand(ICommand):
|
||||
def __init__(self, settings: Settings, *args, **kwargs):
|
||||
self.settings = settings
|
||||
|
||||
def traverse(self):
|
||||
return self.settings.notebooks_root_path.glob("./**/*.age")
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ class EditCommand(BaseCommand):
|
|||
temp_path = Path(tf.name)
|
||||
|
||||
editor = os.environ.get("EDITOR", "vim")
|
||||
subprocess.call([editor, temp_path])
|
||||
subprocess.call([editor, temp_path]) # noqa: S603
|
||||
|
||||
with temp_path.open("rb") as f:
|
||||
new_contents = f.read()
|
||||
|
|
|
|||
41
halig/commands/import_unencrypted.py
Normal file
41
halig/commands/import_unencrypted.py
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
from halig.commands.base import BaseCommand
|
||||
from halig.encryption import Encryptor
|
||||
from halig.settings import Settings
|
||||
|
||||
|
||||
class ImportCommand(BaseCommand):
|
||||
def __init__(self, settings: Settings, unlink: bool = False):
|
||||
super().__init__(settings)
|
||||
self.encryptor = Encryptor(self.settings)
|
||||
self.unlink = unlink
|
||||
|
||||
def get_importables(self) -> Generator[Path, None, None]:
|
||||
"""Get all markdown files under self.settings.notebooks_root_path
|
||||
|
||||
Yields:
|
||||
a list of Path objects matching any markdown files under
|
||||
the notebooks root path
|
||||
"""
|
||||
return self.settings.notebooks_root_path.glob("**/*.md")
|
||||
|
||||
def run(self):
|
||||
"""For every importable file, encrypt its contents inside
|
||||
`filename.age`. If `self.unlink` is `True`, unlink the original
|
||||
file
|
||||
"""
|
||||
|
||||
for file_path in self.get_importables():
|
||||
with file_path.open("rb") as f:
|
||||
contents = f.read()
|
||||
|
||||
encrypted_contents = self.encryptor.encrypt(contents)
|
||||
encrypted_file_path = file_path.with_suffix("").with_suffix(".age")
|
||||
encrypted_file_path.touch()
|
||||
with encrypted_file_path.open("wb") as f:
|
||||
f.write(encrypted_contents)
|
||||
|
||||
if self.unlink:
|
||||
file_path.unlink()
|
||||
|
|
@ -8,8 +8,15 @@ from halig.commands.base import BaseCommand
|
|||
|
||||
|
||||
class NotebooksCommand(BaseCommand):
|
||||
def __init__(self, max_depth: int | float, *args, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
max_depth: int | float,
|
||||
include_notes: bool = False,
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
self.max_depth = max_depth
|
||||
self.include_notes = include_notes
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def build_tree(self, root_path: Path):
|
||||
|
|
@ -21,9 +28,10 @@ class NotebooksCommand(BaseCommand):
|
|||
break
|
||||
for item in sorted(current_folder_path.iterdir()):
|
||||
if item.is_dir():
|
||||
item_tree_node = current_tree_node.add(item.name)
|
||||
q.append((item, item_tree_node, depth + 1))
|
||||
else:
|
||||
if item.name != ".git":
|
||||
item_tree_node = current_tree_node.add(item.name)
|
||||
q.append((item, item_tree_node, depth + 1))
|
||||
elif self.include_notes and item.name.endswith(".age"):
|
||||
current_tree_node.add(item.name)
|
||||
return tree
|
||||
|
||||
|
|
|
|||
26
halig/commands/reencrypt.py
Normal file
26
halig/commands/reencrypt.py
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
import pyrage
|
||||
from rich import print
|
||||
|
||||
from halig.commands.base import BaseCommand
|
||||
from halig.encryption import Encryptor
|
||||
from halig.settings import Settings
|
||||
|
||||
|
||||
class ReencryptCommand(BaseCommand):
|
||||
def __init__(self, settings: Settings, *args, **kwargs):
|
||||
super().__init__(settings, *args, **kwargs)
|
||||
self.encryptor = Encryptor(settings)
|
||||
|
||||
def run(self):
|
||||
for note_path in self.traverse():
|
||||
try:
|
||||
with note_path.open("rb") as fr:
|
||||
orig_data = self.encryptor.decrypt(fr.read())
|
||||
new_data = self.encryptor.encrypt(orig_data)
|
||||
with note_path.open("wb") as fw:
|
||||
fw.write(new_data)
|
||||
except pyrage.DecryptError: # type: ignore[reportGeneralTypeIssues] # noqa: PERF203
|
||||
print(
|
||||
f"[yellow] Could not reencrypt {note_path} because no matching keys"
|
||||
f" were found, skipping ...",
|
||||
)
|
||||
111
halig/commands/search.py
Normal file
111
halig/commands/search.py
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
import hashlib
|
||||
import re
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
from rich.console import Console
|
||||
|
||||
from halig.commands.base import BaseCommand
|
||||
from halig.encryption import Encryptor
|
||||
from halig.settings import Settings
|
||||
|
||||
|
||||
class SearchCommand(BaseCommand):
|
||||
def __init__(self, term: str, index: bool, settings: Settings, *args, **kwargs):
|
||||
super().__init__(settings, *args, **kwargs)
|
||||
self.encryptor = Encryptor(settings)
|
||||
self.term = term
|
||||
self.index = index
|
||||
self.db_path = self.settings.cache_path / "halig.db"
|
||||
self.db_conn = sqlite3.connect(self.db_path)
|
||||
|
||||
def _create_schema(self):
|
||||
with self.db_conn:
|
||||
self.db_conn.execute(
|
||||
"""CREATE VIRTUAL TABLE IF NOT EXISTS notes
|
||||
USING fts5(name, last_timestamp, hash, filepath, body);""",
|
||||
)
|
||||
|
||||
def _search_note_in_db_by_path(self, path: Path) -> tuple[str | None, str | None]:
|
||||
with self.db_conn:
|
||||
cursor = self.db_conn.execute(
|
||||
"SELECT hash, last_timestamp FROM notes where filepath = ?",
|
||||
(str(path),),
|
||||
)
|
||||
results = cursor.fetchall()
|
||||
if not results:
|
||||
return None, None
|
||||
return results[0] # type: ignore[no-any-return]
|
||||
|
||||
def _index_note(
|
||||
self,
|
||||
updated_at: float,
|
||||
body_hash: str,
|
||||
note_path: Path,
|
||||
body: str,
|
||||
):
|
||||
with self.db_conn:
|
||||
self.db_conn.execute(
|
||||
"""INSERT INTO notes (name, last_timestamp, hash, filepath, body)
|
||||
VALUES (?, ?, ?, ?, ?);""",
|
||||
(note_path.name, updated_at, body_hash, str(note_path), body),
|
||||
)
|
||||
|
||||
def _update_index_note(
|
||||
self,
|
||||
updated_at: float,
|
||||
body_hash: str,
|
||||
note_path: Path,
|
||||
body: str,
|
||||
):
|
||||
with self.db_conn:
|
||||
self.db_conn.execute(
|
||||
"""UPDATE notes SET
|
||||
last_timestamp = (?),
|
||||
hash = (?),
|
||||
body = (?)
|
||||
WHERE
|
||||
filepath = (?);
|
||||
""",
|
||||
(updated_at, body_hash, body, str(note_path)),
|
||||
)
|
||||
|
||||
def _index_notebooks(self):
|
||||
for note_path in self.traverse():
|
||||
updated_at = note_path.stat().st_mtime
|
||||
with note_path.open("rb") as f:
|
||||
body = self.encryptor.decrypt(f.read())
|
||||
body_hash = hashlib.sha512(body).hexdigest()
|
||||
original_hash, last_timestamp = self._search_note_in_db_by_path(note_path)
|
||||
if not original_hash:
|
||||
self._index_note(updated_at, body_hash, note_path, body.decode())
|
||||
continue
|
||||
|
||||
if hash != original_hash:
|
||||
self._update_index_note(updated_at, body_hash, note_path, body.decode())
|
||||
|
||||
def _search(self):
|
||||
with self.db_conn:
|
||||
cursor = self.db_conn.execute(
|
||||
"SELECT filepath, body FROM notes WHERE body MATCH ? ORDER BY rank;",
|
||||
(f"{self.term}*",),
|
||||
)
|
||||
results = cursor.fetchall()
|
||||
console = Console()
|
||||
search_regex = re.compile(re.escape(self.term), re.IGNORECASE)
|
||||
for result in results:
|
||||
filepath, body = result
|
||||
lines = body.split("\n")
|
||||
|
||||
for lineno, line in enumerate(lines, start=1):
|
||||
match = search_regex.search(line)
|
||||
if match:
|
||||
content_line = search_regex.sub("[bold red]\\g<0>[/bold red]", line)
|
||||
console.print(f"{filepath}:{lineno}: {content_line}")
|
||||
|
||||
def run(self):
|
||||
self._create_schema()
|
||||
if self.index:
|
||||
self._index_notebooks()
|
||||
self._search()
|
||||
self.db_conn.close()
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
from pathlib import Path
|
||||
|
||||
from rich import print
|
||||
from rich.console import Console
|
||||
from rich.markdown import Markdown
|
||||
|
||||
|
|
@ -10,11 +11,12 @@ from halig.settings import Settings
|
|||
|
||||
|
||||
class ShowCommand(BaseCommand):
|
||||
def __init__(self, note_path: Path, settings: Settings):
|
||||
def __init__(self, note_path: Path, settings: Settings, plain: bool = False):
|
||||
super().__init__(settings)
|
||||
|
||||
self.note_path = self.settings.notebooks_root_path / note_path
|
||||
self.encryptor = Encryptor(self.settings)
|
||||
self.plain = plain
|
||||
self.console = Console()
|
||||
if self.note_path.is_dir():
|
||||
self.note_path /= f"{utils.now().date()}.age"
|
||||
|
|
@ -33,5 +35,9 @@ class ShowCommand(BaseCommand):
|
|||
return self.encryptor.decrypt(data).decode()
|
||||
|
||||
def run(self): # pragma: no cover
|
||||
md_contents = Markdown(self.decrypt())
|
||||
contents = self.decrypt()
|
||||
if self.plain:
|
||||
print(contents)
|
||||
return
|
||||
md_contents = Markdown(contents)
|
||||
self.console.print(md_contents)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from pyrage import decrypt as rage_decrypt
|
||||
from pyrage import encrypt as rage_encrypt
|
||||
from pyrage import ssh, x25519
|
||||
from pyrage import decrypt as rage_decrypt # pyright: ignore [reportGeneralTypeIssues]
|
||||
from pyrage import encrypt as rage_encrypt # pyright: ignore [reportGeneralTypeIssues]
|
||||
from pyrage import ssh, x25519 # pyright: ignore [reportGeneralTypeIssues]
|
||||
|
||||
from halig.settings import Settings
|
||||
|
||||
|
|
@ -14,27 +14,27 @@ class Encryptor:
|
|||
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings = settings
|
||||
self.identities = []
|
||||
self.recipients = []
|
||||
|
||||
# load identity
|
||||
with settings.identity_path.open("r") as f:
|
||||
identity_contents = f.read()
|
||||
if identity_contents.startswith("-----BEGIN OPENSSH PRIVATE KEY-----"):
|
||||
self.identity = ssh.Identity.from_buffer(identity_contents.encode())
|
||||
else:
|
||||
self.identity = x25519.Identity.from_str(identity_contents)
|
||||
for key in settings.load_private_keys():
|
||||
if key.startswith("-----BEGIN OPENSSH PRIVATE KEY-----"):
|
||||
self.identities.append(ssh.Identity.from_buffer(key.encode()))
|
||||
else:
|
||||
self.identities.append(x25519.Identity.from_str(key))
|
||||
|
||||
# load recipient
|
||||
with settings.recipient_path.open("r") as f:
|
||||
recipient_contents = f.read()
|
||||
if recipient_contents.startswith("ssh-ed25519"):
|
||||
self.recipient = ssh.Recipient.from_str(recipient_contents)
|
||||
else:
|
||||
self.recipient = x25519.Recipient.from_str(recipient_contents)
|
||||
for key in settings.load_public_keys():
|
||||
if key.startswith("ssh-ed25519"):
|
||||
self.recipients.append(ssh.Recipient.from_str(key))
|
||||
else:
|
||||
self.recipients.append(x25519.Recipient.from_str(key))
|
||||
|
||||
def encrypt(self, data: str | bytes) -> bytes:
|
||||
if isinstance(data, str):
|
||||
data = data.encode()
|
||||
return rage_encrypt(data, [self.recipient]) # type: ignore[no-any-return]
|
||||
return rage_encrypt(data, self.recipients) # type: ignore[no-any-return]
|
||||
|
||||
def decrypt(self, data: bytes) -> bytes:
|
||||
return rage_decrypt(data, [self.identity]) # type: ignore[no-any-return]
|
||||
if not len(data):
|
||||
return data
|
||||
return rage_decrypt(data, self.identities) # type: ignore[no-any-return]
|
||||
|
|
|
|||
|
|
@ -3,13 +3,28 @@ COMMANDS_NOTEBOOKS_HELP = "List all notebooks and notes, tree-style"
|
|||
COMMANDS_EDIT_HELP = "Edit or add a note into a notebook"
|
||||
COMMANDS_SHOW_HELP = "Show a note's contents"
|
||||
COMMANDS_VERSION = "Show halig's version"
|
||||
COMMANDS_IMPORT_HELP = "Encrypt existing unencrypted files"
|
||||
COMMANDS_SEARCH_HELP = """Perform a full-text search against all your notes,
|
||||
which are indexed into a SQLite FTS5 database located at `~/.cache/halig/halig.db`
|
||||
"""
|
||||
COMMANDS_REENCRYPT_HELP = """Reencrypt all available notes. This operation is useful
|
||||
when new public keys have been added to the config file and you want the notes
|
||||
to be seen by the new pairing private keys"""
|
||||
|
||||
# OPTIONS
|
||||
OPTION_CONFIG_HELP = "Configuration file. Must be YAML and schema compatible"
|
||||
OPTION_LEVEL_HELP = (
|
||||
"Tree max recursion level; negative numbers indicate a value of infinity"
|
||||
)
|
||||
|
||||
OPTION_UNLINK_HELP = """Setting this will remove the original markdown files;
|
||||
only the newly encrypted .age files will be preserved. Backup your data first
|
||||
"""
|
||||
OPTION_INDEX_HELP = """Index the SQLite database with your notes contents. The first
|
||||
time you perform a search, this flag should be set. Afterwards, you should only index
|
||||
when new notes have been added or older ones have been changed, since it's a slow
|
||||
operation"""
|
||||
OPTION_PLAIN_HELP = "Show the note as plaintext"
|
||||
OPTION_INCLUDE_NODES_HELP = "Include each notebook's notes when listing"
|
||||
# ARGUMENTS
|
||||
ARGUMENT_EDIT_NOTE_HELP = """A valid, settings-relative path.
|
||||
Be aware that valid can also mean implicit notes, that is, pointing to a
|
||||
|
|
|
|||
124
halig/main.py
124
halig/main.py
|
|
@ -2,48 +2,78 @@
|
|||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from rich import print
|
||||
from rich.prompt import Prompt
|
||||
from typer import Argument, Option, Typer
|
||||
|
||||
from halig import literals
|
||||
from halig.__version__ import __version__
|
||||
from halig.commands.edit import EditCommand
|
||||
from halig.commands.import_unencrypted import ImportCommand
|
||||
from halig.commands.notebooks import NotebooksCommand
|
||||
from halig.commands.reencrypt import ReencryptCommand
|
||||
from halig.commands.search import SearchCommand
|
||||
from halig.commands.show import ShowCommand
|
||||
from halig.settings import load_from_file
|
||||
from halig.utils import capture
|
||||
from halig.__version__ import __version__
|
||||
from rich import print
|
||||
|
||||
app = Typer(pretty_exceptions_enable=False, pretty_exceptions_show_locals=False)
|
||||
|
||||
config_option = Option(None, "--config", "-c", help=literals.OPTION_CONFIG_HELP)
|
||||
|
||||
|
||||
def complete_note_path(incomplete: str):
|
||||
"""Build `path = Path(settings.notebooks_root_path / incomplete)`, and complete if:
|
||||
- `path` exists:
|
||||
- if `path` is a file and ends with `.age`: return it
|
||||
- if `path` is a dir: return all its children, non-recursively
|
||||
- `path` does not exist: return all occurrences of path.glob("*")
|
||||
"""
|
||||
settings = load_from_file()
|
||||
path = settings.notebooks_root_path / incomplete
|
||||
if path.exists():
|
||||
if path.is_dir():
|
||||
for child in path.iterdir():
|
||||
yield str(child.relative_to(settings.notebooks_root_path))
|
||||
elif path.name.endswith(".age"):
|
||||
return str(path.relative_to(settings.notebooks_root_path))
|
||||
glob = settings.notebooks_root_path.glob(f"{incomplete}*")
|
||||
for path in glob:
|
||||
yield str(path.relative_to(settings.notebooks_root_path))
|
||||
|
||||
|
||||
@app.command(help=literals.COMMANDS_NOTEBOOKS_HELP)
|
||||
@capture
|
||||
def notebooks(
|
||||
level: int = Option( # noqa: B008
|
||||
-1,
|
||||
"--level",
|
||||
"-l",
|
||||
help=literals.OPTION_LEVEL_HELP,
|
||||
),
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
def tree(
|
||||
level: int = Option( # B008
|
||||
-1,
|
||||
"--level",
|
||||
"-l",
|
||||
help=literals.OPTION_LEVEL_HELP,
|
||||
),
|
||||
include_notes: bool = Option(False, help=literals.OPTION_INCLUDE_NODES_HELP),
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
):
|
||||
if level < 0:
|
||||
level = float("inf") # type: ignore[assignment]
|
||||
settings = load_from_file(config)
|
||||
command = NotebooksCommand(settings=settings, max_depth=level)
|
||||
command = NotebooksCommand(
|
||||
settings=settings,
|
||||
max_depth=level,
|
||||
include_notes=include_notes,
|
||||
)
|
||||
command.run()
|
||||
|
||||
|
||||
@app.command(help=literals.COMMANDS_EDIT_HELP)
|
||||
@capture
|
||||
def edit(
|
||||
note: Path = Argument( # noqa: B008
|
||||
...,
|
||||
help=literals.ARGUMENT_EDIT_NOTE_HELP,
|
||||
),
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
note: Path = Argument( # noqa: B008
|
||||
...,
|
||||
help=literals.ARGUMENT_EDIT_NOTE_HELP,
|
||||
autocompletion=complete_note_path,
|
||||
),
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
):
|
||||
settings = load_from_file(config)
|
||||
command = EditCommand(settings=settings, note_path=note)
|
||||
|
|
@ -53,14 +83,64 @@ def edit(
|
|||
@app.command(help=literals.COMMANDS_SHOW_HELP)
|
||||
@capture
|
||||
def show(
|
||||
note: Path = Argument( # noqa: B008
|
||||
...,
|
||||
help=literals.ARGUMENT_SHOW_NOTE_HELP,
|
||||
),
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
note: Path = Argument( # noqa: B008
|
||||
...,
|
||||
help=literals.ARGUMENT_SHOW_NOTE_HELP,
|
||||
autocompletion=complete_note_path,
|
||||
),
|
||||
plain: bool = Option(False, help=literals.OPTION_PLAIN_HELP), # B008
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
):
|
||||
settings = load_from_file(config)
|
||||
command = ShowCommand(settings=settings, note_path=note)
|
||||
command = ShowCommand(settings=settings, note_path=note, plain=plain)
|
||||
command.run()
|
||||
|
||||
|
||||
@app.command(name="import", help=literals.COMMANDS_IMPORT_HELP)
|
||||
@capture
|
||||
def import_unencrypted(
|
||||
unlink: bool = Option(
|
||||
False,
|
||||
help=literals.OPTION_UNLINK_HELP,
|
||||
),
|
||||
config: Optional[Path] = config_option, # noqa: UP007
|
||||
):
|
||||
settings = load_from_file(config)
|
||||
command = ImportCommand(settings=settings, unlink=unlink)
|
||||
files_to_unlink = list(command.get_importables())
|
||||
if files_to_unlink:
|
||||
if unlink:
|
||||
should_unlink = Prompt.ask(
|
||||
f"""Unlink flag set, will delete {len(files_to_unlink)} files.
|
||||
Have you backed up your data?""",
|
||||
choices=["y", "Y", "N", "n"],
|
||||
)
|
||||
if should_unlink in ["N", "n"]:
|
||||
command.unlink = False
|
||||
|
||||
command.run()
|
||||
|
||||
|
||||
@app.command(help=literals.COMMANDS_SEARCH_HELP)
|
||||
def search(
|
||||
term: str,
|
||||
index: bool = Option(False, help=literals.OPTION_INDEX_HELP),
|
||||
):
|
||||
settings = load_from_file()
|
||||
command = SearchCommand(
|
||||
term=term,
|
||||
index=index,
|
||||
settings=settings,
|
||||
)
|
||||
command.run()
|
||||
|
||||
|
||||
@app.command(help=literals.COMMANDS_REENCRYPT_HELP)
|
||||
def reencrypt():
|
||||
settings = load_from_file()
|
||||
command = ReencryptCommand(
|
||||
settings=settings,
|
||||
)
|
||||
command.run()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +1,15 @@
|
|||
import os
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import hishel
|
||||
import httpx
|
||||
import platformdirs
|
||||
import yaml
|
||||
from pydantic import BaseSettings, DirectoryPath, FilePath
|
||||
from pydantic import DirectoryPath, Field, FilePath, HttpUrl, field_validator
|
||||
from pydantic_core import Url
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from rich import print
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
|
|
@ -14,29 +20,101 @@ class Settings(BaseSettings):
|
|||
Attributes:
|
||||
notebooks_root_path (DirectoryPath): a *valid* path to a directory that
|
||||
may contain notes or other notebooks
|
||||
identity_path (FilePath): a *valid* path to an identity file, which usually
|
||||
is understood as a private key. Defaults to `~/.ssh/id_ed25519`
|
||||
recipient_path (FilePath): a *valid* path to a recipient file, which usually
|
||||
is understood as a public key. Defaults to `~/.ssh/id_ed25519.pub`
|
||||
identity_paths (list[FilePath]): a list of *valid* paths of private keys.
|
||||
Defaults to `[~/.ssh/id_ed25519]`
|
||||
recipient_paths (list[FilePath|HttpUrl]): a list *valid* paths of public keys,
|
||||
which usually is understood as a public key. Defaults to
|
||||
`[~/.ssh/id_ed25519.pub]`
|
||||
cache_path (DirectoryPath): a *valid* path used to cache some stuff,
|
||||
particularly remote public keys. Defaults to $XDG_CACHE_HOME/halig
|
||||
remote_public_keys_timeout (float): time after which the retrieval of external public keys
|
||||
(e.g. github ssh keys) should be interrupted. Defaults to 0.5.
|
||||
"""
|
||||
|
||||
notebooks_root_path: DirectoryPath
|
||||
identity_path: FilePath = Path("~/.ssh/id_ed25519").expanduser()
|
||||
recipient_path: FilePath = Path("~/.ssh/id_ed25519.pub").expanduser()
|
||||
identity_paths: list[FilePath] = Field(
|
||||
default=[Path("~/.ssh/id_ed25519").expanduser()],
|
||||
)
|
||||
recipient_paths: list[FilePath | HttpUrl] = Field(
|
||||
default=[
|
||||
Path("~/.ssh/id_ed25519.pub").expanduser(),
|
||||
],
|
||||
)
|
||||
cache_path: DirectoryPath = Field(
|
||||
default_factory=lambda: platformdirs.user_cache_path(
|
||||
"halig",
|
||||
ensure_exists=True,
|
||||
),
|
||||
)
|
||||
remote_public_keys_timeout: float = 0.5
|
||||
|
||||
class Config:
|
||||
env_prefix = "halig_"
|
||||
@field_validator("identity_paths", "recipient_paths", mode="before")
|
||||
@classmethod
|
||||
def validate_paths(cls, v: Any):
|
||||
if not isinstance(v, list):
|
||||
v = [v]
|
||||
new_v = []
|
||||
for path in v:
|
||||
new_path = path
|
||||
if isinstance(path, str) and not path.startswith("http"):
|
||||
new_path = Path(path)
|
||||
new_v.append(
|
||||
new_path.expanduser() if isinstance(new_path, Path) else new_path,
|
||||
)
|
||||
return new_v
|
||||
|
||||
@field_validator("notebooks_root_path", mode="before")
|
||||
@classmethod
|
||||
def validate_notebooks_path(cls, v: Any):
|
||||
if isinstance(v, str):
|
||||
return Path(v).expanduser()
|
||||
if isinstance(v, Path):
|
||||
return v.expanduser()
|
||||
return v
|
||||
|
||||
def load_private_keys(self) -> set[str]:
|
||||
keys = set()
|
||||
for path in self.identity_paths:
|
||||
with path.open("r") as f:
|
||||
keys.add(f.read())
|
||||
return keys
|
||||
|
||||
def load_public_keys(self) -> set[str]:
|
||||
keys = set()
|
||||
for path in self.recipient_paths:
|
||||
if isinstance(path, Url):
|
||||
try:
|
||||
with hishel.CacheClient(
|
||||
storage=hishel.FileStorage(
|
||||
base_path=self.cache_path / "hishel"
|
||||
),
|
||||
timeout=3,
|
||||
) as client:
|
||||
response = client.get(
|
||||
str(path),
|
||||
timeout=self.remote_public_keys_timeout,
|
||||
)
|
||||
if response.status_code == httpx.codes.OK:
|
||||
for line in response.content.decode().split("\n"):
|
||||
if line:
|
||||
keys.add(line)
|
||||
except Exception as e: # noqa: BLE001
|
||||
print(
|
||||
f"[yellow] Could not retrieve public key from {path}. Ignoring error: '{e}'"
|
||||
)
|
||||
elif isinstance(path, Path):
|
||||
with path.open("r") as f:
|
||||
keys.add(f.read())
|
||||
return keys
|
||||
|
||||
model_config = SettingsConfigDict(env_prefix="halig_")
|
||||
|
||||
|
||||
@lru_cache
|
||||
def load_from_file(file_path: Path | None = None) -> Settings:
|
||||
if file_path is None:
|
||||
xdg_config_home = Path(os.getenv("XDG_CONFIG_HOME", "~/.config")).expanduser()
|
||||
if not xdg_config_home.exists():
|
||||
err = f"File {xdg_config_home} does not exist"
|
||||
raise FileNotFoundError(err)
|
||||
|
||||
file_path = xdg_config_home / "halig" / "halig.yml"
|
||||
halig_config_home = platformdirs.user_config_path("halig", ensure_exists=True)
|
||||
file_path = halig_config_home / "halig.yml"
|
||||
file_path.touch(exist_ok=True)
|
||||
elif not file_path.exists():
|
||||
err = f"File {file_path} does not exist"
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from rich import print
|
|||
|
||||
def now():
|
||||
tz = local_timezone()
|
||||
return pendulum.now(tz)
|
||||
return pendulum.now(tz) # type: ignore[reportArgumentType]
|
||||
|
||||
|
||||
def capture(fn: Callable):
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import nox
|
||||
|
||||
VERSIONS = ["3.10", "3.11"]
|
||||
VERSIONS = ["3.10", "3.11", "3.12"]
|
||||
|
||||
|
||||
@nox.session(python=VERSIONS)
|
||||
|
|
|
|||
|
|
@ -4,12 +4,16 @@ authors = [
|
|||
]
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"typer<1.0.0,>=0.6.1",
|
||||
"rich>=13.3.3",
|
||||
"pydantic>=1.10.7",
|
||||
"typer>=0.12",
|
||||
"rich>=13.3",
|
||||
"pydantic>=2.7",
|
||||
"pyyaml>=6.0",
|
||||
"pyrage>=1.0.3",
|
||||
"pendulum>=2.1.2",
|
||||
"pyrage>=1.1",
|
||||
"pendulum>=3.0",
|
||||
"httpx>=0.27",
|
||||
"platformdirs>=4.2",
|
||||
"pydantic-settings>=2.0",
|
||||
"hishel>=0.0",
|
||||
]
|
||||
name = "halig"
|
||||
dynamic = ["version"]
|
||||
|
|
@ -25,6 +29,7 @@ classifiers = [
|
|||
"Operating System :: POSIX :: Linux",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Topic :: Terminals",
|
||||
"Topic :: Utilities",
|
||||
"Typing :: Typed"
|
||||
|
|
@ -64,7 +69,7 @@ testing = [
|
|||
]
|
||||
linting = [
|
||||
"black>=23.3.0",
|
||||
"ruff>=0.0.260",
|
||||
"ruff>=0.1.0",
|
||||
"pyright>=1.1.301",
|
||||
"mypy>=1.1.1",
|
||||
"types-PyYAML>=6.0.12.9",
|
||||
|
|
@ -73,25 +78,32 @@ docs = [
|
|||
"mkdocs-material>=9.1.5",
|
||||
"mkdocstrings[python]>=0.20.0",
|
||||
]
|
||||
dev = [
|
||||
"bump-pydantic>=0.6.0",
|
||||
]
|
||||
[tool.pytest]
|
||||
mock_use_standalone_module = true
|
||||
|
||||
[tool.pyright]
|
||||
reportMissingImports = false
|
||||
reportMissingTypeStubs = false
|
||||
reportAttributeAccessIssue = false
|
||||
|
||||
[tool.ruff]
|
||||
extend-select = ["W", "C90", "I", "N", "UP", "S", "BLE", "FBT", "B", "A", "COM", "C4", "DTZ", "T10", "EM", "ISC", "T20", "PT", "RSE", "RET", "SIM", "PTH", "ERA", "PGH", "PL", "TRY", "RUF"]
|
||||
extend-ignore = ["S101", "ISC002"]
|
||||
[tool.ruff.lint]
|
||||
extend-select = ["W", "C90", "I", "N", "UP", "S", "BLE", "B", "A", "COM", "C4", "DTZ", "T10", "EM", "ISC", "T20", "PT", "RSE", "RET", "SIM", "PTH", "ERA", "PGH", "PL", "TRY", "RUF", "FURB", "PERF"]
|
||||
extend-ignore = ["S101", "ISC002", "COM812", "ISC001"]
|
||||
|
||||
[tool.mypy]
|
||||
python_version = "3.11"
|
||||
warn_return_any = true
|
||||
warn_unused_configs = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = [
|
||||
"pyrage",
|
||||
"pyrage.ssh",
|
||||
"pyrage.x25519",
|
||||
"attr",
|
||||
"httpx_cache"
|
||||
]
|
||||
ignore_missing_imports = true
|
||||
|
|
|
|||
|
|
@ -30,7 +30,26 @@ def notes(notebooks_path: Path):
|
|||
(dailies / f"{dt.date()}.age").touch()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture()
|
||||
def unencrypted_notes(notebooks_path):
|
||||
unencrypted_root_path = notebooks_path / "unencrypted"
|
||||
unencrypted_root_path.mkdir()
|
||||
for i in range(5):
|
||||
note = unencrypted_root_path / f"note_{i}.md"
|
||||
note.touch()
|
||||
subnote_path = unencrypted_root_path / f"inner_{i}"
|
||||
subnote_path.mkdir()
|
||||
for j in range(2):
|
||||
subnote = subnote_path / f"note_{i}_{j}.md"
|
||||
subnote.touch()
|
||||
with subnote.open("w") as f:
|
||||
f.write(f"subnote {i} {j}")
|
||||
with note.open("w") as f:
|
||||
f.write(f"note {i}")
|
||||
return unencrypted_root_path
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def notebooks_command(settings: Settings):
|
||||
return NotebooksCommand(max_depth=float("inf"), settings=settings)
|
||||
|
||||
|
|
@ -39,27 +58,27 @@ def notebooks_command(settings: Settings):
|
|||
def current_note(notes, settings, encryptor) -> Path:
|
||||
note_path = settings.notebooks_root_path / f"{utils.now().date()}.age"
|
||||
note_path.touch()
|
||||
data = encryptor.encrypt("foo".encode())
|
||||
data = encryptor.encrypt(b"foo")
|
||||
with note_path.open("wb") as f:
|
||||
f.write(data)
|
||||
return note_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture()
|
||||
def current_daily(notes, settings, encryptor) -> Path:
|
||||
note_path = (
|
||||
settings.notebooks_root_path / "Work" / "Dailies" / f"{utils.now().date()}.age"
|
||||
)
|
||||
data = encryptor.encrypt("foo".encode())
|
||||
data = encryptor.encrypt(b"foo")
|
||||
with note_path.open("wb") as f:
|
||||
f.write(data)
|
||||
return note_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture()
|
||||
def mock_edit(mocker):
|
||||
def edit(callargs: list):
|
||||
with open(callargs[1], "wb") as f:
|
||||
f.write("edited".encode())
|
||||
f.write(b"edited")
|
||||
|
||||
mocker.patch("halig.commands.edit.subprocess.call", side_effect=edit)
|
||||
|
|
|
|||
|
|
@ -16,7 +16,8 @@ def test_edit_raises_invalid_age_file(notes, settings: Settings):
|
|||
|
||||
def test_edit_current_note(mock_edit, current_note, settings: Settings, encryptor):
|
||||
edit_command = EditCommand(
|
||||
note_path=settings.notebooks_root_path, settings=settings
|
||||
note_path=settings.notebooks_root_path,
|
||||
settings=settings,
|
||||
)
|
||||
assert edit_command.note_path == current_note
|
||||
edit_command.run()
|
||||
|
|
|
|||
47
tests/commands/test_import.py
Normal file
47
tests/commands/test_import.py
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from halig.commands.import_unencrypted import ImportCommand
|
||||
from halig.encryption import Encryptor
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def command(settings) -> ImportCommand:
|
||||
return ImportCommand(settings)
|
||||
|
||||
|
||||
def test_get_importables(unencrypted_notes: Path, command: ImportCommand):
|
||||
notes = list(command.get_importables())
|
||||
assert len(notes) == 15
|
||||
assert notes == list(unencrypted_notes.glob("**/*.md"))
|
||||
|
||||
|
||||
def test_import(unencrypted_notes: Path, command: ImportCommand, encryptor: Encryptor):
|
||||
command.run()
|
||||
notes = command.get_importables()
|
||||
encrypted_notes = list(unencrypted_notes.glob("**/*.age"))
|
||||
assert len(encrypted_notes) == len(list(notes)) == 15
|
||||
for note in encrypted_notes:
|
||||
with note.open("rb") as f:
|
||||
encrypted_data = f.read()
|
||||
data = encryptor.decrypt(encrypted_data)
|
||||
if "inner" in str(note):
|
||||
assert (
|
||||
f'sub{note.name.replace(".age", "").replace("_"," ")}' == data.decode()
|
||||
)
|
||||
else:
|
||||
assert note.name.replace(".age", "").replace("_", " ") == data.decode()
|
||||
|
||||
|
||||
def test_import_unlink(
|
||||
unencrypted_notes: Path,
|
||||
command: ImportCommand,
|
||||
encryptor: Encryptor,
|
||||
):
|
||||
command.unlink = True
|
||||
command.run()
|
||||
notes = command.get_importables()
|
||||
encrypted_notes = list(unencrypted_notes.glob("**/*.age"))
|
||||
assert len(encrypted_notes) == 15
|
||||
assert len(list(unencrypted_notes.glob("**/*.md"))) == 0
|
||||
|
|
@ -25,12 +25,14 @@ def test_build_tree_max_depth_2(notes, notebooks_command: NotebooksCommand):
|
|||
work = tree.children[1]
|
||||
assert personal.label == "Personal"
|
||||
assert work.label == "Work"
|
||||
assert len(work.children) == 2
|
||||
assert len(personal.children) == 1
|
||||
assert len(work.children) == 1
|
||||
assert len(personal.children) == 0
|
||||
|
||||
|
||||
def test_build_tree_max_depth_inf(notes, notebooks_command: NotebooksCommand):
|
||||
tree = notebooks_command.build_tree(notebooks_command.settings.notebooks_root_path)
|
||||
def test_build_tree_max_depth_inf(notes, settings):
|
||||
tree = NotebooksCommand(max_depth=float("inf"), settings=settings, include_notes=True).build_tree(
|
||||
settings.notebooks_root_path
|
||||
)
|
||||
personal = tree.children[0]
|
||||
work = tree.children[1]
|
||||
assert personal.label == "Personal"
|
||||
|
|
|
|||
25
tests/commands/test_reencrypt.py
Normal file
25
tests/commands/test_reencrypt.py
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
import pytest
|
||||
|
||||
from halig import utils
|
||||
from halig.commands.reencrypt import ReencryptCommand
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def reencrypt_command(settings):
|
||||
return ReencryptCommand(settings)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("notes")
|
||||
def test_reencrypt(reencrypt_command):
|
||||
reencrypt_command.run()
|
||||
for note_path in reencrypt_command.traverse():
|
||||
with note_path.open("rb") as f:
|
||||
assert reencrypt_command.encryptor.decrypt(f.read()) == b""
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("current_daily")
|
||||
def test_reencrypt_warns_no_matching_key(reencrypt_command, halig_ssh_path, capfd):
|
||||
reencrypt_command.encryptor.identities = []
|
||||
reencrypt_command.run()
|
||||
out, _ = capfd.readouterr()
|
||||
assert f'because no matching keys were found, skipping ...' in out
|
||||
|
|
@ -26,7 +26,8 @@ def test_show_raises_note_path_is_not_age_valid(notes, settings: Settings):
|
|||
|
||||
def test_show_current_note(current_note, settings):
|
||||
show_command = ShowCommand(
|
||||
note_path=settings.notebooks_root_path, settings=settings
|
||||
note_path=settings.notebooks_root_path,
|
||||
settings=settings,
|
||||
)
|
||||
assert show_command.note_path == current_note
|
||||
assert show_command.decrypt() == "foo"
|
||||
|
|
|
|||
|
|
@ -42,10 +42,9 @@ def ssh_recipient(halig_ssh_public_key: str) -> Recipient:
|
|||
|
||||
|
||||
@pytest.fixture()
|
||||
def halig_path(fs, halig_ssh_public_key, halig_ssh_private_key) -> Path:
|
||||
fs.add_real_paths(["/etc/localtime"])
|
||||
ssh_path = Path("~/.ssh").expanduser()
|
||||
ssh_path.mkdir(parents=True)
|
||||
def halig_ssh_path(tmp_path: Path, halig_ssh_public_key, halig_ssh_private_key) -> Path:
|
||||
ssh_path = tmp_path / ".ssh"
|
||||
ssh_path.mkdir()
|
||||
|
||||
with (ssh_path / "id_ed25519").open("w") as f:
|
||||
f.write(halig_ssh_private_key)
|
||||
|
|
@ -53,38 +52,47 @@ def halig_path(fs, halig_ssh_public_key, halig_ssh_private_key) -> Path:
|
|||
with (ssh_path / "id_ed25519.pub").open("w") as f:
|
||||
f.write(halig_ssh_public_key)
|
||||
|
||||
halig_path = Path("~/.config/halig").expanduser()
|
||||
return ssh_path
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def halig_config_path(tmp_path: Path):
|
||||
halig_path = tmp_path / ".config/halig"
|
||||
halig_path.mkdir(parents=True)
|
||||
return halig_path
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def notebooks_path(halig_path) -> Path:
|
||||
notebooks_path = Path("~/Notebooks").expanduser()
|
||||
notebooks_path.mkdir(parents=True)
|
||||
def notebooks_path(tmp_path) -> Path:
|
||||
notebooks_path = tmp_path / "Notebooks"
|
||||
notebooks_path.mkdir()
|
||||
return notebooks_path
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def settings(notebooks_path: Path) -> Settings:
|
||||
return Settings(notebooks_root_path=notebooks_path)
|
||||
def settings(notebooks_path: Path, halig_ssh_path) -> Settings:
|
||||
return Settings(
|
||||
notebooks_root_path=notebooks_path,
|
||||
identity_paths=[halig_ssh_path / "id_ed25519"],
|
||||
recipient_paths=[halig_ssh_path / "id_ed25519.pub"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def settings_file_path(halig_path: Path, notebooks_path: Path) -> Path:
|
||||
yaml_file = halig_path / "halig.yml"
|
||||
def settings_file_path(halig_config_path: Path, notebooks_path: Path) -> Path:
|
||||
yaml_file = halig_config_path / "halig.yml"
|
||||
yaml_file.touch()
|
||||
s = Settings(notebooks_root_path=notebooks_path)
|
||||
# `.dict()` doesn't serialize some fields that yaml doesn't understand
|
||||
serialized = json.loads(s.json())
|
||||
serialized = json.loads(s.model_dump_json())
|
||||
with yaml_file.open("w") as f:
|
||||
yaml.safe_dump(serialized, f)
|
||||
return yaml_file
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def empty_file_path(halig_path: Path) -> Path:
|
||||
empty_path = halig_path / "empty"
|
||||
def empty_file_path(halig_config_path: Path) -> Path:
|
||||
empty_path = halig_config_path / "empty"
|
||||
empty_path.touch()
|
||||
return empty_path
|
||||
|
||||
|
|
|
|||
|
|
@ -4,30 +4,42 @@ from halig.encryption import Encryptor
|
|||
from halig.settings import Settings
|
||||
|
||||
|
||||
def test_instance_encryptor_from_age_keys(halig_path, notebooks_path):
|
||||
identity = x25519.Identity.generate()
|
||||
identity_path = halig_path / "identity.key"
|
||||
identity_path.touch()
|
||||
recipient_path = halig_path / "recipient.key"
|
||||
recipient_path.touch()
|
||||
with identity_path.open("w") as f:
|
||||
f.write(str(identity))
|
||||
def test_instance_encryptor_from_age_keys(notebooks_path, halig_config_path):
|
||||
identity_paths = []
|
||||
recipient_paths = []
|
||||
identities = []
|
||||
for i in range(5):
|
||||
identity = x25519.Identity.generate()
|
||||
identities.append(identity)
|
||||
identity_path = halig_config_path / f"identity_{i}.key"
|
||||
identity_path.touch()
|
||||
with identity_path.open("w") as f:
|
||||
f.write(str(identity))
|
||||
identity_paths.append(identity_path)
|
||||
|
||||
with recipient_path.open("w") as f:
|
||||
f.write(str(identity.to_public()))
|
||||
recipient_path = halig_config_path / f"recipient_{i}.key"
|
||||
recipient_path.touch()
|
||||
|
||||
with recipient_path.open("w") as f:
|
||||
f.write(str(identity.to_public()))
|
||||
|
||||
recipient_paths.append(recipient_path)
|
||||
settings = Settings(
|
||||
notebooks_root_path=notebooks_path,
|
||||
identity_path=identity_path,
|
||||
recipient_path=recipient_path,
|
||||
identity_paths=identity_paths,
|
||||
recipient_paths=recipient_paths,
|
||||
)
|
||||
assert Encryptor(settings)
|
||||
encryptor = Encryptor(settings)
|
||||
for identity in identities:
|
||||
assert str(identity) in [str(identity) for identity in encryptor.identities]
|
||||
assert str(identity.to_public()) in [
|
||||
str(recipient) for recipient in encryptor.recipients
|
||||
]
|
||||
|
||||
|
||||
def test_encrypt(encryptor: Encryptor, ssh_identity):
|
||||
unencrypted_data = "foo"
|
||||
encrypted_data = encryptor.encrypt(unencrypted_data)
|
||||
|
||||
assert isinstance(encrypted_data, bytes)
|
||||
assert unencrypted_data == decrypt(encrypted_data, [ssh_identity]).decode()
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ def test_settings_from_env(settings: Settings, notebooks_root_path_envvar):
|
|||
|
||||
|
||||
def test_settings_from_non_existing_file_raises_value_error():
|
||||
with pytest.raises(ValueError, match="field required"):
|
||||
with pytest.raises(ValueError, match="Field required"):
|
||||
Settings() # type: ignore[call-arg]
|
||||
|
||||
|
||||
|
|
@ -20,23 +20,12 @@ def test_load_from_file(notebooks_path: Path, settings_file_path: Path):
|
|||
assert settings.notebooks_root_path == notebooks_path
|
||||
|
||||
|
||||
def test_load_from_non_xdg_home_config_raises_file_not_found_error(fs):
|
||||
path = Path("~/.config").expanduser()
|
||||
with pytest.raises(FileNotFoundError, match=f"File {path} does not exist"):
|
||||
load_from_file()
|
||||
|
||||
|
||||
def test_load_from_existing_standard_file(settings_file_path: Path, settings: Settings):
|
||||
standard_settings = load_from_file()
|
||||
assert standard_settings.notebooks_root_path == settings.notebooks_root_path
|
||||
|
||||
|
||||
def test_load_from_empty_file_raises_value_error(empty_file_path: Path):
|
||||
with pytest.raises(ValueError, match=f"File {empty_file_path} is empty"):
|
||||
load_from_file(empty_file_path)
|
||||
|
||||
|
||||
def test_load_from_non_existing_file_path_raises_file_not_found_error(halig_path: Path):
|
||||
file = halig_path / "some_invalid_file.yml"
|
||||
def test_load_from_non_existing_file_path_raises_file_not_found_error(halig_config_path: Path):
|
||||
file = halig_config_path / "some_invalid_file.yml"
|
||||
with pytest.raises(FileNotFoundError, match=f"File {file} does not exist"):
|
||||
load_from_file(file)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,4 @@
|
|||
from typing import Callable
|
||||
|
||||
import pytest
|
||||
from collections.abc import Callable
|
||||
|
||||
from halig.utils import capture
|
||||
|
||||
|
|
@ -10,7 +8,8 @@ def exec_capture(func: Callable):
|
|||
|
||||
|
||||
def test_capture():
|
||||
def func(): return 1
|
||||
def func():
|
||||
return 1
|
||||
|
||||
assert exec_capture(func) == 1
|
||||
|
||||
|
|
@ -22,9 +21,10 @@ def test_capture_exits_with_custom_os_error(mocker):
|
|||
nonlocal exit_code
|
||||
exit_code = code
|
||||
|
||||
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
|
||||
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
|
||||
|
||||
def func(): raise OSError(2, "os_error_func")
|
||||
def func():
|
||||
raise OSError(2, "os_error_func")
|
||||
|
||||
exec_capture(func)
|
||||
assert exit_code == 2
|
||||
|
|
@ -37,9 +37,10 @@ def test_capture_exits_with_os_error(mocker):
|
|||
nonlocal exit_code
|
||||
exit_code = code
|
||||
|
||||
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
|
||||
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
|
||||
|
||||
def func(): raise OSError
|
||||
def func():
|
||||
raise OSError
|
||||
|
||||
exec_capture(func)
|
||||
assert not exit_code
|
||||
|
|
@ -52,9 +53,10 @@ def test_capture_exits_with_value_error(mocker):
|
|||
nonlocal exit_code
|
||||
exit_code = code
|
||||
|
||||
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
|
||||
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
|
||||
|
||||
def func(): raise ValueError("value_error_func")
|
||||
def func():
|
||||
raise ValueError("value_error_func")
|
||||
|
||||
exec_capture(func)
|
||||
assert exit_code == 1
|
||||
|
|
@ -67,9 +69,10 @@ def test_capture_exits_with_other_error(mocker):
|
|||
nonlocal exit_code
|
||||
exit_code = code
|
||||
|
||||
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
|
||||
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
|
||||
|
||||
def func(): raise ArithmeticError("arithmetic_error_func")
|
||||
def func():
|
||||
raise ArithmeticError("arithmetic_error_func")
|
||||
|
||||
exec_capture(func)
|
||||
assert exit_code == 2
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue