# Extension to provide automatic caching of bundle server for pull## Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org>## This software may be used and distributed according to the terms of the# GNU General Public License version 2 or any later version."""pullbundle: automatic server side bundle cachingGeneral principle=================This extension provides a means for server to use pre-computed bundle forserving arbitrary pulls. If missing, the necessary pre-computed bundle will begenerated on demand.To maximize usage of existing cached bundle, each pull will be served throughmultiple bundles. The bundle will be created using "standard range" from the"stablerange" principle. The "stablerange" concept if already used forobsmarkers discovery in the evolve extensions.Using pull Bundle=================All configuration is only required server side.The "stablerange" code currently still live in the evolve extensions, so fornow enabling that extensions is required:You need at minimum the following configuration: [extensions] evolve=yes pullbundle=yes [experimental] obshashrange.warm-cache = yesIf you do not want to use evolution server side, you should disable obsmarkers exchange: [experimental] evolution.exchange=noExtra Configuration=================== [pullbundle] # By default bundles are stored `.hg/cache/pullbundles/. # This can be changed with the following config: cache-directory=/absolute/pathImplementation status=====================Both for stablerange and pullbundle use "simple" initial implementations.Theses implemenations focus on testing the algorithms and proving the featuresworks. Yet they are already useful and used in production.Performances are expected to greatly improved in the final implementation,especially if some of it end up being compiled code.This first implementation lacks the ability to server the cached bundle from aCDN. We'll want this limitation to be lifted quickly.The way mercurial core report progress is designed for the receival of a singlechangegroup. So currently using pullbundle means flooding the user with output.This will have to be fixed.Why is does this live in the same repository as evolve======================================================There is no fundamental reasons for live in the same repository. However, thestablerange data-structure lives in evolve, so it was simpler to put this newextensions next to it. As soon as stable range have been upstreamed, we won'tneed the dependency to the evolve extension anymore."""importcollectionsimporterrnoimportrandomimportosfrommercurialimport(changegroup,discovery,error,exchange,narrowspec,nodeasnodemod,registrar,scmutil,util,)frommercurial.i18nimport___version__=b'0.1.1'testedwith=b'4.4 4.5 4.6 4.7.1'minimumhgversion=b'4.4'buglink=b'https://bz.mercurial-scm.org/'cmdtable={}command=registrar.command(cmdtable)configtable={}configitem=registrar.configitem(configtable)configitem(b'pullbundle',b'cache-directory',default=None,)# generic wrappingdefuisetup(ui):exchange.getbundle2partsmapping[b'changegroup']=_getbundlechangegrouppartdef_getbundlechangegrouppart(bundler,repo,source,bundlecaps=None,b2caps=None,heads=None,common=None,**kwargs):"""add a changegroup part to the requested bundle"""ifnotkwargs.get(r'cg',True):returnversion=b'01'cgversions=b2caps.get(b'changegroup')ifcgversions:# 3.1 and 3.2 ship with an empty valuecgversions=[vforvincgversionsifvinchangegroup.supportedoutgoingversions(repo)]ifnotcgversions:raiseValueError(_(b'no common changegroup version'))version=max(cgversions)outgoing=exchange._computeoutgoing(repo,heads,common)ifnotoutgoing.missing:returnifkwargs.get(r'narrow',False):include=sorted(filter(bool,kwargs.get(r'includepats',[])))exclude=sorted(filter(bool,kwargs.get(r'excludepats',[])))filematcher=narrowspec.match(repo.root,include=include,exclude=exclude)else:filematcher=None# START OF ALTERED PARTmakeallcgpart(bundler.newpart,repo,outgoing,version,source,bundlecaps,filematcher,cgversions)# END OF ALTERED PARTifkwargs.get(r'narrow',False)and(includeorexclude):narrowspecpart=bundler.newpart(b'narrow:spec')ifinclude:narrowspecpart.addparam(b'include',b'\n'.join(include),mandatory=True)ifexclude:narrowspecpart.addparam(b'exclude',b'\n'.join(exclude),mandatory=True)defmakeallcgpart(newpart,repo,outgoing,version,source,bundlecaps,filematcher,cgversions):pullbundle=notfilematcherifpullbundleandnotutil.safehasattr(repo,'stablerange'):repo.ui.warn(b'pullbundle: required extension "evolve" are missing, skipping pullbundle\n')pullbundle=Falseiffilematcher:makeonecgpart(newpart,repo,None,outgoing,version,source,bundlecaps,filematcher,cgversions)else:start=util.timer()slices=sliceoutgoing(repo,outgoing)end=util.timer()msg=_(b'pullbundle-cache: "missing" set sliced into %d subranges 'b'in %f seconds\n')repo.ui.write(msg%(len(slices),end-start))forsliceid,sliceoutinslices:makeonecgpart(newpart,repo,sliceid,sliceout,version,source,bundlecaps,filematcher,cgversions)# stable range slicingDEBUG=Falsedefsliceoutgoing(repo,outgoing):cl=repo.changelogrev=cl.nodemap.getnode=cl.noderevsort=repo.stablesortmissingrevs=set(rev(n)forninoutgoing.missing)ifDEBUG:ms=missingrevs.copy()ss=[]allslices=[]missingheads=[rev(n)forninsorted(outgoing.missingheads,reverse=True)]forheadinmissingheads:localslices=[]localmissing=set(repo.revs(b'%ld and ::%d',missingrevs,head))thisrunmissing=localmissing.copy()whilelocalmissing:slicerevs=[]forrinrevsort.walkfrom(repo,head):ifrnotinthisrunmissing:breakslicerevs.append(r)slicenodes=[node(r)forrinslicerevs]localslices.append(canonicalslices(repo,slicenodes))ifDEBUG:ss.append(slicerevs)missingrevs.difference_update(slicerevs)localmissing.difference_update(slicerevs)iflocalmissing:heads=list(repo.revs(b'heads(%ld)',localmissing))heads.sort(key=node)head=heads.pop()ifheads:thisrunmissing=repo.revs(b'%ld and only(%d, %ld)',localmissing,head,heads)else:thisrunmissing=localmissing.copy()ifDEBUG:forsinreversed(ss):ms-=set(s)missingbase=repo.revs(b'parents(%ld) and %ld',s,ms)ifmissingbase:repo.ui.write_err(b'!!! rev bundled while parents missing\n')repo.ui.write_err(b' parent: %s\n'%list(missingbase))pb=repo.revs(b'%ld and children(%ld)',s,missingbase)repo.ui.write_err(b' children: %s\n'%list(pb))h=repo.revs(b'heads(%ld)',s)repo.ui.write_err(b' heads: %s\n'%list(h))raiseerror.ProgrammingError(b'issuing a range before its parents')forsinreversed(localslices):allslices.extend(s)# unknown subrange might had to be computedrepo.stablerange.save(repo)return[(rangeid,outgoingfromnodes(repo,nodes))forrangeid,nodesinallslices]defcanonicalslices(repo,nodes):depth=repo.depthcache.getstablerange=repo.stablerangerangelength=lambdax:stablerange.rangelength(repo,x)headrev=repo.changelog.rev(nodes[0])nbrevs=len(nodes)headdepth=depth(headrev)skipped=headdepth-nbrevsrangeid=(headrev,skipped)subranges=canonicalsubranges(repo,stablerange,rangeid)idx=0slices=[]nodes.reverse()forrangeidinsubranges:size=rangelength(rangeid)slices.append((rangeid,nodes[idx:idx+size]))idx+=size### slow code block to validate ranges content# rev = repo.changelog.nodemap.get# for ri, ns in slices:# a = set(rev(n) for n in ns)# b = set(repo.stablerange.revsfromrange(repo, ri))# l = repo.stablerange.rangelength(repo, ri)# repo.ui.write('range-length: %d-%d %s %s\n' % (ri[0], ri[1], l, len(a)))# if a != b:# d = (ri[0], ri[1], b - a, a - b)# repo.ui.write("mismatching content: %d-%d -%s +%s\n" % d)returnslicesdefcanonicalsubranges(repo,stablerange,rangeid):"""slice a size of nodes into most reusable subranges We try to slice a range into a set of "largest" and "canonical" stable range. It might make sense to move this function as a 'stablerange' method. """headrev,skip=rangeidrangedepth=stablerange.depthrev(repo,rangeid[0])canonicals=[]# 0. find the first power of 2 higher than this range depthcursor=1whilecursor<=rangedepth:cursor*=2# 1. find first cuptprecut=cut=0whileTrue:ifskip<=cut:breakifcut+cursor<rangedepth:precut=cutcut+=cursorifcursor==1:breakcursor//=2# 2. optimise, bottom partifskip!=cut:currentsize=tailsize=cut-skipassert0<tailsize,tailsize# we need to take several "standard cut" in the bottom part## This is similar to what we will do for the top part, we reusing the# existing structure is a bit more complex.allcuts=list(reversed(standardcut(tailsize)))prerange=(headrev,precut)### slow code block to check we operate on the right data# rev = repo.changelog.nodemap.get# allrevs = [rev(n) for n in nodes]# allrevs.reverse()# prerevs = repo.stablerange.revsfromrange(repo, prerange)# assert allrevs == prerevs[(len(prerevs) - len(allrevs)):]# end of checksub=list(stablerange.subranges(repo,prerange)[:-1])bottomranges=[]# XXX we might be able to reuse core stable-range logic instead of# redoing this manuallycurrentrange=sub.pop()currentsize=stablerange.rangelength(repo,currentrange)currentcut=NonewhileallcutsorcurrentcutisnotNone:# get the next cut if neededifcurrentcutisNone:currentcut=allcuts.pop()# deal attemp a cutifcurrentsize==currentcut:bottomranges.append(currentrange)currentcut=Noneelifcurrentsize<currentcut:bottomranges.append(currentrange)currentcut-=currentsizeelse:# currentsize > currentcutnewskip=currentrange[1]+(currentsize-currentcut)currentsub=stablerange._slicesrangeat(repo,currentrange,newskip)bottomranges.append(currentsub.pop())sub.extend(currentsub)currentcut=Nonecurrentrange=sub.pop()currentsize=stablerange.rangelength(repo,currentrange)bottomranges.reverse()canonicals.extend(bottomranges)# 3. take recursive subrange until we get to a power of two size?current=(headrev,cut)whilenotpoweroftwo(stablerange.rangelength(repo,current)):sub=stablerange.subranges(repo,current)canonicals.extend(sub[:-1])current=sub[-1]canonicals.append(current)returncanonicalsdefstandardcut(size):assert0<size# 0. find the first power of 2 higher than this range depthcut=1whilecut<=size:cut*=2allcuts=[]# 1. find all standard expected cutwhile1<cutandsize:cut//=2ifcut<=size:allcuts.append(cut)size-=cutreturnallcutsdefpoweroftwo(num):returnnumandnotnum&(num-1)defoutgoingfromnodes(repo,nodes):returndiscovery.outgoing(repo,missingroots=nodes,missingheads=nodes)# changegroup part constructiondef_changegroupinfo(repo,nodes,source):ifrepo.ui.verboseorsource==b'bundle':repo.ui.status(_(b"%d changesets found\n")%len(nodes))def_makenewstream(newpart,repo,outgoing,version,source,bundlecaps,filematcher,cgversions):old=changegroup._changegroupinfotry:changegroup._changegroupinfo=_changegroupinfoiffilematcherisnotNone:cgstream=changegroup.makestream(repo,outgoing,version,source,bundlecaps=bundlecaps,filematcher=filematcher)else:cgstream=changegroup.makestream(repo,outgoing,version,source,bundlecaps=bundlecaps)finally:changegroup._changegroupinfo=oldnbchanges=len(outgoing.missing)pversion=Noneifcgversions:pversion=versionreturn(cgstream,nbchanges,pversion)def_makepartfromstream(newpart,repo,cgstream,nbchanges,version):# same as upstream codepart=newpart(b'changegroup',data=cgstream)ifversion:part.addparam(b'version',version)part.addparam(b'nbchanges',b'%d'%nbchanges,mandatory=False)ifb'treemanifest'inrepo.requirements:part.addparam(b'treemanifest',b'1')# cache managementdefcachedir(repo):cachedir=repo.ui.config(b'pullbundle',b'cache-directory')ifcachedirisnotNone:returncachedirreturnrepo.cachevfs.join(b'pullbundles')defgetcache(repo,bundlename):cdir=cachedir(repo)bundlepath=os.path.join(cdir,bundlename)ifnotos.path.exists(bundlepath):returnNone# delay file opening as much as possible this introduce a small race# condition if someone remove the file before we actually use it. However# opening too many file will not work.defdata():withopen(bundlepath,r'rb')asfd:forchunkinutil.filechunkiter(fd):yieldchunkreturndata()defcachewriter(repo,bundlename,stream):cdir=cachedir(repo)bundlepath=os.path.join(cdir,bundlename)try:os.makedirs(cdir)exceptOSErrorasexc:ifexc.errno==errno.EEXIST:passwithutil.atomictempfile(bundlepath)ascachefile:forchunkinstream:cachefile.write(chunk)yieldchunkBUNDLEMASK=b"%s-%s-%010iskip-%010isize.hg"defmakeonecgpart(newpart,repo,rangeid,outgoing,version,source,bundlecaps,filematcher,cgversions):bundlename=cachedata=NoneifrangeidisnotNone:nbchanges=repo.stablerange.rangelength(repo,rangeid)headnode=nodemod.hex(repo.changelog.node(rangeid[0]))# XXX do we need to use cgversion in there?bundlename=BUNDLEMASK%(version,headnode,rangeid[1],nbchanges)cachedata=getcache(repo,bundlename)ifcachedataisNone:partdata=_makenewstream(newpart,repo,outgoing,version,source,bundlecaps,filematcher,cgversions)ifbundlenameisnotNone:cgstream=cachewriter(repo,bundlename,partdata[0])partdata=(cgstream,)+partdata[1:]else:ifrepo.ui.verboseorsource==b'bundle':repo.ui.status(_(b"%d changesets found in caches\n")%nbchanges)pversion=Noneifcgversions:pversion=versionpartdata=(cachedata,nbchanges,pversion)return_makepartfromstream(newpart,repo,*partdata)@command(b'debugpullbundlecacheoverlap',[(b'',b'count',100,_(b'of "client" pulling')),(b'',b'min-cache',1,_(b'minimum size of cached bundle')),],_(b'hg debugpullbundlecacheoverlap [--client 100] REVSET'))defdebugpullbundlecacheoverlap(ui,repo,*revs,**opts):'''Display statistic on bundle cache hit This command "simulate pulls from multiple clients. Each using a random subset of revisions defined by REVSET. And display statistic about the overlap in bundle necessary to serve them. '''actionrevs=scmutil.revrange(repo,revs)ifnotrevs:raiseerror.Abort(b'No revision selected')count=opts['count']min_cache=opts['min_cache']bundlehits=collections.defaultdict(lambda:0)pullstats=[]rlen=lambdarangeid:repo.stablerange.rangelength(repo,rangeid)repo.ui.write(b"gathering %d sample pulls within %d revisions\n"%(count,len(actionrevs)))if1<min_cache:repo.ui.write(b" not caching ranges smaller than %d changesets\n"%min_cache)foriinrange(count):repo.ui.progress(b'gathering data',i,total=count)outgoing=takeonesample(repo,actionrevs)ranges=sliceoutgoing(repo,outgoing)hitranges=0hitchanges=0totalchanges=0largeranges=[]forrangeid,__inranges:length=rlen(rangeid)totalchanges+=lengthifbundlehits[rangeid]:hitranges+=1hitchanges+=rlen(rangeid)ifmin_cache<=length:bundlehits[rangeid]+=1largeranges.append(rangeid)stats=(len(outgoing.missing),totalchanges,hitchanges,len(largeranges),hitranges,)pullstats.append(stats)repo.ui.progress(b'gathering data',None)sizes=[]changesmissing=[]totalchanges=0totalcached=0changesratio=[]rangesratio=[]bundlecount=[]forentryinpullstats:sizes.append(entry[0])changesmissing.append(entry[1]-entry[2])changesratio.append(entry[2]/float(entry[1]))ifentry[3]:rangesratio.append(entry[4]/float(entry[3]))else:rangesratio.append(1)bundlecount.append(entry[3])totalchanges+=entry[1]totalcached+=entry[2]cachedsizes=[]cachedhits=[]forrangeid,hitsinbundlehits.items():ifhits<=0:continuelength=rlen(rangeid)cachedsizes.append(length)cachedhits.append(hits)sizesdist=distribution(sizes)repo.ui.write(fmtdist(b'pull size',sizesdist))changesmissingdist=distribution(changesmissing)repo.ui.write(fmtdist(b'non-cached changesets',changesmissingdist))changesratiodist=distribution(changesratio)repo.ui.write(fmtdist(b'ratio of cached changesets',changesratiodist))bundlecountdist=distribution(bundlecount)repo.ui.write(fmtdist(b'bundle count',bundlecountdist))rangesratiodist=distribution(rangesratio)repo.ui.write(fmtdist(b'ratio of cached bundles',rangesratiodist))repo.ui.write(b'changesets served:\n')repo.ui.write(b' total: %7d\n'%totalchanges)repo.ui.write(b' from cache: %7d (%2d%%)\n'%(totalcached,(totalcached*100//totalchanges)))repo.ui.write(b' bundle: %7d\n'%sum(bundlecount))cachedsizesdist=distribution(cachedsizes)repo.ui.write(fmtdist(b'size of cached bundles',cachedsizesdist))cachedhitsdist=distribution(cachedhits)repo.ui.write(fmtdist(b'hit on cached bundles',cachedhitsdist))deftakeonesample(repo,revs):node=repo.changelog.nodepulled=random.sample(revs,max(4,len(revs)//1000))pulled=repo.revs(b'%ld::%ld',pulled,pulled)nodes=[node(r)forrinpulled]returnoutgoingfromnodes(repo,nodes)defdistribution(data):data.sort()length=len(data)return{b'min':data[0],b'10%':data[length//10],b'25%':data[length//4],b'50%':data[length//2],b'75%':data[(length//4)*3],b'90%':data[(length//10)*9],b'95%':data[(length//20)*19],b'max':data[-1],}STATSFORMAT=b"""{name}: min: {min} 10%: {10%} 25%: {25%} 50%: {50%} 75%: {75%} 90%: {90%} 95%: {95%} max: {max}"""deffmtdist(name,data):returnSTATSFORMAT.format(name=name,**data)