Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Optimization-based and moving horizon estimation#877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
murrayrm merged 14 commits intopython-control:mainfrommurrayrm:oep_mhe-26Feb2023
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
14 commits
Select commitHold shift + click to select a range
b0b6121
add pytest --skipslow
murrayrmMar 11, 2023
823dfbc
mark selected optimal, rlocus tests as slow
murrayrmMar 11, 2023
cf45109
initial implementation of optimal estimation problem
murrayrmMar 6, 2023
bd7ba6d
updated documentation + Jupyter notebook
murrayrmMar 6, 2023
8375127
add standard _process_indices function
murrayrmMar 11, 2023
e165dd5
add {control, disturbance}_indices to create_estimator_iosystem
murrayrmMar 11, 2023
1c1ce0c
implement {control, disturbance}_indices in oep
murrayrmMar 13, 2023
ebedf19
regularize *_labels processing across create_*_iosystem
murrayrmMar 13, 2023
df4508c
updated docstrings and docs, trim slow unit tests
murrayrmMar 14, 2023
039b22d
remove pytest --skipslow; use pytest -m "not slow" instead
murrayrmMar 24, 2023
9373f8e
fix dtime integral cost calculation; trim unit tests for speed
murrayrmMar 25, 2023
2a42fe8
add benchmarks for optimal estimation
murrayrmMar 26, 2023
1599f44
Update doc/optimal.rst
murrayrmMar 31, 2023
a329323
add executed mhe-pvtol.ipynb to fix doctest failure
murrayrmMar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions.gitignore
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
*.pyc
__pycache__/
build/
dist/
htmlcov/
Expand Down
87 changes: 87 additions & 0 deletionsbenchmarks/optestim_bench.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
# optestim_bench.py - benchmarks for optimal/moving horizon estimation
# RMM, 14 Mar 2023
#
# This benchmark tests the timing for the optimal estimation routines and
# is intended to be used for helping tune the performance of the functions
# used for optimization-based estimation.

import numpy as np
import math
import control as ct
import control.optimal as opt

minimizer_table = {
'default': (None, {}),
'trust': ('trust-constr', {}),
'trust_bigstep': ('trust-constr', {'finite_diff_rel_step': 0.01}),
'SLSQP': ('SLSQP', {}),
'SLSQP_bigstep': ('SLSQP', {'eps': 0.01}),
'COBYLA': ('COBYLA', {}),
}

# Table to turn on and off process disturbances and measurement noise
noise_table = {
'noisy': (1e-1, 1e-3),
'nodist': (0, 1e-3),
'nomeas': (1e-1, 0),
'clean': (0, 0)
}


# Assess performance as a function of optimization and integration methods
def time_oep_minimizer_methods(minimizer_name, noise_name, initial_guess):
# Use fixed system to avoid randome errors (was csys = ct.rss(4, 2, 5))
csys = ct.ss(
[[-0.5, 1, 0, 0], [0, -1, 1, 0], [0, 0, -2, 1], [0, 0, 0, -3]], # A
[[0, 0.1], [0, 0.1], [0, 0.1], [1, 0.1]], # B
[[1, 0, 0, 0], [0, 0, 1, 0]], # C
0, dt=0)
# dsys = ct.c2d(csys, dt)
# sys = csys if dt == 0 else dsys
sys = csys

# Decide on process disturbances and measurement noise
dist_mag, meas_mag = noise_table[noise_name]

# Create disturbances and noise (fixed, to avoid random errors)
Rv = 0.1 * np.eye(1) # scalar disturbance
Rw = 0.01 * np.eye(sys.noutputs)
timepts = np.arange(0, 10.1, 1)
V = np.array(
[0 if t % 2 == 1 else 1 if t % 4 == 0 else -1 for t in timepts]
).reshape(1, -1) * dist_mag
W = np.vstack([np.sin(2*timepts), np.cos(3*timepts)]) * meas_mag

# Generate system data
U = np.sin(timepts).reshape(1, -1)
res = ct.input_output_response(sys, timepts, [U, V])
Y = res.outputs + W

# Decide on the initial guess to use
if initial_guess == 'xhat':
initial_guess = (res.states, V*0)
elif initial_guess == 'both':
initial_guess = (res.states, V)
else:
initial_guess = None


# Set up optimal estimation function using Gaussian likelihoods for cost
traj_cost = opt.gaussian_likelihood_cost(sys, Rv, Rw)
init_cost = lambda xhat, x: (xhat - x) @ (xhat - x)
oep = opt.OptimalEstimationProblem(
sys, timepts, traj_cost, terminal_cost=init_cost)

# Noise and disturbances (the standard case)
est = oep.compute_estimate(Y, U, initial_guess=initial_guess)
assert est.success
np.testing.assert_allclose(
est.states[:, -1], res.states[:, -1], atol=1e-1, rtol=1e-2)


# Parameterize the test against different choices of integrator and minimizer
time_oep_minimizer_methods.param_names = ['minimizer', 'noise', 'initial']
time_oep_minimizer_methods.params = (
['default', 'trust', 'SLSQP', 'COBYLA'],
['noisy', 'nodist', 'nomeas', 'clean'],
['none', 'xhat', 'both'])
23 changes: 23 additions & 0 deletionscontrol/config.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -10,6 +10,7 @@

