1
2
3
4
5
6
7 """
8 This modules allows for asynchronous execution of Fdist and
9 spliting of loads.
10
11 FDistAsync Allows for the execution of FDist.
12
13 SplitFDist splits a single Fdist execution in several, taking advantage
14 of multi-core architectures.
15
16 """
17
18 import os
19 import thread
20 from time import sleep
21 from Bio.PopGen.Async import Local
22 from Bio.PopGen.FDist.Controller import FDistController
23
25 """Asynchronous FDist execution.
26 """
27
28 - def __init__(self, fdist_dir = "", ext = None):
29 """Constructor.
30
31 Parameters:
32 fdist_dir - Where fdist can be found, if = "", then it
33 should be on the path.
34 ext - Extension of binary names (e.g. nothing on Unix,
35 ".exe" on Windows
36 """
37 FDistController.__init__(self, fdist_dir, ext)
38
39 - def run_job(self, parameters, input_files):
40 """Runs FDist asynchronously.
41
42 Gets typical Fdist parameters from a dictionary and
43 makes a "normal" call. This is run, normally, inside
44 a separate thread.
45 """
46 npops = parameters['npops']
47 nsamples = parameters['nsamples']
48 fst = parameters['fst']
49 sample_size = parameters['sample_size']
50 mut = parameters.get('mut', 0)
51 num_sims = parameters.get('num_sims', 20000)
52 data_dir = parameters.get('data_dir', '.')
53 fst = self.run_fdist(npops, nsamples, fst, sample_size,
54 mut, num_sims, data_dir)
55 output_files = {}
56 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r')
57 return fst, output_files
58
60 """Splits a FDist run.
61
62 The idea is to split a certain number of simulations in smaller
63 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This
64 allows to run simulations in parallel, thus taking advantage
65 of multi-core CPUs.
66
67 Each SplitFDist object can only be used to run a single FDist
68 simulation.
69 """
70 - def __init__(self, report_fun = None,
71 num_thr = 2, split_size = 1000, fdist_dir = '', ext = None):
72 """Constructor.
73
74 Parameters:
75 report_fun - Function that is called when a single packet is
76 run, it should have a single parameter: Fst.
77 num_thr - Number of desired threads, typically the number
78 of cores.
79 split_size - Size that a full simulation will be split in.
80 ext - Binary extension name (e.g. nothing on Unix, '.exe' on
81 Windows).
82 """
83 self.async = Local.Local(num_thr)
84 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext)
85 self.report_fun = report_fun
86 self.split_size = split_size
87
88
90 """Monitors and reports (using report_fun) execution.
91
92 Every time a partial simulation ends, calls report_fun.
93 IMPORTANT: monitor calls can be concurrent with other
94 events, ie, a tasks might end while report_fun is being
95 called. This means that report_fun should be consider that
96 other events might be happening while it is running (it
97 can call acquire/release if necessary).
98 """
99 while(True):
100 sleep(1)
101 self.async.access_ds.acquire()
102 keys = self.async.done.keys()[:]
103 self.async.access_ds.release()
104 for done in keys:
105 self.async.access_ds.acquire()
106 fst, files = self.async.done[done]
107 del self.async.done[done]
108 out_dat = files['out.dat']
109 f = open(self.data_dir + os.sep + 'out.dat','a')
110 f.writelines(out_dat.readlines())
111 f.close()
112 out_dat.close()
113 self.async.access_ds.release()
114 for file in os.listdir(self.parts[done]):
115 os.remove (self.parts[done] + os.sep + file)
116 os.rmdir(self.parts[done])
117
118 if self.report_fun:
119 self.report_fun(fst)
120 self.async.access_ds.acquire()
121 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \
122 and len(self.async.done) == 0:
123 break
124 self.async.access_ds.release()
125
126
127
128
130 """Allows the external acquisition of the lock.
131 """
132 self.async.access_ds.acquire()
133
135 """Allows the external release of the lock.
136 """
137 self.async.access_ds.release()
138
139
140 - def run_fdist(self, npops, nsamples, fst, sample_size,
141 mut = 0, num_sims = 20000, data_dir='.'):
142 """Runs FDist.
143
144 Parameters can be seen on FDistController.run_fdist.
145
146 It will split a single execution in several parts and
147 create separated data directories.
148 """
149 num_parts = num_sims/self.split_size
150 self.parts = {}
151 self.data_dir = data_dir
152 for directory in range(num_parts):
153 full_path = data_dir + os.sep + str(directory)
154 try:
155 os.mkdir(full_path)
156 except OSError:
157 pass
158 id = self.async.run_program('fdist', {
159 'npops' : npops,
160 'nsamples' : nsamples,
161 'fst' : fst,
162 'sample_size' : sample_size,
163 'mut' : mut,
164 'num_sims' : self.split_size,
165 'data_dir' : full_path
166 }, {})
167 self.parts[id] = full_path
168 thread.start_new_thread(self.monitor, ())
169