Make SQLGenAnnotator work with GoogleAppEngine.
from logilab.common.testlib import TestCase, mock_object
import os, os.path as osp
import time
from shutil import copy
# additional monkey patches necessary in regular cubicweb environment
from cubicweb.server import rqlannotation
from cubicweb.goa.overrides import rqlannotation as goarqlannotation
rqlannotation.sqlgen_annotate = goarqlannotation.sqlgen_annotate
rqlannotation.set_qdata = goarqlannotation.set_qdata
try:
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import datastore_file_stub
from google.appengine.ext import db as gdb
from cubicweb.goa import db, do_monkey_patch
from cubicweb.goa.dbmyams import load_schema
import_appengine_failed = None
except ImportError, exc:
raise
class db:
class Model:
pass
class DummyProperty:
def __init__(self, *args, **kwargs):
pass
TextProperty = DummyProperty
StringProperty = DummyProperty
BlobProperty = DummyProperty
DateProperty = DummyProperty
ReferenceProperty = DummyProperty
SelfReferenceProperty = DummyProperty
import_appengine_failed = 'cannot import appengine: %s' % exc
from cubicweb import CW_SOFTWARE_ROOT
from cubicweb.server.utils import crypt_password
from cubicweb.devtools.fake import FakeRequest
from cubicweb.goa.goavreg import GAERegistry
from cubicweb.goa.goaconfig import GAEConfiguration
from cubicweb.goa.dbinit import (create_user, create_groups, fix_entities,
init_persistent_schema, insert_versions)
import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)
do_monkey_patch()
class GAEBasedTC(TestCase):
APP_ID = u'test_app'
AUTH_DOMAIN = 'gmail.com'
LOGGED_IN_USER = u't...@example.com' # set to '' for no logged in user
MODEL_CLASSES = None
LOAD_APP_MODULES = None
config = None
_DS_TEMPL_FILE = 'tmpdb-template'
def load_schema_hook(self, loader):
loader.import_yams_template_schema('data')
@property
def DS_FILE(self):
return self.DS_TEMPL_FILE.replace('-template', '')
@property
def DS_TEMPL_FILE(self):
return self._DS_TEMPL_FILE + '_'.join(sorted(cls.__name__ for cls in self.MODEL_CLASSES))
def _set_ds_file(self, dsfile):
# Start with a fresh api proxy.
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
# Use a fresh stub datastore.
stub = datastore_file_stub.DatastoreFileStub(self.APP_ID, dsfile,
dsfile+'.history')
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)
def setUp(self):
if import_appengine_failed:
self.skip(import_appengine_failed)
# Ensure we're in UTC.
os.environ['TZ'] = 'UTC'
time.tzset()
if osp.exists(self.DS_TEMPL_FILE):
copy(self.DS_TEMPL_FILE, self.DS_FILE)
need_ds_init = False
self._set_ds_file(self.DS_FILE)
else:
need_ds_init = True
self._set_ds_file(self.DS_TEMPL_FILE)
# from google.appengine.api import mail_stub
# from google3.apphosting.api import urlfetch_stub
# from google3.apphosting.api import user_service_stub
# # Use a fresh stub UserService.
# apiproxy_stub_map.apiproxy.RegisterStub(
# 'user', user_service_stub.UserServiceStub())
os.environ['AUTH_DOMAIN'] = self.AUTH_DOMAIN
os.environ['USER_EMAIL'] = self.LOGGED_IN_USER
# # Use a fresh urlfetch stub.
# apiproxy_stub_map.apiproxy.RegisterStub(
# 'urlfetch', urlfetch_stub.URLFetchServiceStub())
# # Use a fresh mail stub.
# apiproxy_stub_map.apiproxy.RegisterStub(
# 'mail', mail_stub.MailServiceStub())
if self.MODEL_CLASSES is None:
raise Exception('GAEBasedTC should set MODEL_CLASSES class attribute')
gdb._kind_map = {}
self.config = self.config or GAEConfiguration('toto')
self.config.init_log(logging.CRITICAL)
self.schema = self.config.load_schema(self.MODEL_CLASSES,
self.load_schema_hook)
self.vreg = GAERegistry(self.config)
self.vreg.schema = self.schema
self.vreg.load_module(db)
from cubicweb.goa.appobjects import sessions
self.vreg.load_module(sessions)
from cubicweb.entities import authobjs, schemaobjs
self.vreg.load_module(authobjs)
self.vreg.load_module(schemaobjs)
if self.config['use-google-auth']:
from cubicweb.goa.appobjects import gauthservice
self.vreg.load_module(gauthservice)
if self.LOAD_APP_MODULES is not None:
for module in self.LOAD_APP_MODULES:
self.vreg.load_module(module)
for cls in self.MODEL_CLASSES:
self.vreg.load_object(cls)
self.session_manager = self.vreg.select_component('sessionmanager')
if need_ds_init:
# create default groups and create entities according to the schema
create_groups()
if not self.config['use-google-auth']:
create_user(self.LOGGED_IN_USER, 'toto', ('users', 'managers'))
self.session = self.login(self.LOGGED_IN_USER, 'toto')
else:
req = FakeRequest(vreg=self.vreg)
self.session = self.session_manager.open_session(req)
self.user = self.session.user()
ssession = self.config.repo_session(self.session.sessionid)
ssession.set_pool()
init_persistent_schema(ssession, self.schema)
insert_versions(ssession, self.config)
ssession.commit()
fix_entities(self.schema)
copy(self.DS_TEMPL_FILE, self.DS_FILE)
self._set_ds_file(self.DS_FILE)
else:
if not self.config['use-google-auth']:
self.session = self.login(self.LOGGED_IN_USER, 'toto')
else:
req = FakeRequest(vreg=self.vreg)
self.session = self.session_manager.open_session(req)
self.user = self.session.user()
def tearDown(self):
self.session.close()
def request(self):
req = FakeRequest(vreg=self.vreg)
req.set_connection(self.session, self.user)
return req
def add_entity(self, etype, **kwargs):
cu = self.session.cursor()
rql = 'INSERT %s X' % etype
if kwargs:
rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
rset = cu.execute(rql, kwargs)
return rset.get_entity(0, 0)
def execute(self, *args):
return self.session.cursor().execute(*args)
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
def create_user(self, login, groups=('users',), req=None):
assert not self.config['use-google-auth']
user = self.add_entity('EUser', upassword=str(login), login=unicode(login))
cu = self.session.cursor()
cu.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
% ','.join(repr(g) for g in groups),
{'x': user.eid}, 'x')
return user
def login(self, login, password=None):
assert not self.config['use-google-auth']
req = FakeRequest(vreg=self.vreg)
req.form['__login'] = login
req.form['__password'] = password or login
return self.session_manager.open_session(req)