Coverage for /opt/hostedtoolcache/Python/3.11.10/x64/lib/python3.11/site-packages/pysagas/cfd/cart3d.py: 0%
226 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-30 04:27 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-30 04:27 +0000
1import os
2import time
3import shutil
4import subprocess
5import numpy as np
6from pysagas import FlowState, Vector
7from typing import Dict, List, Optional
8from pysagas.optimisation.cart3d.utilities import C3DPrep
9from pysagas.sensitivity import Cart3DSensitivityCalculator
10from pysagas.cfd.solver import FlowSolver, FlowResults, SensitivityResults
13class Cart3D(FlowSolver):
14 method = "Cart3D"
16 _C3D_errors = [
17 "==> ADAPT failed",
18 "Check cart3d.out in AD_A_J for more clues",
19 "==> adjointErrorEst_quad failed again, status = 1",
20 "ERROR: CUBES failed",
21 "ERROR: ADAPT failed with status = 1",
22 "ERROR",
23 "Try decreasing etol, increasing mesh growth or switching to 'a' cycles",
24 ]
25 _C3D_done = "Done aero.csh"
27 def __init__(
28 self,
29 stl_files: List[str],
30 aero_csh: str = "aero.csh",
31 input_cntl: str = "input.cntl",
32 freestream: Optional[FlowState] = None,
33 verbosity: Optional[int] = 1,
34 c3d_log_name: str = "C3D_log",
35 c3d_info_file: str = None,
36 write_config_xml: bool = True,
37 ref_area: Optional[float] = 1.0,
38 ref_length: Optional[float] = 1.0,
39 ) -> None:
40 """Instantiate the Cart3D solver wrapper.
42 Parameters
43 -----------
44 stl_files : list[str]
45 A list of filepaths to the STL files to intersect and
46 simulate.
48 aero_csh : str
49 The filepath to the reference aero.csh file. The default
50 option will look in the current working directory.
52 c3d_log_name : str, optional
53 The name to use for the Cart3D logfile. The default is C3D_log.
55 write_config_xml : bool, optional
56 A boolean flag to write the Cart3D Config.xml file when running
57 comp2tri. If the geometry may be perturbed to a state where one
58 component is no longer part of the wetted surface, writing the
59 comp2tri file can cause set-up issues, and so it can be beneficial
60 to turn it off. The default is True.
61 """
62 # Save STL files for processing
63 self.stl_files = stl_files
64 self._root_dir = os.getcwd()
65 self._aerocsh_path = os.path.join(self._root_dir, aero_csh)
66 self._inputcntl_path = os.path.join(self._root_dir, input_cntl)
68 # Instantiate helper
69 self.c3d_log_name = c3d_log_name
70 self._prepper = C3DPrep(
71 logfile=c3d_log_name,
72 info_file=c3d_info_file,
73 write_config=write_config_xml,
74 )
76 # Dimensional properties
77 self.ref_area = ref_area
78 self.ref_length = ref_length
80 # Private attributes
81 self._sim_dir = None
82 self._killed = False
83 self._donefile = None
85 # Complete instantiation
86 super().__init__(None, freestream, verbosity)
88 def solve(
89 self,
90 freestream: Optional[FlowState] = None,
91 mach: Optional[float] = None,
92 aoa: Optional[float] = None,
93 ) -> FlowResults:
94 already_run = super().solve(freestream=freestream, mach=mach, aoa=aoa)
95 if already_run:
96 # Already have a result
97 if self.verbosity > 1:
98 print("Attention: this flowstate has already been solved.")
99 result = self.flow_result
101 else:
102 # Get flow
103 flow = self._last_solve_freestream
104 loads = self._run_sim(mach=flow.M, aoa=flow.aoa)
106 # TODO - need to be careful and aware of the reference coordinates here
107 C_F = np.array(
108 [loads["C_A-entire"], loads["C_N-entire"], loads["C_Y-entire"]]
109 )
110 C_M = np.array(
111 [loads["C_M_x-entire"], loads["C_M_z-entire"], -loads["C_M_y-entire"]]
112 )
114 # Dimensionalise results
115 net_force = C_F * flow.q * self.ref_area
116 net_moment = C_M * flow.q * self.ref_area * self.ref_length
118 # Convert to vectors
119 net_force = Vector.from_coordinates(net_force)
120 net_moment = Vector.from_coordinates(net_moment)
122 # Construct results
123 result = FlowResults(
124 freestream=flow, net_force=net_force, net_moment=net_moment
125 )
127 # Save
128 self.flow_result = result
130 if self.verbosity > 1:
131 print("Cart3D done.")
133 return result
135 @property
136 def status(self):
137 """Returns the status of the wrapper."""
138 running, error = self._c3d_running(
139 os.path.join(self._sim_dir, self.c3d_log_name), self._donefile
140 )
141 if error:
142 print(f"Failed with error {error}.")
143 else:
144 if running:
145 print("Running.")
146 else:
147 print("Complete.")
149 def stop(self):
150 """Stop running Cart3D."""
151 self._killed = True
152 if self.verbosity > 0:
153 print("Kill signal received. Stopping Cart3D.")
154 stopfile = os.path.join(self._sim_dir, "STOP")
155 with open(stopfile, "w"):
156 pass
158 def solve_sens(
159 self,
160 sensitivity_filepath: str,
161 freestream: Optional[FlowState] = None,
162 mach: Optional[float] = None,
163 aoa: Optional[float] = None,
164 ) -> SensitivityResults:
165 already_run = super().solve_sens(freestream=freestream, mach=mach, aoa=aoa)
166 if already_run:
167 # Already have a result
168 if self.verbosity > 1:
169 print("Attention: this flowstate sensitivity has already been solved.")
170 result = self.flow_sensitivity
172 else:
173 # Get flow
174 flow = self._last_sens_freestream
176 # Check that nominal flow condition has been solved already
177 if self._last_solve_freestream:
178 # Solver has run before
179 if self._last_solve_freestream != flow:
180 # Run flow solver first at the nominal flow conditions
181 if self.verbosity > 0:
182 print("Running flow solver to prepare sensitivities.")
183 self.solve(flow)
184 else:
185 if self.verbosity > 0:
186 print("Using previously run flow solution.")
187 else:
188 # Solver has not run yet at all, run it now
189 if self.verbosity > 0:
190 print("Running flow solver to prepare sensitivities.")
191 self.solve(flow)
193 # Create sensitivity wrapper
194 components_filepath = os.path.join(
195 self._sim_dir, "BEST/FLOW/Components.i.plt"
196 )
197 wrapper = Cart3DSensitivityCalculator(
198 freestream=flow,
199 sensitivity_filepath=sensitivity_filepath,
200 components_filepath=components_filepath,
201 verbosity=0,
202 )
204 # Calculate sensitivities
205 df_f, df_m = wrapper.calculate()
207 # Construct results
208 result = SensitivityResults(
209 f_sens=df_f,
210 m_sens=df_m,
211 freestream=flow,
212 )
214 # Save
215 self.flow_sensitivity = result
217 if self.verbosity > 0:
218 print(result)
220 return result
222 def running(self) -> bool:
223 """Returns True if Cart3D is actively running, else False."""
224 if self._donefile:
225 # Donefile has been assigned
226 return self._c3d_running(
227 os.path.join(self._sim_dir, self.c3d_log_name), self._donefile
228 )[0]
229 else:
230 # Donefile has not been assigned
231 return True
233 def save(self, name: str, attributes: Dict[str, list]):
234 raise NotImplementedError("Coming soon.")
236 def _prepare_sim_dir(self, sim_dir_name: dir):
237 """Prepares a new directory to run the simulation in."""
238 # Make simulation directory
239 sim_dir = os.path.join(self._root_dir, sim_dir_name)
240 run_intersect = False
241 components_filepath = os.path.join(sim_dir, "Components.i.tri")
242 if not os.path.exists(sim_dir):
243 os.mkdir(sim_dir)
244 run_intersect = True
245 intersected = False
246 else:
247 # Check for intersected file
248 run_intersect = not os.path.exists(components_filepath)
249 intersected = True
251 # Copy STL files into sim directory
252 for file in self.stl_files:
253 shutil.copyfile(
254 os.path.join(self._root_dir, file),
255 os.path.join(sim_dir, file),
256 )
258 if self.verbosity > 0:
259 print(f"Running simulation in {sim_dir}.")
261 return run_intersect, intersected
263 def _run_sim(self, mach: float, aoa: float):
264 """Run the simulation."""
265 # Prepare sim directory
266 sim_dir = os.path.join(self._root_dir, f"M{mach}A{aoa}")
267 self._sim_dir = sim_dir
268 run_intersect, intersected = self._prepare_sim_dir(sim_dir)
270 # Move into sim directory
271 os.chdir(sim_dir)
273 # Attempt simulation
274 sim_success = False
275 no_attempts = 3
276 for attempt in range(no_attempts):
277 # Run intersect
278 if run_intersect:
279 self._prepper._log(f"SHAPEOPT INTERSECT ATTEMPT {attempt+1}")
280 intersected = self._prepper.intersect_stls(stl_files=self.stl_files)
282 # Check for intersection
283 if intersected:
284 # Prepare rest of simulation directory
285 if not os.path.exists(os.path.join(sim_dir, "input.cntl")):
286 # Move files to simulation directory (including Components.i.tri)
287 self._prepper.run_autoinputs()
288 # os.system(
289 # f"mv *.tri Config.xml input.c3d preSpec.c3d.cntl {sim_dir} >> {self.c3d_log_name} 2>&1"
290 # )
292 # Copy sim files and permissions
293 for filename, fp in {
294 "input.cntl": self._inputcntl_path,
295 "aero.csh": self._aerocsh_path,
296 }.items():
297 shutil.copyfile(
298 os.path.join(self._root_dir, filename),
299 os.path.join(sim_dir, filename),
300 )
301 shutil.copymode(
302 os.path.join(self._root_dir, filename),
303 os.path.join(sim_dir, filename),
304 )
306 # Update flow conditions
307 new_input_cntl = os.path.join(sim_dir, "input.cntl")
308 self._edit_input_cntl(
309 mach=mach,
310 aoa=aoa,
311 original=self._inputcntl_path,
312 new=new_input_cntl,
313 )
315 # Run Cart3D and await result
316 target_adapt = self._infer_adapt(self._aerocsh_path)
317 c3d_donefile = os.path.join(sim_dir, target_adapt, "FLOW", "DONE")
318 self._donefile = c3d_donefile
319 run_cmd = "./aero.csh restart"
320 _restarts = 0
321 if not os.path.exists(c3d_donefile):
322 with open(self.c3d_log_name, "a") as f:
323 subprocess.run(
324 run_cmd, shell=True, stdout=f, stderr=subprocess.STDOUT
325 )
326 while not os.path.exists(c3d_donefile):
327 # Wait...
328 time.sleep(5)
330 # Check for C3D failure
331 running, e = self._c3d_running(
332 c3d_log_name=self.c3d_log_name, donefile=c3d_donefile
333 )
335 # Check for kill status
336 if self._killed:
337 if self.verbosity > 1:
338 print("Kill signal received, exiting simulation loop.")
339 sim_success = False
340 break
342 if not running:
343 # C3D failed, try restart it
344 if _restarts > 3:
345 return False
347 if self.verbosity > 0:
348 print(f"Warning: Cart3D failed with error '{e}'")
350 f = open(self.c3d_log_name, "a")
351 subprocess.run(
352 run_cmd, shell=True, stdout=f, stderr=subprocess.STDOUT
353 )
354 f.close()
355 _restarts += 1
357 sim_success = True
358 break
360 if sim_success:
361 # Sim finished successfully, read loads file
362 loads_dict = self._read_c3d_loads(
363 os.path.join(sim_dir, "BEST/FLOW/loadsCC.dat"),
364 tag="entire",
365 )
367 else:
368 loads_dict = None
370 # Change back to working directory
371 os.chdir(self._root_dir)
373 return loads_dict
375 @staticmethod
376 def _c3d_running(c3d_log_name: str, donefile: str) -> bool:
377 """Watches the Cart3D log file to check for errors and return False
378 if Cart3D has stopped running.
380 Returns
381 -------
382 running : bool
383 Whether Cart3D is still running.
385 error : str
386 The error message, if an error has occured.
387 """
388 if os.path.exists(donefile):
389 return False, None
391 # DONE file doesn't exist, read logfile for clues
392 if os.path.exists(c3d_log_name):
393 try:
394 with open(c3d_log_name) as f:
395 # Get last line in log file
396 for line in f:
397 pass
399 # Check if if it is in the known errors
400 for e in Cart3D._C3D_errors:
401 if e in line:
402 return False, e
404 # Check if it says done
405 if Cart3D._C3D_done in line:
406 return False, None
407 except:
408 # Threading read error
409 pass
411 # No errors
412 return True, None
414 @staticmethod
415 def _infer_adapt(aero_csh_fp: str) -> str:
416 """Returns the target adapt number by reading the aero.csh file."""
417 with open(aero_csh_fp, "r") as f:
418 lines = f.readlines()
420 for line in lines:
421 if line.find("set n_adapt_cycles") != -1:
422 return f"adapt{int(line.split('=')[-1]):02d}"
424 @staticmethod
425 def _edit_input_cntl(mach: float, aoa: float, original: str, new: str):
426 """Edits the Mach number and angle of attack in the input.cntl file."""
427 with open(new, "w+") as new_file:
428 with open(original, "r") as old_file:
429 for line in old_file:
430 if line.startswith("Mach"):
431 line = f"Mach {mach}\n"
432 elif line.startswith("alpha"):
433 line = f"alpha {aoa}\n"
435 # Write line
436 new_file.write(line)
438 @staticmethod
439 def _read_c3d_loads(
440 loadsCC_filepath: str,
441 b_frame: bool = True,
442 v_frame: bool = True,
443 moments: bool = True,
444 tag: str = None,
445 ) -> dict:
446 load_dict = {}
447 with open(loadsCC_filepath, "r") as file:
448 for line in file:
449 if line[0] == "#":
450 # Commented line, skip
451 continue
453 # Remove linebreaks and multiple spaces
454 line = " ".join(line.split())
455 words = line.split(":")
457 if len(words) == 0 or len(words) == 1: # skip if empty line
458 continue
460 text = words[0]
461 number = float(words[1])
462 word = text.split(" ")
463 _tag = word[0]
465 if (not tag) or (tag is not None and tag == _tag):
466 if not tag:
467 tag = _tag
468 coeff = word[-1]
469 short_coeff = coeff[1:4]
470 long_coeff = coeff[1:6]
471 if b_frame is True:
472 if short_coeff in ["C_A", "C_Y", "C_N"]: # get force in b_frame
473 load_dict["{0}-{1}".format(short_coeff, tag)] = number
474 if v_frame is True:
475 if short_coeff in ["C_D", "C_S", "C_L"]: # get force in v_frame
476 load_dict["{0}-{1}".format(short_coeff, tag)] = number
477 if moments is True:
478 if short_coeff in [
479 "C_l",
480 "C_m",
481 "C_n",
482 "C_M",
483 ]: # get moment coeff
484 load_dict["{0}-{1}".format(short_coeff, tag)] = number
485 if b_frame is True:
486 if long_coeff in [
487 "C_M_x",
488 "C_M_y",
489 "C_M_z",
490 ]: # get force in b_frame
491 load_dict["{0}-{1}".format(long_coeff, tag)] = number
493 return load_dict