initial code for dumping imessages in a reasonable format

This commit is contained in:
Jeffrey Paul
2014-02-09 00:30:49 +01:00
parent c0021efb13
commit 9dd7628f04
157 changed files with 24178 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
import sqlite3
from keychain3 import Keychain3
from keychain4 import Keychain4
def keychain_load(filename, keybag, key835):
version = sqlite3.connect(filename).execute("SELECT version FROM tversion").fetchone()[0]
#print "Keychain version : %d" % version
if version == 3:
return Keychain3(filename, key835)
elif version >= 4:
return Keychain4(filename, keybag)
raise Exception("Unknown keychain version %d" % version)

View File

@@ -0,0 +1,237 @@
from store import PlistKeychain, SQLiteKeychain
from util import write_file
from util.asciitables import print_table
from util.bplist import BPlistReader
from util.cert import RSA_KEY_DER_to_PEM, CERT_DER_to_PEM
import M2Crypto
import hashlib
import plistlib
import sqlite3
import string
import struct
KSECATTRACCESSIBLE = {
6: "kSecAttrAccessibleWhenUnlocked",
7: "kSecAttrAccessibleAfterFirstUnlock",
8: "kSecAttrAccessibleAlways",
9: "kSecAttrAccessibleWhenUnlockedThisDeviceOnly",
10: "kSecAttrAccessibleAfterFirstUnlockThisDeviceOnly",
11: "kSecAttrAccessibleAlwaysThisDeviceOnly"
}
printset = set(string.printable)
def render_password(p):
data = p["data"]
if data != None and data.startswith("bplist") and data.find("\x00") != -1:
pl = BPlistReader.plistWithString(p["data"])
filename = "%s_%s_%d.plist" % (p["svce"],p["acct"],p["rowid"])
plistlib.writePlist(pl, filename)
#write_file("bin_"+filename, p["data"])
data = filename
if p.has_key("srvr"):
return "%s:%d;%s;%s" % (p["srvr"],p["port"],p["acct"],data)
else:
return "%s;%s;%s" % (p["svce"],p["acct"],data)
class Keychain(object):
def __init__(self, filename):
magic = open(filename, "rb").read(16)
if magic.startswith("SQLite"):
self.store = SQLiteKeychain(filename)
elif magic.startswith("bplist"):
self.store = PlistKeychain(filename)
else:
raise Exception("Unknown keychain format for %s" % filename)
self.bsanitize = True
self.items = {"genp": None, "inet": None, "cert": None, "keys": None}
def decrypt_data(self, data):
return data #override this method
def decrypt_item(self, res):
res["data"] = self.decrypt_data(res["data"])
if not res["data"]:
return {}
return res
def get_items(self, table):
if self.items[table]:
return self.items[table]
self.items[table] = filter(lambda x:x!={}, map(self.decrypt_item, self.store.get_items(table)))
return self.items[table]
def get_passwords(self):
return self.get_items("genp")
def get_inet_passwords(self):
return self.get_items("inet")
def get_keys(self):
return self.get_items("keys")
def get_cert(self):
return self.get_items("cert")
def get_certs(self):
certs = {}
pkeys = {}
keys = self.get_keys()
for row in self.get_cert():
cert = M2Crypto.X509.load_cert_der_string(row["data"])
subject = cert.get_subject().as_text()
common_name = cert.get_subject().get_entries_by_nid(M2Crypto.X509.X509_Name.nid['CN'])
if len(common_name):
subject = str(common_name[0].get_data())
else:
subject = "cn_unknown_%d" % row["rowid"]
certs[subject+ "_%s" % row["agrp"]] = cert
#print subject
#print "Access :\t" + KSECATTRACCESSIBLE.get(row["clas"])
for k in keys:
if k["agrp"] == row["agrp"] and k["klbl"] == row["pkhh"]:
pkey_der = k["data"]
pkey_der = RSA_KEY_DER_to_PEM(pkey_der)
pkeys[subject + "_%s" % row["agrp"]] = pkey_der
break
return certs, pkeys
def save_passwords(self):
passwords = "\n".join(map(render_password, self.get_passwords()))
inetpasswords = "\n".join(map(render_password, self.get_inet_passwords()))
print "Writing passwords to keychain.csv"
write_file("keychain.csv", "Passwords;;\n"+passwords+"\nInternet passwords;;\n"+ inetpasswords)
def save_certs_keys(self):
certs, pkeys = self.get_certs()
for c in certs:
filename = c + ".crt"
print "Saving certificate %s" % filename
certs[c].save_pem(filename)
for k in pkeys:
filename = k + ".key"
print "Saving key %s" % filename
write_file(filename, pkeys[k])
def sanitize(self, pw):
if pw.startswith("bplist"):
return "<binary plist data>"
elif not set(pw).issubset(printset):
pw = ">"+ pw.encode("hex")
#pw = "<binary data> : " + pw.encode("hex")
if self.bsanitize:
return pw[:2] + ("*" * (len(pw) - 2))
return pw
def print_all(self, sanitize=True):
self.bsanitize = sanitize
headers = ["Service", "Account", "Data", "Access group", "Protection class"]
rows = []
for p in self.get_passwords():
row = [p.get("svce","?"),
str(p.get("acct","?"))[:40],
self.sanitize(p.get("data","?"))[:20],
p.get("agrp","?"),
KSECATTRACCESSIBLE.get(p["clas"])[18:]]
rows.append(row)
print_table("Passwords", headers, rows)
headers = ["Server", "Account", "Data", "Access group", "Protection class"]
rows = []
for p in self.get_inet_passwords():
addr = "?"
if p.has_key("srvr"):
addr = p["srvr"] + ":" + str(p["port"])
row = [addr,
str(p.get("acct","?")),
self.sanitize(p.get("data","?"))[:20],
p.get("agrp","?"),
KSECATTRACCESSIBLE.get(p["clas"])[18:]]
rows.append(row)
print_table("Internet Passwords", headers, rows)
headers = ["Id", "Common Name", "Access group", "Protection class"]
rows = []
c = {}
for row in self.get_cert():
subject = "?"
if row.has_key("data"):
cert = M2Crypto.X509.load_cert_der_string(row["data"])
subject = cert.get_subject().as_text()
common_name = cert.get_subject().get_entries_by_nid(M2Crypto.X509.X509_Name.nid['CN'])
if len(common_name):
subject = str(common_name[0].get_data())
else:
subject = "cn_unknown_%d" % row["rowid"]
c[hashlib.sha1(str(row["pkhh"])).hexdigest() + row["agrp"]] = subject
row = [str(row["rowid"]),
subject[:81],
row.get("agrp","?")[:31],
KSECATTRACCESSIBLE.get(row["clas"])[18:]
]
rows.append(row)
print_table("Certificates", headers, rows)
headers = ["Id", "Label", "Common Name", "Access group", "Protection class"]
rows = []
for row in self.get_keys():
subject = ""
if row.has_key("klbl"):
subject = c.get(hashlib.sha1(str(row["klbl"])).hexdigest() + row["agrp"], "")
row = [str(row["rowid"]), row.get("labl", "?")[:30], subject[:39], row.get("agrp","?")[:31],
KSECATTRACCESSIBLE.get(row["clas"])[18:]]
rows.append(row)
print_table("Keys", headers, rows)
def get_push_token(self):
for p in self.get_passwords():
if p["svce"] == "push.apple.com":
return p["data"]
def get_managed_configuration(self):
for p in self.get_passwords():
if p["acct"] == "Private" and p["svce"] == "com.apple.managedconfiguration" and p["agrp"] == "apple":
return BPlistReader.plistWithString(p["data"])
def _diff(self, older, res, func, key):
res.setdefault(key, [])
current = func(self)
for p in func(older):
if not p in current and not p in res[key]:
res[key].append(p)
def diff(self, older, res):
self._diff(older, res, Keychain.get_passwords, "genp")
self._diff(older, res, Keychain.get_inet_passwords, "inet")
self._diff(older, res, Keychain.get_cert, "cert")
self._diff(older, res, Keychain.get_keys, "keys")
def cert(self, rowid, filename=""):
for row in self.get_cert():
if row["rowid"] == rowid:
blob = CERT_DER_to_PEM(row["data"])
if filename:
write_file(filename, blob)
cert = M2Crypto.X509.load_cert_der_string(row["data"])
print cert.as_text()
return
def key(self, rowid, filename=""):
for row in self.get_keys():
if row["rowid"] == rowid:
blob = RSA_KEY_DER_to_PEM(row["data"])
if filename:
write_file(filename, blob)
#k = M2Crypto.RSA.load_key_string(blob)
print blob
return

