Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Optimal control enhancements#712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
murrayrm merged 4 commits intopython-control:masterfrommurrayrm:optimal_18Mar2022
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletionscontrol/config.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -103,6 +103,9 @@ def reset_defaults():
from .iosys import _iosys_defaults
defaults.update(_iosys_defaults)

from .optimal import _optimal_defaults
defaults.update(_optimal_defaults)


def _get_param(module, param, argval=None, defval=None, pop=False, last=False):
"""Return the default value for a configuration option.
Expand Down
126 changes: 80 additions & 46 deletionscontrol/iosys.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1571,7 +1571,7 @@ def __init__(self, io_sys, ss_sys=None):
def input_output_response(
sys, T, U=0., X0=0, params={},
transpose=False, return_x=False, squeeze=None,
solve_ivp_kwargs={}, **kwargs):
solve_ivp_kwargs={},t_eval='T',**kwargs):
"""Compute the output response of a system to a given input.

Simulate a dynamical system with a given input and return its output
Expand DownExpand Up@@ -1654,50 +1654,57 @@ def input_output_response(
if kwargs.get('solve_ivp_method', None):
if kwargs.get('method', None):
raise ValueError("ivp_method specified more than once")
solve_ivp_kwargs['method'] = kwargs['solve_ivp_method']
solve_ivp_kwargs['method'] = kwargs.pop('solve_ivp_method')

# Set the default method to 'RK45'
if solve_ivp_kwargs.get('method', None) is None:
solve_ivp_kwargs['method'] = 'RK45'

# Make sure all input arguments got parsed
if kwargs:
raise TypeError("unknown parameters %s" % kwargs)

# Sanity checking on the input
if not isinstance(sys, InputOutputSystem):
raise TypeError("System of type ", type(sys), " not valid")

# Compute the time interval and number of steps
T0, Tf = T[0], T[-1]
n_steps = len(T)
ntimepts = len(T)

# Figure out simulation times (t_eval)
if solve_ivp_kwargs.get('t_eval'):
if t_eval == 'T':
# Override the default with the solve_ivp keyword
t_eval = solve_ivp_kwargs.pop('t_eval')
else:
raise ValueError("t_eval specified more than once")
if isinstance(t_eval, str) and t_eval == 'T':
# Use the input time points as the output time points
t_eval = T

# Check and convert the input, if needed
# TODO: improve MIMO ninputs check (choose from U)
if sys.ninputs is None or sys.ninputs == 1:
legal_shapes = [(n_steps,), (1,n_steps)]
legal_shapes = [(ntimepts,), (1,ntimepts)]
else:
legal_shapes = [(sys.ninputs,n_steps)]
legal_shapes = [(sys.ninputs,ntimepts)]
U = _check_convert_array(U, legal_shapes,
'Parameter ``U``: ', squeeze=False)
U = U.reshape(-1, n_steps)

# Check to make sure this is not a static function
nstates = _find_size(sys.nstates, X0)
if nstates == 0:
# No states => map input to output
u = U[0] if len(U.shape) == 1 else U[:, 0]
y = np.zeros((np.shape(sys._out(T[0], X0, u))[0], len(T)))
for i in range(len(T)):
u = U[i] if len(U.shape) == 1 else U[:, i]
y[:, i] = sys._out(T[i], [], u)
return TimeResponseData(
T, y, None, U, issiso=sys.issiso(),
output_labels=sys.output_index, input_labels=sys.input_index,
transpose=transpose, return_x=return_x, squeeze=squeeze)
U = U.reshape(-1, ntimepts)
ninputs = U.shape[0]

# create X0 if not given, test if X0 has correct shape
nstates = _find_size(sys.nstates, X0)
X0 = _check_convert_array(X0, [(nstates,), (nstates, 1)],
'Parameter ``X0``: ', squeeze=True)

# Update the parameter values
sys._update_params(params)
# Figure out the number of outputs
if sys.noutputs is None:
# Evaluate the output function to find number of outputs
noutputs = np.shape(sys._out(T[0], X0, U[:, 0]))[0]
else:
noutputs = sys.noutputs

#
# Define a function to evaluate the input at an arbitrary time
Expand All@@ -1714,6 +1721,31 @@ def ufun(t):
dt = (t - T[idx-1]) / (T[idx] - T[idx-1])
return U[..., idx-1] * (1. - dt) + U[..., idx] * dt

# Check to make sure this is not a static function
if nstates == 0: # No states => map input to output
# Make sure the user gave a time vector for evaluation (or 'T')
if t_eval is None:
# User overrode t_eval with None, but didn't give us the times...
warn("t_eval set to None, but no dynamics; using T instead")
t_eval = T

# Allocate space for the inputs and outputs
u = np.zeros((ninputs, len(t_eval)))
y = np.zeros((noutputs, len(t_eval)))

# Compute the input and output at each point in time
for i, t in enumerate(t_eval):
u[:, i] = ufun(t)
y[:, i] = sys._out(t, [], u[:, i])

return TimeResponseData(
t_eval, y, None, u, issiso=sys.issiso(),
output_labels=sys.output_index, input_labels=sys.input_index,
transpose=transpose, return_x=return_x, squeeze=squeeze)

# Update the parameter values
sys._update_params(params)

# Create a lambda function for the right hand side
def ivp_rhs(t, x):
return sys._rhs(t, x, ufun(t))
Expand All@@ -1724,27 +1756,27 @@ def ivp_rhs(t, x):
raise NameError("scipy.integrate.solve_ivp not found; "
"use SciPy 1.0 or greater")
soln = sp.integrate.solve_ivp(
ivp_rhs, (T0, Tf), X0, t_eval=T,
ivp_rhs, (T0, Tf), X0, t_eval=t_eval,
vectorized=False, **solve_ivp_kwargs)
if not soln.success:
raise RuntimeError("solve_ivp failed: " + soln.message)

if not soln.success or soln.status != 0:
# Something went wrong
warn("sp.integrate.solve_ivp failed")
print("Return bunch:", soln)

# Compute the output associated with the state (and use sys.out to
# figure out the number of outputs just in case it wasn't specified)
u = U[0] if len(U.shape) == 1 else U[:, 0]
y = np.zeros((np.shape(sys._out(T[0], X0, u))[0], len(T)))
for i in range(len(T)):
u = U[i] if len(U.shape) == 1 else U[:, i]
y[:, i] = sys._out(T[i], soln.y[:, i], u)
# Compute inputs and outputs for each time point
u = np.zeros((ninputs, len(soln.t)))
y = np.zeros((noutputs, len(soln.t)))
for i, t in enumerate(soln.t):
u[:, i] = ufun(t)
y[:, i] = sys._out(t, soln.y[:, i], u[:, i])

elif isdtime(sys):
# If t_eval was not specified, use the sampling time
if t_eval is None:
t_eval = np.arange(T[0], T[1] + sys.dt, sys.dt)

# Make sure the time vector is uniformly spaced
dt =T[1] -T[0]
if not np.allclose(T[1:] -T[:-1], dt):
raise ValueError("Parameter ``T``: time values must be "
dt =t_eval[1] -t_eval[0]
if not np.allclose(t_eval[1:] -t_eval[:-1], dt):
raise ValueError("Parameter ``t_eval``: time values must be "
"equally spaced.")

# Make sure the sample time matches the given time
Expand All@@ -1764,21 +1796,23 @@ def ivp_rhs(t, x):

# Compute the solution
soln = sp.optimize.OptimizeResult()
soln.t =T # Store the time vector directly
x =X0 # Initilize state
soln.t =t_eval # Store the time vector directly
x =np.array(X0)# State vector (store as floats)
soln.y = [] # Solution, following scipy convention
y = []# System output
fori inrange(len(T)):
# Store the current state and output
u,y = [], [] # System input, output
fort int_eval:
# Store the currentinput,state, and output
soln.y.append(x)
y.append(sys._out(T[i], x, ufun(T[i])))
u.append(ufun(t))
y.append(sys._out(t, x, u[-1]))

# Update the state for the next iteration
x = sys._rhs(T[i], x,ufun(T[i]))
x = sys._rhs(t, x,u[-1])

# Convert output to numpy arrays
soln.y = np.transpose(np.array(soln.y))
y = np.transpose(np.array(y))
u = np.transpose(np.array(u))

# Mark solution as successful
soln.success = True # No way to fail
Expand All@@ -1787,7 +1821,7 @@ def ivp_rhs(t, x):
raise TypeError("Can't determine system type")

return TimeResponseData(
soln.t, y, soln.y,U, issiso=sys.issiso(),
soln.t, y, soln.y,u, issiso=sys.issiso(),
output_labels=sys.output_index, input_labels=sys.input_index,
state_labels=sys.state_index,
transpose=transpose, return_x=return_x, squeeze=squeeze)
Expand Down
70 changes: 55 additions & 15 deletionscontrol/optimal.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -16,10 +16,21 @@
import logging
import time

from . import config
from .exception import ControlNotImplemented
from .timeresp import TimeResponseData

__all__ = ['find_optimal_input']

# Define module default parameter values
_optimal_defaults = {
'optimal.minimize_method': None,
'optimal.minimize_options': {},
'optimal.minimize_kwargs': {},
'optimal.solve_ivp_method': None,
'optimal.solve_ivp_options': {},
}


class OptimalControlProblem():
"""Description of a finite horizon, optimal control problem.
Expand DownExpand Up@@ -110,6 +121,10 @@ class OptimalControlProblem():
values of the input at the specified times (using linear interpolation for
continuous systems).

The default values for ``minimize_method``, ``minimize_options``,
``minimize_kwargs``, ``solve_ivp_method``, and ``solve_ivp_options`` can
be set using config.defaults['optimal.<keyword>'].

"""
def __init__(
self, sys, timepts, integral_cost, trajectory_constraints=[],
Expand All@@ -126,17 +141,22 @@ def __init__(

# Process keyword arguments
self.solve_ivp_kwargs = {}
self.solve_ivp_kwargs['method'] = kwargs.pop('solve_ivp_method', None)
self.solve_ivp_kwargs.update(kwargs.pop('solve_ivp_kwargs', {}))
self.solve_ivp_kwargs['method'] = kwargs.pop(
'solve_ivp_method', config.defaults['optimal.solve_ivp_method'])
self.solve_ivp_kwargs.update(kwargs.pop(
'solve_ivp_kwargs', config.defaults['optimal.solve_ivp_options']))

self.minimize_kwargs = {}
self.minimize_kwargs['method'] = kwargs.pop('minimize_method', None)
self.minimize_kwargs['options'] = kwargs.pop('minimize_options', {})
self.minimize_kwargs.update(kwargs.pop('minimize_kwargs', {}))
self.minimize_kwargs['method'] = kwargs.pop(
'minimize_method', config.defaults['optimal.minimize_method'])
self.minimize_kwargs['options'] = kwargs.pop(
'minimize_options', config.defaults['optimal.minimize_options'])
self.minimize_kwargs.update(kwargs.pop(
'minimize_kwargs', config.defaults['optimal.minimize_kwargs']))

if len(kwargs) > 0:
raise ValueError(
f'unrecognized keyword(s):{list(kwargs.keys())}')
# Make sure all input arguments got parsed
if kwargs:
raise TypeError("unrecognized keyword(s):", str(kwargs))

# Process trajectory constraints
if isinstance(trajectory_constraints, tuple):
Expand DownExpand Up@@ -271,9 +291,10 @@ def _cost_function(self, coeffs):
logging.debug("initial input[0:3] =\n" + str(inputs[:, 0:3]))

# Simulate the system to get the state
# TODO: try calling solve_ivp directly for better speed?
_, _, states = ct.input_output_response(
self.system, self.timepts, inputs, x, return_x=True,
solve_ivp_kwargs=self.solve_ivp_kwargs)
solve_ivp_kwargs=self.solve_ivp_kwargs, t_eval=self.timepts)
self.system_simulations += 1
self.last_x = x
self.last_coeffs = coeffs
Expand DownExpand Up@@ -393,7 +414,7 @@ def _constraint_function(self, coeffs):
# Simulate the system to get the state
_, _, states = ct.input_output_response(
self.system, self.timepts, inputs, x, return_x=True,
solve_ivp_kwargs=self.solve_ivp_kwargs)
solve_ivp_kwargs=self.solve_ivp_kwargs, t_eval=self.timepts)
self.system_simulations += 1
self.last_x = x
self.last_coeffs = coeffs
Expand DownExpand Up@@ -475,7 +496,7 @@ def _eqconst_function(self, coeffs):
# Simulate the system to get the state
_, _, states = ct.input_output_response(
self.system, self.timepts, inputs, x, return_x=True,
solve_ivp_kwargs=self.solve_ivp_kwargs)
solve_ivp_kwargs=self.solve_ivp_kwargs, t_eval=self.timepts)
self.system_simulations += 1
self.last_x = x
self.last_coeffs = coeffs
Expand DownExpand Up@@ -548,7 +569,7 @@ def _process_initial_guess(self, initial_guess):
initial_guess = np.atleast_1d(initial_guess)

# See whether we got entire guess or just first time point
iflen(initial_guess.shape) == 1:
if initial_guess.ndim == 1:
# Broadcast inputs to entire time vector
try:
initial_guess = np.broadcast_to(
Expand DownExpand Up@@ -804,6 +825,15 @@ class OptimalControlResult(sp.optimize.OptimizeResult):
Whether or not the optimizer exited successful.
problem : OptimalControlProblem
Optimal control problem that generated this solution.
cost : float
Final cost of the return solution.
system_simulations, {cost, constraint, eqconst}_evaluations : int
Number of system simulations and evaluations of the cost function,
(inequality) constraint function, and equality constraint function
performed during the optimzation.
{cost, constraint, eqconst}_process_time : float
If logging was enabled, the amount of time spent evaluating the cost
and constraint functions.

"""
def __init__(
Expand DownExpand Up@@ -833,15 +863,19 @@ def __init__(
"unable to solve optimal control problem\n"
"scipy.optimize.minimize returned " + res.message, UserWarning)

# Save the final cost
self.cost = res.fun

# Optionally print summary information
if print_summary:
ocp._print_statistics()
print("* Final cost:", self.cost)

if return_states and inputs.shape[1] == ocp.timepts.shape[0]:
# Simulate the system if we need the state back
_, _, states = ct.input_output_response(
ocp.system, ocp.timepts, inputs, ocp.x, return_x=True,
solve_ivp_kwargs=ocp.solve_ivp_kwargs)
solve_ivp_kwargs=ocp.solve_ivp_kwargs, t_eval=ocp.timepts)
ocp.system_simulations += 1
else:
states = None
Expand All@@ -858,7 +892,7 @@ def __init__(

# Compute the input for a nonlinear, (constrained) optimal control problem
def solve_ocp(
sys, horizon, X0, cost, trajectory_constraints=[], terminal_cost=None,
sys, horizon, X0, cost, trajectory_constraints=None, terminal_cost=None,
terminal_constraints=[], initial_guess=None, basis=None, squeeze=None,
transpose=None, return_states=False, log=False, **kwargs):

Expand DownExpand Up@@ -960,12 +994,18 @@ def solve_ocp(
# Process keyword arguments
if trajectory_constraints is None:
# Backwards compatibility
trajectory_constraints = kwargs.pop('constraints',None)
trajectory_constraints = kwargs.pop('constraints',[])

# Allow 'return_x` as a synonym for 'return_states'
return_states = ct.config._get_param(
'optimal', 'return_x', kwargs, return_states, pop=True)

# Process (legacy) method keyword
if kwargs.get('method'):
if kwargs.get('minimize_method'):
raise ValueError("'minimize_method' specified more than once")
kwargs['minimize_method'] = kwargs.pop('method')

# Set up the optimal control problem
ocp = OptimalControlProblem(
sys, horizon, cost, trajectory_constraints=trajectory_constraints,
Expand Down
2 changes: 1 addition & 1 deletioncontrol/statefbk.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -734,7 +734,7 @@ def dlqr(*args, **kwargs):
The dlqr() function computes the optimal state feedback controller
u[n] = - K x[n] that minimizes the quadratic cost

.. math:: J = \\Sum_0^\\infty (x[n]' Q x[n] + u[n]' R u[n] + 2 x[n]' N u[n])
.. math:: J = \\sum_0^\\infty (x[n]' Q x[n] + u[n]' R u[n] + 2 x[n]' N u[n])

The function can be called with either 3, 4, or 5 arguments:

Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp