Coverage for /opt/hostedtoolcache/Python/3.11.10/x64/lib/python3.11/site-packages/pysagas/cfd/cart3d.py: 0%

226 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-30 04:27 +0000

1import os 

2import time 

3import shutil 

4import subprocess 

5import numpy as np 

6from pysagas import FlowState, Vector 

7from typing import Dict, List, Optional 

8from pysagas.optimisation.cart3d.utilities import C3DPrep 

9from pysagas.sensitivity import Cart3DSensitivityCalculator 

10from pysagas.cfd.solver import FlowSolver, FlowResults, SensitivityResults 

11 

12 

13class Cart3D(FlowSolver): 

14 method = "Cart3D" 

15 

16 _C3D_errors = [ 

17 "==> ADAPT failed", 

18 "Check cart3d.out in AD_A_J for more clues", 

19 "==> adjointErrorEst_quad failed again, status = 1", 

20 "ERROR: CUBES failed", 

21 "ERROR: ADAPT failed with status = 1", 

22 "ERROR", 

23 "Try decreasing etol, increasing mesh growth or switching to 'a' cycles", 

24 ] 

25 _C3D_done = "Done aero.csh" 

26 

27 def __init__( 

28 self, 

29 stl_files: List[str], 

30 aero_csh: str = "aero.csh", 

31 input_cntl: str = "input.cntl", 

32 freestream: Optional[FlowState] = None, 

33 verbosity: Optional[int] = 1, 

34 c3d_log_name: str = "C3D_log", 

35 c3d_info_file: str = None, 

36 write_config_xml: bool = True, 

37 ref_area: Optional[float] = 1.0, 

38 ref_length: Optional[float] = 1.0, 

39 ) -> None: 

40 """Instantiate the Cart3D solver wrapper. 

41 

42 Parameters 

43 ----------- 

44 stl_files : list[str] 

45 A list of filepaths to the STL files to intersect and 

46 simulate. 

47 

48 aero_csh : str 

49 The filepath to the reference aero.csh file. The default 

50 option will look in the current working directory. 

51 

52 c3d_log_name : str, optional 

53 The name to use for the Cart3D logfile. The default is C3D_log. 

54 

55 write_config_xml : bool, optional 

56 A boolean flag to write the Cart3D Config.xml file when running 

57 comp2tri. If the geometry may be perturbed to a state where one 

58 component is no longer part of the wetted surface, writing the 

59 comp2tri file can cause set-up issues, and so it can be beneficial 

60 to turn it off. The default is True. 

61 """ 

62 # Save STL files for processing 

63 self.stl_files = stl_files 

64 self._root_dir = os.getcwd() 

65 self._aerocsh_path = os.path.join(self._root_dir, aero_csh) 

66 self._inputcntl_path = os.path.join(self._root_dir, input_cntl) 

67 

68 # Instantiate helper 

69 self.c3d_log_name = c3d_log_name 

70 self._prepper = C3DPrep( 

71 logfile=c3d_log_name, 

72 info_file=c3d_info_file, 

73 write_config=write_config_xml, 

74 ) 

75 

76 # Dimensional properties 

77 self.ref_area = ref_area 

78 self.ref_length = ref_length 

79 

80 # Private attributes 

81 self._sim_dir = None 

82 self._killed = False 

83 self._donefile = None 

84 

85 # Complete instantiation 

86 super().__init__(None, freestream, verbosity) 

87 

88 def solve( 

89 self, 

90 freestream: Optional[FlowState] = None, 

91 mach: Optional[float] = None, 

92 aoa: Optional[float] = None, 

93 ) -> FlowResults: 

94 already_run = super().solve(freestream=freestream, mach=mach, aoa=aoa) 

95 if already_run: 

96 # Already have a result 

97 if self.verbosity > 1: 

98 print("Attention: this flowstate has already been solved.") 

99 result = self.flow_result 

100 

101 else: 

102 # Get flow 

103 flow = self._last_solve_freestream 

104 loads = self._run_sim(mach=flow.M, aoa=flow.aoa) 

105 

106 # TODO - need to be careful and aware of the reference coordinates here 

107 C_F = np.array( 

108 [loads["C_A-entire"], loads["C_N-entire"], loads["C_Y-entire"]] 

109 ) 

110 C_M = np.array( 

111 [loads["C_M_x-entire"], loads["C_M_z-entire"], -loads["C_M_y-entire"]] 

112 ) 

