wip: add full-text search against SQLite's FTS5
This commit is contained in:
parent
d328e640e0
commit
4eb438bab3
2 changed files with 183 additions and 0 deletions
169
halig/commands/search.py
Normal file
169
halig/commands/search.py
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
import platformdirs
|
||||
from rich.console import Console
|
||||
|
||||
from halig.commands.base import BaseCommand
|
||||
from halig.encryption import Encryptor
|
||||
from halig.settings import Settings
|
||||
|
||||
|
||||
class SearchCommand(BaseCommand):
|
||||
"""Full text search against a SQLite located at $HOME/.cache/halig.db
|
||||
|
||||
The database schema is pretty simple and it uses SQLite's FT5 for
|
||||
the full text search capabilities:
|
||||
|
||||
CREATE VIRTUAL TABLE note USING fts5(last_timestamp, hash, filepath, body);
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
search_term: str,
|
||||
settings: Settings,
|
||||
should_index: bool = False,
|
||||
):
|
||||
self.search_term = search_term
|
||||
self.settings = settings
|
||||
self.should_index = should_index
|
||||
self.encryptor = Encryptor(self.settings)
|
||||
|
||||
def _create_schema(self):
|
||||
"""Create or repair the database schema"""
|
||||
db_path = self._get_database_path()
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create or repair the schema
|
||||
cursor.execute(
|
||||
"""CREATE VIRTUAL TABLE IF NOT EXISTS notes
|
||||
USING fts5(last_timestamp, hash, filepath, body);
|
||||
""",
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def _check_index_status(self):
|
||||
"""Check the db's notes indexing status using the hash and the timestamp"""
|
||||
db_path = self._get_database_path()
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Query the database to check if it's already indexed
|
||||
cursor.execute("SELECT COUNT(*) FROM note;")
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
conn.close()
|
||||
|
||||
return count > 0
|
||||
|
||||
def _do_index(self):
|
||||
"""Index the notes, either partially or fully"""
|
||||
db_path = self._get_database_path()
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Delete existing records before re-indexing
|
||||
cursor.execute("DELETE FROM note;")
|
||||
|
||||
# Traverse the notebook directory and index the notes
|
||||
for path in self._get_notebook_files():
|
||||
encrypted_data = self._read_encrypted_file(path)
|
||||
decrypted_data = self.encryptor.decrypt(encrypted_data)
|
||||
|
||||
# Calculate the hash of the decrypted data
|
||||
hash_value = self._calculate_hash(decrypted_data)
|
||||
|
||||
# Insert the indexed data into the database
|
||||
cursor.execute(
|
||||
"""INSERT INTO notes (last_timestamp, hash, filepath, body)
|
||||
VALUES (?, ?, ?, ?);""",
|
||||
(os.path.getmtime(path), hash_value, str(path), decrypted_data),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def run(self):
|
||||
"""`halig search` entrypoint, which does a few checks before running
|
||||
the query.
|
||||
|
||||
1. Check if the notes are indexed
|
||||
2. If there are notes to be indexed or the database does not exist
|
||||
or it has an incorrect schema, the user is prompted to allow
|
||||
the program to reindex
|
||||
3. After we're sure the database is in a correct state, we perform the
|
||||
query
|
||||
4. We print the results as if it were `grep -rin` output
|
||||
"""
|
||||
self._create_schema()
|
||||
# Check if indexing is required or if the database is in an incorrect state
|
||||
index_status = self._check_index_status()
|
||||
if self.should_index or not index_status:
|
||||
self._do_index()
|
||||
|
||||
# Perform the search query
|
||||
self._perform_search()
|
||||
|
||||
def _perform_search(self):
|
||||
"""Perform the search query and print the results
|
||||
with highlighted search term
|
||||
"""
|
||||
db_path = self._get_database_path()
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Execute the search query
|
||||
cursor.execute(
|
||||
"SELECT filepath, body FROM note WHERE body MATCH ?;",
|
||||
(self.search_term,),
|
||||
)
|
||||
|
||||
# Fetch and print the results with highlighted search term
|
||||
console = Console()
|
||||
search_regex = re.compile(re.escape(self.search_term), re.IGNORECASE)
|
||||
|
||||
results = cursor.fetchall()
|
||||
for result in results:
|
||||
filepath, body = result
|
||||
|
||||
# Split the body into lines
|
||||
lines = body.decode().split("\n")
|
||||
|
||||
# Iterate over lines and find the line number where the search term is found
|
||||
for lineno, line in enumerate(lines, start=1):
|
||||
match = search_regex.search(line)
|
||||
if match:
|
||||
content_line = search_regex.sub("[bold red]\\g<0>[/bold red]", line)
|
||||
console.print(f"{filepath}:{lineno}: {content_line}")
|
||||
|
||||
conn.close()
|
||||
|
||||
def _get_database_path(self) -> Path:
|
||||
"""Get the path to the SQLite database"""
|
||||
cache_dir = platformdirs.user_cache_path("halig", ensure_exists=True)
|
||||
db_path = cache_dir / "halig.db"
|
||||
db_path.touch()
|
||||
return db_path
|
||||
|
||||
def _get_notebook_files(self) -> Generator[Path, None, None]:
|
||||
"""Get the list of notebook files to index"""
|
||||
return self.settings.notebooks_root_path.glob("**/*.age")
|
||||
|
||||
def _read_encrypted_file(self, file_path: Path) -> bytes:
|
||||
"""Read the encrypted contents of a file"""
|
||||
with file_path.open("rb") as file:
|
||||
return file.read()
|
||||
|
||||
def _calculate_hash(self, data: bytes) -> str:
|
||||
"""Calculate the hash of the data"""
|
||||
# Use an appropriate hash algorithm, e.g., hashlib.sha256()
|
||||
# Adjust the hashing algorithm based on your requirements
|
||||
hash_object = hashlib.sha256(data)
|
||||
return hash_object.hexdigest()
|
||||
|
|
@ -11,6 +11,7 @@ from halig.__version__ import __version__
|
|||
from halig.commands.edit import EditCommand
|
||||
from halig.commands.import_unencrypted import ImportCommand
|
||||
from halig.commands.notebooks import NotebooksCommand
|
||||
from halig.commands.search import SearchCommand
|
||||
from halig.commands.show import ShowCommand
|
||||
from halig.settings import load_from_file
|
||||
from halig.utils import capture
|
||||
|
|
@ -114,6 +115,19 @@ def import_unencrypted(
|
|||
command.run()
|
||||
|
||||
|
||||
@app.command()
|
||||
def search(
|
||||
search_term: str,
|
||||
):
|
||||
settings = load_from_file()
|
||||
command = SearchCommand(
|
||||
search_term=search_term,
|
||||
should_index=True,
|
||||
settings=settings,
|
||||
)
|
||||
command.run()
|
||||
|
||||
|
||||
@app.command(help=literals.COMMANDS_VERSION)
|
||||
@capture
|
||||
def version():
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue