[csw-devel] SF.net SVN: gar:[22145] csw/mgar/gar/v2
guengel at users.sourceforge.net
guengel at users.sourceforge.net
Sun Oct 6 20:28:24 CEST 2013
Revision: 22145
http://gar.svn.sourceforge.net/gar/?rev=22145&view=rev
Author: guengel
Date: 2013-10-06 18:28:23 +0000 (Sun, 06 Oct 2013)
Log Message:
-----------
Added chkdbcat and associated module. It is used to check the database catalog for errors.
Added Paths:
-----------
csw/mgar/gar/v2/bin/chkdbcat
csw/mgar/gar/v2/lib/python/chkdbcat.py
csw/mgar/gar/v2/lib/python/chkdbcat_test.py
Added: csw/mgar/gar/v2/bin/chkdbcat
===================================================================
--- csw/mgar/gar/v2/bin/chkdbcat (rev 0)
+++ csw/mgar/gar/v2/bin/chkdbcat 2013-10-06 18:28:23 UTC (rev 22145)
@@ -0,0 +1,53 @@
+#!/opt/csw/bin/python2.6
+# -*- python -*-
+
+import argparse
+import logging
+from lib.python import chkdbcat
+
+class MyCheckDBCatalog(chkdbcat.CheckDBCatalog):
+ """Class overriding CheckDBCatalog.notify()"""
+ def __init__(self, catrel, arch, osrel, fn_ts, chkcat, verbose=False):
+ super(MyCheckDBCatalog,self).__init__(catrel, arch, osrel, fn_ts, chkcat)
+ self.__verbose = verbose
+
+ def notify(self, date, addr, pkginfo):
+ # TODO: Do actual notification. To be discussed.
+ if self.__verbose: print('Notify %s' % addr)
+ print(date, add, pkginfo)
+
+
+def argparser():
+ parser = argparse.ArgumentParser(description='Check Database Catalog.')
+ parser.add_argument('--debug', help='enable debug output', required=False, default=False, action='store_const', const=True)
+ parser.add_argument('--verbose', help='be verbose', required=False, default=False, action='store_const', const=True)
+ parser.add_argument('--arch', required=True)
+ parser.add_argument('--catalog-release', required=True)
+ parser.add_argument('--os-release', required=True)
+ parser.add_argument('--timestamp-file', default='/var/cache/chkdbcat/timestamp.json')
+ parser.add_argument('--chkcat-path', default='/opt/csw/bin/chkcat')
+ return parser.parse_args()
+
+def main():
+ args = argparser()
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+
+ if args.verbose: print("Checking Database Catalog {0} {2} {1}".format(args.catalog_release, args.arch, args.os_release))
+ with MyCheckDBCatalog(args.catalog_release, args.arch,
+ args.os_release, args.timestamp_file,
+ args.chkcat_path, args.verbose) as checker:
+ success = checker.check()
+ if args.verbose:
+ if success:
+ print("Database Catalog {0} {2} {1}: OK".format(args.catalog_release, args.arch, args.os_release))
+ else:
+ print("Database Catalog {0} {2} {1}: FAILED".format(args.catalog_release, args.arch, args.os_release))
+ print("chkcat output")
+ print checker.stderr
+
+ exit(0) if success else exit(1)
+
+if __name__ == '__main__':
+ main()
Added: csw/mgar/gar/v2/lib/python/chkdbcat.py
===================================================================
--- csw/mgar/gar/v2/lib/python/chkdbcat.py (rev 0)
+++ csw/mgar/gar/v2/lib/python/chkdbcat.py 2013-10-06 18:28:23 UTC (rev 22145)
@@ -0,0 +1,409 @@
+"""Check catalog data stored in the database for consistency.
+
+Check catalog data stored in the database for consistency using Peter
+Bonivart's chkcat.
+
+Notifications for invalid catalogs can be customized by overriding
+CheckDBCat.notify().
+
+"""
+import cjson
+import datetime
+import dateutil.parser
+import logging
+import os.path
+import shutil
+import subprocess
+import sys
+import tempfile
+from lib.python.shell import ShellCommand
+from lib.python.rest import RestClient
+from lib.python.generate_catalog_file import CatalogFileGenerator
+
+class FSLock(object):
+ """Simple Lock class
+
+ Provide a simple locking mechanism using a directory as lock.
+
+ """
+ def __init__(self, dirname):
+ self.__dirname = dirname
+
+ def __enter__(self):
+ """Create lock directory."""
+ logging.debug("Create lock dir: %s", self.__dirname)
+ os.mkdir(self.__dirname)
+ return self
+
+ def __exit__(self, t, v, tb):
+ """Remove lock directory."""
+ logging.debug("Remove lock dir: %s", self.__dirname)
+ os.rmdir(self.__dirname)
+
+
+class TimestampRecord(object):
+ """Record Timestamp for a given Catalog, Architecture, and OS Release into a json encoded file."""
+ def __init__(self, fn):
+ """Constructor.
+
+ "fn is" the filename to read from and write to. File will be
+ created if necessary and is not required to exist prior
+ instantiation of class.
+
+ """
+ self.__filename = fn
+ if os.path.exists(self.__filename):
+ self.__read_data()
+ else:
+ self.__ts_by_catalog={}
+
+ def __enter__(self):
+ """Dummy."""
+ return self
+
+ def __exit__(self, exc_type, wdc1, wdc2):
+ """Save data to file."""
+ if exc_type is None:
+ logging.debug("Save TimestampRecord to file")
+ self.save()
+
+ def __read_data(self):
+ """Read json data into memory.
+
+ Read json data from file and create a dictionary with
+ (catrel,arch,osrel) as key and string representation of
+ timestamp as value.
+
+ Dictionary is stored in self.__ts_by_catalog
+
+ """
+ with open(self.__filename, "r") as fp:
+ keyval_list = cjson.decode(fp.read())
+ ts_by_ctlg = dict((tuple(x), y) for x, y in keyval_list)
+ self.__ts_by_catalog = ts_by_ctlg
+
+ def save(self):
+ """Save in memory data to file.
+
+ Save self.__ts_by_catalog json encoded to file. If self.__ts_by_catalog is
+ empty, nothing will be done.
+
+ """
+ # We don't create an empty json file in order to prevent
+ # reading from an empty json file upon instanciation with
+ # the same file name
+ if not self.__ts_by_catalog: return
+
+ with open(self.__filename, "w") as fp:
+ fp.write(cjson.encode(self.__ts_by_catalog.items()))
+
+ def get(self, catrel, arch, osrel):
+ """Get time stamp for given catrel, arch, and osrel.
+
+ Time stamp will be returned as datetime. None will be
+ returned if no time stamp has been recorded for catalog
+ triple.
+
+ """
+ catkey = (catrel, arch, osrel)
+ if self.__ts_by_catalog.has_key(catkey):
+ return dateutil.parser.parse(self.__ts_by_catalog[catkey])
+ else:
+ return None
+
+ def set(self, catrel, arch, osrel, date):
+ """Set time stamp for a given catrel, arch, and osrel.
+
+ If date is an instance of datetime, the value obtained by
+ calling isoformat() will be stored.
+
+ If date is an instance of str, it has to be a date in iso format.
+
+ """
+ assert date is not None
+
+ catkey = (catrel, arch, osrel)
+ if isinstance(date, datetime.datetime):
+ self.__ts_by_catalog[catkey] = date.replace(microsecond=0).isoformat()
+ elif isinstance(date, str):
+ # try to convert string into datetime, so that we
+ # know it has proper format.
+ self.__ts_by_catalog[catkey] = dateutil.parser.parse(date).isoformat()
+ else:
+ raise TypeError("Expected instance of str or datetime, got %s" % str(type(date)))
+
+
+class CatalogTiming(object):
+ """Fetch Catalog Timing information.
+
+ Fetch Catalog Timing information, such as when packages have
+ been built and uploaded.
+
+ """
+ _epoch_start = datetime.datetime(datetime.MINYEAR,1,1,0,0,0,0)
+
+ # Mapping of key name to list index with optional transformation.
+ # Used by __list_to_dict_generator()
+ __list_to_dict_translation = (
+ ('pkg', 0),
+ ('version', 1),
+ ('spkg', 2),
+ ('fullname', 3),
+ ('md5', 4),
+ ('built', 10,
+ lambda x: CatalogTiming._epoch_start if x is None else dateutil.parser.parse(x)),
+ ('upload', 11,
+ lambda x: CatalogTiming._epoch_start if x is None else dateutil.parser.parse(x)),
+ ('uploadby', 12)
+ )
+
+ def __init__(self, catrel, arch, osrel):
+ self.__catrel = catrel
+ self.__arch = arch
+ self.__osrel = osrel
+ self.__timing_data = self.__json_list_to_dict(self.fetch())
+
+ def __list_to_dict_generator(self, l):
+ """Generate tuples suitable to create a dictionary.
+
+ It uses the tuples t in self.__list_to_dict_translation to
+ extract the value at position t[1] in l using t[0] as key name.
+
+ If len(t)==3, it assumes t[2] is callable and calls t[2]
+ with the value at position t[1] in l as argument.
+
+ """
+ for tr in self.__list_to_dict_translation:
+ if len(tr) == 2:
+ yield (tr[0], l[tr[1]])
+ elif len(tr) == 3:
+ yield (tr[0], tr[2](l[tr[1]]))
+
+ def __json_list_to_dict(self, jdat):
+ """Translate json encoded timing data to a list of dictionaries.
+
+ Returns a list of dicts decoded from json input.
+
+ Json input "jdat" is expect to have the form:
+
+ [
+ [
+ "libz1",
+ "1.2.8,REV=2013.09.17",
+ "CSWlibz1",
+ "libz1-1.2.8,REV=2013.09.17-SunOS5.10-sparc-CSW.pkg.gz",
+ "7684a5d3a096900f89f78c3c2dda3ff3",
+ 197630,
+ [...],
+ [...],
+ "libz1 - Zlib data compression library, libz.so.1",
+ 125,
+ "2013-09-17T07:13:30",
+ "2013-09-17T10:36:15",
+ "raos"
+ ],
+ . . .
+ ]
+
+
+ Output is a list of dictionaries:
+
+ [
+ { "pkg": "libz1",
+ "version": "1.2.8,REV=2013.09.17",
+ "spkg": "CSWlibz1",
+ "fullname": "libz1-1.2.8,REV=2013.09.17-SunOS5.10-sparc-CSW.pkg.gz",
+ "md5": "7684a5d3a096900f89f78c3c2dda3ff3",
+ "built": "2013-09-17T07:13:30",
+ "upload": "2013-09-17T10:36:15",
+ "uploadby": "raos"
+ },
+ . . .
+ ]
+
+ """
+ return [dict(self.__list_to_dict_genrator(v)) for v in jdat]
+
+ def fetch(self):
+ """Fetch timing information using REST."""
+ return RestClient().GetCatalogTimingInformation(self.__catrel,
+ self.__arch,
+ self.__osrel)
+
+ def upload_newer_than(self, date):
+ """Retrieve timing information on upload newer than "date"."""
+ self.__timing_data.sort(key=lambda x: x['upload'])
+
+ # now, find the first item having a `upload date'=>`date'
+ # in __timing_data and splice the list starting at that
+ # index up to the end. Since the list has been sorted in
+ # descending order, this gives all `upload date'=>`date'
+ index=0
+ for f in self.__timing_data:
+ if f['upload']>=date:
+ break
+ index+=1
+
+ return self.__timing_data[index:]
+
+
+class CheckDBCatalog(object):
+ """Check a catalog retrieved from the database.
+
+ It retrieves a catalog from the database according to the
+ tripple (catrel,arch,osrel) and runs `chkcat' on the catalog. If
+ `chkcat' reports no errors, a time stamp will be written into a
+ file marking the last successful check.
+
+ If `chkcat' finds errors, the last successful time stamp will be
+ used to find all newly uploaded packages since this last
+ successful check, using "CatalogTiming" class. In that case, the
+ time stamp will NOT be updated.
+
+ Each new catalog to be checked is required to pass the check
+ initially, since no last successful time stamp is available.
+
+ The class has to be used in a `with' statement, since the
+ temporary directory required is created by __enter__() and
+ removed by __exit__().
+
+ """
+ def __init__(self, catrel, arch, osrel, fn_ts, chkcat="/opt/csw/bin/chkcat",
+ cattiming_class=CatalogTiming,
+ tsrecord_class=TimestampRecord):
+ """Constructor.
+
+ "fn_ts" is the path name to the time stamp file.
+
+ "chkcat" is the path to `chkcat'. By default
+ `/opt/csw/bin/chkcat'.
+
+ "cattiming_class" and "tsrecord_class" are mainly used for
+ unit tests in order to provide classes with overridden
+ methods.
+
+ """
+ self.__catalogfgen = CatalogFileGenerator(catrel, arch, osrel)
+ self.__chkcat = chkcat
+
+ # store for later use
+ self.__catrel = catrel
+ self.__arch = arch
+ self.__osrel = osrel
+
+ # will be set by __enter__ and unset by __exit__
+ self.tmpdir = None
+
+ self.__tsrecord_class = tsrecord_class
+ self.__cattiming_class = cattiming_class
+ self.__timestamp_record = tsrecord_class(fn_ts)
+
+ def __enter__(self):
+ assert self.tmpdir is None
+ self.tmpdir=tempfile.mkdtemp(dir='/var/tmp')
+ logging.debug("Created temp dir %s" % self.tmpdir)
+ return self
+
+ def __exit__(self, wdc1, wdc2, wdc3):
+ """Cleanup temporary directory where catalog has been stored."""
+ assert self.tmpdir is not None
+ logging.debug("Remove tmp directory %s" % (self.tmpdir,))
+ shutil.rmtree(self.tmpdir)
+ self.tmpdir = None
+
+ def __get_notification_address(self, pkginfo):
+ """Returns the address where the notification will be sent to.
+
+ Find the email address according to package information "pkginfo""
+
+ """
+ # In case 'uploadby' is non-conclusive, fall back to
+ # retrieve email address of maintainer.
+ if pkginfo['uploadby'] is not None and pkginfo['uploadby'] != "web":
+ return pkginfo['uploadby']+'@opencsw.org'
+ else:
+ return RestClient().GetMaintainerByMd5(pkginfo['md5'])['maintainer_email']
+
+ def notify(self, date, addr, pkginfo):
+ """Notification.
+
+ Will be called for each "addr" once. "pkginfo" is a list
+ with packages as retrieved by 'CatalogTiming' since last
+ successful check.
+
+ """
+ logging.info("TO: %s" % addr)
+ [logging.info("packge %s uploaded since %s might have caused catalog break" % (p['fullname'],str(date))) for p in pkginfo]
+
+ def fetch_db_cat(self):
+ """Fetch catalog stored in database into temporary direcotry."""
+ assert self.tmpdir is not None
+ self.__catalogfgen.GenerateCatalog(self.tmpdir)
+
+ def run_chkcat(self):
+ """Run `chkcat' on catalog retrieved into temporary directory."""
+ assert self.tmpdir is not None
+
+ logging.debug("Run chkcat on %s" % os.path.join(self.tmpdir, "catalog"))
+ (self.__chkcat_retval,
+ self.stdout,
+ self.stderr) = ShellCommand([self.__chkcat,
+ "-p",
+ os.path.join(self.tmpdir, "catalog")],
+ allow_error=True)
+ # see `/opt/csw/bin/chkcat -h' for details on the return code
+ return self.__chkcat_retval in (0,1)
+
+ def check(self):
+ """Download the catalog and use chkcat on it.
+
+ In case the catalog is invalid, a list of packages by
+ `uploader' will be composed and notify() called for each
+ uploader.
+
+ Returns True upon successful check, False otherwise.
+ """
+ retval = False
+ with FSLock('/tmp/CheckDBCat.lock'):
+ self.fetch_db_cat()
+ retval = self.run_chkcat()
+
+ # Only record successful checks.
+ if retval:
+ with self.__timestamp_record:
+ self.__timestamp_record.set(self.__catrel,
+ self.__arch,
+ self.__osrel,
+ datetime.datetime.now())
+
+ # Compose list of packages uploaded since last successful
+ # check by `uploader'.
+ notifications = {}
+ if not retval:
+ lastsuccessful = self.__timestamp_record.get(
+ self.__catrel,
+ self.__arch,
+ self.__osrel)
+
+ if lastsuccessful is None:
+ logging.warn("No successful catalog check recorded for %s,%s,%s" %
+ (self.__catrel, self.__arch, self.__osrel))
+ return retval;
+
+ newpkgs = self.__cattiming_class(self.__catrel,
+ self.__arch,
+ self.__osrel).upload_newer_than(lastsuccessful)
+
+ # compose notifications list in a manner so that
+ # each email address is notified exactly once, even
+ # when several packages are affected.
+ notifications = {}
+ for np in newpkgs:
+ addr=self.__get_notification_address(np)
+ notifications.setdefault(addr,{ 'lastsuccessful': lastsuccessful })
+ notifications[addr].setdefault('newpkgs', []).append(np)
+
+ for n in notifications:
+ self.notify(notifications[n]['lastsuccessful'], n, notifications[n]['newpkgs'])
+
+ return retval
Property changes on: csw/mgar/gar/v2/lib/python/chkdbcat.py
___________________________________________________________________
Added: svn:keywords
+ Date Revision Author HeadURL Id
Added: svn:eol-style
+ native
Added: csw/mgar/gar/v2/lib/python/chkdbcat_test.py
===================================================================
--- csw/mgar/gar/v2/lib/python/chkdbcat_test.py (rev 0)
+++ csw/mgar/gar/v2/lib/python/chkdbcat_test.py 2013-10-06 18:28:23 UTC (rev 22145)
@@ -0,0 +1,288 @@
+"""Tests for chkdbcat.py."""
+
+import os
+import cjson
+import datetime
+import logging
+import unittest
+from lib.python.chkdbcat import TimestampRecord, CatalogTiming, CheckDBCatalog
+
+##
+## Unit tests
+##
+class TCatalogTiming(CatalogTiming):
+ """Override fetch() for CatalogTiming.
+
+ Class overrides fetch() for use in tests.
+
+ """
+ def fetch(self):
+ """Provide static data for unit test."""
+ return cjson.decode("""[
+ [
+ "libz1",
+ "1.2.8,REV=2013.09.17",
+ "CSWlibz1",
+ "libz1-1.2.8,REV=2013.09.17-SunOS5.10-sparc-CSW.pkg.gz",
+ "7684a5d3a096900f89f78c3c2dda3ff3",
+ 197630,
+ [],
+ [],
+ "libz1 - Zlib data compression library, libz.so.1",
+ 125,
+ "2013-09-17T07:13:30",
+ "2013-09-17T10:36:15",
+ "raos"
+ ],
+ [
+ "zlib_stub",
+ "1.2.8,REV=2013.09.17",
+ "CSWzlib",
+ "zlib_stub-1.2.8,REV=2013.09.17-SunOS5.10-all-CSW.pkg.gz",
+ "93f73a862bf07339badd723cf11eb0ca",
+ 1857,
+ [],
+ [],
+ "zlib_stub - Transitional package. Content moved to CSWlibz1",
+ 125,
+ "2013-09-17T07:16:39",
+ "2013-09-17T10:36:28",
+ "raos"
+ ],
+ [
+ "389_admin",
+ "1.1.30,REV=2013.01.07",
+ "CSW389-admin",
+ "389_admin-1.1.30,REV=2013.01.07-SunOS5.10-sparc-CSW.pkg.gz",
+ "f7d9c15d13118a3d837849d581339a65",
+ 396732,
+ [],
+ [],
+ "389_admin - The 389 LDAP server Admin Tools",
+ 126,
+ "2013-01-07T11:53:26",
+ null,
+ null
+ ],
+ [
+ "zsh",
+ "5.0.1,REV=2012.12.21",
+ "CSWzsh",
+ "zsh-5.0.1,REV=2012.12.21-SunOS5.10-sparc-CSW.pkg.gz",
+ "78420c21d8772b3ce72518ad91c82813",
+ 2176170,
+ [],
+ [],
+ "zsh - Powerful UNIX shell",
+ 98,
+ "2012-12-21T00:25:14",
+ null,
+ null
+ ],
+ [
+ "zutils",
+ "1.0,REV=2013.07.05",
+ "CSWzutils",
+ "zutils-1.0,REV=2013.07.05-SunOS5.10-sparc-CSW.pkg.gz",
+ "82c42dd606530aebc230d813f324c3e5",
+ 161930,
+ [],
+ [],
+ "zutils - Utilities to deal with compressed and non-compressed files",
+ 3,
+ "2013-07-05T11:28:42",
+ "2013-07-05T13:31:57",
+ "dam"
+ ]
+]
+""")
+
+
+class TestTimestampRecord(unittest.TestCase):
+ def setUp(self):
+ self.__tmpfile = "/tmp/TestTimestampRecord"
+ self.__now = datetime.datetime.now().replace(microsecond=0)
+ self.__fixture = [
+ [ [(a,b,c) for a in ('unstable', 'kiel', 'testing')]
+ for b in ('i386', 'sparc') ]
+ for c in ('SunOS5.10', 'SunOS5.11')]
+
+ def test_LastSuccessfulCheck_str(self):
+ """Single pass test writing and retrieving data from a file using string date."""
+ try:
+ os.unlink(self.__tmpfile)
+ except:
+ pass
+
+ # With statement used to make sure data is saved to disk
+ with TimestampRecord(self.__tmpfile) as obj:
+ obj.set('unstable', 'sparc', 'SunOS5.10', self.__now.isoformat())
+
+ obj = TimestampRecord(self.__tmpfile)
+ self.assertEqual(self.__now.isoformat(), obj.get('unstable', 'sparc', 'SunOS5.10').isoformat())
+
+ def test_Many_str(self):
+ """Multiple pass test writing and retrieving data from a file using string date."""
+ try:
+ os.unlink(self.__tmpfile)
+ except:
+ pass
+
+ with TimestampRecord(self.__tmpfile) as obj:
+ [[[obj.set(c[0], c[1], c[2], self.__now.isoformat()) for c in b] for b in a] for a in self.__fixture]
+
+ obj = TimestampRecord(self.__tmpfile)
+ [[[self.assertEqual(self.__now.isoformat(),obj.get(c[0], c[1], c[2]).isoformat()) for c in b] for b in a] for a in self.__fixture]
+
+ def test_LastSuccessfulCheck_obj(self):
+ """Single pass test writing and retrieving data from a file using datetime object."""
+ try:
+ os.unlink(self.__tmpfile)
+ except:
+ pass
+
+ with TimestampRecord(self.__tmpfile) as obj:
+ obj.set('unstable', 'sparc', 'SunOS5.10', self.__now)
+
+ obj = TimestampRecord(self.__tmpfile)
+ self.assertEqual(self.__now.isoformat(), obj.get('unstable', 'sparc', 'SunOS5.10').isoformat())
+
+ def test_Many_obj(self):
+ """Multiple pass test writing and retrieving data from a file using datetime obj."""
+ try:
+ os.unlink(self.__tmpfile)
+ except:
+ pass
+
+ with TimestampRecord(self.__tmpfile) as obj:
+ [[[obj.set(c[0], c[1], c[2], self.__now) for c in b] for b in a] for a in self.__fixture]
+
+ obj = TimestampRecord(self.__tmpfile)
+ [[[self.assertEqual(self.__now.isoformat(),obj.get(c[0], c[1], c[2]).isoformat()) for c in b] for b in a] for a in self.__fixture]
+
+ def test_SetInvalidType(self):
+ """Test set() with invalid type."""
+
+ with TimestampRecord(self.__tmpfile) as obj:
+ self.assertRaises(ValueError,obj.set,'unstable','i386','SunOS5.11',"abcd")
+
+ with TimestampRecord(self.__tmpfile) as obj:
+ self.assertRaises(TypeError,obj.set,'unstable', 'i386', 'SunOS5.11', 1)
+
+ def tearDown(self):
+ try:
+ os.unlink(self.__tmpfile)
+ except:
+ pass
+
+
+class TestCatalogTiming(unittest.TestCase):
+ def test_Newer1(self):
+ obj = TCatalogTiming('unstable', 'sparc', 'SunOS5.10')
+ self.assertEqual(
+ len(obj.upload_newer_than(datetime.datetime(datetime.MINYEAR,1,1,0,0,0,0))),
+ 5
+ )
+
+ def test_Newer2(self):
+ obj = TCatalogTiming('unstable', 'sparc', 'SunOS5.10')
+ self.assertEqual(
+ len(obj.upload_newer_than(datetime.datetime(2013,9,1,0,0,0,0))),
+ 2
+ )
+
+ def test_Newer3(self):
+ obj = TCatalogTiming('unstable', 'sparc', 'SunOS5.10')
+ self.assertEqual(
+ len(obj.upload_newer_than(datetime.datetime(2040,1,1,0,0,0,0))),
+ 0
+ )
+
+
+class TestCheckDBCatalog(unittest.TestCase):
+ class TCheckDBCatalogInvalid(CheckDBCatalog):
+ """Generate an invalid catalog."""
+ def fetch_db_cat(self):
+ logging.debug('Create catalog %s' %
+ os.path.join(self.tmpdir,'catalog'))
+ with open(os.path.join(self.tmpdir,'catalog'), 'w') as fp:
+ # The catalog is invalid because several dependencies are missing
+ fp.write("""zsh 5.0.1,REV=2012.12.21 CSWzsh zsh-5.0.1,REV=2012.12.21-SunOS5.10-sparc-CSW.pkg.gz 78420c21d8772b3ce72518ad91c82813 2176170 CSWcommon|CSWlibiconv2|CSWlibncursesw5|CSWlibpcre1|CSWlibgdbm4 none none
+zutils 1.0,REV=2013.07.05 CSWzutils zutils-1.0,REV=2013.07.05-SunOS5.10-sparc-CSW.pkg.gz 82c42dd606530aebc230d813f324c3e5 161930 CSWcas-texinfo|CSWcommon|CSWisaexec|CSWlzip none none""")
+
+
+ class TCheckDBCatalogValid(CheckDBCatalog):
+ def fetch_db_cat(self):
+ """Generate a valid catalog"""
+ logging.debug('Create catalog %s' %
+ os.path.join(self.tmpdir,'catalog'))
+ with open(os.path.join(self.tmpdir,'catalog'), 'w') as fp:
+ fp.write("""common 1.5,REV=2010.12.11 CSWcommon common-1.5,REV=2010.12.11-SunOS5.8-sparc-CSW.pkg f83ab71194e67e04d1eee5d8db094011 23040 none none none
+ zsh 5.0.1,REV=2012.12.21 CSWzsh zsh-5.0.1,REV=2012.12.21-SunOS5.10-sparc-CSW.pkg.gz 78420c21d8772b3ce72518ad91c82813 2176170 CSWcommon none none
+ zutils 1.0,REV=2013.07.05 CSWzutils zutils-1.0,REV=2013.07.05-SunOS5.10-sparc-CSW.pkg.gz 82c42dd606530aebc230d813f324c3e5 161930 CSWcommon none none""")
+
+
+ class TCheckDBCatalogNotification(TCheckDBCatalogInvalid):
+ expected_notification_on = {
+ 'dam at opencsw.org': {
+ 'lastsuccessful': datetime.datetime(2013,5,17,0,0,0),
+ 'newpkgs': [
+ 'zutils-1.0,REV=2013.07.05-SunOS5.10-sparc-CSW.pkg.gz'
+ ]
+ },
+ 'raos at opencsw.org': {
+ 'lastsuccessful': datetime.datetime(2013,5,17,0,0,0),
+ 'newpkgs': [
+ 'libz1-1.2.8,REV=2013.09.17-SunOS5.10-sparc-CSW.pkg.gz',
+ 'zlib_stub-1.2.8,REV=2013.09.17-SunOS5.10-all-CSW.pkg.gz'
+ ]
+ }
+ }
+
+ def notify(self, date, addr, pkginfo):
+ assert date == self.expected_notification_on[addr]['lastsuccessful']
+
+ if addr == "raos at opencsw.org":
+ assert len(pkginfo) == 2
+ elif addr == "dam at opencsw.org":
+ assert len(pkginfo) == 1
+ else:
+ raise Exception("Unexpected address")
+
+ for p in pkginfo:
+ assert p['fullname'] in self.expected_notification_on[addr]['newpkgs']
+
+
+ def setUp(self):
+ self.__timestamp_file = '/tmp/TestCheckDBCatalog.ts'
+
+ def test_InvalidCatalog(self):
+ """Test a locally generated invalid catalog"""
+
+ with self.TCheckDBCatalogInvalid('unstable','sparc','SunOS5.10', self.__timestamp_file, cattiming_class=TCatalogTiming) as test:
+ self.assertFalse(test.check())
+
+ def test_ValidCatalog(self):
+ """Test a locally generated valid catalog"""
+ with self.TCheckDBCatalogValid('unstable','sparc','SunOS5.10', self.__timestamp_file, cattiming_class=TCatalogTiming) as test:
+ self.assertFalse(test.check())
+
+ def test_Notification(self):
+ """Test notification for invalid catalog."""
+ # Create last successful test time stamp for catalog to be tested
+ with TimestampRecord(self.__timestamp_file) as tsobj:
+ tsobj.set('unstable','sparc','SunOS5.10',datetime.datetime(2013,5,17,0,0,0))
+
+ with self.TCheckDBCatalogNotification('unstable', 'sparc', 'SunOS5.10', self.__timestamp_file, cattiming_class=TCatalogTiming) as test:
+ self.assertFalse(test.check())
+
+ def tearDown(self):
+ try:
+ os.unlink(self.__timestamp_file)
+ except:
+ pass
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG)
+ unittest.main()
Property changes on: csw/mgar/gar/v2/lib/python/chkdbcat_test.py
___________________________________________________________________
Added: svn:keywords
+ Date Revision Author HeadURL Id
Added: svn:eol-style
+ native
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
More information about the devel
mailing list