113 

114 # Dimensionalise results 

115 net_force = C_F * flow.q * self.ref_area 

116 net_moment = C_M * flow.q * self.ref_area * self.ref_length 

117 

118 # Convert to vectors 

119 net_force = Vector.from_coordinates(net_force) 

120 net_moment = Vector.from_coordinates(net_moment) 

121 

122 # Construct results 

123 result = FlowResults( 

124 freestream=flow, net_force=net_force, net_moment=net_moment 

125 ) 

126 

127 # Save 

128 self.flow_result = result 

129 

130 if self.verbosity > 1: 

131 print("Cart3D done.") 

132 

133 return result 

134 

135 @property 

136 def status(self): 

137 """Returns the status of the wrapper.""" 

138 running, error = self._c3d_running( 

139 os.path.join(self._sim_dir, self.c3d_log_name), self._donefile 

140 ) 

141 if error: 

142 print(f"Failed with error {error}.") 

143 else: 

144 if running: 

145 print("Running.") 

146 else: 

147 print("Complete.") 

148 

149 def stop(self): 

150 """Stop running Cart3D.""" 

151 self._killed = True 

152 if self.verbosity > 0: 

153 print("Kill signal received. Stopping Cart3D.") 

154 stopfile = os.path.join(self._sim_dir, "STOP") 

155 with open(stopfile, "w"): 

156 pass 

157 

158 def solve_sens( 

159 self, 

160 sensitivity_filepath: str, 

161 freestream: Optional[FlowState] = None, 

162 mach: Optional[float] = None, 

163 aoa: Optional[float] = None, 

164 ) -> SensitivityResults: 

165 already_run = super().solve_sens(freestream=freestream, mach=mach, aoa=aoa) 

166 if already_run: 

167 # Already have a result 

168 if self.verbosity > 1: 

169 print("Attention: this flowstate sensitivity has already been solved.") 

170 result = self.flow_sensitivity 

171 

172 else: 

173 # Get flow 

174 flow = self._last_sens_freestream 

175 

176 # Check that nominal flow condition has been solved already 

177 if self._last_solve_freestream: 

178 # Solver has run before 

179 if self._last_solve_freestream != flow: 

180 # Run flow solver first at the nominal flow conditions 

181 if self.verbosity > 0: 

182 print("Running flow solver to prepare sensitivities.") 

183 self.solve(flow) 

184 else: 

185 if self.verbosity > 0: 

186 print("Using previously run flow solution.") 

187 else: 

188 # Solver has not run yet at all, run it now 

189 if self.verbosity > 0: 

190 print("Running flow solver to prepare sensitivities.") 

191 self.solve(flow) 

192 

193 # Create sensitivity wrapper 

194 components_filepath = os.path.join( 

195 self._sim_dir, "BEST/FLOW/Components.i.plt" 

196 ) 

197 wrapper = Cart3DSensitivityCalculator( 

198 freestream=flow, 

199 sensitivity_filepath=sensitivity_filepath, 

200 components_filepath=components_filepath, 

201 verbosity=0, 

202 ) 

203 

204 # Calculate sensitivities 

205 df_f, df_m = wrapper.calculate() 

206 

207 # Construct results 

208 result = SensitivityResults( 

209 f_sens=df_f, 

210 m_sens=df_m, 

211 freestream=flow, 

212 ) 

213 

214 # Save 

215 self.flow_sensitivity = result 

216 

217 if self.verbosity > 0: 

218 print(result) 

219 

220 return result 

221 

222 def running(self) -> bool: 

223 """Returns True if Cart3D is actively running, else False.""" 

224 if self._donefile: 

225 # Donefile has been assigned 

226 return self._c3d_running( 

227 os.path.join(self._sim_dir, self.c3d_log_name), self._donefile 

228 )[0] 

229 else: 

230 # Donefile has not been assigned 

231 return True 

232 

233 def save(self, name: str, attributes: Dict[str, list]): 

234 raise NotImplementedError("Coming soon.") 

235 

236 def _prepare_sim_dir(self, sim_dir_name: dir): 

237 """Prepares a new directory to run the simulation in.""" 

238 # Make simulation directory 

239 sim_dir = os.path.join(self._root_dir, sim_dir_name) 

240 run_intersect = False 

241 components_filepath = os.path.join(sim_dir, "Components.i.tri") 

