Package Bio :: Package SeqIO :: Module InsdcIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.InsdcIO

  1  # Copyright 2007-2009 by Peter Cock.  All rights reserved. 
  2  # 
  3  # This code is part of the Biopython distribution and governed by its 
  4  # license.  Please see the LICENSE file that should have been included 
  5  # as part of this package.. 
  6   
  7  """Bio.SeqIO support for the "genbank" and "embl" file formats. 
  8   
  9  You are expected to use this module via the Bio.SeqIO functions. 
 10  Note that internally this module calls Bio.GenBank to do the actual 
 11  parsing of both GenBank and EMBL files. 
 12   
 13  See also: 
 14   
 15  International Nucleotide Sequence Database Collaboration 
 16  http://www.insdc.org/ 
 17    
 18  GenBank 
 19  http://www.ncbi.nlm.nih.gov/Genbank/ 
 20   
 21  EMBL Nucleotide Sequence Database 
 22  http://www.ebi.ac.uk/embl/ 
 23   
 24  DDBJ (DNA Data Bank of Japan) 
 25  http://www.ddbj.nig.ac.jp/ 
 26  """ 
 27   
 28  from Bio.Seq import UnknownSeq 
 29  from Bio.GenBank.Scanner import GenBankScanner, EmblScanner 
 30  from Bio import Alphabet 
 31  from Interfaces import SequentialSequenceWriter 
 32   
 33  # NOTE 
 34  # ==== 
 35  # The "brains" for parsing GenBank and EMBL files (and any 
 36  # other flat file variants from the INSDC in future) is in 
 37  # Bio.GenBank.Scanner (plus the _FeatureConsumer in Bio.GenBank) 
 38   
