1
2
3
4 from cStringIO import StringIO
5 import string
6 from Martel import RecordReader
7 from mx import TextTools as TT
8
10 i = 0
11
12 while 1:
13 x = reader.next()
14
15 if x is None:
16 break
17 i = i + 1
18 return i
19
20
22 return string.join(TT.splitlines(s), "\n")
23
24
25
26 data1 = """\
27 ID Q1234
28 DE Some protein
29 SQ blah
30 ABCDE FGHIJ
31 //
32 ID Q2345
33 DE ID
34 SQ lahb
35 ID just checking
36 BCDEF GHIJA
37 //
38 ID Q3456
39 DE me proteinSo
40 SQ ahbl
41 CDEFG HIJID
42 //
43 """
44 data1 = normalize(data1)
45
46
47
49 to_eol_data = (("A\n", 1),
50 ("AA\n", 1),
51 ("A\nB\n", 1),
52 ("A\nB\nB\n", 1),
53 ("A\nB\nB\nA\n", 2),
54 ("A\nA\nB\nA\n", 3),
55 ("A\n A\nA\n", 2),
56 ("A A A A A A A A A A A A A A\nB\nA\n", 2),
57 )
58 for s, expected in to_eol_data:
59 reader = RecordReader.StartsWith(StringIO(s), "A")
60 count = count_records(reader)
61 assert count == expected, (s, expected, count)
62
63
65
66 for ending in ("\n", "\r", "\r\n"):
67 s = string.replace(data1, "\n", ending)
68 for final in ("", ending):
69 d = s + final
70
71 for i in range(5, 20):
72 infile = StringIO(d)
73 reader = RecordReader.StartsWith(infile, "ID", i)
74 count = count_records(reader)
75 assert count == 3, (ending, final, i, count, d)
76
77 for i in range(6, 20):
78 infile = StringIO(d)
79 reader = RecordReader.StartsWith(infile, "ID ", i)
80 count = count_records(reader)
81 assert count == 3, (ending, final, i, count, d)
82
84
85
86 for base in ("A" + ending, "A" + ending + "BA" + ending):
87 for repeat in range(0, 15):
88 s = base * repeat
89 infile = StringIO(s)
90
91 for marker in ("A",):
92 for look in range(5):
93 lookahead = base * look
94 for readhint in range(4, 10):
95 infile.seek(0)
96 reader = RecordReader.StartsWith(\
97 infile, marker, readhint, lookahead)
98 count = count_records(reader)
99 assert count == repeat + look, \
100 (count, ending, base, repeat, marker,
101 look, readhint)
102 infile.seek(0)
103 reader = RecordReader.StartsWith(infile, marker,
104 lookahead = lookahead)
105 count = count_records(reader)
106 assert count == repeat + look, \
107 (count, ending, repeat, marker, look)
108
110
111 for repeat in range(20):
112 vals = map(lambda x: "A\n%d\n" % x, range(repeat))
113 data = string.join(vals, "")
114 infile = StringIO(data)
115 for look in range(10) + range(10, len(data), 5):
116 infile.seek(look)
117 lookahead = data[:look]
118 reader = RecordReader.StartsWith(infile, "A",
119 lookahead = lookahead)
120 all = ""
121 while 1:
122 file, lh = reader.remainder()
123 pos = file.tell()
124 rest = file.read()
125 assert all + lh + rest == data, (all, lh, rest, data)
126 file.seek(pos)
127 assert data.startswith(all), (data, all)
128 record = reader.next()
129 if record is None:
130 break
131 all = all + record
132 assert all == data, (all, data)
133
134
136
137
138
139 for s in ("B", "B\n", " A\n", " A", "B\nA\n"):
140 try:
141 infile = StringIO(s)
142
143
144
145 reader = RecordReader.StartsWith(infile, "A")
146 rec = reader.next()
147 raise AssertionError, "should not allow %r" % s
148 except RecordReader.ReaderError:
149 pass
150 else:
151 raise AssertionError, "should not get here"
152
171
172
173
175 to_eol_data = (("A\n", 1),
176 ("AA\n", 1),
177 ("B\nA\n", 1),
178 ("B\nB\nA\n", 1),
179 ("A\nA\n", 2),
180 ("A A\nA\n", 2),
181 ("A", 1),
182 ("A\nA A\nA\n", 3),
183 ("A\nA A\nA", 3),
184 )
185 for s, expected in to_eol_data:
186 reader = RecordReader.EndsWith(StringIO(s), "A")
187 count = count_records(reader)
188 assert count == expected, (s, expected, count)
189
190
191 newline_data = (("A\n", 1),
192
193 ("B\nA\n", 1),
194 ("B\nB\nA\n", 1),
195 ("A\nA\n", 2),
196 ("A A\nA\n", 1),
197 ("A", 1),
198 ("A\nA A\nA\n", 2),
199 ("A\nA A\nA", 2),
200 )
201 for s, expected in newline_data:
202 reader = RecordReader.EndsWith(StringIO(s), "A\n")
203 count = count_records(reader)
204 assert count == expected, (s, expected, count)
205
206
208
209 for ending in ("\n", "\r", "\r\n"):
210 s = string.replace(data1, "\n", ending)
211 for final in ("", ending):
212 d = s + final
213
214 loop = 0
215 for i in range(5, 20):
216 infile = StringIO(d)
217 reader = RecordReader.EndsWith(infile, "//", i)
218 count = count_records(reader)
219 assert count == 3, (ending, final, i, count, d)
220
221 for i in range(5, 20):
222 infile = StringIO(d)
223 reader = RecordReader.EndsWith(infile, "//\n", i)
224 count = count_records(reader)
225 assert count == 3, (ending, final, i, count, d)
226
227
229
230
231 for base in ("A" + ending, "BA" + ending + "A" + ending):
232 for repeat in range(0, 15):
233 s = base * repeat
234 infile = StringIO(s)
235 for marker in ("A", "A\n"):
236 for look in range(5):
237 lookahead = base * look
238 for readhint in range(4, 10):
239 infile.seek(0)
240 reader = RecordReader.EndsWith(\
241 infile, marker, readhint, lookahead)
242 count = count_records(reader)
243 assert count == repeat + look, \
244 (count, ending, base, repeat, marker,
245 look, readhint)
246 infile.seek(0)
247 reader = RecordReader.EndsWith(infile, marker,
248 lookahead = lookahead)
249 count = count_records(reader)
250 assert count == repeat + look, \
251 (count, ending, repeat, marker, look)
252
254
255 for repeat in range(20):
256 vals = map(lambda x: "%d\nA\n" % x, range(repeat))
257 data = string.join(vals, "")
258 infile = StringIO(data)
259 for look in range(10) + range(10, len(data), 5):
260 infile.seek(look)
261 lookahead = data[:look]
262 reader = RecordReader.EndsWith(infile, "A",
263 lookahead = lookahead)
264 all = ""
265 while 1:
266 file, lh = reader.remainder()
267 pos = file.tell()
268 rest = file.read()
269 assert all + lh + rest == data, (all, lh, rest, data)
270 file.seek(pos)
271 assert data.startswith(all), (data, all)
272 record = reader.next()
273 if record is None:
274 break
275 all = all + record
276 assert all == data, (all, data)
277
279
280
281
282
283
284 for s in ("B", "B\n", "A\nB\n", "A\nB\nA\nB\n", "A\nB\nA\n ", "AA",
285 "AA\n", "A\nB\nA X\n"):
286 has_error = 0
287 infile = StringIO(s)
288 try:
289 reader = RecordReader.EndsWith(infile, "A\n")
290 except RecordReader.ReaderError:
291 has_error = 1
292
293 if not has_error:
294 while not has_error:
295 try:
296 rec = reader.next()
297 except RecordReader.ReaderError:
298 has_error = 1
299 if not has_error and rec is None:
300 break
301 if not has_error:
302 raise AssertionError, "should not get here with %r" % s
303
304
305
306
307 for s in ("B", "B\n", "A\nB\n", "A\nB\nA\nB\n", "A\nB\nA\n "):
308 has_error = 0
309 infile = StringIO(s)
310 try:
311 reader = RecordReader.EndsWith(infile, "A")
312 except RecordReader.ReaderError:
313 has_error = 1
314
315 if not has_error:
316 while not has_error:
317 try:
318 rec = reader.next()
319 except RecordReader.ReaderError:
320 has_error = 1
321 if not has_error and rec is None:
322 break
323 if not has_error:
324 raise AssertionError, "should not get here with %r" % s
325
326
327
346
347
348
349
350
352
353 print "Testing Until"
354 test_data = ("A",
355 "A\n",
356 "A\nB",
357 "A\nB\n",
358 "A\nBCDE\nQWE\nTRYU\nA\n",
359 "A\nA\nA\nA\nA\nA\nA\n",
360 "AB",
361 "AB\n",
362 "AB\nAC",
363 "AB\nAC\n",
364 )
365
366 for ending in ("\n", "\r", "\r\n"):
367 for pre in ("", "B\n", "BA\n", "CA\nBA\n"):
368 pre_nl = string.replace(pre, "\n", ending)
369 for look in (0, 1, 3):
370 for text in test_data:
371 text_nl = string.replace(text, "\n", ending)
372 s = pre_nl + text_nl
373 reader = RecordReader.Until(StringIO(s[look:]),
374 "A",
375 lookahead = s[:look],
376 sizehint = 4)
377 found_record = None
378 while 1:
379 rec = reader.next()
380 if rec is None:
381 break
382 assert not found_record, \
383 "Already found %r but also found %r in %r" % \
384 (found_record, rec, s)
385 found_record = rec
386
387 assert found_record == pre_nl, \
388 "Expecting record %r, found %r in %r" % \
389 (pre_nl, found_record, s)
390 infile, remainder = reader.remainder()
391 remainder = remainder + infile.read()
392 assert remainder == text_nl, \
393 "Expecting remainder %r, found %r in %r" % \
394 (text_nl, remainder, s)
395
396
397
399
400
401
402
403
404 print "Testing CountLines"
405 for ending in ("\n", "\r", "\r\n"):
406 print " ... exhaustive testing against %r" % ending
407 s = ""
408 for i in range(25):
409 for count in range(1,i+1):
410 for look in (0, 2, 5):
411 rep, final = divmod(i, count)
412 reader = RecordReader.CountLines(StringIO(s[look:]),
413 count,
414 lookahead = s[:look],
415 sizehint = 1)
416 all = ""
417 while rep > 0:
418 rec = reader.next()
419 lines = string.split(rec, ending)
420 assert len(lines)-1 == count, \
421 "Expecting %d lines, got %d in %r using %r" % \
422 (count, len(lines)-1, rec, ending)
423 all = all + rec
424 rep = rep - 1
425
426 if final == 0:
427
428 rec = reader.next()
429 assert rec is None, \
430 "Should be at end of reader, got %r" % rec
431 rec = reader.next()
432 assert rec is None, \
433 "data after end of reader, got %r" % rec
434 else:
435
436 infile, remainder = reader.remainder()
437 text = remainder + infile.read()
438 all = all + text
439 lines = string.split(text, ending)
440 assert len(lines)-1 == final, \
441 "Expecting %d final lines, got %d" % \
442 (final, len(lines)-1)
443 try:
444 rec = reader.next()
445 raise AssertionError, \
446 "Got unexpected final record, %r" % rec
447 except RecordReader.ReaderError:
448 pass
449 assert all == s, \
450 "record data %r doesn't rebuild input %r" % \
451 (all, s)
452 s = s + str(i) + ending
453
454
455
457 print "Testing Nothing"
458
459 s = "This is a test.\nThis is only a test.\nHad this been an actual...\n"
460 for ending in ("\n", "\r", "\r\n"):
461 data = string.replace(s, "\n", ending)
462 for look in (0, 1, 2, 5):
463 reader = RecordReader.Nothing(StringIO(data[look:]),
464 sizehint = 1,
465 lookahead = data[:look])
466 rec = reader.next()
467 assert rec is None, "should be empty, not %r" % rec
468 rec = reader.next()
469 assert rec is None, "2nd time should also be empty, not %r" % rec
470
471 infile, remainder = reader.remainder()
472 remainder = remainder + infile.read()
473 assert remainder == data, "Why %r when input was %r?" % \
474 (remainder, data)
475
476
477
479 print "Testing Everything"
480
481 s = "This is a test.\nThis is only a test.\nHad this been an actual...\n"
482 for ending in ("\n", "\r", "\r\n"):
483 data = string.replace(s, "\n", ending)
484 for look in (0, 1, 2, 5):
485 reader = RecordReader.Everything(StringIO(data[look:]),
486 sizehint = 1,
487 lookahead = data[:look])
488 rec = reader.next()
489 assert rec == data, "Record %r is not same as input %r" % \
490 (rec, data)
491 infile, remainder = reader.remainder()
492 remainder = remainder + infile.read()
493 assert not remainder, "Why is there a remainder of %r?" % \
494 remainder
495
496 rec = reader.next()
497 assert rec is None, "Expecting None after final read, got %r" % \
498 rec
499 rec = reader.next()
500 assert rec is None, "Expecting None (again), got %r" % rec
501
502
503
504
505
513
514 if __name__ == "__main__":
515 test()
516