hooks/test/unittest_syncschema.py
branchstable
changeset 4667 6c8eccb1b695
child 4681 5f72584ab1d7
equal deleted inserted replaced
4666:737cbdb87e87 4667:6c8eccb1b695
       
     1 from logilab.common.testlib import TestCase, unittest_main
       
     2 from cubicweb.devtools.testlib import CubicWebTC
       
     3 
       
     4 #################
       
     5 # <required  ?> #
       
     6 #################
       
     7 
       
     8 
       
     9 from datetime import datetime
       
    10 
       
    11 from cubicweb import (ConnectionError, ValidationError, AuthenticationError,
       
    12                       BadConnectionId)
       
    13 from cubicweb.devtools.testlib import get_versions
       
    14 
       
    15 from cubicweb.server.sqlutils import SQL_PREFIX
       
    16 from cubicweb.server.repository import Repository
       
    17 
       
    18 orig_get_versions = Repository.get_versions
       
    19 #################
       
    20 # </required ?> #
       
    21 #################
       
    22 
       
    23 def setup_module(*args):
       
    24     Repository.get_versions = get_versions
       
    25 
       
    26 def teardown_module(*args):
       
    27     Repository.get_versions = orig_get_versions
       
    28 
       
    29 class SchemaModificationHooksTC(CubicWebTC):
       
    30 
       
    31     @classmethod
       
    32     def init_config(cls, config):
       
    33         super(SchemaModificationHooksTC, cls).init_config(config)
       
    34         config._cubes = None
       
    35         cls.repo.fill_schema()
       
    36 
       
    37     def index_exists(self, etype, attr, unique=False):
       
    38         self.session.set_pool()
       
    39         dbhelper = self.session.pool.source('system').dbhelper
       
    40         sqlcursor = self.session.pool['system']
       
    41         return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
       
    42 
       
    43     def _set_perms(self, eid):
       
    44         self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup',
       
    45                      {'x': eid}, 'x')
       
    46         self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"',
       
    47                      {'x': eid}, 'x')
       
    48         self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"',
       
    49                      {'x': eid}, 'x')
       
    50 
       
    51     def _set_attr_perms(self, eid):
       
    52         self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup',
       
    53                      {'x': eid}, 'x')
       
    54         self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"',
       
    55                      {'x': eid}, 'x')
       
    56 
       
    57     def test_base(self):
       
    58         schema = self.repo.schema
       
    59         self.session.set_pool()
       
    60         dbhelper = self.session.pool.source('system').dbhelper
       
    61         sqlcursor = self.session.pool['system']
       
    62         self.failIf(schema.has_entity('Societe2'))
       
    63         self.failIf(schema.has_entity('concerne2'))
       
    64         # schema should be update on insertion (after commit)
       
    65         eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0]
       
    66         self._set_perms(eeid)
       
    67         self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE')
       
    68         self.failIf(schema.has_entity('Societe2'))
       
    69         self.failIf(schema.has_entity('concerne2'))
       
    70         # have to commit before adding definition relations
       
    71         self.commit()
       
    72         self.failUnless(schema.has_entity('Societe2'))
       
    73         self.failUnless(schema.has_relation('concerne2'))
       
    74         attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", '
       
    75                                '   X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
       
    76                                'WHERE RT name "name", E name "Societe2", F name "String"')[0][0]
       
    77         self._set_attr_perms(attreid)
       
    78         concerne2_rdef_eid = self.execute(
       
    79             'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
       
    80             'WHERE RT name "concerne2", E name "Societe2"')[0][0]
       
    81         self._set_perms(concerne2_rdef_eid)
       
    82         self.failIf('name' in schema['Societe2'].subject_relations())
       
    83         self.failIf('concerne2' in schema['Societe2'].subject_relations())
       
    84         self.failIf(self.index_exists('Societe2', 'name'))
       
    85         self.commit()
       
    86         self.failUnless('name' in schema['Societe2'].subject_relations())
       
    87         self.failUnless('concerne2' in schema['Societe2'].subject_relations())
       
    88         self.failUnless(self.index_exists('Societe2', 'name'))
       
    89         # now we should be able to insert and query Societe2
       
    90         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
       
    91         self.execute('Societe2 X WHERE X name "logilab"')
       
    92         self.execute('SET X concerne2 X WHERE X name "logilab"')
       
    93         rset = self.execute('Any X WHERE X concerne2 Y')
       
    94         self.assertEquals(rset.rows, [[s2eid]])
       
    95         # check that when a relation definition is deleted, existing relations are deleted
       
    96         rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, '
       
    97                                '   X from_entity E, X to_entity E '
       
    98                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
       
    99         self._set_perms(rdefeid)
       
   100         self.commit()
       
   101         self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x')
       
   102         self.commit()
       
   103         self.failUnless('concerne2' in schema['CWUser'].subject_relations())
       
   104         self.failIf('concerne2' in schema['Societe2'].subject_relations())
       
   105         self.failIf(self.execute('Any X WHERE X concerne2 Y'))
       
   106         # schema should be cleaned on delete (after commit)
       
   107         self.execute('DELETE CWEType X WHERE X name "Societe2"')
       
   108         self.execute('DELETE CWRType X WHERE X name "concerne2"')
       
   109         self.failUnless(self.index_exists('Societe2', 'name'))
       
   110         self.failUnless(schema.has_entity('Societe2'))
       
   111         self.failUnless(schema.has_relation('concerne2'))
       
   112         self.commit()
       
   113         self.failIf(self.index_exists('Societe2', 'name'))
       
   114         self.failIf(schema.has_entity('Societe2'))
       
   115         self.failIf(schema.has_entity('concerne2'))
       
   116         self.failIf('concerne2' in schema['CWUser'].subject_relations())
       
   117 
       
   118     def test_is_instance_of_insertions(self):
       
   119         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
       
   120         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
       
   121         self.assertEquals(is_etypes, ['Transition'])
       
   122         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
       
   123         self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
       
   124         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
       
   125         self.failIf('subdiv' in snames)
       
   126         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
       
   127         self.failUnless('subdiv' in snames)
       
   128 
       
   129 
       
   130     def test_perms_synchronization_1(self):
       
   131         schema = self.repo.schema
       
   132         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
       
   133         self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
       
   134         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
       
   135         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
       
   136         self.commit()
       
   137         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', )))
       
   138         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
       
   139         self.commit()
       
   140         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',)))
       
   141 
       
   142     def test_perms_synchronization_2(self):
       
   143         schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')]
       
   144         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
       
   145         self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
       
   146         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
       
   147         self.commit()
       
   148         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
       
   149         self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
       
   150         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
       
   151         self.commit()
       
   152         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
       
   153 
       
   154     def test_nonregr_user_edit_itself(self):
       
   155         ueid = self.session.user.eid
       
   156         groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')]
       
   157         self.execute('DELETE X in_group Y WHERE X eid %s' % ueid)
       
   158         self.execute('SET X surname "toto" WHERE X eid %s' % ueid)
       
   159         self.execute('SET X in_group Y WHERE X eid %s, Y name "managers"' % ueid)
       
   160         self.commit()
       
   161         eeid = self.execute('Any X WHERE X is CWEType, X name "CWEType"')[0][0]
       
   162         self.execute('DELETE X read_permission Y WHERE X eid %s' % eeid)
       
   163         self.execute('SET X final FALSE WHERE X eid %s' % eeid)
       
   164         self.execute('SET X read_permission Y WHERE X eid %s, Y eid in (%s, %s)'
       
   165                      % (eeid, groupeids[0], groupeids[1]))
       
   166         self.commit()
       
   167         self.execute('Any X WHERE X is CWEType, X name "CWEType"')
       
   168 
       
   169     # schema modification hooks tests #########################################
       
   170 
       
   171     def test_uninline_relation(self):
       
   172         self.session.set_pool()
       
   173         dbhelper = self.session.pool.source('system').dbhelper
       
   174         sqlcursor = self.session.pool['system']
       
   175         self.failUnless(self.schema['state_of'].inlined)
       
   176         try:
       
   177             self.execute('SET X inlined FALSE WHERE X name "state_of"')
       
   178             self.failUnless(self.schema['state_of'].inlined)
       
   179             self.commit()
       
   180             self.failIf(self.schema['state_of'].inlined)
       
   181             self.failIf(self.index_exists('State', 'state_of'))
       
   182             rset = self.execute('Any X, Y WHERE X state_of Y')
       
   183             self.assertEquals(len(rset), 2) # user states
       
   184         finally:
       
   185             self.execute('SET X inlined TRUE WHERE X name "state_of"')
       
   186             self.failIf(self.schema['state_of'].inlined)
       
   187             self.commit()
       
   188             self.failUnless(self.schema['state_of'].inlined)
       
   189             self.failUnless(self.index_exists('State', 'state_of'))
       
   190             rset = self.execute('Any X, Y WHERE X state_of Y')
       
   191             self.assertEquals(len(rset), 2)
       
   192 
       
   193     def test_indexed_change(self):
       
   194         self.session.set_pool()
       
   195         dbhelper = self.session.pool.source('system').dbhelper
       
   196         sqlcursor = self.session.pool['system']
       
   197         try:
       
   198             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
       
   199             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
       
   200             self.failUnless(self.index_exists('Workflow', 'name'))
       
   201             self.commit()
       
   202             self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed)
       
   203             self.failIf(self.index_exists('Workflow', 'name'))
       
   204         finally:
       
   205             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"')
       
   206             self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed)
       
   207             self.failIf(self.index_exists('Workflow', 'name'))
       
   208             self.commit()
       
   209             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
       
   210             self.failUnless(self.index_exists('Workflow', 'name'))
       
   211 
       
   212     def test_unique_change(self):
       
   213         self.session.set_pool()
       
   214         dbhelper = self.session.pool.source('system').dbhelper
       
   215         sqlcursor = self.session.pool['system']
       
   216         try:
       
   217             self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
       
   218                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
       
   219                          'RT name "name", E name "Workflow"')
       
   220             self.failIf(self.schema['Workflow'].has_unique_values('name'))
       
   221             self.failIf(self.index_exists('Workflow', 'name', unique=True))
       
   222             self.commit()
       
   223             self.failUnless(self.schema['Workflow'].has_unique_values('name'))
       
   224             self.failUnless(self.index_exists('Workflow', 'name', unique=True))
       
   225         finally:
       
   226             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
       
   227                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
       
   228                          'RT name "name", E name "Workflow"')
       
   229             self.failUnless(self.schema['Workflow'].has_unique_values('name'))
       
   230             self.failUnless(self.index_exists('Workflow', 'name', unique=True))
       
   231             self.commit()
       
   232             self.failIf(self.schema['Workflow'].has_unique_values('name'))
       
   233             self.failIf(self.index_exists('Workflow', 'name', unique=True))
       
   234 
       
   235     def test_required_change_1(self):
       
   236         self.execute('SET DEF cardinality "?1" '
       
   237                      'WHERE DEF relation_type RT, DEF from_entity E,'
       
   238                      'RT name "title", E name "Bookmark"')
       
   239         self.commit()
       
   240         # should now be able to add bookmark without title
       
   241         self.execute('INSERT Bookmark X: X path "/view"')
       
   242         self.commit()
       
   243 
       
   244     def test_required_change_2(self):
       
   245         self.execute('SET DEF cardinality "11" '
       
   246                      'WHERE DEF relation_type RT, DEF from_entity E,'
       
   247                      'RT name "surname", E name "CWUser"')
       
   248         self.commit()
       
   249         # should not be able anymore to add cwuser without surname
       
   250         self.assertRaises(ValidationError, self.create_user, "toto")
       
   251         self.execute('SET DEF cardinality "?1" '
       
   252                      'WHERE DEF relation_type RT, DEF from_entity E,'
       
   253                      'RT name "surname", E name "CWUser"')
       
   254         self.commit()
       
   255 
       
   256 
       
   257     def test_add_attribute_to_base_class(self):
       
   258         attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
       
   259                                'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0]
       
   260         assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"',
       
   261                      {'x': attreid}, 'x')
       
   262         self.commit()
       
   263         self.schema.rebuild_infered_relations()
       
   264         self.failUnless('Transition' in self.schema['messageid'].subjects())
       
   265         self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects())
       
   266         self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"')
       
   267 
       
   268     def test_change_fulltextindexed(self):
       
   269         target = self.request().create_entity(u'EmailAddress', address=u'rick.roll@dance.com')
       
   270         self.commit()
       
   271         rset = self.execute('Any X Where X has_text "rick.roll"')
       
   272         self.assertIn(target.eid, [item[0] for item in rset])
       
   273 
       
   274         assert self.execute('''SET A fulltextindexed False
       
   275                         WHERE E is CWEType,
       
   276                               E name "EmailAddress",
       
   277                               A is CWAttribute,
       
   278                               A from_entity E,
       
   279                               A relation_type R,
       
   280                               R name "address"
       
   281                     ''')
       
   282         self.commit()
       
   283         rset = self.execute('Any X Where X has_text "rick.roll"')
       
   284         self.assertNotIn(target.eid, [item[0] for item in rset])
       
   285