Compare commits

...

26 commits

Author SHA1 Message Date
f611fa5f2a
ci: add pre-commit and tests workflows
Some checks failed
checks / tests (push) Waiting to run
checks / pre-commit (push) Failing after 1m30s
2024-08-04 10:54:59 +02:00
ee43046e59
chore: update deps 2024-06-19 18:55:16 +02:00
1b8220e6e1
feat: rename 'notebooks' command to 'tree' 2024-06-19 18:50:35 +02:00
812fee592e
feat: update dependencies and capture network errors when retrieving remote public keys 2024-05-17 19:05:44 +02:00
54a27a2f75
chore: bump version to v0.4.5 2024-05-17 18:49:23 +02:00
e156ea4108
chore: update linters and replace black with ruff-format 2024-05-17 18:47:56 +02:00
3d93be39d6
feat: make the inclusion of each notebook's note optional via --include-notes when listing notebooks 2024-05-17 18:45:22 +02:00
a591fe20e8
chore: update pyrage and other deps 2023-12-10 11:23:26 +01:00
c6d361f649
tests: remove pyfakefs due to issues with pathlib and sqlite in favor of a simple tempfolder where to store test files 2023-11-29 18:42:30 +01:00
4fb1fff521
feat: add remote_public_keys_timeout option in order to set the time after which the retrieval of external public keys should be interrupted 2023-11-29 18:15:34 +01:00
af9a6d82f4
feat: add --plain option to the show command in order to render notes as plaintext 2023-11-28 13:49:13 +01:00
fc81531f3c
chore: update deps 2023-11-24 08:34:26 +01:00
67595b3220
fix: remove extra quote when executing a note indexation and update deps 2023-11-17 13:31:50 +01:00
5908a70691
chore: update deps 2023-11-02 20:10:38 +01:00
ff00c70ce9
chore: bump version to 0.4.0 2023-07-24 19:53:44 +02:00
654d996a53
feat: add reencrypt command 2023-07-24 19:52:25 +02:00
02ca346eae
chore: update ruff to 0.0.280 2023-07-23 18:00:55 +02:00
701d79583d
refactor: clean the search command a bit 2023-07-23 17:40:03 +02:00
eeb1573f99
feat: update to pydantic v2 and refactor accordingly 2023-07-22 13:29:46 +02:00
4eb438bab3
wip: add full-text search against SQLite's FTS5 2023-05-17 19:44:30 +02:00
d328e640e0 chore: typo 2023-05-17 17:02:47 +00:00
be20284f78
feat: add import command 2023-05-17 19:00:42 +02:00
d37bffdc51
feat: cache downloaded public keys via httpx-cache 2023-05-13 11:12:14 +02:00
2398431a7b
fix: refactor and fix the recipient paths parsing that was missing the remote urls 2023-05-09 20:29:36 +02:00
c6ac0d6043
feat: add multiple, remote recipients support 2023-05-09 09:11:36 +02:00
e1c9ee0c9c
feat: add notes path autocompletion for show and edit 2023-04-24 13:57:11 +02:00
30 changed files with 1734 additions and 963 deletions

View file

@ -1,2 +0,0 @@
[flake8]
max-line-length = 89

View file

@ -0,0 +1,30 @@
---
name: checks
on: # yamllint disable-line rule:truthy
- 'push'
jobs:
pre-commit:
runs-on: ubuntu-22.04
steps:
- uses: https://code.forgejo.org/actions/checkout@v4
- uses: https://code.forgejo.org/actions/setup-python@v5
with:
python-version: '3.12'
- uses: opentofu/setup-opentofu@v1
with:
tofu_version: 1.7.0
- uses: pre-commit/action@v3.0.1
tests:
runs-on: ubuntu-22.04
steps:
- uses: https://code.forgejo.org/actions/checkout@v4
- uses: https://code.forgejo.org/actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pdm install -G testing
- run: poetry run tests

View file

@ -1,11 +1,8 @@
default_language_version:
python: python3.10
files: ^halig|tests$
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.6.0
hooks:
- id: trailing-whitespace
args: [ --markdown-linebreak-ext=md ]
@ -22,20 +19,14 @@ repos:
args: [ --fix=lf ]
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.260
rev: v0.5.6
hooks:
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
pass_filenames: false
args:
- "halig"
- id: ruff-format
- repo: local
hooks:

View file

