--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/releasing/blocks/framework/src/Blocks/debfile.py Thu Sep 02 15:02:14 2010 +0800
@@ -0,0 +1,248 @@
+#
+# Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
+# All rights reserved.
+# This component and the accompanying materials are made available
+# under the terms of "Eclipse Public License v1.0"
+# which accompanies this distribution, and is available
+# at the URL "http://www.eclipse.org/legal/epl-v10.html".
+#
+# Initial Contributors:
+# Nokia Corporation - initial contribution.
+#
+# Contributors:
+#
+# Description:
+# Deb-file manager
+#
+
+import os
+import tarfile
+import tempfile
+from collections import namedtuple
+import shutil
+
+from Blocks.arfile import ArFile, ArInfo, ArError, _FILE_HEADER_STRUCT
+import Blocks.gpg as gpg
+
+class DebError(Exception):
+ ''' Debian package error '''
+ def __init__(self, error):
+ Exception.__init__(self, "Debian package error: %s" % error)
+ self.error = error
+
+MetaFiles = namedtuple("ControlFiles", "control, copyright, md5sum")
+Diff = namedtuple("Diff", "new, removed, changed")
+VerifySignStatus = namedtuple("VerifySignStatus", "type, status")
+
+class DebFile(ArFile):
+ ''' Manage debian package files (.deb) '''
+ def __init__(self, name=None, mode="r", fileobj=None):
+ try:
+ ArFile.__init__(self, name, mode, fileobj)
+ except ArError, ex:
+ raise DebError(ex.error)
+ self.datafiles = None
+ self._metadata = {}
+ self.md5sums = {}
+
+ def _initMetaData(self):
+ if not self._metadata:
+ with tempfile.SpooledTemporaryFile(1024*1024, prefix="debfile") as debtemp:
+ self.extract("control.tar.gz", debtemp)
+ debtemp.seek(0)
+ tar = tarfile.open(fileobj=debtemp)
+ metafiles = MetaFiles(tar.extractfile("control"), tar.extractfile("copyright"), tar.extractfile("md5sum"))
+ self._parseMetaFile(metafiles.control)
+ self._parseMetaFile(metafiles.copyright)
+ self.md5sums = dict(reversed(line.rstrip().split(" ", 1)) for line in metafiles.md5sum)
+
+ def _parseMetaFile(self, fileobj):
+ line = fileobj.readline()
+ part = line.partition(":")
+ metaContent = part[2].strip()
+ self._metadata[part[0]] = metaContent
+ for line in fileobj:
+ if line[0] in (" ", "\t"):
+ metaContent += "\n" + line.strip()
+ else:
+ self._metadata[part[0]] = metaContent
+ part = line.partition(":")
+ metaContent = part[2].strip()
+ self._metadata[part[0]] = metaContent
+
+ @property
+ def metadata(self):
+ self._initMetaData()
+ return self._metadata.copy()
+
+ def compareMetaData(self, other, doNotCompare=None):
+ self._initMetaData()
+ other._initMetaData()
+
+ if doNotCompare is None:
+ doNotCompare = ["Installed-Size"]
+ keys = set(self._metadata.keys())
+ otherkeys = set(other.metadata.keys())
+ new = otherkeys - keys
+ removed = keys - otherkeys
+ same = otherkeys - new
+ changed = tuple(key for key in same
+ if key not in doNotCompare and
+ self._metadata[key] != other.metadata[key])
+ return Diff(tuple(new), tuple(removed), changed)
+
+ def compareFiles(self, other):
+ self._initMetaData()
+ other._initMetaData()
+
+ files = set(self.md5sums.keys())
+ otherfiles = set(other.md5sums.keys())
+ new = otherfiles - files
+ removed = files - otherfiles
+ same = otherfiles - new
+ changed = tuple(fname for fname in same if self.md5sums[fname] != other.md5sums[fname])
+ return Diff(tuple(new), tuple(removed), changed)
+
+ def compare(self, other):
+ ''' Returns tuple (metadiff, filediff) '''
+ metadiff = self.compareMetaData(other)
+ filediff = self.compareFiles(other)
+ return (metadiff, filediff)
+
+ def getDataPackageName(self):
+ '''
+ @return: Name of data package
+ '''
+ try:
+ datafile = [name for name in self.getNames() if name.startswith("data.tar")][0]
+ except IndexError:
+ raise DebError("No data file found")
+ formats = ("tar", "tar.gz", "tar.bz2")
+ if not datafile.endswith(formats):
+ raise DebError("Data file not in supported%s format. Format: %s" % (formats, datafile))
+ return datafile
+
+ def getControlPackageName(self):
+ '''
+ @return: Name of control package
+ '''
+ try:
+ controlfile = [name for name in self.getNames() if name.startswith("control.tar")][0]
+ except IndexError:
+ raise DebError("No data file found")
+ formats = ("tar", "tar.gz", "tar.bz2")
+ if not controlfile.endswith(formats):
+ raise DebError("Control file not in supported%s format. Format: %s" % (formats, controlfile))
+ return controlfile
+
+ def extractDataPackage(self, fileObj):
+ '''
+ Extracts data package containing all install files
+ fileObj can be either path or file like object
+ '''
+ self.extract(self.getDataPackageName(), fileObj)
+
+ def extractData(self, filename, path):
+ if not self.datafiles:
+ debtemp = tempfile.SpooledTemporaryFile(1024*1024, prefix="debfile")
+ self.extractDataPackage(debtemp)
+ debtemp.seek(0)
+ self.datafiles = tarfile.open(fileobj=debtemp)
+ self.datafiles.extract(filename, path)
+
+ def getControl(self):
+ '''
+ Return the contents of the control tarball members as a dictionary.
+ File names are keys.
+ '''
+ ret = {}
+ self.extractfile(self.getControlPackageName())
+ t = tarfile.open("r:gz", fileobj=self)
+ for member in ("control", "copyright", "md5sum"):
+ c = t.extractfile(member)
+ ret[member] = ""
+ while True:
+ buf = c.read()
+ if not buf:
+ break
+ ret[member] = ret[member] + buf
+ t.close()
+ return ret
+
+ def signatureExists(self, signtype):
+ signfilename = "_gpg" + signtype
+ return signfilename in self.getNames()
+
+ def addSignature(self, signtype, gpgHome=None, gpgBatchMode=False, gpgPassfile=None):
+ self._endFile()
+
+ if self.signatureExists(signtype):
+ raise DebError("Signature type '%s' already exists" % signtype)
+
+ self.extractfile("debian-binary", "control.tar.gz", self.getDataPackageName())
+ try:
+ sig = gpg.sign(self, None, gpgHome, gpgBatchMode, gpgPassfile)
+ except gpg.GpgError, ex:
+ raise DebError(ex.output)
+ signfilename = "_gpg" + signtype
+ self.addfile(ArInfo(signfilename))
+ self.write(sig)
+
+ def removeSignature(self, signtype=None):
+ '''
+ Remove signature(s)
+ If signtype is None all signatures are removed
+ Assumes that signatures are in the end of file
+ '''
+ gpgFileNames = [n for n in self.files.iterkeys() if n.startswith("_gpg")]
+ if signtype:
+ try:
+ self.remove("_gpg" + signtype)
+ except ArError:
+ raise DebError("Sign type '%s' not found" % signtype)
+ elif gpgFileNames:
+ signOffsets = [self.files[n].offset for n in gpgFileNames]
+ self.archive.truncate(min(signOffsets) - _FILE_HEADER_STRUCT.size)
+ for name in gpgFileNames:
+ del self.files[name]
+ else:
+ raise DebError("No signatures to remove")
+
+ def getSignatures(self):
+ return [n[4:] for n in self.getNames() if n.startswith("_gpg")]
+
+ def verifySignature(self, signtype=None, homedir=None):
+ ''' Verify signature(s) '''
+ verifyDataPath = None
+ temppath = None
+ status = []
+ try:
+ if signtype:
+ signfile = "_gpg" + signtype
+ signfiles = [signfile] if signfile in self.getNames() else []
+ else:
+ signfiles = [n for n in self.getNames() if n.startswith("_gpg")]
+ if not signfiles:
+ if signtype:
+ raise DebError("Signature type '%s' not found." % signtype)
+ else:
+ raise DebError("Signatures not found")
+
+ with tempfile.NamedTemporaryFile(prefix="deb_sign", delete=False) as debtemp:
+ for name in ("debian-binary", "control.tar.gz"):
+ self.extract(name, debtemp)
+ self.extractDataPackage(debtemp)
+ verifyDataPath = debtemp.name
+ temppath = tempfile.mkdtemp(prefix="deb_sign_gpg")
+ for signname in signfiles:
+ typename = signname[4:]
+ self.extract(signname, temppath)
+ signFilePath = os.path.join(temppath, signname)
+ verifystatus = gpg.verify(signFilePath, verifyDataPath, homedir)
+ status.append(VerifySignStatus(typename, verifystatus))
+ finally:
+ if verifyDataPath:
+ os.remove(verifyDataPath)
+ if temppath:
+ shutil.rmtree(temppath)
+ return status
\ No newline at end of file