Package Bio :: Package Sequencing :: Module Phd
[hide private]
[frames] | no frames]

Source Code for Module Bio.Sequencing.Phd

  1  # Copyright 2004 by Cymon J. Cox and Frank Kauff.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5  """ 
  6  Parser for PHD files output by PHRED and used by PHRAP and CONSED. 
  7   
  8  Works fine with PHRED 0.020425.c 
  9   
 10  Version 1.1, 03/09/2004 
 11  written by Cymon J. Cox (cymon@duke.edu) and Frank Kauff (fkauff@duke.edu) 
 12  Comments, bugs, problems, suggestions to one uf us are welcome! 
 13   
 14  Uses the Biopython Parser interface for parsing: ParserSupport.py 
 15   
 16  """ 
 17   
 18  import os 
 19  from Bio import File 
 20  from Bio import Seq 
 21  from Bio.ParserSupport import * 
 22  from Bio.Alphabet import IUPAC 
 23   
 24  CKEYWORDS=['CHROMAT_FILE','ABI_THUMBPRINT','PHRED_VERSION','CALL_METHOD',\ 
 25          'QUALITY_LEVELS','TIME','TRACE_ARRAY_MIN_INDEX','TRACE_ARRAY_MAX_INDEX',\ 
 26          'TRIM','TRACE_PEAK_AREA_RATIO','CHEM','DYE'] 
 27   
28 -class Record:
29 """Hold information from a PHD file."""
30 - def __init__(self):
31 self.file_name = '' 32 self.comments={} 33 for kw in CKEYWORDS: 34 self.comments[kw.lower()]=None 35 self.sites = [] 36 self.seq = '' 37 self.seq_trimmed = ''
38 39
40 -class Iterator:
41 """Iterates over a file of multiple PHD records. 42 43 Methods: 44 next Return the next record from the stream, or None. 45 """ 46
47 - def __init__(self, handle, parser=None):
48 """__init__(self, handle, parser=None) 49 50 Create a new iterator. handle is a file-like object. parser 51 is an optional Parser object to change the results into another form. 52 If set to None, then the raw contents of the file will be returned. 53 """ 54 self._uhandle = File.UndoHandle(handle) 55 self._parser = parser
56
57 - def next(self):
58 """next(self) -> object 59 60 Return the next PHD record from the file. If no more records 61 return None. 62 """ 63 64 lines = [] 65 while 1: 66 line = self._uhandle.readline() 67 if not line: 68 break 69 # If a new record, then put the line back and stop. 70 if lines and line[:14] == 'BEGIN_SEQUENCE': 71 self._uhandle.saveline(line) 72 break 73 lines.append(line) 74 75 if not lines: 76 return None 77 78 data = ''.join(lines) 79 if self._parser is not None: 80 return self._parser.parse(File.StringHandle(data)) 81 return data
82
83 - def __iter__(self):
84 """Iterate over the PHY file record bt record.""" 85 return iter(self.next, None)
86
87 -class RecordParser(AbstractParser):
88 """Parses PHD file data into a Record object."""
89 - def __init__(self):
90 self._scanner = _Scanner() 91 self._consumer = _RecordConsumer()
92
93 - def parse(self, handle):
94 if isinstance(handle, File.UndoHandle): 95 uhandle = handle 96 else: 97 uhandle = File.UndoHandle(handle) 98 self._scanner.feed(uhandle, self._consumer) 99 return self._consumer.data
100 101
102 -class _Scanner:
103 """Scans a PHD-formatted file. 104 105 Methods: 106 feed - Feed one PHD record. 107 """
108 - def feed(self, handle, consumer):
109 """Reads in PDH data from the handle for scanning. 110 111 Feed in PHD data for scanning. handle is a file-like object 112 containing PHD data. consumer is a Consumer object that will 113 receive events as the PHD data is scanned. 114 """ 115 assert isinstance(handle, File.UndoHandle), \ 116 "handle must be an UndoHandle" 117 if handle.peekline(): 118 self._scan_record(handle, consumer)
119
120 - def _scan_record(self, uhandle, consumer):
121 self._scan_begin_sequence(uhandle, consumer) 122 self._scan_comments(uhandle, consumer) 123 self._scan_dna(uhandle, consumer) 124 consumer.end_sequence()
125
126 - def _scan_begin_sequence(self, uhandle, consumer):
127 read_and_call(uhandle, consumer.begin_sequence, start = 'BEGIN_SEQUENCE')
128
129 - def _scan_comments(self, uhandle, consumer):
130 131 read_and_call_while(uhandle, consumer.noevent, blank=1) 132 read_and_call(uhandle, consumer.noevent, start = 'BEGIN_COMMENT') 133 read_and_call_while(uhandle, consumer.noevent, blank=1) 134 135 while 1: 136 for kw in CKEYWORDS: 137 if attempt_read_and_call(uhandle,getattr(consumer,kw.lower()),start=kw+':'): 138 break # recognized keyword: end for loop and do another while 139 else: 140 break # no keywords found: end while loop 141 142 read_and_call_while(uhandle, consumer.noevent, blank=1) 143 read_and_call(uhandle, consumer.noevent, start = 'END_COMMENT')
144
145 - def _scan_dna(self, uhandle, consumer):
146 while 1: 147 line = uhandle.readline() 148 if is_blank_line(line) or line == 'BEGIN_DNA\n': 149 continue 150 elif line == 'END_DNA\n': 151 break 152 consumer.read_dna(line)
153 154
155 -class _RecordConsumer(AbstractConsumer):
156 """Consumer that converts a PHD record to a Record object."""
157 - def __init__(self):
158 self.data = None
159
160 - def begin_sequence(self, line):
161 self.data = Record() 162 self.data.file_name = line[15:].rstrip()
163
164 - def end_sequence(self):
165 self.data.seq = Seq.Seq(''.join([n[0] for n in self.data.sites]), IUPAC.IUPACAmbiguousDNA()) 166 if self.data.comments['trim'] is not None : 167 first = self.data.comments['trim'][0] 168 last = self.data.comments['trim'][1] 169 self.data.seq_trimmed = self.data.seq[first:last]
170
171 - def chromat_file(self, line):
172 self.data.comments['chromat_file'] = line[13:-1].strip()
173
174 - def abi_thumbprint(self, line):
175 self.data.comments['abi_thumbprint'] = int(line[15:-1].strip())
176
177 - def phred_version(self, line):
178 self.data.comments['phred_version'] = line[14:-1].strip()
179
180 - def call_method(self, line):
181 self.data.comments['call_method'] = line[12:-1].strip()
182
183 - def quality_levels(self, line):
184 self.data.comments['quality_levels'] = int(line[15:-1].strip())
185
186 - def time(self, line):
187 self.data.comments['time'] = line[5:-1].strip()
188
189 - def trace_array_min_index(self, line):
190 self.data.comments['trace_array_min_index'] = int(line[22:-1].strip())
191
192 - def trace_array_max_index(self, line):
193 self.data.comments['trace_array_max_index'] = int(line[22:-1].strip())
194
195 - def trim(self, line):
196 first, last, prob = line[5:-1].split() 197 self.data.comments['trim'] = (int(first), int(last), float(prob))
198
199 - def trace_peak_area_ratio(self, line):
200 self.data.comments['trace_peak_area_ratio'] = float(line[22:-1].strip())
201
202 - def chem(self, line):
203 self.data.comments['chem'] = line[5:-1].strip()
204
205 - def dye(self, line):
206 self.data.comments['dye'] = line[4:-1].strip()
207
208 - def read_dna(self, line):
209 base, quality, location = line.split() 210 self.data.sites.append((base, quality, location))
211 212 if __name__ == "__main__" : 213 print "Quick self test" 214 #Test the iterator, 215 handle = open("../../Tests/Phd/phd1") 216 recordparser = RecordParser() 217 iterator = Iterator(handle,recordparser) 218 for record in iterator : 219 print record.file_name, len(record.seq) 220 handle.close() 221 print "Done" 222