242 if not os.path.exists(sim_dir): 

243 os.mkdir(sim_dir) 

244 run_intersect = True 

245 intersected = False 

246 else: 

247 # Check for intersected file 

248 run_intersect = not os.path.exists(components_filepath) 

249 intersected = True 

250 

251 # Copy STL files into sim directory 

252 for file in self.stl_files: 

253 shutil.copyfile( 

254 os.path.join(self._root_dir, file), 

255 os.path.join(sim_dir, file), 

256 ) 

257 

258 if self.verbosity > 0: 

259 print(f"Running simulation in {sim_dir}.") 

260 

261 return run_intersect, intersected 

262 

263 def _run_sim(self, mach: float, aoa: float): 

264 """Run the simulation.""" 

265 # Prepare sim directory 

266 sim_dir = os.path.join(self._root_dir, f"M{mach}A{aoa}") 

267 self._sim_dir = sim_dir 

268 run_intersect, intersected = self._prepare_sim_dir(sim_dir) 

269 

270 # Move into sim directory 

271 os.chdir(sim_dir) 

272 

273 # Attempt simulation 

274 sim_success = False 

275 no_attempts = 3 

276 for attempt in range(no_attempts): 

277 # Run intersect 

278 if run_intersect: 

279 self._prepper._log(f"SHAPEOPT INTERSECT ATTEMPT {attempt+1}") 

280 intersected = self._prepper.intersect_stls(stl_files=self.stl_files) 

281 

282 # Check for intersection 

283 if intersected: 

284 # Prepare rest of simulation directory 

285 if not os.path.exists(os.path.join(sim_dir, "input.cntl")): 

286 # Move files to simulation directory (including Components.i.tri) 

287 self._prepper.run_autoinputs() 

288 # os.system( 

289 # f"mv *.tri Config.xml input.c3d preSpec.c3d.cntl {sim_dir} >> {self.c3d_log_name} 2>&1" 

290 # ) 

291 

292 # Copy sim files and permissions 

293 for filename, fp in { 

294 "input.cntl": self._inputcntl_path, 

295 "aero.csh": self._aerocsh_path, 

296 }.items(): 

297 shutil.copyfile( 

298 os.path.join(self._root_dir, filename), 

299 os.path.join(sim_dir, filename), 

300 ) 

301 shutil.copymode( 

302 os.path.join(self._root_dir, filename), 

303 os.path.join(sim_dir, filename), 

304 ) 

305 

306 # Update flow conditions 

307 new_input_cntl = os.path.join(sim_dir, "input.cntl") 

308 self._edit_input_cntl( 

309 mach=mach, 

310 aoa=aoa, 

311 original=self._inputcntl_path, 

312 new=new_input_cntl, 

313 ) 

314 

315 # Run Cart3D and await result 

316 target_adapt = self._infer_adapt(self._aerocsh_path) 

317 c3d_donefile = os.path.join(sim_dir, target_adapt, "FLOW", "DONE") 

318 self._donefile = c3d_donefile 

319 run_cmd = "./aero.csh restart" 

320 _restarts = 0 

321 if not os.path.exists(c3d_donefile): 

322 with open(self.c3d_log_name, "a") as f: 

323 subprocess.run( 

324 run_cmd, shell=True, stdout=f, stderr=subprocess.STDOUT 

325 ) 

326 while not os.path.exists(c3d_donefile): 

327 # Wait... 

328 time.sleep(5) 

329 

330 # Check for C3D failure 

331 running, e = self._c3d_running( 

332 c3d_log_name=self.c3d_log_name, donefile=c3d_donefile 

333 ) 

334 

335 # Check for kill status 

336 if self._killed: 

337 if self.verbosity > 1: 

338 print("Kill signal received, exiting simulation loop.") 

339 sim_success = False 

340 break 

341 

342 if not running: 

343 # C3D failed, try restart it 

344 if _restarts > 3: 

345 return False 

346 

347 if self.verbosity > 0: 

348 print(f"Warning: Cart3D failed with error '{e}'") 

349 

350 f = open(self.c3d_log_name, "a") 

351 subprocess.run( 

352 run_cmd, shell=True, stdout=f, stderr=subprocess.STDOUT 

353 ) 

354 f.close() 

355 _restarts += 1 

356 

357 sim_success = True 

358 break 

359 

360 if sim_success: 

361 # Sim finished successfully, read loads file 

