Coverage for /opt/hostedtoolcache/Python/3.11.10/x64/lib/python3.11/site-packages/pysagas/optimisation/opm/opm.py: 0%
173 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-30 04:27 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-10-30 04:27 +0000
1import os
2import glob
3import shutil
4import pickle
5import numpy as np
6import pandas as pd
7from pysagas.cfd import OPM
8from pysagas.flow import FlowState
9from hypervehicle.generator import Generator
10from pyoptsparse import Optimizer, Optimization
11from pysagas.geometry.parsers import PyMesh, STL
12from pysagas.optimisation.optimiser import ShapeOpt
13from typing import List, Dict, Optional, Optional, Any
14from pysagas.sensitivity import GenericSensitivityCalculator
15from hypervehicle.utilities import SensitivityStudy, merge_stls
18class OPMShapeOpt(ShapeOpt):
19 def __init__(
20 self,
21 optimiser: Optimizer,
22 vehicle_generator: Generator,
23 objective_callback: callable,
24 jacobian_callback: callable,
25 freestream: FlowState,
26 geometry_filename: str,
27 A_ref: Optional[float] = 1.0,
28 optimiser_options: Optional[Dict[str, Any]] = None,
29 sensitivity_filename: str = "all_components_sensitivity.csv",
30 working_dir_name: str = "working_dir",
31 basefiles_dir_name: str = "basefiles",
32 save_evolution: bool = True,
33 verbosity: int = 1,
34 sensitivity_kwargs: dict = None,
35 parser_kwargs: dict = None,
36 ) -> None:
37 # Declare global variables
38 global fs_flow
39 global generator
40 global working_dir
41 global obj_cb, jac_cb
42 global save_geom, evo_dir
43 global sens_filename, basefiles_dir
44 global geom_filename
45 global _A_ref
46 global verbose
48 # Dynamics global variables
49 global cells, solver
50 cells = None
51 solver = None
53 # Construct paths
54 # TODO - I dont think basefiles is needed.
55 home_dir = os.getcwd()
56 basefiles_dir = os.path.join(home_dir, basefiles_dir_name)
57 working_dir = os.path.join(home_dir, working_dir_name)
58 sens_filename = sensitivity_filename
59 generator = vehicle_generator
60 save_geom = save_evolution
61 verbose = verbosity
63 # Save sensitivity kwargs
64 global _sens_kwargs, _parser_kwargs
65 _sens_kwargs = sensitivity_kwargs if sensitivity_kwargs else {}
66 _parser_kwargs = parser_kwargs if parser_kwargs else {}
68 if save_geom:
69 # Create evolution history directory
70 evo_dir = os.path.join(home_dir, "evolution")
71 if not os.path.exists(evo_dir):
72 os.mkdir(evo_dir)
74 # Save callback functions
75 obj_cb = objective_callback
76 jac_cb = jacobian_callback
78 # Construct optimisation problem
79 self.opt_problem = Optimization(
80 name="Cart3D-PySAGAS Shape Optimisation",
81 objFun=evaluate_objective,
82 sens=evaluate_gradient,
83 )
85 # Other
86 fs_flow = freestream
87 geom_filename = geometry_filename
88 _A_ref = A_ref
90 # Complete initialisation
91 optimiser_options = optimiser_options if optimiser_options else {}
92 super().__init__(optimiser, working_dir, optimiser_options)
95def evaluate_objective(x: dict):
96 if verbose > 0:
97 print("\n\033[1mEvaluating objective function\033[0m")
99 # Pre-process parameters
100 _process_parameters(x)
102 # Run flow solver
103 loads_dict = _run_simulation()
105 # Load properties
106 properties_dir = glob.glob("*_properties")
107 if properties_dir:
108 # Load volume and mass
109 volmass = pd.read_csv(
110 glob.glob(os.path.join(properties_dir[0], "*volmass.csv"))[0],
111 index_col=0,
112 )
114 # Load COG data
115 cog = np.loadtxt(
116 glob.glob(os.path.join(properties_dir[0], "*cog.txt"))[0], delimiter=","
117 )
119 # Fetch user-defined properties
120 properties_file = glob.glob(os.path.join(properties_dir[0], "*properties.csv"))
121 if len(properties_file) > 0:
122 # File exists, load it
123 properties = pd.read_csv(
124 properties_file[0],
125 index_col=0,
126 )["0"]
127 else:
128 properties = None
130 else:
131 # No properties data found
132 volmass = None
133 properties = None
134 cog = None
136 # Evaluate objective function
137 funcs = obj_cb(
138 loads_dict=loads_dict,
139 volmass=volmass,
140 properties=properties,
141 parameters=x,
142 cog=cog,
143 )
144 failed = False
146 if verbose > 0:
147 print(" Done evaluating objective function.")
149 return funcs, failed
152def evaluate_gradient(x: dict, objective: dict):
153 if verbose > 0:
154 print("\n\033[1mEvaluating gradient function\033[0m")
156 # Pre-process parameters
157 _process_parameters(x)
159 # Run flow solver for gradient information
160 loads_dict, coef_sens = _run_sensitivities()
162 # Calculate Jacobian
163 properties_dir = glob.glob("*_properties")
164 if properties_dir:
165 scalar_sens_dir = os.path.join("scalar_sensitivities")
166 vm = pd.read_csv(
167 glob.glob(os.path.join(properties_dir[0], "*volmass.csv"))[0],
168 index_col=0,
169 )
170 vm_sens = pd.read_csv(
171 os.path.join(scalar_sens_dir, "volmass_sensitivity.csv"),
172 index_col=0,
173 )[x.keys()]
175 # Fetch user-defined properties and sensitivities
176 properties_file = glob.glob(os.path.join(properties_dir[0], "*properties.csv"))
177 if len(properties_file) > 0:
178 # File exists, load it
179 properties = pd.read_csv(
180 properties_file[0],
181 index_col=0,
182 )["0"]
184 # Also load sensitivity file
185 property_sens = pd.read_csv(
186 os.path.join(scalar_sens_dir, "property_sensitivity.csv"),
187 index_col=0,
188 )[x.keys()]
189 else:
190 properties = None
191 property_sens = None
193 else:
194 # No properties data found
195 vm = None
196 vm_sens = None
197 properties = None
198 property_sens = None
200 # Call function
201 jac = jac_cb(
202 parameters=x,
203 coef_sens=coef_sens,
204 loads_dict=loads_dict,
205 volmass=vm,
206 volmass_sens=vm_sens,
207 properties=properties,
208 property_sens=property_sens,
209 )
211 if verbose > 0:
212 print(" Done evaluating gradient function.")
214 return jac
217def _process_parameters(x):
218 # Move into working directory
219 os.chdir(working_dir)
221 # Load existing parameters to compare
222 already_started = _compare_parameters(x)
224 if already_started and verbose > 0:
225 print(" Picking up solution from file...")
227 if not already_started:
228 # These parameters haven't run yet, prepare the working directory
229 if save_geom:
230 # Move components file into evolution directory
231 geom_filepath = os.path.join(working_dir, geom_filename)
233 # Check if file exists
234 if os.path.exists(geom_filepath):
235 # Determine new name
236 suffix = geom_filename.split(".")[-1]
237 new_filename = f"{len(os.listdir(evo_dir)):04d}.{suffix}"
239 # Copy file over
240 shutil.copyfile(
241 geom_filepath,
242 os.path.join(evo_dir, new_filename),
243 )
245 # Delete any files from previous run
246 _clean_dir(working_dir)
248 # Dump design parameters to file
249 with open("parameters.pkl", "wb") as f:
250 pickle.dump(x, f)
252 # Generate vehicle and geometry sensitivities
253 if len(glob.glob("*sensitivity*")) == 0 or not already_started:
254 # No sensitivity files generated yet, or this is new geometry
255 if verbose > 0:
256 print(" Running geometric sensitivity study.")
257 parameters = _unwrap_x(x)
258 ss = SensitivityStudy(vehicle_constructor=generator, verbosity=0)
259 ss.dvdp(parameter_dict=parameters, perturbation=2, write_nominal_stl=True)
260 ss.to_csv()
262 # Merge STLs
263 vehicle = ss.nominal_vehicle_instance
264 stl_files = [f"{c}.stl" for c in vehicle.get_non_ghost_components().keys()]
265 if len(stl_files) > 1:
266 geom_file = merge_stls(
267 stl_files=stl_files, name=vehicle.name, verbosity=verbose
268 )
269 else:
270 geom_file = stl_files[0]
273def _run_simulation():
274 """Prepare and run the CFD simulation with the OPM solver."""
275 global cells, solver
277 cells = None
278 solver = None
280 # Load cells from geometry
281 if cells is None:
282 try:
283 cells = PyMesh.load_from_file(
284 geom_filename, verbosity=verbose, **_parser_kwargs
285 )
286 except:
287 cells = STL.load_from_file(
288 geom_filename, verbosity=verbose, **_parser_kwargs
289 )
291 # Run OPM solver
292 # if solver is None:
293 solver = OPM(cells=cells, freestream=fs_flow, verbosity=verbose)
295 if verbose > 0:
296 print(" Running OPM flow solver.")
298 sim_results = solver.solve()
300 # Construct coefficient dictionary
301 CL, CD, Cm = sim_results.coefficients()
302 coefficients = {"CL": CL, "CD": CD, "Cm": Cm}
304 return coefficients
307def _run_sensitivities():
308 global cells, solver
310 cells = None
311 solver = None
313 # Load cells from geometry
314 if cells is None:
315 try:
316 # cells = PyMesh.load_from_file(geom_filename, verbosity=verbose, **_parser_kwargs)
317 cells = PyMesh.load_from_file(
318 filepath=geom_filename,
319 geom_sensitivities=sens_filename,
320 verbosity=verbose,
321 **_parser_kwargs,
322 )
323 except:
324 cells = STL.load_from_file(
325 geom_filename, verbosity=verbose, **_parser_kwargs
326 )
328 # Run OPM to generate nominal aero data on cells
329 flow_solver = OPM(cells=cells, freestream=fs_flow, verbosity=verbose)
330 aero = flow_solver.solve(freestream=fs_flow)
332 # Solve for aerodynamic sensitivities
333 sens_solver = GenericSensitivityCalculator(
334 cells=cells,
335 sensitivity_filepath=sens_filename,
336 cells_have_sens_data=True,
337 verbosity=verbose,
338 )
339 result = sens_solver.calculate(**_sens_kwargs)
340 F_sense = result.f_sens
342 # Non-dimensionalise
343 coef_sens = F_sense / (fs_flow.q * _A_ref)
345 # # Run OPM solver for coefficients
346 # if solver is None:
347 # solver = OPM(cells=cells, freestream=fs_flow, verbosity=0)
349 # # TODO - how will sens combining be handled? Need to use merged STL
350 # # TODO - remove hard coding below
351 # if verbose > 0:
352 # print(" Running OPM sensitivity flow solver.")
353 # sens_results = solver.solve_sens(sensitivity_filepath="nose_sensitivity.csv")
355 # # Non-dimensionalise
356 # coef_sens = sens_results.f_sens / (fs_flow.q * _A_ref)
358 # Construct coefficient dictionary
359 CL, CD, Cm = aero.coefficients()
360 coefficients = {"CL": CL, "CD": CD, "Cm": Cm}
362 return coefficients, coef_sens
365def _compare_parameters(x):
366 """Compares the current parameters x to the last run simulation
367 parameters."""
368 try:
369 with open("parameters.pkl", "rb") as f:
370 try:
371 xp = pickle.load(f)
372 except EOFError:
373 return False
375 # Compare to current parameters
376 already_run = x == xp
378 except FileNotFoundError:
379 # Simulation not run yet
380 already_run = False
382 return already_run
385def _clean_dir(directory: str, keep: list = None):
386 """Deletes everything in a directory except for what is specified
387 in keep."""
388 global cells, solver
390 all_files = os.listdir(directory)
391 if keep is None:
392 # Convert to empty list
393 keep = []
395 # Define files to remove and remove them
396 rm_files = set(all_files) - set(keep)
397 for f in rm_files:
398 if os.path.isdir(f):
399 shutil.rmtree(f)
400 else:
401 os.remove(f)
403 # Also reset dynamic global variables
404 cells = None
405 solver = None
408def _unwrap_x(x: dict) -> dict:
409 """Unwraps an ordered dictionary."""
410 unwrapped = {}
411 for key, val in x.items():
412 if len(val) == 1:
413 unwrapped[key] = val[0]
414 else:
415 unwrapped[key] = val
416 return unwrapped