secret/internal/secret/pgpunlock_test.go

527 lines
15 KiB
Go

package secret_test
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"filippo.io/age"
"git.eeqj.de/sneak/secret/internal/secret"
"git.eeqj.de/sneak/secret/internal/vault"
"git.eeqj.de/sneak/secret/pkg/agehd"
"github.com/spf13/afero"
)
// Register vault with secret package for testing
func init() {
// Register the vault.GetCurrentVault function with the secret package
secret.RegisterGetCurrentVaultFunc(func(fs afero.Fs, stateDir string) (secret.VaultInterface, error) {
return vault.GetCurrentVault(fs, stateDir)
})
}
// setupNonInteractiveGPG creates a custom GPG environment for testing
func setupNonInteractiveGPG(t *testing.T, tempDir, passphrase, gnupgHomeDir string) {
// Create GPG config file for non-interactive operation
gpgConfPath := filepath.Join(gnupgHomeDir, "gpg.conf")
gpgConfContent := `batch
no-tty
pinentry-mode loopback
`
if err := os.WriteFile(gpgConfPath, []byte(gpgConfContent), 0600); err != nil {
t.Fatalf("Failed to write GPG config file: %v", err)
}
// Create a test-specific GPG implementation
origEncryptFunc := secret.GPGEncryptFunc
origDecryptFunc := secret.GPGDecryptFunc
// Set custom GPG functions for this test
secret.GPGEncryptFunc = func(data []byte, keyID string) ([]byte, error) {
cmd := exec.Command("gpg",
"--homedir", gnupgHomeDir,
"--batch",
"--yes",
"--pinentry-mode", "loopback",
"--passphrase", passphrase,
"--trust-model", "always",
"--armor",
"--encrypt",
"-r", keyID)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Stdin = bytes.NewReader(data)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("GPG encryption failed: %w\nStderr: %s", err, stderr.String())
}
return stdout.Bytes(), nil
}
secret.GPGDecryptFunc = func(encryptedData []byte) ([]byte, error) {
cmd := exec.Command("gpg",
"--homedir", gnupgHomeDir,
"--batch",
"--yes",
"--pinentry-mode", "loopback",
"--passphrase", passphrase,
"--quiet",
"--decrypt")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Stdin = bytes.NewReader(encryptedData)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("GPG decryption failed: %w\nStderr: %s", err, stderr.String())
}
return stdout.Bytes(), nil
}
// Restore original functions after test
t.Cleanup(func() {
secret.GPGEncryptFunc = origEncryptFunc
secret.GPGDecryptFunc = origDecryptFunc
})
}
// runGPGWithPassphrase executes a GPG command with the specified passphrase
func runGPGWithPassphrase(gnupgHome, passphrase string, args []string, input io.Reader) ([]byte, error) {
cmdArgs := []string{
"--homedir=" + gnupgHome,
"--batch",
"--yes",
"--pinentry-mode", "loopback",
"--passphrase", passphrase,
}
cmdArgs = append(cmdArgs, args...)
cmd := exec.Command("gpg", cmdArgs...)
cmd.Stdin = input
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return nil, fmt.Errorf("GPG command failed: %w\nStderr: %s", err, stderr.String())
}
return stdout.Bytes(), nil
}
func TestPGPUnlockerWithRealFS(t *testing.T) {
// Skip tests if gpg is not available
if _, err := exec.LookPath("gpg"); err != nil {
t.Skip("GPG not available, skipping PGP unlock key tests")
}
// Create a temporary directory for our tests
tempDir, err := os.MkdirTemp("", "secret-pgp-test-")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir) // Clean up after test
// Create a temporary GNUPGHOME
gnupgHomeDir := filepath.Join(tempDir, "gnupg")
if err := os.MkdirAll(gnupgHomeDir, 0700); err != nil {
t.Fatalf("Failed to create GNUPGHOME: %v", err)
}
// Save original GNUPGHOME
origGnupgHome := os.Getenv("GNUPGHOME")
// Set new GNUPGHOME
os.Setenv("GNUPGHOME", gnupgHomeDir)
// Clean up environment after test
defer func() {
if origGnupgHome != "" {
os.Setenv("GNUPGHOME", origGnupgHome)
} else {
os.Unsetenv("GNUPGHOME")
}
}()
// Test passphrase for GPG key
testPassphrase := "test123"
// Setup non-interactive GPG with custom functions
setupNonInteractiveGPG(t, tempDir, testPassphrase, gnupgHomeDir)
// Create GPG batch file for key generation
batchFile := filepath.Join(tempDir, "gen-key-batch")
batchContent := `%echo Generating a test key
Key-Type: RSA
Key-Length: 2048
Name-Real: Test User
Name-Email: test@example.com
Expire-Date: 0
Passphrase: ` + testPassphrase + `
%commit
%echo Key generation completed
`
if err := os.WriteFile(batchFile, []byte(batchContent), 0600); err != nil {
t.Fatalf("Failed to write batch file: %v", err)
}
// Generate GPG key with batch mode
t.Log("Generating GPG key...")
_, err = runGPGWithPassphrase(gnupgHomeDir, testPassphrase,
[]string{"--gen-key", batchFile}, nil)
if err != nil {
t.Fatalf("Failed to generate GPG key: %v", err)
}
t.Log("GPG key generated successfully")
// Get the key ID
output, err := runGPGWithPassphrase(gnupgHomeDir, testPassphrase,
[]string{"--list-secret-keys", "--with-colons"}, nil)
if err != nil {
t.Fatalf("Failed to list GPG keys: %v", err)
}
// Parse output to get key ID
var keyID string
lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "sec:") {
fields := strings.Split(line, ":")
if len(fields) >= 5 {
keyID = fields[4]
break
}
}
}
if keyID == "" {
t.Fatalf("Failed to find GPG key ID in output: %s", output)
}
t.Logf("Generated GPG key ID: %s", keyID)
// Set the GPG_AGENT_INFO to empty to ensure gpg-agent doesn't interfere
oldAgentInfo := os.Getenv("GPG_AGENT_INFO")
os.Setenv("GPG_AGENT_INFO", "")
defer func() {
if oldAgentInfo != "" {
os.Setenv("GPG_AGENT_INFO", oldAgentInfo)
} else {
os.Unsetenv("GPG_AGENT_INFO")
}
}()
// Use the real filesystem
fs := afero.NewOsFs()
// Test data
testMnemonic := "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
// Save original environment variable
oldMnemonic := os.Getenv(secret.EnvMnemonic)
oldGPGKeyID := os.Getenv(secret.EnvGPGKeyID)
// Set test environment variables
os.Setenv(secret.EnvMnemonic, testMnemonic)
os.Setenv(secret.EnvGPGKeyID, keyID)
// Clean up after test
defer func() {
if oldMnemonic != "" {
os.Setenv(secret.EnvMnemonic, oldMnemonic)
} else {
os.Unsetenv(secret.EnvMnemonic)
}
if oldGPGKeyID != "" {
os.Setenv(secret.EnvGPGKeyID, oldGPGKeyID)
} else {
os.Unsetenv(secret.EnvGPGKeyID)
}
}()
// Set up vault structure for testing
stateDir := tempDir
vaultName := "test-vault"
// Test creation of a PGP unlock key through a vault
t.Run("CreatePGPUnlocker", func(t *testing.T) {
// Set a limited test timeout to avoid hanging
timer := time.AfterFunc(30*time.Second, func() {
t.Fatalf("Test timed out after 30 seconds")
})
defer timer.Stop()
// Create a test vault directory structure
vlt, err := vault.CreateVault(fs, stateDir, vaultName)
if err != nil {
t.Fatalf("Failed to create vault: %v", err)
}
// Set the current vault
err = vault.SelectVault(fs, stateDir, vaultName)
if err != nil {
t.Fatalf("Failed to select vault: %v", err)
}
// Derive long-term key from mnemonic
ltIdentity, err := agehd.DeriveIdentity(testMnemonic, 0)
if err != nil {
t.Fatalf("Failed to derive long-term key: %v", err)
}
// Get the vault directory
vaultDir, err := vlt.GetDirectory()
if err != nil {
t.Fatalf("Failed to get vault directory: %v", err)
}
// Write long-term public key
ltPubKeyPath := filepath.Join(vaultDir, "pub.age")
if err := afero.WriteFile(fs, ltPubKeyPath, []byte(ltIdentity.Recipient().String()), secret.FilePerms); err != nil {
t.Fatalf("Failed to write long-term public key: %v", err)
}
// Unlock the vault
vlt.Unlock(ltIdentity)
// Create a passphrase unlocker first (to have current unlocker)
passUnlocker, err := vlt.CreatePassphraseUnlocker("test-passphrase")
if err != nil {
t.Fatalf("Failed to create passphrase unlocker: %v", err)
}
// Verify passphrase unlocker was created
if passUnlocker == nil {
t.Fatal("Passphrase unlocker is nil")
}
// Now create a PGP unlock key (this will use our custom GPGEncryptFunc)
pgpUnlocker, err := secret.CreatePGPUnlocker(fs, stateDir, keyID)
if err != nil {
t.Fatalf("Failed to create PGP unlock key: %v", err)
}
// Verify the PGP unlock key was created
if pgpUnlocker == nil {
t.Fatal("PGP unlock key is nil")
}
// Check if the key has the correct type
if pgpUnlocker.GetType() != "pgp" {
t.Errorf("Expected PGP unlock key type 'pgp', got '%s'", pgpUnlocker.GetType())
}
// Check if the key ID includes the GPG key ID
if !strings.Contains(pgpUnlocker.GetID(), keyID) {
t.Errorf("PGP unlock key ID '%s' does not contain GPG key ID '%s'", pgpUnlocker.GetID(), keyID)
}
// Check if the key directory exists
unlockerDir := pgpUnlocker.GetDirectory()
keyExists, err := afero.DirExists(fs, unlockerDir)
if err != nil {
t.Fatalf("Failed to check if PGP key directory exists: %v", err)
}
if !keyExists {
t.Errorf("PGP unlock key directory does not exist: %s", unlockerDir)
}
// Check if required files exist
pubKeyPath := filepath.Join(unlockerDir, "pub.age")
pubKeyExists, err := afero.Exists(fs, pubKeyPath)
if err != nil {
t.Fatalf("Failed to check if public key file exists: %v", err)
}
if !pubKeyExists {
t.Errorf("PGP unlock key public key file does not exist: %s", pubKeyPath)
}
privKeyPath := filepath.Join(unlockerDir, "priv.age.gpg")
privKeyExists, err := afero.Exists(fs, privKeyPath)
if err != nil {
t.Fatalf("Failed to check if private key file exists: %v", err)
}
if !privKeyExists {
t.Errorf("PGP unlock key private key file does not exist: %s", privKeyPath)
}
metadataPath := filepath.Join(unlockerDir, "unlocker-metadata.json")
metadataExists, err := afero.Exists(fs, metadataPath)
if err != nil {
t.Fatalf("Failed to check if metadata file exists: %v", err)
}
if !metadataExists {
t.Errorf("PGP unlock key metadata file does not exist: %s", metadataPath)
}
longtermPath := filepath.Join(unlockerDir, "longterm.age")
longtermExists, err := afero.Exists(fs, longtermPath)
if err != nil {
t.Fatalf("Failed to check if longterm key file exists: %v", err)
}
if !longtermExists {
t.Errorf("PGP unlock key longterm key file does not exist: %s", longtermPath)
}
// Read and verify metadata
metadataBytes, err := afero.ReadFile(fs, metadataPath)
if err != nil {
t.Fatalf("Failed to read metadata: %v", err)
}
var metadata struct {
ID string `json:"id"`
Type string `json:"type"`
CreatedAt time.Time `json:"created_at"`
Flags []string `json:"flags"`
GPGKeyID string `json:"gpg_key_id"`
}
if err := json.Unmarshal(metadataBytes, &metadata); err != nil {
t.Fatalf("Failed to parse metadata: %v", err)
}
if metadata.Type != "pgp" {
t.Errorf("Expected metadata type 'pgp', got '%s'", metadata.Type)
}
if metadata.GPGKeyID != keyID {
t.Errorf("Expected GPG key ID '%s', got '%s'", keyID, metadata.GPGKeyID)
}
})
// Set up key directory for individual tests
unlockerDir := filepath.Join(tempDir, "unlocker")
if err := os.MkdirAll(unlockerDir, secret.DirPerms); err != nil {
t.Fatalf("Failed to create unlocker directory: %v", err)
}
// Set up test metadata
metadata := secret.UnlockerMetadata{
ID: fmt.Sprintf("%s-pgp", keyID),
Type: "pgp",
CreatedAt: time.Now(),
Flags: []string{"gpg", "encrypted"},
}
// Create a PGP unlocker for the remaining tests
unlocker := secret.NewPGPUnlocker(fs, unlockerDir, metadata)
// Test getting GPG key ID
t.Run("GetGPGKeyID", func(t *testing.T) {
// Create PGP metadata with GPG key ID
type PGPUnlockerMetadata struct {
secret.UnlockerMetadata
GPGKeyID string `json:"gpg_key_id"`
}
pgpMetadata := PGPUnlockerMetadata{
UnlockerMetadata: metadata,
GPGKeyID: keyID,
}
// Write metadata file
metadataPath := filepath.Join(unlockerDir, "unlocker-metadata.json")
metadataBytes, err := json.MarshalIndent(pgpMetadata, "", " ")
if err != nil {
t.Fatalf("Failed to marshal metadata: %v", err)
}
if err := afero.WriteFile(fs, metadataPath, metadataBytes, secret.FilePerms); err != nil {
t.Fatalf("Failed to write metadata: %v", err)
}
// Get GPG key ID
retrievedKeyID, err := unlocker.GetGPGKeyID()
if err != nil {
t.Fatalf("Failed to get GPG key ID: %v", err)
}
// Verify key ID
if retrievedKeyID != keyID {
t.Errorf("Expected GPG key ID '%s', got '%s'", keyID, retrievedKeyID)
}
})
// Test getting identity from PGP unlocker
t.Run("GetIdentity", func(t *testing.T) {
// Generate an age identity for testing
ageIdentity, err := age.GenerateX25519Identity()
if err != nil {
t.Fatalf("Failed to generate age identity: %v", err)
}
// Write the public key
pubKeyPath := filepath.Join(unlockerDir, "pub.age")
if err := afero.WriteFile(fs, pubKeyPath, []byte(ageIdentity.Recipient().String()), secret.FilePerms); err != nil {
t.Fatalf("Failed to write public key: %v", err)
}
// GPG encrypt the private key using our custom encrypt function
privKeyData := []byte(ageIdentity.String())
encryptedOutput, err := secret.GPGEncryptFunc(privKeyData, keyID)
if err != nil {
t.Fatalf("Failed to encrypt with GPG: %v", err)
}
// Write the encrypted data to a file
encryptedPath := filepath.Join(unlockerDir, "priv.age.gpg")
if err := afero.WriteFile(fs, encryptedPath, encryptedOutput, secret.FilePerms); err != nil {
t.Fatalf("Failed to write encrypted private key: %v", err)
}
// Now try to get the identity - this will use our custom GPGDecryptFunc
identity, err := unlocker.GetIdentity()
if err != nil {
t.Fatalf("Failed to get identity: %v", err)
}
// Verify the identity matches
expectedPubKey := ageIdentity.Recipient().String()
actualPubKey := identity.Recipient().String()
if actualPubKey != expectedPubKey {
t.Errorf("Expected public key '%s', got '%s'", expectedPubKey, actualPubKey)
}
})
// Test removing the unlocker
t.Run("RemoveUnlocker", func(t *testing.T) {
// Ensure unlocker directory exists before removal
keyExists, err := afero.DirExists(fs, unlockerDir)
if err != nil {
t.Fatalf("Failed to check if unlocker directory exists: %v", err)
}
if !keyExists {
t.Fatalf("Unlocker directory does not exist: %s", unlockerDir)
}
// Remove unlocker
err = unlocker.Remove()
if err != nil {
t.Fatalf("Failed to remove unlocker: %v", err)
}
// Verify directory is gone
keyExists, err = afero.DirExists(fs, unlockerDir)
if err != nil {
t.Fatalf("Failed to check if unlocker directory exists: %v", err)
}
if keyExists {
t.Errorf("Unlocker directory still exists after removal: %s", unlockerDir)
}
})
}