362 loads_dict = self._read_c3d_loads( 

363 os.path.join(sim_dir, "BEST/FLOW/loadsCC.dat"), 

364 tag="entire", 

365 ) 

366 

367 else: 

368 loads_dict = None 

369 

370 # Change back to working directory 

371 os.chdir(self._root_dir) 

372 

373 return loads_dict 

374 

375 @staticmethod 

376 def _c3d_running(c3d_log_name: str, donefile: str) -> bool: 

377 """Watches the Cart3D log file to check for errors and return False 

378 if Cart3D has stopped running. 

379 

380 Returns 

381 ------- 

382 running : bool 

383 Whether Cart3D is still running. 

384 

385 error : str 

386 The error message, if an error has occured. 

387 """ 

388 if os.path.exists(donefile): 

389 return False, None 

390 

391 # DONE file doesn't exist, read logfile for clues 

392 if os.path.exists(c3d_log_name): 

393 try: 

394 with open(c3d_log_name) as f: 

395 # Get last line in log file 

396 for line in f: 

397 pass 

398 

399 # Check if if it is in the known errors 

400 for e in Cart3D._C3D_errors: 

401 if e in line: 

402 return False, e 

403 

404 # Check if it says done 

405 if Cart3D._C3D_done in line: 

406 return False, None 

407 except: 

408 # Threading read error 

409 pass 

410 

411 # No errors 

412 return True, None 

413 

414 @staticmethod 

415 def _infer_adapt(aero_csh_fp: str) -> str: 

416 """Returns the target adapt number by reading the aero.csh file.""" 

417 with open(aero_csh_fp, "r") as f: 

418 lines = f.readlines() 

419 

420 for line in lines: 

421 if line.find("set n_adapt_cycles") != -1: 

422 return f"adapt{int(line.split('=')[-1]):02d}" 

423 

424 @staticmethod 

425 def _edit_input_cntl(mach: float, aoa: float, original: str, new: str): 

426 """Edits the Mach number and angle of attack in the input.cntl file.""" 

427 with open(new, "w+") as new_file: 

428 with open(original, "r") as old_file: 

429 for line in old_file: 

430 if line.startswith("Mach"): 

431 line = f"Mach {mach}\n" 

432 elif line.startswith("alpha"): 

433 line = f"alpha {aoa}\n" 

434 

435 # Write line 

436 new_file.write(line) 

437 

438 @staticmethod 

439 def _read_c3d_loads( 

440 loadsCC_filepath: str, 

441 b_frame: bool = True, 

442 v_frame: bool = True, 

443 moments: bool = True, 

444 tag: str = None, 

445 ) -> dict: 

446 load_dict = {} 

447 with open(loadsCC_filepath, "r") as file: 

448 for line in file: 

449 if line[0] == "#": 

450 # Commented line, skip 

451 continue 

452 

453 # Remove linebreaks and multiple spaces 

454 line = " ".join(line.split()) 

455 words = line.split(":") 

456 

457 if len(words) == 0 or len(words) == 1: # skip if empty line 

458 continue 

459 

460 text = words[0] 

461 number = float(words[1]) 

462 word = text.split(" ") 

463 _tag = word[0] 

464 

465 if (not tag) or (tag is not None and tag == _tag): 

466 if not tag: 

467 tag = _tag 

468 coeff = word[-1] 

469 short_coeff = coeff[1:4] 

470 long_coeff = coeff[1:6] 

471 if b_frame is True: 

472 if short_coeff in ["C_A", "C_Y", "C_N"]: # get force in b_frame 

473 load_dict["{0}-{1}".format(short_coeff, tag)] = number 

474 if v_frame is True: 

475 if short_coeff in ["C_D", "C_S", "C_L"]: # get force in v_frame 

476 load_dict["{0}-{1}".format(short_coeff, tag)] = number 

477 if moments is True: 

478 if short_coeff in [ 

479 "C_l", 

480 "C_m", 

481 "C_n", 

482 "C_M", 

483 ]: # get moment coeff 

484 load_dict["{0}-{1}".format(short_coeff, tag)] = number 

485 if b_frame is True: 

486 if long_coeff in [ 

487 "C_M_x", 

488 "C_M_y", 

489 "C_M_z", 

490 ]: # get force in b_frame 

491 load_dict["{0}-{1}".format(long_coeff, tag)] = number 

492 

493 return load_dict