1 from cherrypy.test import test
2 test.prefer_parent_path()
3
4 import httplib
5 import os
6 localDir = os.path.dirname(__file__)
7 import sys
8 import threading
9 import time
10
11 import cherrypy
12 from cherrypy.lib import sessions
13
19
20 cherrypy.tools.allow = cherrypy.Tool('on_start_resource', http_methods_allowed)
21
22
24
25 class Root:
26
27 _cp_config = {'tools.sessions.on': True,
28 'tools.sessions.storage_type' : 'ram',
29 'tools.sessions.storage_path' : localDir,
30 'tools.sessions.timeout': (1.0 / 60),
31 'tools.sessions.clean_freq': (1.0 / 60),
32 }
33
34 def clear(self):
35 cherrypy.session.cache.clear()
36 clear.exposed = True
37
38 def testGen(self):
39 counter = cherrypy.session.get('counter', 0) + 1
40 cherrypy.session['counter'] = counter
41 yield str(counter)
42 testGen.exposed = True
43
44 def testStr(self):
45 counter = cherrypy.session.get('counter', 0) + 1
46 cherrypy.session['counter'] = counter
47 return str(counter)
48 testStr.exposed = True
49
50 def setsessiontype(self, newtype):
51 self.__class__._cp_config.update({'tools.sessions.storage_type': newtype})
52 if hasattr(cherrypy, "session"):
53 del cherrypy.session
54 setsessiontype.exposed = True
55 setsessiontype._cp_config = {'tools.sessions.on': False}
56
57 def index(self):
58 sess = cherrypy.session
59 c = sess.get('counter', 0) + 1
60 time.sleep(0.01)
61 sess['counter'] = c
62 return str(c)
63 index.exposed = True
64
65 def keyin(self, key):
66 return str(key in cherrypy.session)
67 keyin.exposed = True
68
69 def delete(self):
70 cherrypy.session.delete()
71 sessions.expire()
72 return "done"
73 delete.exposed = True
74
75 def delkey(self, key):
76 del cherrypy.session[key]
77 return "OK"
78 delkey.exposed = True
79
80 def blah(self):
81 return self._cp_config['tools.sessions.storage_type']
82 blah.exposed = True
83
84 def iredir(self):
85 raise cherrypy.InternalRedirect('/blah')
86 iredir.exposed = True
87
88 def restricted(self):
89 return cherrypy.request.method
90 restricted.exposed = True
91 restricted._cp_config = {'tools.allow.on': True,
92 'tools.allow.methods': ['GET']}
93
94 def regen(self):
95 cherrypy.tools.sessions.regenerate()
96 return "logged in"
97 regen.exposed = True
98
99 def length(self):
100 return str(len(cherrypy.session))
101 length.exposed = True
102
103 cherrypy.tree.mount(Root())
104 cherrypy.config.update({'environment': 'test_suite'})
105
106
107 from cherrypy.test import helper
108
110
116
118 self.getPage('/setsessiontype/ram')
119 self.getPage('/clear')
120
121 self.getPage('/testStr')
122 self.assertBody('1')
123 self.getPage('/testGen', self.cookies)
124 self.assertBody('2')
125 self.getPage('/testStr', self.cookies)
126 self.assertBody('3')
127 self.getPage('/length', self.cookies)
128 self.assertBody('1')
129 self.getPage('/delkey?key=counter', self.cookies)
130 self.assertStatus(200)
131
132 self.getPage('/setsessiontype/file')
133 self.getPage('/testStr')
134 self.assertBody('1')
135 self.getPage('/testGen', self.cookies)
136 self.assertBody('2')
137 self.getPage('/testStr', self.cookies)
138 self.assertBody('3')
139 self.getPage('/delkey?key=counter', self.cookies)
140 self.assertStatus(200)
141
142
143 time.sleep(2)
144 self.getPage('/')
145 self.assertBody('1')
146 self.getPage('/length', self.cookies)
147 self.assertBody('1')
148
149
150 self.getPage('/keyin?key=counter', self.cookies)
151 self.assertBody("True")
152 cookieset1 = self.cookies
153
154
155 self.getPage('/')
156 self.getPage('/length', self.cookies)
157 self.assertBody('2')
158
159
160 self.getPage('/delete', self.cookies)
161 self.assertBody("done")
162 self.getPage('/delete', cookieset1)
163 self.assertBody("done")
164 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')]
165 self.assertEqual(f(), [])
166
167
168 self.getPage('/')
169 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')]
170 self.assertNotEqual(f(), [])
171 time.sleep(2)
172 self.assertEqual(f(), [])
173
177
181
183 client_thread_count = 5
184 request_count = 30
185
186
187 self.getPage("/")
188 self.assertBody("1")
189 cookies = self.cookies
190
191 data_dict = {}
192 errors = []
193
194 def request(index):
195 if self.scheme == 'https':
196 c = httplib.HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
197 else:
198 c = httplib.HTTPConnection('%s:%s' % (self.interface(), self.PORT))
199 for i in xrange(request_count):
200 c.putrequest('GET', '/')
201 for k, v in cookies:
202 c.putheader(k, v)
203 c.endheaders()
204 response = c.getresponse()
205 body = response.read()
206 if response.status != 200 or not body.isdigit():
207 errors.append((response.status, body))
208 else:
209 data_dict[index] = max(data_dict[index], int(body))
210
211
212
213
214
215 ts = []
216 for c in xrange(client_thread_count):
217 data_dict[c] = 0
218 t = threading.Thread(target=request, args=(c,))
219 ts.append(t)
220 t.start()
221
222 for t in ts:
223 t.join()
224
225 hitcount = max(data_dict.values())
226 expected = 1 + (client_thread_count * request_count)
227
228 for e in errors:
229 print e
230 self.assertEqual(hitcount, expected)
231
237
239
240 self.getPage('/testStr')
241
242 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
243 path = os.path.join(localDir, "session-" + id)
244 os.unlink(path)
245 self.getPage('/testStr', self.cookies)
246
248 self.getPage('/unknown/page')
249 self.assertErrorPage(404, "The path '/unknown/page' was not found.")
250
251
252
253
254
255
256 self.getPage('/restricted', self.cookies, method='POST')
257 self.assertErrorPage(405, "Specified method is invalid for this server.")
258
260 self.getPage('/testStr')
261
262 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
263 self.getPage('/regen')
264 self.assertBody('logged in')
265 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
266 self.assertNotEqual(id1, id2)
267
268 self.getPage('/testStr')
269
270 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
271 self.getPage('/testStr',
272 headers=[('Cookie',
273 'session_id=maliciousid; '
274 'expires=Sat, 27 Oct 2017 04:18:28 GMT; Path=/;')])
275 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
276 self.assertNotEqual(id1, id2)
277 self.assertNotEqual(id2, 'maliciousid')
278
279
280 import socket
281 try:
282 import memcache
283
284 host, port = '127.0.0.1', 11211
285 for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
286 socket.SOCK_STREAM):
287 af, socktype, proto, canonname, sa = res
288 s = None
289 try:
290 s = socket.socket(af, socktype, proto)
291
292
293 s.settimeout(1.0)
294 s.connect((host, port))
295 s.close()
296 except socket.error:
297 if s:
298 s.close()
299 raise
300 break
301 except (ImportError, socket.error):
306 else:
308
336
338 client_thread_count = 5
339 request_count = 30
340
341
342 self.getPage("/")
343 self.assertBody("1")
344 cookies = self.cookies
345
346 data_dict = {}
347
348 def request(index):
349 for i in xrange(request_count):
350 self.getPage("/", cookies)
351
352
353 if not self.body.isdigit():
354 self.fail(self.body)
355 data_dict[index] = v = int(self.body)
356
357
358
359 ts = []
360 for c in xrange(client_thread_count):
361 data_dict[c] = 0
362 t = threading.Thread(target=request, args=(c,))
363 ts.append(t)
364 t.start()
365
366 for t in ts:
367 t.join()
368
369 hitcount = max(data_dict.values())
370 expected = 1 + (client_thread_count * request_count)
371 self.assertEqual(hitcount, expected)
372
378
380 self.getPage('/unknown/page')
381 self.assertErrorPage(404, "The path '/unknown/page' was not found.")
382
383
384
385
386
387
388 self.getPage('/restricted', self.cookies, method='POST')
389 self.assertErrorPage(405, "Specified method is invalid for this server.")
390
391
392
393 if __name__ == "__main__":
394 setup_server()
395 helper.testmain()
396