@ -11,13 +11,21 @@
it encrypts the new contents into an [age](https://github.com/FiloSottile/age) file that
you can store, _relatively_ safe, anywhere.
## Features
- Simple notebooks management with paths autocompletion
- Passphrase-less, fully-encrypted notes, compatible with existing SSH keys
- No external `age` binary needed
- Almost all `age` advantages, like having multiple keys for encryption and decryption
- Remote (HTTP) public keys import: e.g: github.com/\<username\>.keys
## Install
```shell
pipx install halig # or pip
```
## Setup TLDR
```shell
@ -26,9 +34,13 @@ ssh-keygen -t ed25519
mkdir -p "${XDG_CONFIG_HOME:-$HOME/.config}/halig"
cat << EOF > "${XDG_CONFIG_HOME:-$HOME/.config}/halig/halig.yml"
---
notebooks_root_path: /home/$(id -un)/Documents/Notebooks
identity_path: /home/$(id -un)/.ssh/id_ed25519
recipient_path: /home/$(id -un)/.ssh/id_ed25519.pub
notebooks_root_path: ~/Documents/Notebooks
identity_paths:
- ~/.ssh/id_ed25519
recipient_paths:
- ~/.ssh/id_ed25519.pub
- https://github.com/<username>.keys
- https://gitlab.com/<username>.keys
EOF
```
@ -38,4 +50,5 @@ EOF
halig edit some_notebook # edit today's note relative to <notebooks_root_path>/some_notebook
halig edit some_notebook/foo # edit <notebooks_root_path>/some_notebook/foo.age
halig notebooks # list current notebooks
```
```

View file

@ -1 +1 @@
__version__ = "0.1.8"
__version__ = "0.5.0"

View file

@ -5,10 +5,12 @@ from halig.settings import Settings
class ICommand(ABC):
@abstractmethod
def run(self):
... # pragma: no cover
def run(self): ... # pragma: no cover
class BaseCommand(ICommand):
def __init__(self, settings: Settings, *args, **kwargs):
self.settings = settings
def traverse(self):
return self.settings.notebooks_root_path.glob("./**/*.age")

View file

@ -58,7 +58,7 @@ class EditCommand(BaseCommand):
temp_path = Path(tf.name)
editor = os.environ.get("EDITOR", "vim")
subprocess.call([editor, temp_path])
subprocess.call([editor, temp_path]) # noqa: S603
with temp_path.open("rb") as f:
new_contents = f.read()

View file

@ -0,0 +1,41 @@
from collections.abc import Generator
from pathlib import Path
from halig.commands.base import BaseCommand
from halig.encryption import Encryptor
from halig.settings import Settings
class ImportCommand(BaseCommand):
def __init__(self, settings: Settings, unlink: bool = False):
super().__init__(settings)
self.encryptor = Encryptor(self.settings)
self.unlink = unlink
def get_importables(self) -> Generator[Path, None, None]:
"""Get all markdown files under self.settings.notebooks_root_path
Yields:
a list of Path objects matching any markdown files under
the notebooks root path
"""
return self.settings.notebooks_root_path.glob("**/*.md")
def run(self):
"""For every importable file, encrypt its contents inside
`filename.age`. If `self.unlink` is `True`, unlink the original
file
"""
for file_path in self.get_importables():
with file_path.open("rb") as f:
contents = f.read()
encrypted_contents = self.encryptor.encrypt(contents)
encrypted_file_path = file_path.with_suffix("").with_suffix(".age")
encrypted_file_path.touch()
with encrypted_file_path.open("wb") as f:
f.write(encrypted_contents)
if self.unlink:
file_path.unlink()

View file

@ -8,8 +8,15 @@ from halig.commands.base import BaseCommand
class NotebooksCommand(BaseCommand):
def __init__(self, max_depth: int | float, *args, **kwargs):
def __init__(
self,
max_depth: int | float,
include_notes: bool = False,
*args,
**kwargs,
):
self.max_depth = max_depth
self.include_notes = include_notes
super().__init__(*args, **kwargs)
def build_tree(self, root_path: Path):
@ -21,9 +28,10 @@ class NotebooksCommand(BaseCommand):
break
for item in sorted(current_folder_path.iterdir()):
if item.is_dir():
item_tree_node = current_tree_node.add(item.name)
q.append((item, item_tree_node, depth + 1))
else:
if item.name != ".git":
item_tree_node = current_tree_node.add(item.name)
q.append((item, item_tree_node, depth + 1))
elif self.include_notes and item.name.endswith(".age"):
current_tree_node.add(item.name)
return tree

View file

@ -0,0 +1,26 @@
import pyrage
from rich import print
from halig.commands.base import BaseCommand
from halig.encryption import Encryptor
from halig.settings import Settings
class ReencryptCommand(BaseCommand):
def __init__(self, settings: Settings, *args, **kwargs):
super().__init__(settings, *args, **kwargs)
self.encryptor = Encryptor(settings)
def run(self):
for note_path in self.traverse():
try:
with note_path.open("rb") as fr:
orig_data = self.encryptor.decrypt(fr.read())
new_data = self.encryptor.encrypt(orig_data)
with note_path.open("wb") as fw:
fw.write(new_data)
except pyrage.DecryptError: # type: ignore[reportGeneralTypeIssues] # noqa: PERF203
print(
f"[yellow] Could not reencrypt {note_path} because no matching keys"
f" were found, skipping ...",
)

111
halig/commands/search.py Normal file
View file

@ -0,0 +1,111 @@
import hashlib
import re
import sqlite3
from pathlib import Path
from rich.console import Console
from halig.commands.base import BaseCommand
from halig.encryption import Encryptor
from halig.settings import Settings
class SearchCommand(BaseCommand):
def __init__(self, term: str, index: bool, settings: Settings, *args, **kwargs):
super().__init__(settings, *args, **kwargs)
self.encryptor = Encryptor(settings)
self.term = term
self.index = index
self.db_path = self.settings.cache_path / "halig.db"
self.db_conn = sqlite3.connect(self.db_path)
def _create_schema(self):
with self.db_conn:
self.db_conn.execute(
"""CREATE VIRTUAL TABLE IF NOT EXISTS notes
USING fts5(name, last_timestamp, hash, filepath, body);""",
)
def _search_note_in_db_by_path(self, path: Path) -> tuple[str | None, str | None]:
with self.db_conn:
cursor = self.db_conn.execute(
"SELECT hash, last_timestamp FROM notes where filepath = ?",
(str(path),),
)
results = cursor.fetchall()
if not results:
return None, None
return results[0] # type: ignore[no-any-return]
def _index_note(
self,
updated_at: float,
body_hash: str,
note_path: Path,
body: str,
):
with self.db_conn:
self.db_conn.execute(
"""INSERT INTO notes (name, last_timestamp, hash, filepath, body)
VALUES (?, ?, ?, ?, ?);""",
(note_path.name, updated_at, body_hash, str(note_path), body),
)
def _update_index_note(
self,
updated_at: float,
body_hash: str,
note_path: Path,
body: str,
):
with self.db_conn:
self.db_conn.execute(
"""UPDATE notes SET
last_timestamp = (?),
hash = (?),
body = (?)
WHERE
filepath = (?);
""",
(updated_at, body_hash, body, str(note_path)),
)
def _index_notebooks(self):
for note_path in self.traverse():
updated_at = note_path.stat().st_mtime
with note_path.open("rb") as f:
body = self.encryptor.decrypt(f.read())
body_hash = hashlib.sha512(body).hexdigest()
original_hash, last_timestamp = self._search_note_in_db_by_path(note_path)
if not original_hash:
self._index_note(updated_at, body_hash, note_path, body.decode())
continue
if hash != original_hash:
self._update_index_note(updated_at, body_hash, note_path, body.decode())
def _search(self):
with self.db_conn:
cursor = self.db_conn.execute(
"SELECT filepath, body FROM notes WHERE body MATCH ? ORDER BY rank;",
(f"{self.term}*",),
)
results = cursor.fetchall()
console = Console()
search_regex = re.compile(re.escape(self.term), re.IGNORECASE)
for result in results:
filepath, body = result
lines = body.split("\n")
for lineno, line in enumerate(lines, start=1):
match = search_regex.search(line)
if match:
content_line = search_regex.sub("[bold red]\\g<0>[/bold red]", line)
console.print(f"{filepath}:{lineno}: {content_line}")
def run(self):
self._create_schema()
if self.index:
self._index_notebooks()
self._search()
self.db_conn.close()

View file

@ -1,5 +1,6 @@
from pathlib import Path
from rich import print
from rich.console import Console
from rich.markdown import Markdown
@ -10,11 +11,12 @@ from halig.settings import Settings
class ShowCommand(BaseCommand):
def __init__(self, note_path: Path, settings: Settings):
def __init__(self, note_path: Path, settings: Settings, plain: bool = False):
super().__init__(settings)
self.note_path = self.settings.notebooks_root_path / note_path
self.encryptor = Encryptor(self.settings)
self.plain = plain
self.console = Console()
if self.note_path.is_dir():
self.note_path /= f"{utils.now().date()}.age"
@ -33,5 +35,9 @@ class ShowCommand(BaseCommand):
return self.encryptor.decrypt(data).decode()
def run(self): # pragma: no cover
md_contents = Markdown(self.decrypt())
contents = self.decrypt()
if self.plain:
print(contents)
return
md_contents = Markdown(contents)
self.console.print(md_contents)

View file

@ -1,6 +1,6 @@
from pyrage import decrypt as rage_decrypt
from pyrage import encrypt as rage_encrypt
from pyrage import ssh, x25519
from pyrage import decrypt as rage_decrypt # pyright: ignore [reportGeneralTypeIssues]
from pyrage import encrypt as rage_encrypt # pyright: ignore [reportGeneralTypeIssues]
from pyrage import ssh, x25519 # pyright: ignore [reportGeneralTypeIssues]
from halig.settings import Settings
@ -14,27 +14,27 @@ class Encryptor:
def __init__(self, settings: Settings):
self.settings = settings
self.identities = []
self.recipients = []
# load identity
with settings.identity_path.open("r") as f:
identity_contents = f.read()
if identity_contents.startswith("-----BEGIN OPENSSH PRIVATE KEY-----"):
self.identity = ssh.Identity.from_buffer(identity_contents.encode())
else:
self.identity = x25519.Identity.from_str(identity_contents)
for key in settings.load_private_keys():
if key.startswith("-----BEGIN OPENSSH PRIVATE KEY-----"):
self.identities.append(ssh.Identity.from_buffer(key.encode()))
else:
self.identities.append(x25519.Identity.from_str(key))
# load recipient
with settings.recipient_path.open("r") as f:
recipient_contents = f.read()
if recipient_contents.startswith("ssh-ed25519"):
self.recipient = ssh.Recipient.from_str(recipient_contents)
else:
self.recipient = x25519.Recipient.from_str(recipient_contents)
for key in settings.load_public_keys():
if key.startswith("ssh-ed25519"):
self.recipients.append(ssh.Recipient.from_str(key))
else:
self.recipients.append(x25519.Recipient.from_str(key))
def encrypt(self, data: str | bytes) -> bytes:
if isinstance(data, str):
data = data.encode()
return rage_encrypt(data, [self.recipient]) # type: ignore[no-any-return]
return rage_encrypt(data, self.recipients) # type: ignore[no-any-return]
def decrypt(self, data: bytes) -> bytes:
return rage_decrypt(data, [self.identity]) # type: ignore[no-any-return]
if not len(data):
return data
return rage_decrypt(data, self.identities) # type: ignore[no-any-return]

View file

@ -3,13 +3,28 @@ COMMANDS_NOTEBOOKS_HELP = "List all notebooks and notes, tree-style"
COMMANDS_EDIT_HELP = "Edit or add a note into a notebook"
COMMANDS_SHOW_HELP = "Show a note's contents"
COMMANDS_VERSION = "Show halig's version"
COMMANDS_IMPORT_HELP = "Encrypt existing unencrypted files"
COMMANDS_SEARCH_HELP = """Perform a full-text search against all your notes,
which are indexed into a SQLite FTS5 database located at `~/.cache/halig/halig.db`
"""
COMMANDS_REENCRYPT_HELP = """Reencrypt all available notes. This operation is useful
when new public keys have been added to the config file and you want the notes
to be seen by the new pairing private keys"""
# OPTIONS
OPTION_CONFIG_HELP = "Configuration file. Must be YAML and schema compatible"
OPTION_LEVEL_HELP = (
"Tree max recursion level; negative numbers indicate a value of infinity"
)
OPTION_UNLINK_HELP = """Setting this will remove the original markdown files;
only the newly encrypted .age files will be preserved. Backup your data first
"""
OPTION_INDEX_HELP = """Index the SQLite database with your notes contents. The first
time you perform a search, this flag should be set. Afterwards, you should only index
when new notes have been added or older ones have been changed, since it's a slow
operation"""
OPTION_PLAIN_HELP = "Show the note as plaintext"
OPTION_INCLUDE_NODES_HELP = "Include each notebook's notes when listing"
# ARGUMENTS
ARGUMENT_EDIT_NOTE_HELP = """A valid, settings-relative path.
Be aware that valid can also mean implicit notes, that is, pointing to a

View file

@ -2,48 +2,78 @@
from pathlib import Path
from typing import Optional
from rich import print
from rich.prompt import Prompt
from typer import Argument, Option, Typer
from halig import literals
from halig.__version__ import __version__
from halig.commands.edit import EditCommand
from halig.commands.import_unencrypted import ImportCommand
from halig.commands.notebooks import NotebooksCommand
from halig.commands.reencrypt import ReencryptCommand
from halig.commands.search import SearchCommand
from halig.commands.show import ShowCommand
from halig.settings import load_from_file
from halig.utils import capture
from halig.__version__ import __version__
from rich import print
app = Typer(pretty_exceptions_enable=False, pretty_exceptions_show_locals=False)
config_option = Option(None, "--config", "-c", help=literals.OPTION_CONFIG_HELP)
def complete_note_path(incomplete: str):
"""Build `path = Path(settings.notebooks_root_path / incomplete)`, and complete if:
- `path` exists:
- if `path` is a file and ends with `.age`: return it
- if `path` is a dir: return all its children, non-recursively
- `path` does not exist: return all occurrences of path.glob("*")
"""
settings = load_from_file()
path = settings.notebooks_root_path / incomplete
if path.exists():
if path.is_dir():
for child in path.iterdir():
yield str(child.relative_to(settings.notebooks_root_path))
elif path.name.endswith(".age"):
return str(path.relative_to(settings.notebooks_root_path))
glob = settings.notebooks_root_path.glob(f"{incomplete}*")
for path in glob:
yield str(path.relative_to(settings.notebooks_root_path))
@app.command(help=literals.COMMANDS_NOTEBOOKS_HELP)
@capture
def notebooks(
level: int = Option( # noqa: B008
-1,
"--level",
"-l",
help=literals.OPTION_LEVEL_HELP,
),
config: Optional[Path] = config_option, # noqa: UP007
def tree(
level: int = Option( # B008
-1,
"--level",
"-l",
help=literals.OPTION_LEVEL_HELP,
),
include_notes: bool = Option(False, help=literals.OPTION_INCLUDE_NODES_HELP),
config: Optional[Path] = config_option, # noqa: UP007
):
if level < 0:
level = float("inf") # type: ignore[assignment]
settings = load_from_file(config)
command = NotebooksCommand(settings=settings, max_depth=level)
command = NotebooksCommand(
settings=settings,
max_depth=level,
include_notes=include_notes,
)
command.run()
@app.command(help=literals.COMMANDS_EDIT_HELP)
@capture
def edit(
note: Path = Argument( # noqa: B008
...,
help=literals.ARGUMENT_EDIT_NOTE_HELP,
),
config: Optional[Path] = config_option, # noqa: UP007
note: Path = Argument( # noqa: B008
...,
help=literals.ARGUMENT_EDIT_NOTE_HELP,
autocompletion=complete_note_path,
),
config: Optional[Path] = config_option, # noqa: UP007
):
settings = load_from_file(config)
command = EditCommand(settings=settings, note_path=note)
@ -53,14 +83,64 @@ def edit(
@app.command(help=literals.COMMANDS_SHOW_HELP)
@capture
def show(
note: Path = Argument( # noqa: B008
...,
help=literals.ARGUMENT_SHOW_NOTE_HELP,
),
config: Optional[Path] = config_option, # noqa: UP007
note: Path = Argument( # noqa: B008
...,
help=literals.ARGUMENT_SHOW_NOTE_HELP,
autocompletion=complete_note_path,
),
plain: bool = Option(False, help=literals.OPTION_PLAIN_HELP), # B008
config: Optional[Path] = config_option, # noqa: UP007
):
settings = load_from_file(config)
command = ShowCommand(settings=settings, note_path=note)
command = ShowCommand(settings=settings, note_path=note, plain=plain)
command.run()
@app.command(name="import", help=literals.COMMANDS_IMPORT_HELP)
@capture
def import_unencrypted(
unlink: bool = Option(
False,
help=literals.OPTION_UNLINK_HELP,
),
config: Optional[Path] = config_option, # noqa: UP007
):
settings = load_from_file(config)
command = ImportCommand(settings=settings, unlink=unlink)
files_to_unlink = list(command.get_importables())
if files_to_unlink:
if unlink:
should_unlink = Prompt.ask(
f"""Unlink flag set, will delete {len(files_to_unlink)} files.
Have you backed up your data?""",
choices=["y", "Y", "N", "n"],
)
if should_unlink in ["N", "n"]:
command.unlink = False
command.run()
@app.command(help=literals.COMMANDS_SEARCH_HELP)
def search(
term: str,
index: bool = Option(False, help=literals.OPTION_INDEX_HELP),
):
settings = load_from_file()
command = SearchCommand(
term=term,
index=index,
settings=settings,
)
command.run()
@app.command(help=literals.COMMANDS_REENCRYPT_HELP)
def reencrypt():
settings = load_from_file()
command = ReencryptCommand(
settings=settings,
)
command.run()

View file

@ -1,9 +1,15 @@
import os
from functools import lru_cache
from pathlib import Path
from typing import Any
import hishel
import httpx
import platformdirs
import yaml
from pydantic import BaseSettings, DirectoryPath, FilePath
from pydantic import DirectoryPath, Field, FilePath, HttpUrl, field_validator
from pydantic_core import Url
from pydantic_settings import BaseSettings, SettingsConfigDict
from rich import print
class Settings(BaseSettings):
@ -14,29 +20,101 @@ class Settings(BaseSettings):
Attributes:
notebooks_root_path (DirectoryPath): a *valid* path to a directory that
may contain notes or other notebooks
identity_path (FilePath): a *valid* path to an identity file, which usually
is understood as a private key. Defaults to `~/.ssh/id_ed25519`
recipient_path (FilePath): a *valid* path to a recipient file, which usually
is understood as a public key. Defaults to `~/.ssh/id_ed25519.pub`
identity_paths (list[FilePath]): a list of *valid* paths of private keys.
Defaults to `[~/.ssh/id_ed25519]`
recipient_paths (list[FilePath|HttpUrl]): a list *valid* paths of public keys,
which usually is understood as a public key. Defaults to
`[~/.ssh/id_ed25519.pub]`
cache_path (DirectoryPath): a *valid* path used to cache some stuff,
particularly remote public keys. Defaults to $XDG_CACHE_HOME/halig
remote_public_keys_timeout (float): time after which the retrieval of external public keys
(e.g. github ssh keys) should be interrupted. Defaults to 0.5.
"""
notebooks_root_path: DirectoryPath
identity_path: FilePath = Path("~/.ssh/id_ed25519").expanduser()
recipient_path: FilePath = Path("~/.ssh/id_ed25519.pub").expanduser()
identity_paths: list[FilePath] = Field(
default=[Path("~/.ssh/id_ed25519").expanduser()],
)
recipient_paths: list[FilePath | HttpUrl] = Field(
default=[
Path("~/.ssh/id_ed25519.pub").expanduser(),
],
)
cache_path: DirectoryPath = Field(
default_factory=lambda: platformdirs.user_cache_path(
"halig",
ensure_exists=True,
),
)
remote_public_keys_timeout: float = 0.5
class Config:
env_prefix = "halig_"
@field_validator("identity_paths", "recipient_paths", mode="before")
@classmethod
def validate_paths(cls, v: Any):
if not isinstance(v, list):
v = [v]
new_v = []
for path in v:
new_path = path
if isinstance(path, str) and not path.startswith("http"):
new_path = Path(path)
new_v.append(
new_path.expanduser() if isinstance(new_path, Path) else new_path,
)
return new_v
@field_validator("notebooks_root_path", mode="before")
@classmethod
def validate_notebooks_path(cls, v: Any):
if isinstance(v, str):
return Path(v).expanduser()
if isinstance(v, Path):
return v.expanduser()
return v
def load_private_keys(self) -> set[str]:
keys = set()
for path in self.identity_paths:
with path.open("r") as f:
keys.add(f.read())
return keys
def load_public_keys(self) -> set[str]:
keys = set()
for path in self.recipient_paths:
if isinstance(path, Url):
try:
with hishel.CacheClient(
storage=hishel.FileStorage(
base_path=self.cache_path / "hishel"
),
timeout=3,
) as client:
response = client.get(
str(path),
timeout=self.remote_public_keys_timeout,
)
if response.status_code == httpx.codes.OK:
for line in response.content.decode().split("\n"):
if line:
keys.add(line)
except Exception as e: # noqa: BLE001
print(
f"[yellow] Could not retrieve public key from {path}. Ignoring error: '{e}'"
)
elif isinstance(path, Path):
with path.open("r") as f:
keys.add(f.read())
return keys
model_config = SettingsConfigDict(env_prefix="halig_")
@lru_cache
def load_from_file(file_path: Path | None = None) -> Settings:
if file_path is None:
xdg_config_home = Path(os.getenv("XDG_CONFIG_HOME", "~/.config")).expanduser()
if not xdg_config_home.exists():
err = f"File {xdg_config_home} does not exist"
raise FileNotFoundError(err)
file_path = xdg_config_home / "halig" / "halig.yml"
halig_config_home = platformdirs.user_config_path("halig", ensure_exists=True)
file_path = halig_config_home / "halig.yml"
file_path.touch(exist_ok=True)
elif not file_path.exists():
err = f"File {file_path} does not exist"

View file

@ -9,7 +9,7 @@ from rich import print
def now():
tz = local_timezone()
return pendulum.now(tz)
return pendulum.now(tz) # type: ignore[reportArgumentType]
def capture(fn: Callable):

View file

@ -1,6 +1,6 @@
import nox
VERSIONS = ["3.10", "3.11"]
VERSIONS = ["3.10", "3.11", "3.12"]
@nox.session(python=VERSIONS)

1843
pdm.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,12 +4,16 @@ authors = [
]
requires-python = ">=3.10"
dependencies = [
"typer<1.0.0,>=0.6.1",
"rich>=13.3.3",
"pydantic>=1.10.7",
"typer>=0.12",
"rich>=13.3",
"pydantic>=2.7",
"pyyaml>=6.0",
"pyrage>=1.0.3",
"pendulum>=2.1.2",
"pyrage>=1.1",
"pendulum>=3.0",
"httpx>=0.27",
"platformdirs>=4.2",
"pydantic-settings>=2.0",
"hishel>=0.0",
]
name = "halig"
dynamic = ["version"]
@ -25,6 +29,7 @@ classifiers = [
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Terminals",
"Topic :: Utilities",
"Typing :: Typed"
@ -64,7 +69,7 @@ testing = [
]
linting = [
"black>=23.3.0",
"ruff>=0.0.260",
"ruff>=0.1.0",
"pyright>=1.1.301",
"mypy>=1.1.1",
"types-PyYAML>=6.0.12.9",
@ -73,25 +78,32 @@ docs = [
"mkdocs-material>=9.1.5",
"mkdocstrings[python]>=0.20.0",
]
dev = [
"bump-pydantic>=0.6.0",
]
[tool.pytest]
mock_use_standalone_module = true
[tool.pyright]
reportMissingImports = false
reportMissingTypeStubs = false
reportAttributeAccessIssue = false
[tool.ruff]
extend-select = ["W", "C90", "I", "N", "UP", "S", "BLE", "FBT", "B", "A", "COM", "C4", "DTZ", "T10", "EM", "ISC", "T20", "PT", "RSE", "RET", "SIM", "PTH", "ERA", "PGH", "PL", "TRY", "RUF"]
extend-ignore = ["S101", "ISC002"]
[tool.ruff.lint]
extend-select = ["W", "C90", "I", "N", "UP", "S", "BLE", "B", "A", "COM", "C4", "DTZ", "T10", "EM", "ISC", "T20", "PT", "RSE", "RET", "SIM", "PTH", "ERA", "PGH", "PL", "TRY", "RUF", "FURB", "PERF"]
extend-ignore = ["S101", "ISC002", "COM812", "ISC001"]
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
[[tool.mypy.overrides]]
module = [
"pyrage",
"pyrage.ssh",
"pyrage.x25519",
"attr",
"httpx_cache"
]
ignore_missing_imports = true

View file

@ -30,7 +30,26 @@ def notes(notebooks_path: Path):
(dailies / f"{dt.date()}.age").touch()
@pytest.fixture
@pytest.fixture()
def unencrypted_notes(notebooks_path):
unencrypted_root_path = notebooks_path / "unencrypted"
unencrypted_root_path.mkdir()
for i in range(5):
note = unencrypted_root_path / f"note_{i}.md"
note.touch()
subnote_path = unencrypted_root_path / f"inner_{i}"
subnote_path.mkdir()
for j in range(2):
subnote = subnote_path / f"note_{i}_{j}.md"
subnote.touch()
with subnote.open("w") as f:
f.write(f"subnote {i} {j}")
with note.open("w") as f:
f.write(f"note {i}")
return unencrypted_root_path
@pytest.fixture()
def notebooks_command(settings: Settings):
return NotebooksCommand(max_depth=float("inf"), settings=settings)
@ -39,27 +58,27 @@ def notebooks_command(settings: Settings):
def current_note(notes, settings, encryptor) -> Path:
note_path = settings.notebooks_root_path / f"{utils.now().date()}.age"
note_path.touch()
data = encryptor.encrypt("foo".encode())
data = encryptor.encrypt(b"foo")
with note_path.open("wb") as f:
f.write(data)
return note_path
@pytest.fixture
@pytest.fixture()
def current_daily(notes, settings, encryptor) -> Path:
note_path = (
settings.notebooks_root_path / "Work" / "Dailies" / f"{utils.now().date()}.age"
)
data = encryptor.encrypt("foo".encode())
data = encryptor.encrypt(b"foo")
with note_path.open("wb") as f:
f.write(data)
return note_path
@pytest.fixture
@pytest.fixture()
def mock_edit(mocker):
def edit(callargs: list):
with open(callargs[1], "wb") as f:
f.write("edited".encode())
f.write(b"edited")
mocker.patch("halig.commands.edit.subprocess.call", side_effect=edit)

View file

@ -16,7 +16,8 @@ def test_edit_raises_invalid_age_file(notes, settings: Settings):
def test_edit_current_note(mock_edit, current_note, settings: Settings, encryptor):
edit_command = EditCommand(
note_path=settings.notebooks_root_path, settings=settings
note_path=settings.notebooks_root_path,
settings=settings,
)
assert edit_command.note_path == current_note
edit_command.run()

View file

@ -0,0 +1,47 @@
from pathlib import Path
import pytest
from halig.commands.import_unencrypted import ImportCommand
from halig.encryption import Encryptor
@pytest.fixture()
def command(settings) -> ImportCommand:
return ImportCommand(settings)
def test_get_importables(unencrypted_notes: Path, command: ImportCommand):
notes = list(command.get_importables())
assert len(notes) == 15
assert notes == list(unencrypted_notes.glob("**/*.md"))
def test_import(unencrypted_notes: Path, command: ImportCommand, encryptor: Encryptor):
command.run()
notes = command.get_importables()
encrypted_notes = list(unencrypted_notes.glob("**/*.age"))
assert len(encrypted_notes) == len(list(notes)) == 15
for note in encrypted_notes:
with note.open("rb") as f:
encrypted_data = f.read()
data = encryptor.decrypt(encrypted_data)
if "inner" in str(note):
assert (
f'sub{note.name.replace(".age", "").replace("_"," ")}' == data.decode()
)
else:
assert note.name.replace(".age", "").replace("_", " ") == data.decode()
def test_import_unlink(
unencrypted_notes: Path,
command: ImportCommand,
encryptor: Encryptor,
):
command.unlink = True
command.run()
notes = command.get_importables()
encrypted_notes = list(unencrypted_notes.glob("**/*.age"))
assert len(encrypted_notes) == 15
assert len(list(unencrypted_notes.glob("**/*.md"))) == 0

View file

@ -25,12 +25,14 @@ def test_build_tree_max_depth_2(notes, notebooks_command: NotebooksCommand):
work = tree.children[1]
assert personal.label == "Personal"
assert work.label == "Work"
assert len(work.children) == 2
assert len(personal.children) == 1
assert len(work.children) == 1
assert len(personal.children) == 0
def test_build_tree_max_depth_inf(notes, notebooks_command: NotebooksCommand):
tree = notebooks_command.build_tree(notebooks_command.settings.notebooks_root_path)
def test_build_tree_max_depth_inf(notes, settings):
tree = NotebooksCommand(max_depth=float("inf"), settings=settings, include_notes=True).build_tree(
settings.notebooks_root_path
)
personal = tree.children[0]
work = tree.children[1]
assert personal.label == "Personal"

View file

@ -0,0 +1,25 @@
import pytest
from halig import utils
from halig.commands.reencrypt import ReencryptCommand
@pytest.fixture()
def reencrypt_command(settings):
return ReencryptCommand(settings)
@pytest.mark.usefixtures("notes")
def test_reencrypt(reencrypt_command):
reencrypt_command.run()
for note_path in reencrypt_command.traverse():
with note_path.open("rb") as f:
assert reencrypt_command.encryptor.decrypt(f.read()) == b""
@pytest.mark.usefixtures("current_daily")
def test_reencrypt_warns_no_matching_key(reencrypt_command, halig_ssh_path, capfd):
reencrypt_command.encryptor.identities = []
reencrypt_command.run()
out, _ = capfd.readouterr()
assert f'because no matching keys were found, skipping ...' in out

View file

@ -26,7 +26,8 @@ def test_show_raises_note_path_is_not_age_valid(notes, settings: Settings):
def test_show_current_note(current_note, settings):
show_command = ShowCommand(
note_path=settings.notebooks_root_path, settings=settings
note_path=settings.notebooks_root_path,
settings=settings,
)
assert show_command.note_path == current_note
assert show_command.decrypt() == "foo"

View file

@ -42,10 +42,9 @@ def ssh_recipient(halig_ssh_public_key: str) -> Recipient:
@pytest.fixture()
def halig_path(fs, halig_ssh_public_key, halig_ssh_private_key) -> Path:
fs.add_real_paths(["/etc/localtime"])
ssh_path = Path("~/.ssh").expanduser()
ssh_path.mkdir(parents=True)
def halig_ssh_path(tmp_path: Path, halig_ssh_public_key, halig_ssh_private_key) -> Path:
ssh_path = tmp_path / ".ssh"
ssh_path.mkdir()
with (ssh_path / "id_ed25519").open("w") as f:
f.write(halig_ssh_private_key)
@ -53,38 +52,47 @@ def halig_path(fs, halig_ssh_public_key, halig_ssh_private_key) -> Path:
with (ssh_path / "id_ed25519.pub").open("w") as f:
f.write(halig_ssh_public_key)
halig_path = Path("~/.config/halig").expanduser()
return ssh_path
@pytest.fixture()
def halig_config_path(tmp_path: Path):
halig_path = tmp_path / ".config/halig"
halig_path.mkdir(parents=True)
return halig_path
@pytest.fixture()
def notebooks_path(halig_path) -> Path:
notebooks_path = Path("~/Notebooks").expanduser()
notebooks_path.mkdir(parents=True)
def notebooks_path(tmp_path) -> Path:
notebooks_path = tmp_path / "Notebooks"
notebooks_path.mkdir()
return notebooks_path
@pytest.fixture()
def settings(notebooks_path: Path) -> Settings:
return Settings(notebooks_root_path=notebooks_path)
def settings(notebooks_path: Path, halig_ssh_path) -> Settings:
return Settings(
notebooks_root_path=notebooks_path,
identity_paths=[halig_ssh_path / "id_ed25519"],
recipient_paths=[halig_ssh_path / "id_ed25519.pub"]
)
@pytest.fixture()
def settings_file_path(halig_path: Path, notebooks_path: Path) -> Path:
yaml_file = halig_path / "halig.yml"
def settings_file_path(halig_config_path: Path, notebooks_path: Path) -> Path:
yaml_file = halig_config_path / "halig.yml"
yaml_file.touch()
s = Settings(notebooks_root_path=notebooks_path)
# `.dict()` doesn't serialize some fields that yaml doesn't understand
serialized = json.loads(s.json())
serialized = json.loads(s.model_dump_json())
with yaml_file.open("w") as f:
yaml.safe_dump(serialized, f)
return yaml_file
@pytest.fixture()
def empty_file_path(halig_path: Path) -> Path:
empty_path = halig_path / "empty"
def empty_file_path(halig_config_path: Path) -> Path:
empty_path = halig_config_path / "empty"
empty_path.touch()
return empty_path

View file

@ -4,30 +4,42 @@ from halig.encryption import Encryptor
from halig.settings import Settings
def test_instance_encryptor_from_age_keys(halig_path, notebooks_path):
identity = x25519.Identity.generate()
identity_path = halig_path / "identity.key"
identity_path.touch()
recipient_path = halig_path / "recipient.key"
recipient_path.touch()
with identity_path.open("w") as f:
f.write(str(identity))
def test_instance_encryptor_from_age_keys(notebooks_path, halig_config_path):
identity_paths = []
recipient_paths = []
identities = []
for i in range(5):
identity = x25519.Identity.generate()
identities.append(identity)
identity_path = halig_config_path / f"identity_{i}.key"
identity_path.touch()
with identity_path.open("w") as f:
f.write(str(identity))
identity_paths.append(identity_path)
with recipient_path.open("w") as f:
f.write(str(identity.to_public()))
recipient_path = halig_config_path / f"recipient_{i}.key"
recipient_path.touch()
with recipient_path.open("w") as f:
f.write(str(identity.to_public()))
recipient_paths.append(recipient_path)
settings = Settings(
notebooks_root_path=notebooks_path,
identity_path=identity_path,
recipient_path=recipient_path,
identity_paths=identity_paths,
recipient_paths=recipient_paths,
)
assert Encryptor(settings)
encryptor = Encryptor(settings)
for identity in identities:
assert str(identity) in [str(identity) for identity in encryptor.identities]
assert str(identity.to_public()) in [
str(recipient) for recipient in encryptor.recipients
]
def test_encrypt(encryptor: Encryptor, ssh_identity):
unencrypted_data = "foo"
encrypted_data = encryptor.encrypt(unencrypted_data)
assert isinstance(encrypted_data, bytes)
assert unencrypted_data == decrypt(encrypted_data, [ssh_identity]).decode()

View file

@ -11,7 +11,7 @@ def test_settings_from_env(settings: Settings, notebooks_root_path_envvar):
def test_settings_from_non_existing_file_raises_value_error():
with pytest.raises(ValueError, match="field required"):
with pytest.raises(ValueError, match="Field required"):
Settings() # type: ignore[call-arg]
@ -20,23 +20,12 @@ def test_load_from_file(notebooks_path: Path, settings_file_path: Path):
assert settings.notebooks_root_path == notebooks_path
def test_load_from_non_xdg_home_config_raises_file_not_found_error(fs):
path = Path("~/.config").expanduser()
with pytest.raises(FileNotFoundError, match=f"File {path} does not exist"):
load_from_file()
def test_load_from_existing_standard_file(settings_file_path: Path, settings: Settings):
standard_settings = load_from_file()
assert standard_settings.notebooks_root_path == settings.notebooks_root_path
def test_load_from_empty_file_raises_value_error(empty_file_path: Path):
with pytest.raises(ValueError, match=f"File {empty_file_path} is empty"):
load_from_file(empty_file_path)
def test_load_from_non_existing_file_path_raises_file_not_found_error(halig_path: Path):
file = halig_path / "some_invalid_file.yml"
def test_load_from_non_existing_file_path_raises_file_not_found_error(halig_config_path: Path):
file = halig_config_path / "some_invalid_file.yml"
with pytest.raises(FileNotFoundError, match=f"File {file} does not exist"):
load_from_file(file)

View file

@ -1,6 +1,4 @@
from typing import Callable
import pytest
from collections.abc import Callable
from halig.utils import capture
@ -10,7 +8,8 @@ def exec_capture(func: Callable):
def test_capture():
def func(): return 1
def func():
return 1
assert exec_capture(func) == 1
@ -22,9 +21,10 @@ def test_capture_exits_with_custom_os_error(mocker):
nonlocal exit_code
exit_code = code
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
def func(): raise OSError(2, "os_error_func")
def func():
raise OSError(2, "os_error_func")
exec_capture(func)
assert exit_code == 2
@ -37,9 +37,10 @@ def test_capture_exits_with_os_error(mocker):
nonlocal exit_code
exit_code = code
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
def func(): raise OSError
def func():
raise OSError
exec_capture(func)
assert not exit_code
@ -52,9 +53,10 @@ def test_capture_exits_with_value_error(mocker):
nonlocal exit_code
exit_code = code
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
def func(): raise ValueError("value_error_func")
def func():
raise ValueError("value_error_func")
exec_capture(func)
assert exit_code == 1
@ -67,9 +69,10 @@ def test_capture_exits_with_other_error(mocker):
nonlocal exit_code
exit_code = code
mocker.patch('halig.utils.sys.exit', side_effect=mock_exit)
mocker.patch("halig.utils.sys.exit", side_effect=mock_exit)
def func(): raise ArithmeticError("arithmetic_error_func")
def func():
raise ArithmeticError("arithmetic_error_func")
exec_capture(func)
assert exit_code == 2