implement a simple statsd logger (closes #5488711)
authorDavid Douard <david.douard@logilab.fr>
Mon, 04 May 2015 17:37:43 +0200
changeset 10477 ee21c559f94f
parent 10476 62251bfdfd79
child 10478 026b4ee032fb
implement a simple statsd logger (closes #5488711) and add a few timing probes on: - etwist.server.CubicWebRootResource.render_request() - server.querier.QuerierHelper.execute() - server.sources.native.NativeSQLSource: - sqlexec() - compile_rql() - authenticate() - doexec() - doexecmany() as well as a pair of counters on server.querier.QuerierHelper cache (hit/miss).
cwconfig.py
etwist/server.py
server/querier.py
server/sources/native.py
statsd_logger.py
--- a/cwconfig.py	Wed Jul 01 21:00:13 2015 +0200
+++ b/cwconfig.py	Mon May 04 17:37:43 2015 +0200
@@ -843,6 +843,13 @@
           'help': 'file where output logs should be written',
           'group': 'main', 'level': 2,
           }),
+        ('statsd-endpoint',
+         {'type' : 'string',
+          'default': '',
+          'help': 'UDP address of the statsd endpoint; it must be formatted'
+                  'like <ip>:<port>; disabled is unset.',
+          'group': 'main', 'level': 2,
+          }),
         # email configuration
         ('smtp-host',
          {'type' : 'string',
@@ -971,7 +978,7 @@
         if logfile.endswith('.log'):
             logfile = logfile[:-4]
         return logfile + '.stats'
-        
+
     def default_pid_file(self):
         """return default path to the pid file of the instance'server"""
         if self.mode == 'system':
@@ -1121,6 +1128,17 @@
         logconfig = join(self.apphome, 'logging.conf')
         if exists(logconfig):
             logging.config.fileConfig(logconfig)
+        # set the statsd address, if any
+        if self.get('statsd-endpoint'):
+            try:
+                address, port = self.get('statsd-endpoint').split(':')
+                port = int(port)
+            except:
+                self.error('statsd-endpoint: invalid address format ({}); '
+                           'it should be "ip:port"'.format(self.get('statsd-endpoint')))
+            else:
+                import statsd_logger
+                statsd_logger.setup('cubicweb.%s' % self.appid, (address, port))
 
     def available_languages(self, *args):
         """return available translation for an instance, by looking for
--- a/etwist/server.py	Wed Jul 01 21:00:13 2015 +0200
+++ b/etwist/server.py	Mon May 04 17:37:43 2015 +0200
@@ -24,6 +24,7 @@
 import threading
 from urlparse import urlsplit, urlunsplit
 from cgi import FieldStorage, parse_header
+from cubicweb.statsd_logger import statsd_timeit
 
 from twisted.internet import reactor, task, threads
 from twisted.web import http, server
@@ -103,6 +104,7 @@
             deferred = threads.deferToThread(self.render_request, request)
             return NOT_DONE_YET
 
+    @statsd_timeit
     def render_request(self, request):
         try:
             # processing HUGE files (hundred of megabytes) in http.processReceived
--- a/server/querier.py	Wed Jul 01 21:00:13 2015 +0200
+++ b/server/querier.py	Mon May 04 17:37:43 2015 +0200
@@ -37,6 +37,7 @@
 from cubicweb.server.ssplanner import READ_ONLY_RTYPES, add_types_restriction
 from cubicweb.server.edition import EditedEntity
 from cubicweb.server.ssplanner import SSPlanner
+from cubicweb.statsd_logger import statsd_timeit, statsd_c
 
 ETYPE_PYOBJ_MAP[Binary] = 'Bytes'
 
@@ -516,6 +517,7 @@
             return InsertPlan(self, rqlst, args, cnx)
         return ExecutionPlan(self, rqlst, args, cnx)
 
+    @statsd_timeit
     def execute(self, cnx, rql, args=None, build_descr=True):
         """execute a rql query, return resulting rows and their description in
         a `ResultSet` object
@@ -558,8 +560,10 @@
                         return empty_rset(rql, args)
             rqlst = self._rql_cache[cachekey]
             self.cache_hit += 1
+            statsd_c('cache_hit')
         except KeyError:
             self.cache_miss += 1
+            statsd_c('cache_miss')
             rqlst = self.parse(rql)
             try:
                 # compute solutions for rqlst and return named args in query
--- a/server/sources/native.py	Wed Jul 01 21:00:13 2015 +0200
+++ b/server/sources/native.py	Mon May 04 17:37:43 2015 +0200
@@ -60,6 +60,7 @@
 from cubicweb.server.edition import EditedEntity
 from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
 from cubicweb.server.sources.rql2sql import SQLGenerator
+from cubicweb.statsd_logger import statsd_timeit
 
 
 ATTR_MAP = {}
@@ -376,6 +377,7 @@
         self._cache.pop('Any X WHERE X eid %s' % eid, None)
         self._cache.pop('Any %s' % eid, None)
 
+    @statsd_timeit
     def sqlexec(self, cnx, sql, args=None):
         """execute the query and return its result"""
         return self.process_result(self.doexec(cnx, sql, args))
@@ -480,6 +482,7 @@
 
     # ISource interface #######################################################
 
+    @statsd_timeit
     def compile_rql(self, rql, sols):
         rqlst = self.repo.vreg.rqlhelper.parse(rql)
         rqlst.restricted_vars = ()
@@ -517,6 +520,7 @@
         # can't claim not supporting a relation
         return True #not rtype == 'content_for'
 
+    @statsd_timeit
     def authenticate(self, cnx, login, **kwargs):
         """return CWUser eid for the given login and other authentication
         information found in kwargs, else raise `AuthenticationError`
@@ -687,6 +691,7 @@
             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
         self.doexec(cnx, sql, attrs)
 
+    @statsd_timeit
     def doexec(self, cnx, query, args=None, rollback=True):
         """Execute a query.
         it's a function just so that it shows up in profiling
@@ -746,6 +751,7 @@
             raise
         return cursor
 
+    @statsd_timeit
     def doexecmany(self, cnx, query, args):
         """Execute a query.
         it's a function just so that it shows up in profiling
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/statsd_logger.py	Mon May 04 17:37:43 2015 +0200
@@ -0,0 +1,126 @@
+# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+
+"""Simple statsd_ logger for cubicweb.
+
+This module is meant to be configured by setting a couple of global variables:
+
+- ``bucket`` global variable will be used as statsd bucket in every
+statsd_ UDP sent packet.
+
+`- `address`` is a pair (IP, port) specifying the address of the
+statsd_ server
+
+
+There are 3 kinds of statds_ message::
+
+- ``statsd_c(context, n)`` is a simple function to send statsd_
+  counter-type of messages like::
+
+    <bucket>.<context>:<n>|c\n
+
+- ``statsd_g(context, value)`` to send statsd_ gauge-type of messages
+  like::
+
+    <bucket>.<context>:<n>|g\n
+
+- ``statsd_t(context, ms)`` to send statsd_ time-type of messages
+  like::
+
+    <bucket>.<context>:<ms>|ms\n
+
+There is also a decorator (``statsd_timeit``) that may be used to
+measure and send to the statsd_ server the time passed in a function
+or a method and the number of calls. It will send a message like::
+   
+    <bucket>.<funcname>:<ms>|ms\n<bucket>.<funcname>:1|c\n
+
+
+.. _statsd: https://github.com/etsy/statsd
+
+"""
+
+__docformat__ = "restructuredtext en"
+
+import time
+import socket
+
+_bucket = 'cubicweb'
+_address = None
+_socket = None
+
+
+def setup(bucket, address):
+    """Configure the statsd endpoint
+
+    :param bucket: the name of the statsd bucket that will be used to
+                   build messages.
+
+    :param address: the UDP endpoint of the statsd server. Must a
+                    couple (ip, port).
+    """
+    global _bucket, _address, _socket
+    _bucket, _address = bucket, address
+    _socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+
+def statsd_c(context, n=1):
+    if _address is not None:
+        _socket.sendto('{0}.{1}:{2}|c\n'.format(_bucket, context, n), _address)
+
+
+def statsd_g(context, value):
+    if _address is not None:
+        _socket.sendto('{0}.{1}:{2}|g\n'.format(_bucket, context, value), _address)
+
+
+def statsd_t(context, value):
+    if _address is not None:
+        _socket.sendto('{0}.{1}:{2:.4f}|ms\n'.format(_bucket, context, value), _address)
+
+
+class statsd_timeit(object):
+    __slots__ = ('callable',)
+
+    def __init__(self, callableobj):
+        self.callable = callableobj
+
+    @property
+    def __doc__(self):
+        return self.callable.__doc__
+    @property
+    def __name__(self):
+        return self.callable.__name__
+    
+    def __call__(self, *args, **kw):
+        if _address is None:
+            return self.callable(*args, **kw)
+        t0 = time.time()
+        try:
+            return self.callable(*args, **kw)
+        finally:
+            dt = 1000*(time.time()-t0)
+            msg = '{0}.{1}:{2:.4f}|ms\n{0}.{1}:1|c\n'.format(_bucket, self.__name__, dt)
+            _socket.sendto(msg, _address)
+                
+    def __get__(self, obj, objtype):
+        """Support instance methods."""
+        if obj is None: # class method or some already wrapped method
+            return self
+        import functools
+        return functools.partial(self.__call__, obj)