View File

@@ -0,0 +1,44 @@
from keychain import Keychain
from crypto.aes import AESdecryptCBC, AESencryptCBC
import hashlib
class Keychain3(Keychain):
def __init__(self, filename, key835=None):
Keychain.__init__(self, filename)
self.key835 = key835
def decrypt_data(self, data):
if data == None:
return ""
data = str(data)
if not self.key835:
print "Key 835 not availaible"
return ""
data = AESdecryptCBC(data[16:], self.key835, data[:16], padding=True)
#data_column = iv + AES128_K835(iv, data + sha1(data))
if hashlib.sha1(data[:-20]).digest() != data[-20:]:
print "data field hash mismatch : bad key ?"
return "ERROR decrypting data : bad key ?"
return data[:-20]
def change_key835(self, newkey):
tables = {"genp": "SELECT rowid, data FROM genp",
"inet": "SELECT rowid, data FROM inet",
"cert": "SELECT rowid, data FROM cert",
"keys": "SELECT rowid, data FROM keys"}
for t in tables.keys():
for row in self.conn.execute(tables[t]):
rowid = row["rowid"]
data = str(row["data"])
iv = data[:16]
data = AESdecryptCBC(data[16:], self.key835, iv)
data = AESencryptCBC(data, newkey, iv)
data = iv + data
data = buffer(data)
self.conn.execute("UPDATE %s SET data=? WHERE rowid=?" % t, (data, rowid))
self.conn.commit()

