Expand Up @@ -111,11 +111,11 @@ class for a set of subclasses that are used to implement specific The :class:`~control.InputOuputSystem` class (and its subclasses) makes use of two special methods for implementing much of the work of the class: *_rhs (t, x, u): compute the right hand side of the differential or *dynamics (t, x, u): compute the right hand side of the differential or difference equation for the system. This must be specified by the subclass for the system. *_out (t, x, u): compute the output for the current state of the system. *output (t, x, u): compute the output for the current state of the system. The default is to return the entire system state. """ Expand Down Expand Up @@ -353,28 +353,69 @@ def _process_signal_list(self, signals, prefix='s'): # Find a signal by name def _find_signal(self, name, sigdict): return sigdict.get(name, None) # Update parameters used for_rhs , _out (used by subclasses) # Update parameters used fordynamics , _out (used by subclasses) def _update_params(self, params, warning=False): if (warning): warn("Parameters passed to InputOutputSystem ignored.") def_rhs (self, t, x, u): """Evaluate right hand side of a differential or difference equation. defdynamics (self, t, x, u): """Compute the dynamics of a differential or difference equation. Private function used to compute the right hand side ofan input/output systemmodel. Given time `t`, input `u` and state `x`, returns the dynamics ofthe system. If the systemis continuous, returns the time derivative ``dx/dt = f(t, x, u)`` where `f` is the system's (possibly nonlinear) dynamics function. If the system is discrete-time, returns the next value of `x`: ``x[t+dt] = f(t, x[t], u[t])`` Where `t` is a scalar. The inputs `x` and `u` must be of the correct length. Parameters ---------- t : float the time at which to evaluate x : array_like current state u : array_like input Returns ------- `dx/dt` or `x[t+dt]` : ndarray """ NotImplemented("Evaluation not implemented for system of type ", NotImplemented("Dynamics not implemented for system of type ", type(self)) def_out (self, t, x, u, params={}): """Evaluate the output ofa system at a given state, input, and time defoutput (self, t, x, u, params={}): """Compute the output ofthe system Private function used to compute the output of of an input/ output system model given the state, input, parameters, and time. Given time `t`, input `u` and state `x`, returns the output of the system: ``y = g(t, x, u)`` The inputs `x` and `u` must be of the correct length. Parameters ---------- t : float the time at which to evaluate x : array_like current state u : array_like input Returns ------- y : ndarray """ # If no output function was defined in subclass, return state return x Expand Down Expand Up @@ -533,7 +574,7 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, """ # # If the linearization is not defined by the subclass, perform a # numerical linearization use the `_rhs ()` and `_out ()` member # numerical linearization use the `dynamics ()` and `output ()` member # functions. # Expand All @@ -548,14 +589,14 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, u0 = np.ones((ninputs,)) * u0 # Compute number of outputs by evaluating the output function noutputs = _find_size(self.noutputs, self._out (t, x0, u0)) noutputs = _find_size(self.noutputs, self.output (t, x0, u0)) # Update the current parameters self._update_params(params) # Compute the nominal value of the update law and output F0 = self._rhs (t, x0, u0) H0 = self._out (t, x0, u0) F0 = self.dynamics (t, x0, u0) H0 = self.output (t, x0, u0) # Create empty matrices that we can fill up with linearizations A = np.zeros((nstates, nstates)) # Dynamics matrix Expand All @@ -567,15 +608,15 @@ def linearize(self, x0, u0, t=0, params={}, eps=1e-6, for i in range(nstates): dx = np.zeros((nstates,)) dx[i] = eps A[:, i] = (self._rhs (t, x0 + dx, u0) - F0) / eps C[:, i] = (self._out (t, x0 + dx, u0) - H0) / eps A[:, i] = (self.dynamics (t, x0 + dx, u0) - F0) / eps C[:, i] = (self.output (t, x0 + dx, u0) - H0) / eps # Perturb each of the input variables and compute linearization for i in range(ninputs): du = np.zeros((ninputs,)) du[i] = eps B[:, i] = (self._rhs (t, x0, u0 + du) - F0) / eps D[:, i] = (self._out (t, x0, u0 + du) - H0) / eps B[:, i] = (self.dynamics (t, x0, u0 + du) - F0) / eps D[:, i] = (self.output (t, x0, u0 + du) - H0) / eps # Create the state space system linsys = LinearIOSystem( Expand Down Expand Up @@ -694,17 +735,8 @@ def _update_params(self, params={}, warning=True): if params and warning: warn("Parameters passed to LinearIOSystems are ignored.") def _rhs(self, t, x, u): # Convert input to column vector and then change output to 1D array xdot = np.dot(self.A, np.reshape(x, (-1, 1))) \ + np.dot(self.B, np.reshape(u, (-1, 1))) return np.array(xdot).reshape((-1,)) def _out(self, t, x, u): # Convert input to column vector and then change output to 1D array y = np.dot(self.C, np.reshape(x, (-1, 1))) \ + np.dot(self.D, np.reshape(u, (-1, 1))) return np.array(y).reshape((-1,)) dynamics = StateSpace.dynamics output = StateSpace.output class NonlinearIOSystem(InputOutputSystem): Expand Down Expand Up @@ -849,12 +881,12 @@ def __call__(sys, u, params=None, squeeze=None): "function evaluation is only supported for static " "input/output systems") # If we received any parameters, update them before calling_out () # If we received any parameters, update them before callingoutput () if params is not None: sys._update_params(params) # Evaluate the function on the argument out = sys._out (0, np.array((0,)), np.asarray(u)) out = sys.output (0, np.array((0,)), np.asarray(u)) _, out = _process_time_response(sys, None, out, None, squeeze=squeeze) return out Expand All @@ -863,12 +895,12 @@ def _update_params(self, params, warning=False): self._current_params = self.params.copy() self._current_params.update(params) def_rhs (self, t, x, u): defdynamics (self, t, x, u): xdot = self.updfcn(t, x, u, self._current_params) \ if self.updfcn is not None else [] return np.array(xdot).reshape((-1,)) def_out (self, t, x, u): defoutput (self, t, x, u): y = self.outfcn(t, x, u, self._current_params) \ if self.outfcn is not None else x return np.array(y).reshape((-1,)) Expand Down Expand Up @@ -1033,7 +1065,7 @@ def _update_params(self, params, warning=False): local.update(params) # update with locally passed parameters sys._update_params(local, warning=warning) def_rhs (self, t, x, u): defdynamics (self, t, x, u): # Make sure state and input are vectors x = np.array(x, ndmin=1) u = np.array(u, ndmin=1) Expand All @@ -1047,7 +1079,7 @@ def _rhs(self, t, x, u): for sys in self.syslist: # Update the right hand side for this subsystem if sys.nstates != 0: xdot[state_index:state_index + sys.nstates] = sys._rhs ( xdot[state_index:state_index + sys.nstates] = sys.dynamics ( t, x[state_index:state_index + sys.nstates], ulist[input_index:input_index + sys.ninputs]) Expand All @@ -1057,7 +1089,7 @@ def _rhs(self, t, x, u): return xdot def_out (self, t, x, u): defoutput (self, t, x, u): # Make sure state and input are vectors x = np.array(x, ndmin=1) u = np.array(u, ndmin=1) Expand Down Expand Up @@ -1089,7 +1121,7 @@ def _compute_static_io(self, t, x, u): state_index, input_index, output_index = 0, 0, 0 for sys in self.syslist: # Compute outputs for each system from current state ysys = sys._out ( ysys = sys.output ( t, x[state_index:state_index + sys.nstates], ulist[input_index:input_index + sys.ninputs]) Expand Down Expand Up @@ -1480,10 +1512,10 @@ def input_output_response(sys, T, U=0., X0=0, params={}, method='RK45', if nstates == 0: # No states => map input to output u = U[0] if len(U.shape) == 1 else U[:, 0] y = np.zeros((np.shape(sys._out (T[0], X0, u))[0], len(T))) y = np.zeros((np.shape(sys.output (T[0], X0, u))[0], len(T))) for i in range(len(T)): u = U[i] if len(U.shape) == 1 else U[:, i] y[:, i] = sys._out (T[i], [], u) y[:, i] = sys.output (T[i], [], u) return _process_time_response( sys, T, y, np.array((0, 0, np.asarray(T).size)), transpose=transpose, return_x=return_x, squeeze=squeeze) Expand All @@ -1497,23 +1529,23 @@ def input_output_response(sys, T, U=0., X0=0, params={}, method='RK45', # Create a lambda function for the right hand side u = sp.interpolate.interp1d(T, U, fill_value="extrapolate") defivp_rhs (t, x): return sys._rhs (t, x, u(t)) defivp_dynamics (t, x): return sys.dynamics (t, x, u(t)) # Perform the simulation if isctime(sys): if not hasattr(sp.integrate, 'solve_ivp'): raise NameError("scipy.integrate.solve_ivp not found; " "use SciPy 1.0 or greater") soln = sp.integrate.solve_ivp(ivp_rhs , (T0, Tf), X0, t_eval=T, soln = sp.integrate.solve_ivp(ivp_dynamics , (T0, Tf), X0, t_eval=T, method=method, vectorized=False) # Compute the output associated with the state (and use sys.out to # figure out the number of outputs just in case it wasn't specified) u = U[0] if len(U.shape) == 1 else U[:, 0] y = np.zeros((np.shape(sys._out (T[0], X0, u))[0], len(T))) y = np.zeros((np.shape(sys.output (T[0], X0, u))[0], len(T))) for i in range(len(T)): u = U[i] if len(U.shape) == 1 else U[:, i] y[:, i] = sys._out (T[i], soln.y[:, i], u) y[:, i] = sys.output (T[i], soln.y[:, i], u) elif isdtime(sys): # Make sure the time vector is uniformly spaced Expand Down Expand Up @@ -1546,10 +1578,10 @@ def ivp_rhs(t, x): return sys._rhs(t, x, u(t)) for i in range(len(T)): # Store the current state and output soln.y.append(x) y.append(sys._out (T[i], x, u(T[i]))) y.append(sys.output (T[i], x, u(T[i]))) # Update the state for the next iteration x = sys._rhs (T[i], x, u(T[i])) x = sys.dynamics (T[i], x, u(T[i])) # Convert output to numpy arrays soln.y = np.transpose(np.array(soln.y)) Expand Down Expand Up @@ -1670,21 +1702,21 @@ def find_eqpt(sys, x0, u0=[], y0=None, t=0, params={}, if y0 is None: # Take u0 as fixed and minimize over x # TODO: update to allow discrete time systems defode_rhs (z): return sys._rhs (t, z, u0) result = root(ode_rhs , x0, **kw) z = (result.x, u0, sys._out (t, result.x, u0)) defode_dynamics (z): return sys.dynamics (t, z, u0) result = root(ode_dynamics , x0, **kw) z = (result.x, u0, sys.output (t, result.x, u0)) else: # Take y0 as fixed and minimize over x and u def rootfun(z): # Split z into x and u x, u = np.split(z, [nstates]) # TODO: update to allow discrete time systems return np.concatenate( (sys._rhs (t, x, u), sys._out (t, x, u) - y0), axis=0) (sys.dynamics (t, x, u), sys.output (t, x, u) - y0), axis=0) z0 = np.concatenate((x0, u0), axis=0) # Put variables together result = root(rootfun, z0, **kw) # Find the eq point x, u = np.split(result.x, [nstates]) # Split result back in two z = (x, u, sys._out (t, x, u)) z = (x, u, sys.output (t, x, u)) else: # General case: figure out what variables to constrain Expand Down Expand Up @@ -1782,10 +1814,10 @@ def rootfun(z): u[input_vars] = z[nstate_vars:] # Compute the update and output maps dx = sys._rhs (t, x, u) - dx0 dx = sys.dynamics (t, x, u) - dx0 if dtime: dx -= x # TODO: check dy = sys._out (t, x, u) - y0 dy = sys.output (t, x, u) - y0 # Map the results into the constrained variables return np.concatenate((dx[deriv_vars], dy[output_vars]), axis=0) Expand All @@ -1799,7 +1831,7 @@ def rootfun(z): # Extract out the results and insert into x and u x[state_vars] = result.x[:nstate_vars] u[input_vars] = result.x[nstate_vars:] z = (x, u, sys._out (t, x, u)) z = (x, u, sys.output (t, x, u)) # Return the result based on what the user wants and what we found if not return_y: Expand Down