|
1 """Base class for request/session |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 :license: Library General Public License version 2 - http://www.gnu.org/licenses |
|
7 """ |
|
8 __docformat__ = "restructuredtext en" |
|
9 |
|
10 from datetime import time |
|
11 |
|
12 from logilab.common.decorators import cached |
|
13 |
|
14 from cubicweb import Unauthorized, typed_eid |
|
15 from cubicweb.rset import ResultSet |
|
16 from cubicweb.utils import ustrftime, strptime, todate, todatetime |
|
17 |
|
18 class RequestSessionBase(object): |
|
19 """base class containing stuff shared by server session and web request |
|
20 """ |
|
21 def __init__(self, vreg): |
|
22 self.vreg = vreg |
|
23 try: |
|
24 encoding = vreg.property_value('ui.encoding') |
|
25 except: # no vreg or property not registered |
|
26 encoding = 'utf-8' |
|
27 self.encoding = encoding |
|
28 # cache result of execution for (rql expr / eids), |
|
29 # should be emptied on commit/rollback of the server session / web |
|
30 # connection |
|
31 self.local_perm_cache = {} |
|
32 self._ = unicode |
|
33 |
|
34 def property_value(self, key): |
|
35 """return value of the property with the given key, giving priority to |
|
36 user specific value if any, else using site value |
|
37 """ |
|
38 if self.user: |
|
39 return self.user.property_value(key) |
|
40 return self.vreg.property_value(key) |
|
41 |
|
42 def etype_rset(self, etype, size=1): |
|
43 """return a fake result set for a particular entity type""" |
|
44 rset = ResultSet([('A',)]*size, '%s X' % etype, |
|
45 description=[(etype,)]*size) |
|
46 def get_entity(row, col=0, etype=etype, req=self, rset=rset): |
|
47 return req.vreg.etype_class(etype)(req, rset, row, col) |
|
48 rset.get_entity = get_entity |
|
49 return self.decorate_rset(rset) |
|
50 |
|
51 def eid_rset(self, eid, etype=None): |
|
52 """return a result set for the given eid without doing actual query |
|
53 (we have the eid, we can suppose it exists and user has access to the |
|
54 entity) |
|
55 """ |
|
56 eid = typed_eid(eid) |
|
57 if etype is None: |
|
58 etype = self.describe(eid)[0] |
|
59 rset = ResultSet([(eid,)], 'Any X WHERE X eid %(x)s', {'x': eid}, |
|
60 [(etype,)]) |
|
61 return self.decorate_rset(rset) |
|
62 |
|
63 def empty_rset(self): |
|
64 """return a result set for the given eid without doing actual query |
|
65 (we have the eid, we can suppose it exists and user has access to the |
|
66 entity) |
|
67 """ |
|
68 return self.decorate_rset(ResultSet([], 'Any X WHERE X eid -1')) |
|
69 |
|
70 def entity_from_eid(self, eid, etype=None): |
|
71 """return an entity instance for the given eid. No query is done""" |
|
72 try: |
|
73 return self.entity_cache(eid) |
|
74 except KeyError: |
|
75 rset = self.eid_rset(eid, etype) |
|
76 entity = rset.get_entity(0, 0) |
|
77 self.set_entity_cache(entity) |
|
78 return entity |
|
79 |
|
80 def entity_cache(self, eid): |
|
81 raise KeyError |
|
82 |
|
83 def set_entity_cache(self, entity): |
|
84 pass |
|
85 |
|
86 def ensure_ro_rql(self, rql): |
|
87 """raise an exception if the given rql is not a select query""" |
|
88 first = rql.split(' ', 1)[0].lower() |
|
89 if first in ('insert', 'set', 'delete'): |
|
90 raise Unauthorized(self._('only select queries are authorized')) |
|
91 |
|
92 # url generation methods ################################################## |
|
93 |
|
94 def build_url(self, *args, **kwargs): |
|
95 """return an absolute URL using params dictionary key/values as URL |
|
96 parameters. Values are automatically URL quoted, and the |
|
97 publishing method to use may be specified or will be guessed. |
|
98 """ |
|
99 # use *args since we don't want first argument to be "anonymous" to |
|
100 # avoid potential clash with kwargs |
|
101 assert len(args) == 1, 'only 0 or 1 non-named-argument expected' |
|
102 method = args[0] |
|
103 base_url = kwargs.pop('base_url', None) |
|
104 if base_url is None: |
|
105 base_url = self.base_url() |
|
106 if '_restpath' in kwargs: |
|
107 assert method == 'view', method |
|
108 path = kwargs.pop('_restpath') |
|
109 else: |
|
110 path = method |
|
111 if not kwargs: |
|
112 return u'%s%s' % (base_url, path) |
|
113 return u'%s%s?%s' % (base_url, path, self.build_url_params(**kwargs)) |
|
114 |
|
115 |
|
116 def build_url_params(self, **kwargs): |
|
117 """return encoded params to incorporate them in an URL""" |
|
118 args = [] |
|
119 for param, values in kwargs.items(): |
|
120 if not isinstance(values, (list, tuple)): |
|
121 values = (values,) |
|
122 for value in values: |
|
123 args.append(u'%s=%s' % (param, self.url_quote(value))) |
|
124 return '&'.join(args) |
|
125 |
|
126 def url_quote(self, value, safe=''): |
|
127 """urllib.quote is not unicode safe, use this method to do the |
|
128 necessary encoding / decoding. Also it's designed to quote each |
|
129 part of a url path and so the '/' character will be encoded as well. |
|
130 """ |
|
131 if isinstance(value, unicode): |
|
132 quoted = urlquote(value.encode(self.encoding), safe=safe) |
|
133 return unicode(quoted, self.encoding) |
|
134 return urlquote(str(value), safe=safe) |
|
135 |
|
136 def url_unquote(self, quoted): |
|
137 """returns a unicode unquoted string |
|
138 |
|
139 decoding is based on `self.encoding` which is the encoding |
|
140 used in `url_quote` |
|
141 """ |
|
142 if isinstance(quoted, unicode): |
|
143 quoted = quoted.encode(self.encoding) |
|
144 try: |
|
145 return unicode(urlunquote(quoted), self.encoding) |
|
146 except UnicodeDecodeError: # might occurs on manually typed URLs |
|
147 return unicode(urlunquote(quoted), 'iso-8859-1') |
|
148 |
|
149 # bound user related methods ############################################### |
|
150 |
|
151 @cached |
|
152 def user_data(self): |
|
153 """returns a dictionnary with this user's information""" |
|
154 userinfo = {} |
|
155 if self.is_internal_session: |
|
156 userinfo['login'] = "cubicweb" |
|
157 userinfo['name'] = "cubicweb" |
|
158 userinfo['email'] = "" |
|
159 return userinfo |
|
160 user = self.actual_session().user |
|
161 rql = "Any F,S,A where U eid %(x)s, U firstname F, U surname S, U primary_email E, E address A" |
|
162 try: |
|
163 firstname, lastname, email = self.execute(rql, {'x': user.eid}, 'x')[0] |
|
164 if firstname is None and lastname is None: |
|
165 userinfo['name'] = '' |
|
166 else: |
|
167 userinfo['name'] = ("%s %s" % (firstname, lastname)) |
|
168 userinfo['email'] = email |
|
169 except IndexError: |
|
170 userinfo['name'] = None |
|
171 userinfo['email'] = None |
|
172 userinfo['login'] = user.login |
|
173 return userinfo |
|
174 |
|
175 def is_internal_session(self): |
|
176 """overrided on the server-side""" |
|
177 return False |
|
178 |
|
179 # formating methods ####################################################### |
|
180 |
|
181 def format_date(self, date, date_format=None, time=False): |
|
182 """return a string for a date time according to instance's |
|
183 configuration |
|
184 """ |
|
185 if date: |
|
186 if date_format is None: |
|
187 if time: |
|
188 date_format = self.property_value('ui.datetime-format') |
|
189 else: |
|
190 date_format = self.property_value('ui.date-format') |
|
191 return ustrftime(date, date_format) |
|
192 return u'' |
|
193 |
|
194 def format_time(self, time): |
|
195 """return a string for a time according to instance's |
|
196 configuration |
|
197 """ |
|
198 if time: |
|
199 return ustrftime(time, self.property_value('ui.time-format')) |
|
200 return u'' |
|
201 |
|
202 def format_float(self, num): |
|
203 """return a string for floating point number according to instance's |
|
204 configuration |
|
205 """ |
|
206 if num: |
|
207 return self.property_value('ui.float-format') % num |
|
208 return u'' |
|
209 |
|
210 def parse_datetime(self, value, etype='Datetime'): |
|
211 """get a datetime or time from a string (according to etype) |
|
212 Datetime formatted as Date are accepted |
|
213 """ |
|
214 assert etype in ('Datetime', 'Date', 'Time'), etype |
|
215 # XXX raise proper validation error |
|
216 if etype == 'Datetime': |
|
217 format = self.property_value('ui.datetime-format') |
|
218 try: |
|
219 return todatetime(strptime(value, format)) |
|
220 except ValueError: |
|
221 pass |
|
222 elif etype == 'Time': |
|
223 format = self.property_value('ui.time-format') |
|
224 try: |
|
225 # (adim) I can't find a way to parse a Time with a custom format |
|
226 date = strptime(value, format) # this returns a DateTime |
|
227 return time(date.hour, date.minute, date.second) |
|
228 except ValueError: |
|
229 raise ValueError('can\'t parse %r (expected %s)' % (value, format)) |
|
230 try: |
|
231 format = self.property_value('ui.date-format') |
|
232 dt = strptime(value, format) |
|
233 if etype == 'Datetime': |
|
234 return todatetime(dt) |
|
235 return todate(dt) |
|
236 except ValueError: |
|
237 raise ValueError('can\'t parse %r (expected %s)' % (value, format)) |
|
238 |
|
239 # abstract methods to override according to the web front-end ############# |
|
240 |
|
241 def base_url(self): |
|
242 """return the root url of the instance""" |
|
243 raise NotImplementedError |
|
244 |
|
245 def decorate_rset(self, rset): |
|
246 """add vreg/req (at least) attributes to the given result set """ |
|
247 raise NotImplementedError |
|
248 |
|
249 def describe(self, eid): |
|
250 """return a tuple (type, sourceuri, extid) for the entity with id <eid>""" |
|
251 raise NotImplementedError |