View File

@@ -0,0 +1,92 @@
from crypto.aes import AESdecryptCBC
import struct
"""
iOS 4 keychain-2.db data column format
version 0x00000000
key class 0x00000008
kSecAttrAccessibleWhenUnlocked 6
kSecAttrAccessibleAfterFirstUnlock 7
kSecAttrAccessibleAlways 8
kSecAttrAccessibleWhenUnlockedThisDeviceOnly 9
kSecAttrAccessibleAfterFirstUnlockThisDeviceOnly 10
kSecAttrAccessibleAlwaysThisDeviceOnly 11
wrapped AES256 key 0x28 bytes (passed to kAppleKeyStoreKeyUnwrap)
encrypted data (AES 256 CBC zero IV)
"""
from keychain import Keychain
from crypto.gcm import gcm_decrypt
from util.bplist import BPlistReader
KSECATTRACCESSIBLE = {
6: "kSecAttrAccessibleWhenUnlocked",
7: "kSecAttrAccessibleAfterFirstUnlock",
8: "kSecAttrAccessibleAlways",
9: "kSecAttrAccessibleWhenUnlockedThisDeviceOnly",
10: "kSecAttrAccessibleAfterFirstUnlockThisDeviceOnly",
11: "kSecAttrAccessibleAlwaysThisDeviceOnly"
}
class Keychain4(Keychain):
def __init__(self, filename, keybag):
if not keybag.unlocked:
print "Keychain object created with locked keybag, some items won't be decrypted"
Keychain.__init__(self, filename)
self.keybag = keybag
def decrypt_item(self, row):
version, clas = struct.unpack("<LL", row["data"][0:8])
if self.keybag.isBackupKeybag():
if clas >= 9 and not self.keybag.deviceKey:
return {}
if version >= 2:
dict = self.decrypt_blob(row["data"])
if not dict:
return {"clas": clas, "rowid": row["rowid"]}
if dict.has_key("v_Data"):
dict["data"] = dict["v_Data"].data
else:
dict["data"] = ""
dict["rowid"] = row["rowid"]
dict["clas"] = clas
return dict
row["clas"] = clas
return Keychain.decrypt_item(self, row)
def decrypt_data(self, data):
data = self.decrypt_blob(data)
if type(data) == dict:
return data["v_Data"].data
return data
def decrypt_blob(self, blob):
if blob == None:
return ""
if len(blob) < 48:
print "keychain blob length must be >= 48"
return
version, clas = struct.unpack("<LL",blob[0:8])
self.clas=clas
if version == 0:
wrappedkey = blob[8:8+40]
encrypted_data = blob[48:]
elif version == 2:
l = struct.unpack("<L",blob[8:12])[0]
wrappedkey = blob[12:12+l]
encrypted_data = blob[12+l:-16]
else:
raise Exception("unknown keychain verson ", version)
return
unwrappedkey = self.keybag.unwrapKeyForClass(clas, wrappedkey, False)
if not unwrappedkey:
return
if version == 0:
return AESdecryptCBC(encrypted_data, unwrappedkey, padding=True)
elif version == 2:
binaryplist = gcm_decrypt(unwrappedkey, "", encrypted_data, "", blob[-16:])
return BPlistReader(binaryplist).parse()

