|
1 # -*- coding: utf-8 -*- |
|
2 # copyright 2018 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
14 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 """unit tests for module cubicweb.statsd_logger""" |
|
20 |
|
21 import threading |
|
22 import socket |
|
23 import time |
|
24 import re |
|
25 |
|
26 from unittest import TestCase |
|
27 from cubicweb import statsd_logger as statsd |
|
28 |
|
29 |
|
30 UDP_PORT = None |
|
31 RUNNING = True |
|
32 SOCK = socket.socket(socket.AF_INET, |
|
33 socket.SOCK_DGRAM) |
|
34 SOCK.settimeout(0.1) |
|
35 STATSD = None |
|
36 DATA = [] |
|
37 |
|
38 |
|
39 def statsd_rcv(): |
|
40 while RUNNING: |
|
41 try: |
|
42 data, addr = SOCK.recvfrom(1024) |
|
43 if data: |
|
44 rcv = [row.strip().decode() for row in data.splitlines()] |
|
45 DATA.extend(rcv) |
|
46 except socket.timeout: |
|
47 pass |
|
48 |
|
49 |
|
50 def setUpModule(*args): |
|
51 global UDP_PORT, STATSD |
|
52 SOCK.bind(('127.0.0.1', 0)) |
|
53 UDP_PORT = SOCK.getsockname()[1] |
|
54 STATSD = threading.Thread(target=statsd_rcv) |
|
55 STATSD.start() |
|
56 statsd.setup('test', ('127.0.0.1', UDP_PORT)) |
|
57 |
|
58 |
|
59 def tearDownModule(*args): |
|
60 global RUNNING |
|
61 RUNNING = False |
|
62 STATSD.join() |
|
63 statsd.teardown() |
|
64 |
|
65 |
|
66 class StatsdTC(TestCase): |
|
67 |
|
68 def setUp(self): |
|
69 super(StatsdTC, self).setUp() |
|
70 DATA[:] = [] |
|
71 |
|
72 def check_received(self, value): |
|
73 for i in range(10): |
|
74 if value in DATA: |
|
75 break |
|
76 time.sleep(0.01) |
|
77 else: |
|
78 self.assertIn(value, DATA) |
|
79 |
|
80 def check_received_ms(self, value): |
|
81 value = re.compile(value.replace('?', '\d')) |
|
82 for i in range(10): |
|
83 if [x for x in DATA if value.match(x)]: |
|
84 break |
|
85 time.sleep(0.01) |
|
86 else: |
|
87 self.assertTrue([x for x in DATA if value.match(x)], DATA) |
|
88 |
|
89 def test_statsd_c(self): |
|
90 statsd.statsd_c('context') |
|
91 self.check_received('test.context:1|c') |
|
92 statsd.statsd_c('context', 10) |
|
93 self.check_received('test.context:10|c') |
|
94 |
|
95 def test_statsd_g(self): |
|
96 statsd.statsd_g('context', 42) |
|
97 self.check_received('test.context:42|g') |
|
98 statsd.statsd_g('context', 'Igorrr') |
|
99 self.check_received('test.context:Igorrr|g') |
|
100 |
|
101 def test_statsd_t(self): |
|
102 statsd.statsd_t('context', 1) |
|
103 self.check_received('test.context:1.0000|ms') |
|
104 statsd.statsd_t('context', 10) |
|
105 self.check_received('test.context:10.0000|ms') |
|
106 statsd.statsd_t('context', 0.12344) |
|
107 self.check_received('test.context:0.1234|ms') |
|
108 statsd.statsd_t('context', 0.12345) |
|
109 self.check_received('test.context:0.1235|ms') |
|
110 |
|
111 def test_decorator(self): |
|
112 |
|
113 @statsd.statsd_timeit |
|
114 def measure_me_please(): |
|
115 "some nice function" |
|
116 return 42 |
|
117 |
|
118 self.assertEqual(measure_me_please.__doc__, |
|
119 "some nice function") |
|
120 |
|
121 measure_me_please() |
|
122 self.check_received_ms('test.measure_me_please:0.0???|ms') |
|
123 self.check_received('test.measure_me_please:1|c') |
|
124 |
|
125 def test_context_manager(self): |
|
126 |
|
127 with statsd.statsd_timethis('cm'): |
|
128 time.sleep(0.1) |
|
129 |
|
130 self.check_received_ms('test.cm:100.????|ms') |
|
131 self.check_received('test.cm:1|c') |
|
132 |
|
133 |
|
134 if __name__ == '__main__': |
|
135 from unittest import main |
|
136 main() |