diff --git a/halig/commands/search.py b/halig/commands/search.py new file mode 100644 index 0000000..5fe0e3f --- /dev/null +++ b/halig/commands/search.py @@ -0,0 +1,169 @@ +import hashlib +import os +import re +import sqlite3 +from collections.abc import Generator +from pathlib import Path + +import platformdirs +from rich.console import Console + +from halig.commands.base import BaseCommand +from halig.encryption import Encryptor +from halig.settings import Settings + + +class SearchCommand(BaseCommand): + """Full text search against a SQLite located at $HOME/.cache/halig.db + + The database schema is pretty simple and it uses SQLite's FT5 for + the full text search capabilities: + + CREATE VIRTUAL TABLE note USING fts5(last_timestamp, hash, filepath, body); + """ + + def __init__( + self, + search_term: str, + settings: Settings, + should_index: bool = False, + ): + self.search_term = search_term + self.settings = settings + self.should_index = should_index + self.encryptor = Encryptor(self.settings) + + def _create_schema(self): + """Create or repair the database schema""" + db_path = self._get_database_path() + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # Create or repair the schema + cursor.execute( + """CREATE VIRTUAL TABLE IF NOT EXISTS notes + USING fts5(last_timestamp, hash, filepath, body); + """, + ) + + conn.commit() + conn.close() + + def _check_index_status(self): + """Check the db's notes indexing status using the hash and the timestamp""" + db_path = self._get_database_path() + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # Query the database to check if it's already indexed + cursor.execute("SELECT COUNT(*) FROM note;") + count = cursor.fetchone()[0] + + conn.close() + + return count > 0 + + def _do_index(self): + """Index the notes, either partially or fully""" + db_path = self._get_database_path() + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # Delete existing records before re-indexing + cursor.execute("DELETE FROM note;") + + # Traverse the notebook directory and index the notes + for path in self._get_notebook_files(): + encrypted_data = self._read_encrypted_file(path) + decrypted_data = self.encryptor.decrypt(encrypted_data) + + # Calculate the hash of the decrypted data + hash_value = self._calculate_hash(decrypted_data) + + # Insert the indexed data into the database + cursor.execute( + """INSERT INTO notes (last_timestamp, hash, filepath, body) + VALUES (?, ?, ?, ?);""", + (os.path.getmtime(path), hash_value, str(path), decrypted_data), + ) + + conn.commit() + conn.close() + + def run(self): + """`halig search` entrypoint, which does a few checks before running + the query. + + 1. Check if the notes are indexed + 2. If there are notes to be indexed or the database does not exist + or it has an incorrect schema, the user is prompted to allow + the program to reindex + 3. After we're sure the database is in a correct state, we perform the + query + 4. We print the results as if it were `grep -rin` output + """ + self._create_schema() + # Check if indexing is required or if the database is in an incorrect state + index_status = self._check_index_status() + if self.should_index or not index_status: + self._do_index() + + # Perform the search query + self._perform_search() + + def _perform_search(self): + """Perform the search query and print the results + with highlighted search term + """ + db_path = self._get_database_path() + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # Execute the search query + cursor.execute( + "SELECT filepath, body FROM note WHERE body MATCH ?;", + (self.search_term,), + ) + + # Fetch and print the results with highlighted search term + console = Console() + search_regex = re.compile(re.escape(self.search_term), re.IGNORECASE) + + results = cursor.fetchall() + for result in results: + filepath, body = result + + # Split the body into lines + lines = body.decode().split("\n") + + # Iterate over lines and find the line number where the search term is found + for lineno, line in enumerate(lines, start=1): + match = search_regex.search(line) + if match: + content_line = search_regex.sub("[bold red]\\g<0>[/bold red]", line) + console.print(f"{filepath}:{lineno}: {content_line}") + + conn.close() + + def _get_database_path(self) -> Path: + """Get the path to the SQLite database""" + cache_dir = platformdirs.user_cache_path("halig", ensure_exists=True) + db_path = cache_dir / "halig.db" + db_path.touch() + return db_path + + def _get_notebook_files(self) -> Generator[Path, None, None]: + """Get the list of notebook files to index""" + return self.settings.notebooks_root_path.glob("**/*.age") + + def _read_encrypted_file(self, file_path: Path) -> bytes: + """Read the encrypted contents of a file""" + with file_path.open("rb") as file: + return file.read() + + def _calculate_hash(self, data: bytes) -> str: + """Calculate the hash of the data""" + # Use an appropriate hash algorithm, e.g., hashlib.sha256() + # Adjust the hashing algorithm based on your requirements + hash_object = hashlib.sha256(data) + return hash_object.hexdigest() diff --git a/halig/main.py b/halig/main.py index 6a9fba3..2247a18 100644 --- a/halig/main.py +++ b/halig/main.py @@ -11,6 +11,7 @@ from halig.__version__ import __version__ from halig.commands.edit import EditCommand from halig.commands.import_unencrypted import ImportCommand from halig.commands.notebooks import NotebooksCommand +from halig.commands.search import SearchCommand from halig.commands.show import ShowCommand from halig.settings import load_from_file from halig.utils import capture @@ -114,6 +115,19 @@ def import_unencrypted( command.run() +@app.command() +def search( + search_term: str, +): + settings = load_from_file() + command = SearchCommand( + search_term=search_term, + should_index=True, + settings=settings, + ) + command.run() + + @app.command(help=literals.COMMANDS_VERSION) @capture def version():