View File

@@ -0,0 +1,27 @@
"""
0
1:MCSHA256DigestWithSalt
2:SecKeyFromPassphraseDataHMACSHA1
"""
from crypto.PBKDF2 import PBKDF2
import plistlib
import hashlib
SALT1 = "F92F024CA2CB9754".decode("hex")
hashMethods={
1: (lambda p,salt:hashlib.sha256(SALT1 + p)),
2: (lambda p,salt:PBKDF2(p, salt, iterations=1000).read(20))
}
def bruteforce_old_pass(h):
salt = h["salt"].data
hash = h["hash"].data
f = hashMethods.get(h["hashMethod"])
if f:
print "Bruteforcing hash %s (4 digits)" % hash.encode("hex")
for i in xrange(10000):
p = "%04d" % (i % 10000)
if f(p,salt) == hash:
return p

View File

@@ -0,0 +1,56 @@
import plistlib
import sqlite3
import struct
from util import readPlist
class KeychainStore(object):
def __init__(self):
pass
def convertDict(self, d):
return d
def returnResults(self, r):
for a in r:
yield self.convertDict(a)
def get_items(self, table):
return []
class SQLiteKeychain(KeychainStore):
def __init__(self, filename):
self.conn = sqlite3.connect(filename)
self.conn.row_factory = sqlite3.Row
def convertDict(self, row):
d = dict(row)
for k,v in d.items():
if type(v) == buffer:
d[k] = str(v)
return d
def get_items(self, table):
sql = {"genp": "SELECT rowid, data, svce, acct, agrp FROM genp",
"inet": "SELECT rowid, data, acct, srvr, port, agrp FROM inet",
"cert": "SELECT rowid, data, pkhh, agrp FROM cert",
"keys": "SELECT rowid, data, klbl, agrp FROM keys"}
return self.returnResults(self.conn.execute(sql[table]))
class PlistKeychain(KeychainStore):
def __init__(self, filename):
self.plist = readPlist(filename)
def convertDict(self, d):
for k, v in d.items():
if isinstance(v, plistlib.Data):
if k == "v_Data":
d["data"] = v.data
elif k == "v_PersistentRef":
#format tablename (4 chars) + rowid (64 bits)
d["rowid"] = struct.unpack("<Q", v.data[-8:])[0]
else:
d[k] = v.data
return d
def get_items(self, table):
return self.returnResults(self.plist.get(table, []))