diff --git a/README.md b/README.md index 8f74d69..58aac33 100644 --- a/README.md +++ b/README.md @@ -18,14 +18,16 @@ Each-session setup cd $HOME/scratch0/prod_dir/ source RazorProduction/cert.sh +A CMSSW python environement will have to be available for the crabAPI to function, so be sure to have it sourced properly. + Campaign setup -------------- See at the bottom Vizualisation of on-going production -------------- -Browse to https://cms-caltech-db.cern.ch/ from within cern network -Restricted acces to "razor-cms" e-group. +Browse to https://cms-caltech-db.cern.ch/ +Restricted acces to "razor-cms" e-group (request to be added in case). Specifying specific tasks -------------- @@ -112,7 +114,6 @@ To create a new production setup The mandatory parameters are better provided by the ./setup.py --help, but some are listed here * --installation is a command line to install the production software - * --setup is a command line to set oneself into the production software * --admins is a coma separated list of username as admins The optional parameters are better liste by the ./setup.py --help, but some are listed here diff --git a/cert.sh b/cert.sh index 545629a..d171652 100755 --- a/cert.sh +++ b/cert.sh @@ -14,5 +14,5 @@ export X509_USER_PROXY=$HOME/cert/voms_proxy.cert ## create a long lived proxy in the designated location cat $HOME/private/$USER.txt | voms-proxy-init -voms cms --valid 48:00 -pwstdin -## make sure the protection is right -#chmod 400 $HOME/cert/voms_proxy.cert +## and get crab setup +source /cvmfs/cms.cern.ch/crab3/crab.sh diff --git a/db.py b/db.py index 4038dca..fe8b72f 100644 --- a/db.py +++ b/db.py @@ -124,9 +124,9 @@ def show(self): print rid def get_campaigns( self ): - print "available productions" - for dn in self.cdb.view('prods/label-version'): - print "\t",dn['key'][0],dn['key'][1] + print len(self.cdb.view('prods/label-version')),"available productions" + #for dn in self.cdb.view('prods/label-version'): + # print "\t",dn['key'][0],dn['key'][1] def get_campaign( self, label, version=None,status=None): diff --git a/prod2list.py b/prod2list.py new file mode 100755 index 0000000..27765b0 --- /dev/null +++ b/prod2list.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +import os +import sys +from optparse import OptionParser +from db import db +import ssl +ssl.match_hostname = lambda cert, hostname: hostname == cert['subjectAltName'][0][1] + +parser = OptionParser("") +parser.add_option("--label") +parser.add_option("--version",default=1,type=int) +parser.add_option("--prepend",default='root://eoscms//eos/cms/') +(options,args) = parser.parse_args() + +d = db() +tasks = d.tasks(label=options.label, version=options.version) + +for t in tasks: + files = d.filelist( t['_id'] ) + filelist = open( t['_id']+'.txt','w') + filelist.write('\n'.join([options.prepend+s for s in files]+[''])) + filelist.close() + diff --git a/production.py b/production.py index a727694..a676eb1 100755 --- a/production.py +++ b/production.py @@ -11,12 +11,21 @@ import copy import hashlib -def get_from_log( log, field): - interest=filter(lambda l : l.startswith(field),log) - if len(interest): - value = interest[-1].split()[-1] - return value - return None +import ssl +ssl.match_hostname = lambda cert, hostname: hostname == cert['subjectAltName'][0][1] + +try: + from CRABAPI.RawCommand import crabCommand +except: + print "source the crab3 env before" + sys.exit(1) + +#def get_from_log( log, field): +# interest=filter(lambda l : l.startswith(field),log) +# if len(interest): +# value = interest[-1].split()[-1] +# return value +# return None class X509CertAuth(httplib.HTTPSConnection): def __init__(self, host, *args, **kwargs): @@ -56,86 +65,48 @@ def generic_call(url,header=None, load=True, data=None, delete=False): print traceback.format_exc() return None -def getReport( task_name ): +def getReport( task_object ): certPrivilege() - data = '&'.join(["workflow=%s"%(task_name) , "limit=-1", "subresource=report" ]) - outputs = generic_call('https://cmsweb.cern.ch/crabserver/prod/workflow/?'+data, header={"User-agent":"CRABClient/3.3.9","Accept": "*/*"}) - - input_mask = outputs['result'][0]['lumiMask'] - #pprint.pprint( input_mask ) - - poolInOnlyRes = {} - for jn, val in outputs['result'][0]['runsAndLumis'].iteritems(): - poolInOnlyRes[jn] = [f for f in val if f['type'] == 'POOLIN'] - - - #mergedLumis = set() - #doubleLumis = set() - - dLumisDict = defaultdict( set ) - mLumisDict = defaultdict( set ) - - for reports in poolInOnlyRes.values(): - for report in reports: - rep = eval( report['runlumi'] ) - for run, lumis in rep.iteritems(): - run=int(run) - for lumi in map(int,lumis): - #if (run,lumi) in mergedLumis: - # doubleLumis.add((run,lumi)) - #mergedLumis.add((run,lumi)) - if lumi in mLumisDict: - dLumisDict[run].add( lumi ) - mLumisDict[run].add( lumi ) - - def compact( lumis ): - #return a compact list of pairs - ret=[] - firstLumi=None - lastLumi=None - for lumi in lumis: - if firstLumi==None: - firstLumi=lumi - lastLumi=lumi - else: - if lastLumi==None or lumi == lastLumi+1: - ## still in the range - lastLumi = lumi - else: - ret.append([firstLumi,lastLumi]) - lastLumi=None - firstLumi=lumi - - if firstLumi: - if lastLumi: - ret.append([firstLumi,lastLumi]) - else: - ret.append([firstLumi,firstLumi]) - return ret - #pprint.pprint( len(mergedLumis) ) - #pprint.pprint( doubleLumis ) + task_name = task_object['taskname'] + res = crabCommand('report', + dir = task_object['taskdir'], + proxy = os.getenv('X509_USER_PROXY')) + #print "Showing the report but reporting nothing" + #print json.dumps(res, indent=2) + print res.keys() - for r,lumis in mLumisDict.iteritems(): - mLumisDict[r] = compact( lumis ) - - for r,lumis in dLumisDict.iteritems(): - dLumisDict[r] = compact( lumis ) - + mLumisDict = res.get('processedLumis',{}) + dLumisDict = res.get('outputFilesDuplicateLumis',{}) return dict( mLumisDict ),dict( dLumisDict ) + -def getOutput( task_name ): +def getOutput( task_object ): return [] + ## the following is still not functional certPrivilege() - data = '&'.join(["workflow=%s"%(task_name) , "limit=-1", "subresource=data" ]) - outputs = generic_call('https://cmsweb.cern.ch/crabserver/prod/workflow/?'+data, header={"User-agent":"CRABClient/3.3.9","Accept": "*/*"}) - #print "answers",outputs - outs=[] - for out in outputs['result']: - outs.append(out['lfn']) - outs.sort() - return outs + task_name = task_object['taskname'] + ## to be fixed you want the lfns of all output files + final = [] + lim = 100 + start=1 + while True: + res = crabCommand('getoutput', + jobids = ",".join(map(str,range(start,start+lim))), + dir = task_object['taskdir'], + dump = True, + proxy = os.getenv('X509_USER_PROXY') + ) + start+=lim + add = res.get('lfn',[]) + if add: + final.append( add ) + else: + break + + #print json.dumps( res, indent=2) + return final def registerOutput( r ): if r['output'] and len(r['output']): @@ -155,7 +126,7 @@ def registerOutput( r ): else: print "Getting output from crab3" ## get a list of files - outs = getOutput( r['taskname'] ) + outs = getOutput( r ) if not outs: print "Empty outputs for",r['taskname'] return @@ -165,7 +136,9 @@ def registerOutput( r ): locations=r['outsite'], owner=r['_id'] ) - + if not r['output']: + r['output']= [] + r['output'] = sorted(set(outs+r['output'])) def fileSummary( dataset, dbs, summary=True): certPrivilege() @@ -178,55 +151,70 @@ def fileSummary( dataset, dbs, summary=True): ret = generic_call(urlds) return ret -def crabKill( task_name, user): - if not certPrivilege(user): - print os.environ.get('USER'),"cannot kill a task for",user - return False - data = '&'.join(["workflow=%s"%(task_name), '']) - killtask = generic_call('https://cmsweb.cern.ch/crabserver/prod/workflow/', data=data, header={"User-agent":"CRABClient/3.3.9","Accept": "*/*"}, delete=True) - if killtask: - print "\t task kill successfully" - print killtask - return True - else: - print "failed to kill" - return False - -def crabResubmit( task_name , jobs, black_sites, user): - black_sites = filter(lambda s : not s in ['Unknown'], black_sites) +def crabKill( task_object, user): + task_name = task_object['taskname'] if not certPrivilege(user): - print os.environ.get('USER'),"cannot resubmit a task for",user + print os.environ.get('USER'),"cannot kill a task for",user return False - data='&'.join( ["workflow=%s"%(task_name)]+['jobids=%s'%job for job in jobs]+['siteblacklist=%s'%bs for bs in black_sites]) - #print data - resub = generic_call('https://cmsweb.cern.ch/crabserver/prod/workflow', header={"User-agent":"CRABClient/3.3.9","Accept": "*/*"}, data = data) - #print "answers from resubmit",resub - if resub: - print "\t resubmission sucessful" + + res = crabCommand('kill', + dir = task_object['taskdir'], + proxy = os.getenv('X509_USER_PROXY') + ) + if res.get("status",None) == "SUCCESS": + print "\t task kill successfully" return True - else: - print "failed resubmission" - print data - print resub + else: + print "failed to kill" + print json.dumps( res ,indent=2) + return False ## for now + +def crabResubmit( task_object , jobs, black_sites, user): + task_name = task_object['taskname'] + black_sites = filter(lambda s : not s in ['Unknown'], black_sites) + if not certPrivilege(user): + print os.environ.get('USER'),"cannot resubmit a task for",user return False + + res = crabCommand('resubmit', + dir = task_object['taskdir'], + jobids = ",".join(jobs), + siteblacklist = ",".join(black_sites), + proxy = os.getenv('X509_USER_PROXY') + ) + print json.dumps(res) + #print "\t resubmission sucessful" + #return True + print "failed resubmission" + return False -def crabStatus( task_name): +def crabStatus( task_object ): certPrivilege() - ## can be done by anyone - status = generic_call('https://cmsweb.cern.ch/crabserver/prod/workflow/?workflow=%s&verbose=1'% task_name ,header={"User-agent":"CRABClient/3.3.9","Accept": "*/*"}) - if not status or not 'result' in status: - print "\tLong status not available for",task_name,"falling back to short" - status = generic_call('https://cmsweb.cern.ch/crabserver/prod/workflow/?workflow=%s'% task_name ,header={"User-agent":"CRABClient/3.3.9","Accept": "*/*"}) - if not status or not 'result' in status: - print "\t\tShort status not available for",task_name,"falling back to short" - return None + task_name = task_object['taskname'] - status=status['result'] - if len(status)!=1: - print "Wrong result for",task_name - return None - status=status[0] - return status + try: + res = crabCommand('status', + long=True, + dir = task_object['taskdir'], + proxy = os.getenv('X509_USER_PROXY')) + #print json.dumps( res , indent = 2 ) + print res.keys() + status = res['status'] + return res + except: + try: + res = crabCommand('status', + long=False, + dir = task_object['taskdir'], + proxy = os.getenv('X509_USER_PROXY')) + #print json.dumps( res , indent = 2 ) + print res.keys() + status = res['status'] + return res + except: + print "no status for",task_name + return None + def privilege(who=['vlimant'],what=''): if not privilegeB(who,what): @@ -310,11 +298,13 @@ def write_crab( r, crab_py ): config.Data.ignoreLocality = True config.Data.publication = False config.Data.inputDataset = "%s" -config.Data.splitting = "LumiBased" -config.Data.unitsPerJob = %d +#config.Data.splitting = "LumiBased" +config.Data.splitting = 'Automatic' +#config.Data.unitsPerJob = %d config.Data.outputDatasetTag = "%s_%s_v%d_v%d" config.section_("Site") +config.Site.whitelist = ["T2_*"] config.Site.storageSite = "%s" '''%( r['label'], r['version'], @@ -362,38 +352,28 @@ def submit(d, r, crab_py, user): print "could not find",r['label'] return False - test = os.system(c['setup']) - if test!=0: - print "The setup command failed %d \n %s. You probably need to install the production" %( test, c['setup']) - return False + #test = os.system(c['setup']) + #if test!=0: + # print "The setup command failed %d \n %s. You probably need to install the production" %( test, c['setup']) + # return False - command = c['setup']+'\n' - command += 'export X509_USER_PROXY=%s \n' % (X509CertAuth.proxy) - command += 'source /cvmfs/cms.cern.ch/crab3/crab.sh \n' - command += 'crab submit -c %s \n' % ( crab_py ) - retry=True + retry = True while retry: - logf=os.popen(command).read() - log=logf.split('\n') - ## retrieve the taskname - taskname=get_from_log(log,'Task name:') - r['taskname'] = taskname - if taskname: + res = crabCommand('submit', config = crab_py , proxy = os.getenv('X509_USER_PROXY')) + taskname = res.get('uniquerequestname', None) + reqname = res.get('requestname', None) + if taskname and reqname: + r['taskname'] = taskname r['status']='submitted' - r['taskdir'] = get_from_log(log,'Log file').rsplit('/',1)[0] - print r['id'],'is',r['status'] + ## or adjust the directory + r['taskdir']=os.getenv('PWD')+'/crab_prod/'+reqname retry=False else: retry=False - ## task evasive actions - for l in logf.split('\n'): - if 'Working area' in l and 'already exists' in l: - wd = l.split("'")[1] - print "Removing the existing task directory %s "% (wd) - os.system('rm -rf %s'%( wd)) - retry=True - continue - print "could not submit\n",command + ## try to remove the directory + print json.dumps(res, indent=2) + print "could not submit with",crab_py + d.save_task( r ) return True @@ -407,10 +387,12 @@ def resubmit(r, do=True): failed.add(jid) if 'SiteHistory' in jst: failed_sites.update(jst['SiteHistory']) - print failed,failed_sites + #print failed,failed_sites + print len(failed),"failed jobs at",len(failed_sites),"sites" + if failed: print "Resubmitting for",r['id'] - return crabResubmit( r['taskname'], failed , failed_sites, r['assignee']) + return crabResubmit( r, failed , failed_sites, r['assignee']) else: print "No failed jobs to resubmit" return False @@ -495,7 +477,7 @@ def resubmit(r, do=True): check_schema = {'admins':[], 'dataset':[], 'installation':None, - 'setup' : None, + #'setup' : None, 'label':None, 'version' : None, 'participants':[], @@ -668,6 +650,7 @@ def resubmit(r, do=True): print "%d/%d]"%(ri,l)+100*"-" if options.do == 'list': for (k,v) in r.items(): + if k in ['output']: continue if k.startswith('_'): continue if k in []: continue if type(v) != dict: @@ -690,9 +673,12 @@ def resubmit(r, do=True): #if 'failed' in jobs: #print "failed",','.join(jobs['failed']) #pprint.pprint( r ) - outs = getOutput( r['taskname'] ) - print len(outs),"outputs" - print '\n'.join(outs) + ## this is clearly too heavy for now + #outs = getOutput( r ) + outs = r.get('output',[]) + if outs: + print len(outs),"outputs" + print '\n'.join(outs) continue if options.do == 'reset': if not privilegeB(r['assignee'],'reset'): @@ -702,7 +688,7 @@ def resubmit(r, do=True): if K.lower() in ['y','yes']: K=raw_input("Confirm that you want to kill all for task %s ? Y/N :"%( r['taskname'])) if K.lower() in ['y','yes']: - dead = crabKill( r['taskname'], r['assignee']) + dead = crabKill( r, r['assignee']) print "Crab kill output",dead if not dead: print "Crab kill failed. Please retry to reset again" @@ -763,22 +749,12 @@ def resubmit(r, do=True): if r['status'] in ['registered']: print r['id'],'is',r['status'] - #outs = d.filelist(r['_id']) - #if not outs: - # print "\t the list of registered files is empty. need to re-fetch output" - #else: - #if not 'ranlumis' in r: - # print "Getting report from crab3 since the information is not there yet" - # (ran,twice) = getReport( r['taskname'] ) - # r['ranlumis'] = ran - # r['duplicatelumis'] = twice - # d.save_task(r) continue if r['status'] in ['done']: print r['id'],'is',r['status'] - print "Getting report from crab3" - (ran,twice) = getReport( r['taskname'] ) + print r['status'],"Getting report from crab3" + (ran,twice) = getReport( r ) r['ranlumis'] = ran open(r['taskdir']+'/lumiSummary.json','w').write(json.dumps(ran)) r['duplicatelumis'] = twice @@ -808,7 +784,8 @@ def resubmit(r, do=True): continue ## get the info from crab-status - info=crabStatus(r['taskname']) + #info=crabStatus(r['taskname']) + info=crabStatus(r) if not info: print "could not find anything for",r['taskname'] @@ -828,9 +805,9 @@ def resubmit(r, do=True): if r['status'] == 'submitted': print r['id'],'is done' - print "Getting report from crab3" # get the lumi mask while progressing on the processing - (ran,twice) = getReport( r['taskname'] ) + info['status'],"Getting report from crab3" + (ran,twice) = getReport( r ) r['ranlumis'] = ran r['duplicatelumis'] = twice @@ -843,9 +820,10 @@ def resubmit(r, do=True): elif info['status'] == 'SUBMITTED': r['status']='submitted' - print "Getting report from crab3" + print info['status'],"Getting report from crab3" # get the lumi mask while progressing on the processing - (ran,twice) = getReport( r['taskname'] ) + #(ran,twice) = getReport( r['taskname'] ) + (ran,twice) = getReport( r ) r['ranlumis'] = ran r['duplicatelumis'] = twice ## register the output as we go @@ -857,7 +835,7 @@ def resubmit(r, do=True): elif info['status'] in ['FAILED', 'RESUBMITFAILED']: ## try to figure out what to do. resubmit or scratch it - if not info['jobSetID'] and options.unlimited: + if not info.get('jobSetID',None) and options.unlimited: ### needs fixing somehow ## the taskk was just never started : scratch print "The task was never started. Resetting",r['id'] r['status'] = 'new' diff --git a/setup.py b/setup.py index dbfc8fa..d651a1d 100755 --- a/setup.py +++ b/setup.py @@ -2,15 +2,18 @@ import os import sys +import json import pprint from optparse import OptionParser from db import db +import ssl +ssl.match_hostname = lambda cert, hostname: hostname == cert['subjectAltName'][0][1] production_schema={ 'admins': (None, "the coma separated list of username being able to make changes in the production" ,str,list), 'dataset':("", "the list of datasets to be processed in the production",str,list) , 'installation': ( None, "the set of instructions to instal and compile the software for the production",str,str), - 'setup' : (None,"the list of instructions to setup the production",str,str), + #'setup' : (None,"the list of instructions to setup the production",str,str), #'timing' : (None,"The average time to process an event",float,float),## NOT USED YET 'label': (None,"the name of the production",str,str), 'version' : (None,"the version of the production",int,int), @@ -38,7 +41,7 @@ else: ## the ones that do not need anything a_doc[key] = c_type(defv) - +parser.add_option('-v', help="view the information in the db",dest="view_",default=False, action="store_true") parser.add_option('-u',help="update the information in the db",dest="update_",default=False,action="store_true") parser.add_option('-c',help="read the information from a card",dest="card_",default=False,action="store_true") parser.add_option('-a',help="add the provided information to the existing config",dest="add_",default=False,action='store_true') @@ -52,9 +55,13 @@ sys.exit(2) d = db() -if options.update_: +if options.update_ or options.view_: a_doc = d.get_campaign( options.label, options.version ) - + +if options.view_: + print json.dumps( a_doc, indent=2) + sys.exit(0) + if options.card_: card=open("%s_v%d.card"%(options.label,options.version)) diff --git a/task2files.py b/task2files.py index a495ac0..8d2823a 100755 --- a/task2files.py +++ b/task2files.py @@ -3,6 +3,9 @@ import os from db import db +import ssl +ssl.match_hostname = lambda cert, hostname: hostname == cert['subjectAltName'][0][1] + class task2files: def __init__(self, localdir=None, eos=False): diff --git a/task2mask.py b/task2mask.py index 6520947..4490854 100755 --- a/task2mask.py +++ b/task2mask.py @@ -3,6 +3,8 @@ from db import db import sys import json +import ssl +ssl.match_hostname = lambda cert, hostname: hostname == cert['subjectAltName'][0][1] class task2mask: def __init__(self):