#!/usr/bin/env python # -*- coding: utf-8 -*- # # Implements a Subversion-to-Hudson notifier for the post-commit hook. # Using "svnlook changed" on a repository, the script filters all the # paths in a specified commit revision (usually the latest). # The paths are mapped into a normalised form used for Hudson project # names. These are then used to request the corresponding URLs from the # Hudson server. These requests cause builds to be triggered. # # Copyright (c) 2009 Rick Beton & Gustavo Niemeyer # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation # files (the "Software"), to deal in the Software without # restriction, including without limitation the rights to use, # copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following # conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. import sys, os import getopt import urllib2 import logging try: # Python >=3.0 from subprocess import getstatusoutput as subprocess_getstatusoutput except ImportError: # Python <3.0 from commands import getstatusoutput as subprocess_getstatusoutput try: my_getopt = getopt.gnu_getopt except AttributeError: my_getopt = getopt.getopt import re from array import array class Error(Exception): pass LOG_FILENAME = 'svn-hudson.log' SECTION = re.compile(r'\[([^]]+?)(?:\s+extends\s+([^]]+))?\]') OPTION = re.compile(r'(\S+)\s*=\s*(.*)$') #{{{ Config class Config: def __init__(self, filename): # Options are stored in __sections_list like this: # [(sectname, [(optname, optval), ...]), ...] self._sections_list = [] self._sections_dict = {} self._read(filename) def _read(self, filename): # Use the same logic as in ConfigParser.__read() file = open(filename) cursectdict = None optname = None lineno = 0 for line in file: lineno = lineno + 1 if line.isspace() or line[0] == '#': continue if line[0].isspace() and cursectdict is not None and optname: value = line.strip() cursectdict[optname] = "%s %s" % (cursectdict[optname], value) cursectlist[-1][1] = "%s %s" % (cursectlist[-1][1], value) else: m = SECTION.match(line) if m: sectname = m.group(1) parentsectname = m.group(2) if parentsectname is None: # No parent section defined, so start a new section cursectdict = self._sections_dict.setdefault( sectname, {} ) cursectlist = [] else: # Copy the parent section into the new section parentsectdict = self._sections_dict.get( parentsectname, {} ) cursectdict = self._sections_dict.setdefault( sectname, parentsectdict.copy() ) cursectlist = self.walk( parentsectname ) self._sections_list.append( (sectname, cursectlist) ) optname = None elif cursectdict is None: raise Error("%s:%d: no section header" % \ (filename, lineno)) else: m = OPTION.match(line) if m: optname, optval = m.groups() optval = optval.strip() cursectdict[optname] = optval cursectlist.append([optname, optval]) else: raise Error("%s:%d: parsing error" % \ (filename, lineno)) def sections(self): return list(self._sections_dict.keys()) def options(self, section): return list(self._sections_dict.get(section, {}).keys()) def get(self, section, option, default=None): return self._sections_dict.get(section, {}).get(option, default) def walk(self, section, option=None): ret = [] for sectname, options in self._sections_list: if sectname == section: for optname, value in options: if not option or optname == option: ret.append((optname, value)) return ret #}}} Config #{{{ SVNLook: Wraps /usr/bin/svnlook in Python API class SVNLook: def __init__(self, repospath, txn=None, rev=None): self.repospath = repospath self.txn = txn self.rev = rev def _execcmd(self, *cmd, **kwargs): cmdstr = " ".join(cmd) status, output = subprocess_getstatusoutput(cmdstr) if status != 0: sys.stderr.write(cmdstr) sys.stderr.write("\n") sys.stderr.write(output) raise Error("command failed: %s\n%s" % (cmdstr, output)) return status, output def _execsvnlook(self, cmd, *args, **kwargs): execcmd_args = ["svnlook", cmd, self.repospath] self._add_txnrev(execcmd_args, kwargs) execcmd_args += args execcmd_kwargs = {} keywords = ["show", "noerror"] for key in keywords: if key in kwargs: execcmd_kwargs[key] = kwargs[key] return self._execcmd(*execcmd_args, **execcmd_kwargs) def _add_txnrev(self, cmd_args, received_kwargs): if "txn" in received_kwargs: txn = received_kwargs.get("txn") if txn is not None: cmd_args += ["-t", txn] elif self.txn is not None: cmd_args += ["-t", self.txn] if "rev" in received_kwargs: rev = received_kwargs.get("rev") if rev is not None: cmd_args += ["-r", rev] elif self.rev is not None: cmd_args += ["-r", self.rev] def changed(self, **kwargs): status, output = self._execsvnlook("changed", **kwargs) if status != 0: return None changes = [] for line in output.splitlines(): line = line.rstrip() if not line: continue entry = [None, None, None] changedata, changeprop, path = None, None, None if line[0] != "_": changedata = line[0] if line[1] != " ": changeprop = line[1] path = line[4:] changes.append((changedata, changeprop, path)) return changes def author(self, **kwargs): status, output = self._execsvnlook("author", **kwargs) if status != 0: return None return output.strip() #==================== #}}} End of SVNlook # Gets a particular HTTP URL and returns the status code. def getUrl (url): try: try: # version 2.6+: short timeout urllib2.urlopen( url, None, 1 ) logging.info( " " + url ) return 200 except TypeError, e: # pre-version 2.6: no timeout urllib2.urlopen( url, None ) logging.info( " " + url ) return 200 except urllib2.HTTPError, e: if e.code == 404: logging.debug( " " + str(e) + ":" + url ) else: logging.error( str(e) + ":" + url ) return e.code except urllib2.URLError, e: logging.error( str(e) + ":" + url ) return 500 # Ask Hudson to start a build. def request_build (urlprefix, string, urlsuffix): if not urlprefix: print string return 200 elif not urlsuffix: return getUrl( urlprefix + string ) else: return getUrl( urlprefix + string + urlsuffix ) # Process a path and decide what builds to start. def process_path (opts, levels, reposName, path, done): steps = path.split('/') if len(steps)>0 and steps[len(steps)-1] != "": base = steps[0] if base in levels: levellist = levels[base] code = 404 logging.debug( reposName + " " + path ) for level in levellist: logging.debug( " " + base + " with " + str(level) + " levels" ) if code == 404 and len(steps)>level: string = opts.alias for i in range(level): string += opts.separator + steps[i] if not string in done: done[string] = 1 code = request_build( opts.urlprefix, string, opts.urlsuffix ) else: logging.debug( "unwanted " + reposName + " " + path ) else: logging.debug( "ignored " + reposName + " " + path ) # Construct a dict where every key is one of the options in the config file and # the value is a reverse-sorted array of integers. def construct_levels (config): levels = {} for option, value in config.walk("levels"): levellist = array('i') for a in value.split(): levellist.append( int(a) ) levels[option] = levellist levels[option].reverse() logging.debug( "level for " + option + "=" + value + " as " + str(levels[option]) ) return levels def notify_hudson (opts, config): for option, value in config.walk("general"): logging.debug( option + " = " + value ) reposParts = opts.repository.split('/') reposName = reposParts[len(reposParts) - 1] opts.alias = config.get( "general", "alias", reposName ) opts.separator = config.get( "general", "separator", '~' ) opts.urlprefix = config.get( "general", "urlprefix" ) opts.urlsuffix = config.get( "general", "urlsuffix" ) levels = construct_levels( config ) svnlook = SVNLook( opts.repository, txn=opts.transaction, rev=opts.revision ) changes = svnlook.changed() done = {} for changedata, changeprop, path in changes: process_path( opts, levels, reposName, path, done ) def init_logging (repoDir, loglevel): logdir = os.path.join( repoDir, "logs" ) if not os.path.isdir(logdir): os.mkdir( logdir, 0755 ); logfile = os.path.join( logdir, LOG_FILENAME ) #logging.basicConfig( filename=logfile, level=loglevel ) logger = logging.getLogger() logger.setLevel( loglevel ) # create console handler and set level to debug ch = logging.StreamHandler() # create formatter formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") # add formatter to ch ch.setFormatter( formatter ) # add ch to logger logger.addHandler( ch ) #handler = logging.handlers.RotatingFileHandler( # logfile, maxBytes=20, backupCount=5) #logger.addHandler( handler ) # Command: USAGE = """\ Usage: svn-hudson.py OPTIONS Options: -p PATH Use repository at PATH to check changes -f PATH Use PATH as configuration file (default is repository path + /conf/svn-hudson.conf) -t TXN Optional Subversion transaction TXN for commit information -r REV Optional Subversion revision REV for commit information (for tests) -v, -d Verbose or debug logging -h Show this message The path, transaction and revision options are passed to "svnlook changed". Example: svn-hudson.py -p repos """ class MissingArgumentsException(Exception): "Thrown when required arguments are missing." pass def parse_options(): try: opts, args = my_getopt(sys.argv[1:], "f:p:r:t:hvd", ["help"]) except getopt.GetoptError, e: raise Error(e.msg) class Options: pass obj = Options() obj.filename = None obj.repository = None obj.transaction = None obj.revision = None obj.loglevel = logging.WARNING obj.alias = None for opt, val in opts: if opt == "-f": obj.filename = val elif opt == "-p": obj.repository = val elif opt == "-t": obj.transaction = val elif opt == "-r": obj.revision = val elif opt == "-v": obj.loglevel = logging.INFO elif opt == "-d": obj.loglevel = logging.DEBUG elif opt in ["-h", "--help"]: sys.stdout.write(USAGE) sys.exit(0) missingopts = [] if not obj.repository: missingopts.append("repository") if missingopts: raise MissingArgumentsException("missing required option(s): " + ", ".join(missingopts)) if obj.filename is None: obj.filename = os.path.join(obj.repository, "conf", "svn-hudson.conf") obj.repository = os.path.abspath(obj.repository) if not (os.path.isdir(obj.repository) and os.path.isdir(os.path.join(obj.repository, "db")) and os.path.isdir(os.path.join(obj.repository, "hooks")) and os.path.isfile(os.path.join(obj.repository, "format"))): raise Error("path '%s' doesn't look like a repository" % \ obj.repository) init_logging( obj.repository, obj.loglevel ) return obj def main(): try: opts = parse_options() config = Config( opts.filename ) notify_hudson( opts, config ) except MissingArgumentsException, e: sys.stderr.write("%s\n" % str(e)) sys.stderr.write(USAGE) sys.exit(1) except Error, e: sys.stderr.write("error: %s\n" % str(e)) sys.exit(1) if __name__ == "__main__": main()