mfer/mfer/gpg_test.go
sneak 4a2060087d Add GPG signature verification on manifest load
- Implement gpgVerify function that creates a temporary keyring to verify
  detached signatures against embedded public keys
- Signature verification happens during deserialization after hash
  validation but before decompression
- Extract signatureString() as a method on manifest for generating the
  canonical signature string (MAGIC-UUID-MULTIHASH)
- Add --require-signature flag to check command to mandate signature from
  a specific GPG key ID
- Expose IsSigned() and Signer() methods on Checker for signature status
2025-12-18 05:56:16 -08:00

348 lines
9.2 KiB
Go

package mfer
import (
"bytes"
"context"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testGPGEnv sets up a temporary GPG home directory with a test key.
// Returns the key ID and a cleanup function.
func testGPGEnv(t *testing.T) (GPGKeyID, func()) {
t.Helper()
// Check if gpg is installed
if _, err := exec.LookPath("gpg"); err != nil {
t.Skip("gpg not installed, skipping signing test")
return "", func() {}
}
// Create temporary GPG home directory
gpgHome, err := os.MkdirTemp("", "mfer-gpg-test-*")
require.NoError(t, err)
// Set restrictive permissions on GPG home
require.NoError(t, os.Chmod(gpgHome, 0o700))
// Save original GNUPGHOME and set new one
origGPGHome := os.Getenv("GNUPGHOME")
os.Setenv("GNUPGHOME", gpgHome)
cleanup := func() {
if origGPGHome == "" {
os.Unsetenv("GNUPGHOME")
} else {
os.Setenv("GNUPGHOME", origGPGHome)
}
os.RemoveAll(gpgHome)
}
// Generate a test key with no passphrase
keyParams := `%no-protection
Key-Type: RSA
Key-Length: 2048
Name-Real: MFER Test Key
Name-Email: test@mfer.test
Expire-Date: 0
%commit
`
paramsFile := filepath.Join(gpgHome, "key-params")
require.NoError(t, os.WriteFile(paramsFile, []byte(keyParams), 0o600))
cmd := exec.Command("gpg", "--batch", "--gen-key", paramsFile)
cmd.Env = append(os.Environ(), "GNUPGHOME="+gpgHome)
output, err := cmd.CombinedOutput()
if err != nil {
cleanup()
t.Skipf("failed to generate test GPG key: %v: %s", err, output)
return "", func() {}
}
// Get the key fingerprint
cmd = exec.Command("gpg", "--list-keys", "--with-colons", "test@mfer.test")
cmd.Env = append(os.Environ(), "GNUPGHOME="+gpgHome)
output, err = cmd.Output()
if err != nil {
cleanup()
t.Fatalf("failed to list test key: %v", err)
}
// Parse fingerprint from output
var keyID string
for _, line := range strings.Split(string(output), "\n") {
fields := strings.Split(line, ":")
if len(fields) >= 10 && fields[0] == "fpr" {
keyID = fields[9]
break
}
}
if keyID == "" {
cleanup()
t.Fatal("failed to find test key fingerprint")
}
return GPGKeyID(keyID), cleanup
}
func TestGPGSign(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
data := []byte("test data to sign")
sig, err := gpgSign(data, keyID)
require.NoError(t, err)
assert.NotEmpty(t, sig)
assert.Contains(t, string(sig), "-----BEGIN PGP SIGNATURE-----")
assert.Contains(t, string(sig), "-----END PGP SIGNATURE-----")
}
func TestGPGExportPublicKey(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
pubKey, err := gpgExportPublicKey(keyID)
require.NoError(t, err)
assert.NotEmpty(t, pubKey)
assert.Contains(t, string(pubKey), "-----BEGIN PGP PUBLIC KEY BLOCK-----")
assert.Contains(t, string(pubKey), "-----END PGP PUBLIC KEY BLOCK-----")
}
func TestGPGGetKeyFingerprint(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
fingerprint, err := gpgGetKeyFingerprint(keyID)
require.NoError(t, err)
assert.NotEmpty(t, fingerprint)
// The fingerprint should be 40 hex chars
assert.Len(t, fingerprint, 40, "fingerprint should be 40 hex chars")
}
func TestGPGSignInvalidKey(t *testing.T) {
// Set up test environment (we need GNUPGHOME set)
_, cleanup := testGPGEnv(t)
defer cleanup()
data := []byte("test data")
_, err := gpgSign(data, GPGKeyID("NONEXISTENT_KEY_ID_12345"))
assert.Error(t, err)
}
func TestBuilderWithSigning(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
// Create a builder with signing options
b := NewBuilder()
b.SetSigningOptions(&SigningOptions{
KeyID: keyID,
})
// Add a test file
content := []byte("test file content")
reader := bytes.NewReader(content)
_, err := b.AddFile("test.txt", FileSize(len(content)), ModTime{}, reader, nil)
require.NoError(t, err)
// Build the manifest
var buf bytes.Buffer
err = b.Build(&buf)
require.NoError(t, err)
// Parse the manifest and verify signature fields are populated
manifest, err := NewManifestFromReader(&buf)
require.NoError(t, err)
require.NotNil(t, manifest.pbOuter)
assert.NotEmpty(t, manifest.pbOuter.Signature, "signature should be populated")
assert.NotEmpty(t, manifest.pbOuter.Signer, "signer should be populated")
assert.NotEmpty(t, manifest.pbOuter.SigningPubKey, "signing public key should be populated")
// Verify signature is a valid PGP signature
assert.Contains(t, string(manifest.pbOuter.Signature), "-----BEGIN PGP SIGNATURE-----")
// Verify public key is a valid PGP public key block
assert.Contains(t, string(manifest.pbOuter.SigningPubKey), "-----BEGIN PGP PUBLIC KEY BLOCK-----")
}
func TestScannerWithSigning(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
// Create in-memory filesystem with test files
fs := afero.NewMemMapFs()
require.NoError(t, fs.MkdirAll("/testdir", 0o755))
require.NoError(t, afero.WriteFile(fs, "/testdir/file1.txt", []byte("content1"), 0o644))
require.NoError(t, afero.WriteFile(fs, "/testdir/file2.txt", []byte("content2"), 0o644))
// Create scanner with signing options
opts := &ScannerOptions{
Fs: fs,
SigningOptions: &SigningOptions{
KeyID: keyID,
},
}
s := NewScannerWithOptions(opts)
// Enumerate files
require.NoError(t, s.EnumeratePath("/testdir", nil))
assert.Equal(t, FileCount(2), s.FileCount())
// Generate signed manifest
var buf bytes.Buffer
require.NoError(t, s.ToManifest(context.Background(), &buf, nil))
// Parse and verify
manifest, err := NewManifestFromReader(&buf)
require.NoError(t, err)
assert.NotEmpty(t, manifest.pbOuter.Signature)
assert.NotEmpty(t, manifest.pbOuter.Signer)
assert.NotEmpty(t, manifest.pbOuter.SigningPubKey)
}
func TestGPGVerify(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
data := []byte("test data to sign and verify")
sig, err := gpgSign(data, keyID)
require.NoError(t, err)
pubKey, err := gpgExportPublicKey(keyID)
require.NoError(t, err)
// Verify the signature
err = gpgVerify(data, sig, pubKey)
require.NoError(t, err)
}
func TestGPGVerifyInvalidSignature(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
data := []byte("test data to sign")
sig, err := gpgSign(data, keyID)
require.NoError(t, err)
pubKey, err := gpgExportPublicKey(keyID)
require.NoError(t, err)
// Try to verify with different data - should fail
wrongData := []byte("different data")
err = gpgVerify(wrongData, sig, pubKey)
assert.Error(t, err)
}
func TestGPGVerifyBadPublicKey(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
data := []byte("test data")
sig, err := gpgSign(data, keyID)
require.NoError(t, err)
// Try to verify with invalid public key - should fail
badPubKey := []byte("not a valid public key")
err = gpgVerify(data, sig, badPubKey)
assert.Error(t, err)
}
func TestManifestSignatureVerification(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
// Create a builder with signing options
b := NewBuilder()
b.SetSigningOptions(&SigningOptions{
KeyID: keyID,
})
// Add a test file
content := []byte("test file content for verification")
reader := bytes.NewReader(content)
_, err := b.AddFile("test.txt", FileSize(len(content)), ModTime{}, reader, nil)
require.NoError(t, err)
// Build the manifest
var buf bytes.Buffer
err = b.Build(&buf)
require.NoError(t, err)
// Parse the manifest - signature should be verified during load
manifest, err := NewManifestFromReader(&buf)
require.NoError(t, err)
require.NotNil(t, manifest)
// Signature should be present and valid
assert.NotEmpty(t, manifest.pbOuter.Signature)
}
func TestManifestTamperedSignatureFails(t *testing.T) {
keyID, cleanup := testGPGEnv(t)
defer cleanup()
// Create a signed manifest
b := NewBuilder()
b.SetSigningOptions(&SigningOptions{
KeyID: keyID,
})
content := []byte("test file content")
reader := bytes.NewReader(content)
_, err := b.AddFile("test.txt", FileSize(len(content)), ModTime{}, reader, nil)
require.NoError(t, err)
var buf bytes.Buffer
err = b.Build(&buf)
require.NoError(t, err)
// Tamper with the signature by replacing some bytes
data := buf.Bytes()
// Find and modify a byte in the signature portion
for i := range data {
if i > 100 && data[i] == 'A' {
data[i] = 'B'
break
}
}
// Try to load the tampered manifest - should fail
_, err = NewManifestFromReader(bytes.NewReader(data))
assert.Error(t, err)
}
func TestBuilderWithoutSigning(t *testing.T) {
// Create a builder without signing options
b := NewBuilder()
// Add a test file
content := []byte("test file content")
reader := bytes.NewReader(content)
_, err := b.AddFile("test.txt", FileSize(len(content)), ModTime{}, reader, nil)
require.NoError(t, err)
// Build the manifest
var buf bytes.Buffer
err = b.Build(&buf)
require.NoError(t, err)
// Parse the manifest and verify signature fields are empty
manifest, err := NewManifestFromReader(&buf)
require.NoError(t, err)
require.NotNil(t, manifest.pbOuter)
assert.Empty(t, manifest.pbOuter.Signature, "signature should be empty when not signing")
assert.Empty(t, manifest.pbOuter.Signer, "signer should be empty when not signing")
assert.Empty(t, manifest.pbOuter.SigningPubKey, "signing public key should be empty when not signing")
}