[dataimport] remove dead code
The only caller of ObjectStore._put is ObjectStore.create_entity, which
RQLObjectStore overrides (and doesn't call up).
# -*- coding: utf-8 -*-# copyright 2012-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.importcPickleimporttracebackfromthreadingimportThreadfromloggingimportgetLoggerimportzmqfromzmq.eventloopimportioloopimportzmq.eventloop.zmqstreamfromcubicwebimportset_log_methodsfromcubicweb.server.serverimportQuitEvent,Finishedctx=zmq.Context()defcwproto_to_zmqaddr(address):""" converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>) into a proper zmq address (tcp://<ip>:<port>) """assertaddress.startswith('zmqpickle-'),'bad protocol string %s'%addressreturnaddress.split('-',1)[1]# chop the `zmqpickle-` prefixclassZMQComm(object):""" A simple ZMQ-based notification bus. There should at most one instance of this class attached to a Repository. A typical usage may be something like:: def callback(msg): self.info('received message: %s', ' '.join(msg)) repo.app_instances_bus.subscribe('hello', callback) to subsribe to the 'hello' kind of message. On the other side, to emit a notification, call:: repo.app_instances_bus.publish(['hello', 'world']) See http://docs.cubicweb.org for more details. """def__init__(self):self.ioloop=ioloop.IOLoop()self._topics={}self._subscribers=[]self.publisher=Nonedefadd_publisher(self,address):assertself.publisherisNone,"more than one publisher is not supported"self.publisher=Publisher(self.ioloop,address)defadd_subscription(self,topic,callback):forsubscriberinself._subscribers:subscriber.subscribe(topic,callback)self._topics[topic]=callbackdefadd_subscriber(self,address):subscriber=Subscriber(self.ioloop,address)fortopic,callbackinself._topics.iteritems():subscriber.subscribe(topic,callback)self._subscribers.append(subscriber)defpublish(self,msg):ifself.publisherisNone:returnself.publisher.send(msg)defstart(self):Thread(target=self.ioloop.start).start()defstop(self):self.ioloop.add_callback(self.ioloop.stop)def__del__(self):self.ioloop.close()classPublisher(object):def__init__(self,ioloop,address):self.address=addressself._topics={}self._subscribers=[]self.ioloop=ioloopdefcallback():s=ctx.socket(zmq.PUB)self.stream=zmq.eventloop.zmqstream.ZMQStream(s,io_loop=ioloop)self.stream.bind(self.address)self.debug('start publisher on %s',self.address)ioloop.add_callback(callback)defsend(self,msg):self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))classSubscriber(object):def__init__(self,ioloop,address):self.address=addressself.dispatch_table={}self.ioloop=ioloopdefcallback():s=ctx.socket(zmq.SUB)self.stream=zmq.eventloop.zmqstream.ZMQStream(s,io_loop=ioloop)self.stream.on_recv(self.dispatch)self.stream.connect(self.address)self.debug('start subscriber on %s',self.address)ioloop.add_callback(callback)defdispatch(self,msg):try:f=self.dispatch_table[msg[0]]exceptKeyError:returnf(msg)defsubscribe(self,topic,callback):self.dispatch_table[topic]=callbackself.ioloop.add_callback(lambda:self.stream.setsockopt(zmq.SUBSCRIBE,topic))classZMQRepositoryServer(object):def__init__(self,repository):"""make the repository available as a PyRO object"""self.address=Noneself.repo=repositoryself.socket=Noneself.stream=Noneself.loop=ioloop.IOLoop()# event queueself.events=[]defconnect(self,address):self.address=cwproto_to_zmqaddr(address)defrun(self):"""enter the service loop"""# start repository looping tasksself.socket=ctx.socket(zmq.REP)self.stream=zmq.eventloop.zmqstream.ZMQStream(self.socket,io_loop=self.loop)self.stream.bind(self.address)self.info('ZMQ server bound on: %s',self.address)self.stream.on_recv(self.process_cmds)try:self.loop.start()exceptzmq.ZMQError:self.warning('ZMQ event loop killed')self.quit()deftrigger_events(self):"""trigger ready events"""foreventinself.events[:]:ifevent.is_ready():self.info('starting event %s',event)event.fire(self)try:event.update()exceptFinished:self.events.remove(event)defprocess_cmd(self,cmd):"""Delegate the given command to the repository. ``cmd`` is a list of (method_name, args, kwargs) where ``args`` is a list of positional arguments and ``kwargs`` is a dictionnary of named arguments. >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}]) :note1: ``kwargs`` may be ommited >>> rset = delegate_to_repo(["execute", [sessionid, rql]]) :note2: both ``args`` and ``kwargs`` may be omitted >>> schema = delegate_to_repo(["get_schema"]) >>> schema = delegate_to_repo("get_schema") # also allowed """cmd=cPickle.loads(cmd)ifnotcmd:raiseAttributeError('function name required')ifisinstance(cmd,basestring):cmd=[cmd]iflen(cmd)<2:cmd.append(())iflen(cmd)<3:cmd.append({})cmd=list(cmd)+[(),{}]funcname,args,kwargs=cmd[:3]result=getattr(self.repo,funcname)(*args,**kwargs)returnresultdefprocess_cmds(self,cmds):"""Callback intended to be used with ``on_recv``. Call ``delegate_to_repo`` on each command and send a pickled of each result recursively. Any exception are catched, pickled and sent. """try:forcmdincmds:result=self.process_cmd(cmd)self.send_data(result)exceptExceptionasexc:traceback.print_exc()self.send_data(exc)defsend_data(self,data):self.socket.send_pyobj(data)defquit(self,shutdown_repo=False):"""stop the server"""self.info('Quitting ZMQ server')try:self.loop.add_callback(self.loop.stop)self.stream.on_recv(None)self.stream.close()exceptExceptionase:printepassifshutdown_repoandnotself.repo.shutting_down:event=QuitEvent()event.fire(self)# server utilitities ######################################################definstall_sig_handlers(self):"""install signal handlers"""importsignalself.info('installing signal handlers')signal.signal(signal.SIGINT,lambdax,y,s=self:s.quit(shutdown_repo=True))signal.signal(signal.SIGTERM,lambdax,y,s=self:s.quit(shutdown_repo=True))# these are overridden by set_log_methods below# only defining here to prevent pylint from complaining@classmethoddefinfo(cls,msg,*a,**kw):passset_log_methods(Publisher,getLogger('cubicweb.zmq.pub'))set_log_methods(Subscriber,getLogger('cubicweb.zmq.sub'))set_log_methods(ZMQRepositoryServer,getLogger('cubicweb.zmq.repo'))