Add comprehensive PGP unlock key testing with non-interactive GPG support

This commit is contained in:
Jeffrey Paul 2025-05-29 15:05:58 -07:00
parent 85d7ef21eb
commit 1a1b11c5a3
2 changed files with 339 additions and 28 deletions

View File

@ -15,6 +15,17 @@ import (
"github.com/spf13/afero"
)
// Variables to allow overriding in tests
var (
// GPGEncryptFunc is the function used for GPG encryption
// Can be overridden in tests to provide a non-interactive implementation
GPGEncryptFunc = gpgEncryptDefault
// GPGDecryptFunc is the function used for GPG decryption
// Can be overridden in tests to provide a non-interactive implementation
GPGDecryptFunc = gpgDecryptDefault
)
// PGPUnlockKeyMetadata extends UnlockKeyMetadata with PGP-specific data
type PGPUnlockKeyMetadata struct {
UnlockKeyMetadata
@ -56,7 +67,7 @@ func (p *PGPUnlockKey) GetIdentity() (*age.X25519Identity, error) {
// Step 2: Decrypt the age private key using GPG
Debug("Decrypting age private key with GPG", "key_id", p.GetID())
agePrivKeyData, err := gpgDecrypt(encryptedAgePrivKeyData)
agePrivKeyData, err := GPGDecryptFunc(encryptedAgePrivKeyData)
if err != nil {
Debug("Failed to decrypt age private key with GPG", "error", err, "key_id", p.GetID())
return nil, fmt.Errorf("failed to decrypt age private key with GPG: %w", err)
@ -271,7 +282,7 @@ func CreatePGPUnlockKey(fs afero.Fs, stateDir string, gpgKeyID string) (*PGPUnlo
// Step 8: Encrypt age private key to the GPG key ID
agePrivateKeyBytes := []byte(ageIdentity.String())
encryptedAgePrivKey, err := gpgEncrypt(agePrivateKeyBytes, gpgKeyID)
encryptedAgePrivKey, err := GPGEncryptFunc(agePrivateKeyBytes, gpgKeyID)
if err != nil {
return nil, fmt.Errorf("failed to encrypt age private key with GPG: %w", err)
}
@ -322,8 +333,8 @@ func checkGPGAvailable() error {
return nil
}
// gpgEncrypt encrypts data to the specified GPG key ID
func gpgEncrypt(data []byte, keyID string) ([]byte, error) {
// gpgEncryptDefault is the default implementation of GPG encryption
func gpgEncryptDefault(data []byte, keyID string) ([]byte, error) {
cmd := exec.Command("gpg", "--trust-model", "always", "--armor", "--encrypt", "-r", keyID)
cmd.Stdin = strings.NewReader(string(data))
@ -335,8 +346,8 @@ func gpgEncrypt(data []byte, keyID string) ([]byte, error) {
return output, nil
}
// gpgDecrypt decrypts GPG-encrypted data
func gpgDecrypt(encryptedData []byte) ([]byte, error) {
// gpgDecryptDefault is the default implementation of GPG decryption
func gpgDecryptDefault(encryptedData []byte) ([]byte, error) {
cmd := exec.Command("gpg", "--quiet", "--decrypt")
cmd.Stdin = strings.NewReader(string(encryptedData))

View File

@ -1,8 +1,10 @@
package secret_test
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
@ -10,10 +12,117 @@ import (
"testing"
"time"
"filippo.io/age"
"git.eeqj.de/sneak/secret/internal/secret"
"git.eeqj.de/sneak/secret/internal/vault"
"git.eeqj.de/sneak/secret/pkg/agehd"
"github.com/spf13/afero"
)
// Register vault with secret package for testing
func init() {
// Register the vault.GetCurrentVault function with the secret package
secret.RegisterGetCurrentVaultFunc(func(fs afero.Fs, stateDir string) (secret.VaultInterface, error) {
return vault.GetCurrentVault(fs, stateDir)
})
}
// setupNonInteractiveGPG creates a custom GPG environment for testing
func setupNonInteractiveGPG(t *testing.T, tempDir, passphrase, gnupgHomeDir string) {
// Create GPG config file for non-interactive operation
gpgConfPath := filepath.Join(gnupgHomeDir, "gpg.conf")
gpgConfContent := `batch
no-tty
pinentry-mode loopback
`
if err := os.WriteFile(gpgConfPath, []byte(gpgConfContent), 0600); err != nil {
t.Fatalf("Failed to write GPG config file: %v", err)
}
// Create a test-specific GPG implementation
origEncryptFunc := secret.GPGEncryptFunc
origDecryptFunc := secret.GPGDecryptFunc
// Set custom GPG functions for this test
secret.GPGEncryptFunc = func(data []byte, keyID string) ([]byte, error) {
cmd := exec.Command("gpg",
"--homedir", gnupgHomeDir,
"--batch",
"--yes",
"--pinentry-mode", "loopback",
"--passphrase", passphrase,
"--trust-model", "always",
"--armor",
"--encrypt",
"-r", keyID)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Stdin = bytes.NewReader(data)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("GPG encryption failed: %w\nStderr: %s", err, stderr.String())
}
return stdout.Bytes(), nil
}
secret.GPGDecryptFunc = func(encryptedData []byte) ([]byte, error) {
cmd := exec.Command("gpg",
"--homedir", gnupgHomeDir,
"--batch",
"--yes",
"--pinentry-mode", "loopback",
"--passphrase", passphrase,
"--quiet",
"--decrypt")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Stdin = bytes.NewReader(encryptedData)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("GPG decryption failed: %w\nStderr: %s", err, stderr.String())
}
return stdout.Bytes(), nil
}
// Restore original functions after test
t.Cleanup(func() {
secret.GPGEncryptFunc = origEncryptFunc
secret.GPGDecryptFunc = origDecryptFunc
})
}
// runGPGWithPassphrase executes a GPG command with the specified passphrase
func runGPGWithPassphrase(gnupgHome, passphrase string, args []string, input io.Reader) ([]byte, error) {
cmdArgs := []string{
"--homedir=" + gnupgHome,
"--batch",
"--yes",
"--pinentry-mode", "loopback",
"--passphrase", passphrase,
}
cmdArgs = append(cmdArgs, args...)
cmd := exec.Command("gpg", cmdArgs...)
cmd.Stdin = input
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return nil, fmt.Errorf("GPG command failed: %w\nStderr: %s", err, stderr.String())
}
return stdout.Bytes(), nil
}
func TestPGPUnlockKeyWithRealFS(t *testing.T) {
// Skip tests if gpg is not available
if _, err := exec.LookPath("gpg"); err != nil {
@ -48,6 +157,12 @@ func TestPGPUnlockKeyWithRealFS(t *testing.T) {
}
}()
// Test passphrase for GPG key
testPassphrase := "test123"
// Setup non-interactive GPG with custom functions
setupNonInteractiveGPG(t, tempDir, testPassphrase, gnupgHomeDir)
// Create GPG batch file for key generation
batchFile := filepath.Join(tempDir, "gen-key-batch")
batchContent := `%echo Generating a test key
@ -56,7 +171,7 @@ Key-Length: 2048
Name-Real: Test User
Name-Email: test@example.com
Expire-Date: 0
Passphrase: test123
Passphrase: ` + testPassphrase + `
%commit
%echo Key generation completed
`
@ -64,20 +179,20 @@ Passphrase: test123
t.Fatalf("Failed to write batch file: %v", err)
}
// Generate GPG key
// Generate GPG key with batch mode
t.Log("Generating GPG key...")
cmd := exec.Command("gpg", "--batch", "--gen-key", batchFile)
output, err := cmd.CombinedOutput()
output, err := runGPGWithPassphrase(gnupgHomeDir, testPassphrase,
[]string{"--gen-key", batchFile}, nil)
if err != nil {
t.Fatalf("Failed to generate GPG key: %v\nOutput: %s", err, output)
t.Fatalf("Failed to generate GPG key: %v", err)
}
t.Log("GPG key generated successfully")
// Get the key ID
cmd = exec.Command("gpg", "--list-secret-keys", "--with-colons")
output, err = cmd.CombinedOutput()
output, err = runGPGWithPassphrase(gnupgHomeDir, testPassphrase,
[]string{"--list-secret-keys", "--with-colons"}, nil)
if err != nil {
t.Fatalf("Failed to list GPG keys: %v\nOutput: %s", err, output)
t.Fatalf("Failed to list GPG keys: %v", err)
}
// Parse output to get key ID
@ -98,11 +213,16 @@ Passphrase: test123
}
t.Logf("Generated GPG key ID: %s", keyID)
// Export GNUPGHOME variable to ensure subprocesses inherit it
err = os.Setenv("GNUPGHOME", gnupgHomeDir)
if err != nil {
t.Fatalf("Failed to set GNUPGHOME environment variable: %v", err)
}
// Set the GPG_AGENT_INFO to empty to ensure gpg-agent doesn't interfere
oldAgentInfo := os.Getenv("GPG_AGENT_INFO")
os.Setenv("GPG_AGENT_INFO", "")
defer func() {
if oldAgentInfo != "" {
os.Setenv("GPG_AGENT_INFO", oldAgentInfo)
} else {
os.Unsetenv("GPG_AGENT_INFO")
}
}()
// Use the real filesystem
fs := afero.NewOsFs()
@ -133,7 +253,158 @@ Passphrase: test123
}
}()
// Create the directory structure for test
// Set up vault structure for testing
stateDir := tempDir
vaultName := "test-vault"
// Test creation of a PGP unlock key through a vault
t.Run("CreatePGPUnlockKey", func(t *testing.T) {
// Set a limited test timeout to avoid hanging
timer := time.AfterFunc(30*time.Second, func() {
t.Fatalf("Test timed out after 30 seconds")
})
defer timer.Stop()
// Create a test vault directory structure
vlt, err := vault.CreateVault(fs, stateDir, vaultName)
if err != nil {
t.Fatalf("Failed to create vault: %v", err)
}
// Set the current vault
err = vault.SelectVault(fs, stateDir, vaultName)
if err != nil {
t.Fatalf("Failed to select vault: %v", err)
}
// Derive long-term key from mnemonic
ltIdentity, err := agehd.DeriveIdentity(testMnemonic, 0)
if err != nil {
t.Fatalf("Failed to derive long-term key: %v", err)
}
// Get the vault directory
vaultDir, err := vlt.GetDirectory()
if err != nil {
t.Fatalf("Failed to get vault directory: %v", err)
}
// Write long-term public key
ltPubKeyPath := filepath.Join(vaultDir, "pub.age")
if err := afero.WriteFile(fs, ltPubKeyPath, []byte(ltIdentity.Recipient().String()), secret.FilePerms); err != nil {
t.Fatalf("Failed to write long-term public key: %v", err)
}
// Unlock the vault
vlt.Unlock(ltIdentity)
// Create a passphrase unlock key first (to have current unlock key)
passKey, err := vlt.CreatePassphraseKey("test-passphrase")
if err != nil {
t.Fatalf("Failed to create passphrase key: %v", err)
}
// Verify passphrase key was created
if passKey == nil {
t.Fatal("Passphrase key is nil")
}
// Now create a PGP unlock key (this will use our custom GPGEncryptFunc)
pgpKey, err := secret.CreatePGPUnlockKey(fs, stateDir, keyID)
if err != nil {
t.Fatalf("Failed to create PGP unlock key: %v", err)
}
// Verify the PGP unlock key was created
if pgpKey == nil {
t.Fatal("PGP unlock key is nil")
}
// Check if the key has the correct type
if pgpKey.GetType() != "pgp" {
t.Errorf("Expected PGP unlock key type 'pgp', got '%s'", pgpKey.GetType())
}
// Check if the key ID includes the GPG key ID
if !strings.Contains(pgpKey.GetID(), keyID) {
t.Errorf("PGP unlock key ID '%s' does not contain GPG key ID '%s'", pgpKey.GetID(), keyID)
}
// Check if the key directory exists
keyDir := pgpKey.GetDirectory()
keyExists, err := afero.DirExists(fs, keyDir)
if err != nil {
t.Fatalf("Failed to check if PGP key directory exists: %v", err)
}
if !keyExists {
t.Errorf("PGP unlock key directory does not exist: %s", keyDir)
}
// Check if required files exist
pubKeyPath := filepath.Join(keyDir, "pub.age")
pubKeyExists, err := afero.Exists(fs, pubKeyPath)
if err != nil {
t.Fatalf("Failed to check if public key file exists: %v", err)
}
if !pubKeyExists {
t.Errorf("PGP unlock key public key file does not exist: %s", pubKeyPath)
}
privKeyPath := filepath.Join(keyDir, "priv.age.gpg")
privKeyExists, err := afero.Exists(fs, privKeyPath)
if err != nil {
t.Fatalf("Failed to check if private key file exists: %v", err)
}
if !privKeyExists {
t.Errorf("PGP unlock key private key file does not exist: %s", privKeyPath)
}
metadataPath := filepath.Join(keyDir, "unlock-metadata.json")
metadataExists, err := afero.Exists(fs, metadataPath)
if err != nil {
t.Fatalf("Failed to check if metadata file exists: %v", err)
}
if !metadataExists {
t.Errorf("PGP unlock key metadata file does not exist: %s", metadataPath)
}
longtermPath := filepath.Join(keyDir, "longterm.age")
longtermExists, err := afero.Exists(fs, longtermPath)
if err != nil {
t.Fatalf("Failed to check if longterm key file exists: %v", err)
}
if !longtermExists {
t.Errorf("PGP unlock key longterm key file does not exist: %s", longtermPath)
}
// Read and verify metadata
metadataBytes, err := afero.ReadFile(fs, metadataPath)
if err != nil {
t.Fatalf("Failed to read metadata: %v", err)
}
var metadata struct {
ID string `json:"id"`
Type string `json:"type"`
CreatedAt time.Time `json:"created_at"`
Flags []string `json:"flags"`
GPGKeyID string `json:"gpg_key_id"`
}
if err := json.Unmarshal(metadataBytes, &metadata); err != nil {
t.Fatalf("Failed to parse metadata: %v", err)
}
if metadata.Type != "pgp" {
t.Errorf("Expected metadata type 'pgp', got '%s'", metadata.Type)
}
if metadata.GPGKeyID != keyID {
t.Errorf("Expected GPG key ID '%s', got '%s'", keyID, metadata.GPGKeyID)
}
})
// Set up key directory for individual tests
keyDir := filepath.Join(tempDir, "unlock-key")
if err := os.MkdirAll(keyDir, secret.DirPerms); err != nil {
t.Fatalf("Failed to create key directory: %v", err)
@ -147,11 +418,6 @@ Passphrase: test123
Flags: []string{"gpg", "encrypted"},
}
// We'll skip the CreatePGPUnlockKey test since it requires registered vault functions
t.Run("CreatePGPUnlockKey", func(t *testing.T) {
t.Skip("Skipping test that requires registered vault functions")
})
// Create a PGP unlock key for the remaining tests
unlockKey := secret.NewPGPUnlockKey(fs, keyDir, metadata)
@ -192,9 +458,43 @@ Passphrase: test123
// Test getting identity from PGP unlock key
t.Run("GetIdentity", func(t *testing.T) {
// For this test, we'll do a simplified version since GPG operations
// can be tricky in automated tests
t.Skip("Skipping GetIdentity test due to complex GPG operations in automated testing")
// Generate an age identity for testing
ageIdentity, err := age.GenerateX25519Identity()
if err != nil {
t.Fatalf("Failed to generate age identity: %v", err)
}
// Write the public key
pubKeyPath := filepath.Join(keyDir, "pub.age")
if err := afero.WriteFile(fs, pubKeyPath, []byte(ageIdentity.Recipient().String()), secret.FilePerms); err != nil {
t.Fatalf("Failed to write public key: %v", err)
}
// GPG encrypt the private key using our custom encrypt function
privKeyData := []byte(ageIdentity.String())
encryptedOutput, err := secret.GPGEncryptFunc(privKeyData, keyID)
if err != nil {
t.Fatalf("Failed to encrypt with GPG: %v", err)
}
// Write the encrypted data to a file
encryptedPath := filepath.Join(keyDir, "priv.age.gpg")
if err := afero.WriteFile(fs, encryptedPath, encryptedOutput, secret.FilePerms); err != nil {
t.Fatalf("Failed to write encrypted private key: %v", err)
}
// Now try to get the identity - this will use our custom GPGDecryptFunc
identity, err := unlockKey.GetIdentity()
if err != nil {
t.Fatalf("Failed to get identity: %v", err)
}
// Verify the identity matches
expectedPubKey := ageIdentity.Recipient().String()
actualPubKey := identity.Recipient().String()
if actualPubKey != expectedPubKey {
t.Errorf("Expected public key '%s', got '%s'", expectedPubKey, actualPubKey)
}
})
// Test removing the unlock key