releasing/blocks/framework/src/Blocks/debfile.py
changeset 632 934f9131337b
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/releasing/blocks/framework/src/Blocks/debfile.py	Thu Sep 02 15:02:14 2010 +0800
@@ -0,0 +1,248 @@
+#
+# Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
+# All rights reserved.
+# This component and the accompanying materials are made available
+# under the terms of "Eclipse Public License v1.0"
+# which accompanies this distribution, and is available
+# at the URL "http://www.eclipse.org/legal/epl-v10.html".
+#
+# Initial Contributors:
+# Nokia Corporation - initial contribution.
+#
+# Contributors:
+#
+# Description:
+# Deb-file manager
+#
+
+import os
+import tarfile
+import tempfile
+from collections import namedtuple
+import shutil
+
+from Blocks.arfile import ArFile, ArInfo, ArError, _FILE_HEADER_STRUCT
+import Blocks.gpg as gpg
+
+class DebError(Exception):
+    ''' Debian package error '''
+    def __init__(self, error):
+        Exception.__init__(self, "Debian package error: %s" % error)
+        self.error = error
+
+MetaFiles = namedtuple("ControlFiles", "control, copyright, md5sum")
+Diff = namedtuple("Diff", "new, removed, changed")
+VerifySignStatus = namedtuple("VerifySignStatus", "type, status")
+
+class DebFile(ArFile):
+    ''' Manage debian package files (.deb) '''
+    def __init__(self, name=None, mode="r", fileobj=None):
+        try:
+            ArFile.__init__(self, name, mode, fileobj)
+        except ArError, ex:
+            raise DebError(ex.error)
+        self.datafiles = None
+        self._metadata = {}
+        self.md5sums = {}
+
+    def _initMetaData(self):
+        if not self._metadata:
+            with tempfile.SpooledTemporaryFile(1024*1024, prefix="debfile") as debtemp:
+                self.extract("control.tar.gz", debtemp)
+                debtemp.seek(0)
+                tar = tarfile.open(fileobj=debtemp)
+                metafiles = MetaFiles(tar.extractfile("control"), tar.extractfile("copyright"), tar.extractfile("md5sum"))
+                self._parseMetaFile(metafiles.control)
+                self._parseMetaFile(metafiles.copyright)
+                self.md5sums = dict(reversed(line.rstrip().split("  ", 1)) for line in metafiles.md5sum)
+
+    def _parseMetaFile(self, fileobj):
+        line = fileobj.readline()
+        part = line.partition(":")
+        metaContent = part[2].strip()
+        self._metadata[part[0]] = metaContent
+        for line in fileobj:
+            if line[0] in (" ", "\t"):
+                metaContent += "\n" + line.strip()
+            else:
+                self._metadata[part[0]] = metaContent
+                part = line.partition(":")
+                metaContent = part[2].strip()
+                self._metadata[part[0]] = metaContent
+
+    @property
+    def metadata(self):
+        self._initMetaData()
+        return self._metadata.copy()
+
+    def compareMetaData(self, other, doNotCompare=None):
+        self._initMetaData()
+        other._initMetaData()
+
+        if doNotCompare is None:
+            doNotCompare = ["Installed-Size"]
+        keys = set(self._metadata.keys())
+        otherkeys = set(other.metadata.keys())
+        new = otherkeys - keys
+        removed = keys - otherkeys
+        same = otherkeys - new
+        changed = tuple(key for key in same
+                        if key not in doNotCompare and
+                        self._metadata[key] != other.metadata[key])
+        return Diff(tuple(new), tuple(removed), changed)
+
+    def compareFiles(self, other):
+        self._initMetaData()
+        other._initMetaData()
+
+        files = set(self.md5sums.keys())
+        otherfiles = set(other.md5sums.keys())
+        new = otherfiles - files
+        removed = files - otherfiles
+        same = otherfiles - new
+        changed = tuple(fname for fname in same if self.md5sums[fname] != other.md5sums[fname])
+        return Diff(tuple(new), tuple(removed), changed)
+
+    def compare(self, other):
+        ''' Returns tuple (metadiff, filediff) '''
+        metadiff = self.compareMetaData(other)
+        filediff = self.compareFiles(other)
+        return (metadiff, filediff)
+
+    def getDataPackageName(self):
+        '''
+        @return: Name of data package
+        '''
+        try:
+            datafile = [name for name in self.getNames() if name.startswith("data.tar")][0]
+        except IndexError:
+            raise DebError("No data file found")
+        formats = ("tar", "tar.gz", "tar.bz2")
+        if not datafile.endswith(formats):
+            raise DebError("Data file not in supported%s format. Format: %s" % (formats, datafile))
+        return datafile
+
+    def getControlPackageName(self):
+        '''
+        @return: Name of control package
+        '''
+        try:
+            controlfile = [name for name in self.getNames() if name.startswith("control.tar")][0]
+        except IndexError:
+            raise DebError("No data file found")
+        formats = ("tar", "tar.gz", "tar.bz2")
+        if not controlfile.endswith(formats):
+            raise DebError("Control file not in supported%s format. Format: %s" % (formats, controlfile))
+        return controlfile
+
+    def extractDataPackage(self, fileObj):
+        '''
+        Extracts data package containing all install files
+        fileObj can be either path or file like object
+        '''
+        self.extract(self.getDataPackageName(), fileObj)
+
+    def extractData(self, filename, path):
+        if not self.datafiles:
+            debtemp = tempfile.SpooledTemporaryFile(1024*1024, prefix="debfile")
+            self.extractDataPackage(debtemp)
+            debtemp.seek(0)
+            self.datafiles = tarfile.open(fileobj=debtemp)
+        self.datafiles.extract(filename, path)
+
+    def getControl(self):
+        '''
+        Return the contents of the control tarball members as a dictionary.
+        File names are keys.
+        '''
+        ret = {}
+        self.extractfile(self.getControlPackageName())
+        t = tarfile.open("r:gz", fileobj=self)
+        for member in ("control", "copyright", "md5sum"):
+            c = t.extractfile(member)
+            ret[member] = ""
+            while True:
+                buf = c.read()
+                if not buf:
+                    break
+                ret[member] = ret[member] + buf
+        t.close()
+        return ret
+
+    def signatureExists(self, signtype):
+        signfilename = "_gpg" + signtype
+        return signfilename in self.getNames()
+
+    def addSignature(self, signtype, gpgHome=None, gpgBatchMode=False, gpgPassfile=None):
+        self._endFile()
+
+        if self.signatureExists(signtype):
+            raise DebError("Signature type '%s' already exists" % signtype)
+
+        self.extractfile("debian-binary", "control.tar.gz", self.getDataPackageName())
+        try:
+            sig = gpg.sign(self, None, gpgHome, gpgBatchMode, gpgPassfile)
+        except gpg.GpgError, ex:
+            raise DebError(ex.output)
+        signfilename = "_gpg" + signtype
+        self.addfile(ArInfo(signfilename))
+        self.write(sig)
+
+    def removeSignature(self, signtype=None):
+        '''
+        Remove signature(s)
+        If signtype is None all signatures are removed
+        Assumes that signatures are in the end of file
+        '''
+        gpgFileNames = [n for n in self.files.iterkeys() if n.startswith("_gpg")]
+        if signtype:
+            try:
+                self.remove("_gpg" + signtype)
+            except ArError:
+                raise DebError("Sign type '%s' not found" % signtype)
+        elif gpgFileNames:
+            signOffsets = [self.files[n].offset for n in gpgFileNames]
+            self.archive.truncate(min(signOffsets) - _FILE_HEADER_STRUCT.size)
+            for name in gpgFileNames:
+                del self.files[name]
+        else:
+            raise DebError("No signatures to remove")
+
+    def getSignatures(self):
+        return [n[4:] for n in self.getNames() if n.startswith("_gpg")]
+
+    def verifySignature(self, signtype=None, homedir=None):
+        ''' Verify signature(s) '''
+        verifyDataPath = None
+        temppath = None
+        status = []
+        try:
+            if signtype:
+                signfile = "_gpg" + signtype
+                signfiles = [signfile] if signfile in self.getNames() else []
+            else:
+                signfiles = [n for n in self.getNames() if n.startswith("_gpg")]
+            if not signfiles:
+                if signtype:
+                    raise DebError("Signature type '%s' not found." % signtype)
+                else:
+                    raise DebError("Signatures not found")
+
+            with tempfile.NamedTemporaryFile(prefix="deb_sign", delete=False) as debtemp:
+                for name in ("debian-binary", "control.tar.gz"):
+                    self.extract(name, debtemp)
+                self.extractDataPackage(debtemp)
+                verifyDataPath = debtemp.name
+            temppath = tempfile.mkdtemp(prefix="deb_sign_gpg")
+            for signname in signfiles:
+                typename = signname[4:]
+                self.extract(signname, temppath)
+                signFilePath = os.path.join(temppath, signname)
+                verifystatus = gpg.verify(signFilePath, verifyDataPath, homedir)
+                status.append(VerifySignStatus(typename, verifystatus))
+        finally:
+            if verifyDataPath:
+                os.remove(verifyDataPath)
+            if temppath:
+                shutil.rmtree(temppath)
+        return status
\ No newline at end of file