39 -def GenBankIterator(handle) :
40 """Breaks up a Genbank file into SeqRecord objects. 41 42 Every section from the LOCUS line to the terminating // becomes 43 a single SeqRecord with associated annotation and features. 44 45 Note that for genomes or chromosomes, there is typically only 46 one record.""" 47 #This calls a generator function: 48 return GenBankScanner(debug=0).parse_records(handle)
49
50 -def EmblIterator(handle) :
51 """Breaks up an EMBL file into SeqRecord objects. 52 53 Every section from the LOCUS line to the terminating // becomes 54 a single SeqRecord with associated annotation and features. 55 56 Note that for genomes or chromosomes, there is typically only 57 one record.""" 58 #This calls a generator function: 59 return EmblScanner(debug=0).parse_records(handle)
60
61 -def GenBankCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein) :
62 """Breaks up a Genbank file into SeqRecord objects for each CDS feature. 63 64 Every section from the LOCUS line to the terminating // can contain 65 many CDS features. These are returned as with the stated amino acid 66 translation sequence (if given). 67 """ 68 #This calls a generator function: 69 return GenBankScanner(debug=0).parse_cds_features(handle, alphabet)
70
71 -def EmblCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein) :
72 """Breaks up a EMBL file into SeqRecord objects for each CDS feature. 73 74 Every section from the LOCUS line to the terminating // can contain 75 many CDS features. These are returned as with the stated amino acid 76 translation sequence (if given). 77 """ 78 #This calls a generator function: 79 return EmblScanner(debug=0).parse_cds_features(handle, alphabet)
80
81 -class GenBankWriter(SequentialSequenceWriter) :
82 HEADER_WIDTH = 12 83 MAX_WIDTH = 80 84
85 - def _write_single_line(self, tag, text) :
86 "Used in the the 'header' of each GenBank record.""" 87 assert len(tag) < self.HEADER_WIDTH 88 assert len(text) < self.MAX_WIDTH - self.HEADER_WIDTH, \ 89 "Annotation %s too long for %s line" % (repr(text), tag) 90 self.handle.write("%s%s\n" % (tag.ljust(self.HEADER_WIDTH), 91 text.replace("\n"," ")))
92
93 - def _write_multi_line(self, tag, text) :
94 "Used in the the 'header' of each GenBank record.""" 95 #TODO - Do the line spliting while preserving white space? 96 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 97 assert len(tag) < self.HEADER_WIDTH 98 text = text.strip() 99 if len(text) < max_len : 100 self._write_single_line(tag, text) 101 return 102 103 words = text.split() 104 assert max([len(w) for w in words]) < max_len, \ 105 "Your description cannot be broken into nice lines!" 106 text = "" 107 while words and len(text) + 1 + len(words[0]) < max_len : 108 text += " " + words.pop(0) 109 text = text.strip() 110 assert len(text) < max_len 111 self._write_single_line(tag, text) 112 while words : 113 text = "" 114 while words and len(text) + 1 + len(words[0]) < max_len : 115 text += " " + words.pop(0) 116 text = text.strip() 117 assert len(text) < max_len 118 self._write_single_line("", text) 119 assert not words
120
121 - def _write_the_first_line(self, record) :
122 """Write the LOCUS line.""" 123 124 locus = record.name 125 if not locus or locus == "<unknown name>" : 126 locus = record.id 127 if not locus or locus == "<unknown id>" : 128 locus = self._get_annotation_str(record, "accession", just_first=True) 129 if len(locus) > 16 : 130 raise ValueError("Locus identifier %s is too long" % repr(locus)) 131 132 if len(record) > 99999999999 : 133 #Currently GenBank only officially support up to 350000, but 134 #the length field can take eleven digits 135 raise ValueError("Sequence too long!") 136 137 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 138 a = Alphabet._get_base_alphabet(record.seq.alphabet) 139 if not isinstance(a, Alphabet.Alphabet) : 140 raise TypeError("Invalid alphabet") 141 elif isinstance(a, Alphabet.ProteinAlphabet) : 142 units = "bp" 143 elif isinstance(a, Alphabet.NucleotideAlphabet) : 144 units = "aa" 145 else : 146 #Must be something like NucleotideAlphabet or 147 #just the generic Alphabet (default for fasta files) 148 raise ValueError("Need a Nucleotide or Protein alphabet") 149 150 #Get the molecule type 151 #TODO - record this explicitly in the parser? 152 if isinstance(a, Alphabet.ProteinAlphabet) : 153 mol_type = "" 154 elif isinstance(a, Alphabet.DNAAlphabet) : 155 mol_type = "DNA" 156 elif isinstance(a, Alphabet.RNAAlphabet) : 157 mol_type = "RNA" 158 else : 159 #Must be something like NucleotideAlphabet or 160 #just the generic Alphabet (default for fasta files) 161 raise ValueError("Need a DNA, RNA or Protein alphabet") 162 163 try : 164 division = record.annotations["data_file_division"] 165 except KeyError : 166 division = "UNK" 167 if division not in ["PRI","ROD","MAM","VRT","INV","PLN","BCT", 168 "VRL","PHG","SYN","UNA","EST","PAT","STS", 169 "GSS","HTG","HTC","ENV"] : 170 division = "UNK" 171 172 assert len(units) == 2 173 assert len(division) == 3 174 #TODO - date 175 #TODO - mol_type 176 line = "LOCUS %s %s %s %s %s 01-JAN-1980\n" \ 177 % (locus.ljust(16), 178 str(len(record)).rjust(11), 179 units, 180 mol_type.ljust(6), 181 division) 182 assert len(line) == 79+1, repr(line) #plus one for new line 183 184 assert line[12:28].rstrip() == locus, \ 185 'LOCUS line does not contain the locus at the expected position:\n' + line 186 assert line[28:29] == " " 187 assert line[29:40].lstrip() == str(len(record)), \ 188 'LOCUS line does not contain the length at the expected position:\n' + line 189 190 #Tests copied from Bio.GenBank.Scanner 191 assert line[40:44] in [' bp ', ' aa '] , \ 192 'LOCUS line does not contain size units at expected position:\n' + line 193 assert line[44:47] in [' ', 'ss-', 'ds-', 'ms-'], \ 194 'LOCUS line does not have valid strand type (Single stranded, ...):\n' + line 195 assert line[47:54].strip() == "" \ 196 or line[47:54].strip().find('DNA') != -1 \ 197 or line[47:54].strip().find('RNA') != -1, \ 198 'LOCUS line does not contain valid sequence type (DNA, RNA, ...):\n' + line 199 assert line[54:55] == ' ', \ 200 'LOCUS line does not contain space at position 55:\n' + line 201 assert line[55:63].strip() in ['','linear','circular'], \ 202 'LOCUS line does not contain valid entry (linear, circular, ...):\n' + line 203 assert line[63:64] == ' ', \ 204 'LOCUS line does not contain space at position 64:\n' + line 205 assert line[67:68] == ' ', \ 206 'LOCUS line does not contain space at position 68:\n' + line 207 assert line[70:71] == '-', \ 208 'LOCUS line does not contain - at position 71 in date:\n' + line 209 assert line[74:75] == '-', \ 210 'LOCUS line does not contain - at position 75 in date:\n' + line 211 212 self.handle.write(line)
213
214 - def _get_annotation_str(self, record, key, default=".", just_first=False) :
215 """Get an annotation dictionary entry (as a string). 216 217 Some entries are lists, in which case if just_first=True the first entry 218 is returned. If just_first=False (default) this verifies there is only 219 one entry before returning it.""" 220 try : 221 answer = record.annotations[key] 222 except KeyError : 223 return default 224 if isinstance(answer, list) : 225 if not just_first : assert len(answer) == 1 226 return str(answer[0]) 227 else : 228 return str(answer)
229
230 - def _write_sequence(self, record):
231 #Loosely based on code from Howard Salis 232 #TODO - Force lower case? 233 LETTERS_PER_LINE = 60 234 SEQUENCE_INDENT = 9 235 236 if isinstance(record.seq, UnknownSeq) : 237 #We have already recorded the length, and there is no need 238 #to record a long sequence of NNNNNNN...NNN or whatever. 239 return 240 241 data = self._get_seq_string(record) #Catches sequence being None 242 seq_len = len(data) 243 for line_number in range(0,seq_len,LETTERS_PER_LINE): 244 self.handle.write(str(line_number+1).rjust(SEQUENCE_INDENT)) 245 for words in range(line_number,min(line_number+LETTERS_PER_LINE,seq_len),10): 246 self.handle.write(" %s" % data[words:words+10]) 247 self.handle.write("\n")
248
249 - def write_record(self, record):
250 """Write a single record to the output file.""" 251 handle = self.handle 252 self._write_the_first_line(record) 253 254 accession = self._get_annotation_str(record, "accession", 255 record.id.split(".",1)[0], 256 just_first=True) 257 acc_with_version = accession 258 if record.id.startswith(accession+".") : 259 try : 260 acc_with_version = "%s.%i" \ 261 % (accession, int(record.id.split(".",1)[1])) 262 except ValueError : 263 pass 264 gi = self._get_annotation_str(record, "gi", just_first=True) 265 266 descr = record.description 267 if descr == "<unknown description>" : descr = "." 268 self._write_multi_line("DEFINITION", descr) 269 270 self._write_single_line("ACCESSION", accession) 271 if gi != "." : 272 self._write_single_line("VERSION", "%s GI:%s" % (acc_with_version,gi)) 273 else : 274 self._write_single_line("VERSION", "%s" % (acc_with_version)) 275 276 try : 277 #List of strings 278 keywords = "; ".join(record.annotations["keywords"]) 279 except KeyError : 280 keywords = "." 281 self._write_multi_line("KEYWORDS", keywords) 282 283 self._write_multi_line("SOURCE", \ 284 self._get_annotation_str(record, "source")) 285 #The ORGANISM line MUST be a single line, as any continuation is the taxonomy 286 org = self._get_annotation_str(record, "organism") 287 if len(org) > self.MAX_WIDTH - self.HEADER_WIDTH : 288 org = org[:self.MAX_WIDTH - self.HEADER_WIDTH-4]+"..." 289 self._write_single_line(" ORGANISM", org) 290 try : 291 #List of strings 292 taxonomy = "; ".join(record.annotations["taxonomy"]) 293 except KeyError : 294 taxonomy = "." 295 self._write_multi_line("", taxonomy) 296 297 #TODO - References... 298 handle.write("FEATURES Location/Qualifiers\n") 299 for feature in record.features : 300 self._write_feature(feature) 301 handle.write("ORIGIN\n") 302 self._write_sequence(record) 303 handle.write("//\n")
304
305 - def _write_feature(self, feature):
306 """Write a single SeqFeature object to features table. 307 308 Not implemented yet, but this stub exists in the short term to 309 facilitate working on writing GenBank files with a sub-class.""" 310 #TODO - Features... 311 pass
312 313 if __name__ == "__main__" : 314 print "Quick self test" 315 import os 316 from StringIO import StringIO 317
318 - def check_genbank_writer(records) :
319 handle = StringIO() 320 GenBankWriter(handle).write_file(records) 321 handle.seek(0) 322 323 records2 = list(GenBankIterator(handle)) 324 325 assert len(records) == len(records2) 326 for r1, r2 in zip(records, records2) : 327 #The SwissProt parser may leave \n in the description... 328 assert r1.description.replace("\n", " ") == r2.description 329 assert r1.id == r2.id 330 assert r1.name == r2.name 331 assert str(r1.seq) == str(r2.seq) 332 for key in ["gi", "keywords", "source", "taxonomy"] : 333 if key in r1.annotations : 334 assert r1.annotations[key] == r2.annotations[key], key 335 for key in ["organism"] : 336 if key in r1.annotations : 337 v1 = r1.annotations[key] 338 v2 = r2.annotations[key] 339 assert isinstance(v1, str) and isinstance(v2, str) 340 #SwissProt organism can be too long to record in GenBank format 341 assert v1 == v2 or \ 342 (v2.endswith("...") and v1.startswith(v2[:-3])), key
343 344 for filename in os.listdir("../../Tests/GenBank") : 345 if not filename.endswith(".gbk") and not filename.endswith(".gb") : 346 continue 347 print filename 348 349 handle = open("../../Tests/GenBank/%s" % filename) 350 records = list(GenBankIterator(handle)) 351 handle.close() 352 353 check_genbank_writer(records) 354 355 for filename in os.listdir("../../Tests/EMBL") : 356 if not filename.endswith(".embl") : 357 continue 358 print filename 359 360 handle = open("../../Tests/EMBL/%s" % filename) 361 records = list(EmblIterator(handle)) 362 handle.close() 363 364 check_genbank_writer(records) 365 366 from Bio import SeqIO 367 for filename in os.listdir("../../Tests/SwissProt") : 368 if not filename.startswith("sp") : 369 continue 370 print filename 371 372 handle = open("../../Tests/SwissProt/%s" % filename) 373 records = list(SeqIO.parse(handle,"swiss")) 374 handle.close() 375 376 check_genbank_writer(records) 377