import collections
import warnings
from .exception import ControlArgument

__all__ = ['defaults', 'set_defaults', 'reset_defaults',
'use_matlab_defaults', 'use_fbs_defaults',
Expand DownExpand Up@@ -310,3 +311,25 @@ def use_legacy_defaults(version):
set_defaults('nyquist', mirror_style='-')

return (major, minor, patch)


#
# Utility function for processing legacy keywords
#
# Use this function to handle a legacy keyword that has been renamed. This
# function pops the old keyword off of the kwargs dictionary and issues a
# warning. if both the old and new keyword are present, a ControlArgument
# exception is raised.
#
def _process_legacy_keyword(kwargs, oldkey, newkey, newval):
if kwargs.get(oldkey) is not None:
warnings.warn(
f"keyworld '{oldkey}' is deprecated; use '{newkey}'",
DeprecationWarning)
if newval is not None:
raise ControlArgument(
f"duplicate keywords '{oldkey}' and '{newkey}'")
else:
return kwargs.pop(oldkey)
else:
return newval
2 changes: 1 addition & 1 deletioncontrol/iosys.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -82,7 +82,7 @@ class for a set of subclasses that are used to implement specific
System name (used for specifying signals). If unspecified, a generic
name <sys[id]> is generated with a unique integer id.
params : dict, optional
Parameter values for thesystems. Passed to the evaluation functions
Parameter values for thesystem. Passed to the evaluation functions
for the system as default values, overriding internal defaults.

Attributes
Expand Down
104 changes: 102 additions & 2 deletionscontrol/namedio.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -22,8 +22,8 @@
'namedio.sampled_system_name_prefix': '',
'namedio.sampled_system_name_suffix': '$sampled'
}


class NamedIOSystem(object):
def __init__(
self, name=None, inputs=None, outputs=None, states=None, **kwargs):
Expand DownExpand Up@@ -584,3 +584,103 @@ def _process_signal_list(signals, prefix='s'):

else:
raise TypeError("Can't parse signal list %s" % str(signals))


#
# Utility functions to process signal indices
#
# Signal indices can be specified in one of four ways:
#
# 1. As a positive integer 'm', in which case we return a list
# corresponding to the first 'm' elements of a range of a given length
#
# 2. As a negative integer '-m', in which case we return a list
# corresponding to the last 'm' elements of a range of a given length
#
# 3. As a slice, in which case we return the a list corresponding to the
# indices specified by the slice of a range of a given length
#
# 4. As a list of ints or strings specifying specific indices. Strings are
# compared to a list of labels to determine the index.
#
def _process_indices(arg, name, labels, length):
# Default is to return indices up to a certain length
arg = length if arg is None else arg

if isinstance(arg, int):
# Return the start or end of the list of possible indices
return list(range(arg)) if arg > 0 else list(range(length))[arg:]

elif isinstance(arg, slice):
# Return the indices referenced by the slice
return list(range(length))[arg]

elif isinstance(arg, list):
# Make sure the length is OK
if len(arg) > length:
raise ValueError(
f"{name}_indices list is too long; max length = {length}")

# Return the list, replacing strings with corresponding indices
arg=arg.copy()
for i, idx in enumerate(arg):
if isinstance(idx, str):
arg[i] = labels.index(arg[i])
return arg

raise ValueError(f"invalid argument for {name}_indices")

#
# Process control and disturbance indices
#
# For systems with inputs and disturbances, the control_indices and
# disturbance_indices keywords are used to specify which is which. If only
# one is given, the other is assumed to be the remaining indices in the
# system input. If neither is given, the disturbance inputs are assumed to
# be the same as the control inputs.
#
def _process_control_disturbance_indices(
sys, control_indices, disturbance_indices):

if control_indices is None and disturbance_indices is None:
# Disturbances enter in the same place as the controls
dist_idx = ctrl_idx = list(range(sys.ninputs))

elif control_indices is not None:
# Process the control indices
ctrl_idx = _process_indices(
control_indices, 'control', sys.input_labels, sys.ninputs)

# Disturbance indices are the complement of control indices
dist_idx = [i for i in range(sys.ninputs) if i not in ctrl_idx]

else: # disturbance_indices is not None
# If passed an integer, count from the end of the input vector
arg = -disturbance_indices if isinstance(disturbance_indices, int) \
else disturbance_indices

dist_idx = _process_indices(
arg, 'disturbance', sys.input_labels, sys.ninputs)

# Set control indices to complement disturbance indices
ctrl_idx = [i for i in range(sys.ninputs) if i not in dist_idx]

return ctrl_idx, dist_idx


# Process labels
def _process_labels(labels, name, default):
if isinstance(labels, str):
labels = [labels.format(i=i) for i in range(len(default))]

if labels is None:
labels = default
elif isinstance(labels, list):
if len(labels) != len(default):
raise ValueError(
f"incorrect length of {name}_labels: {len(labels)}"
f" instead of {len(default)}")
else:
raise ValueError(f"{name}_labels should be a string or a list")

return labels
Loading

[8]ページ先頭

©2009-2025 Movatter.jp