# -----------------------------------------------------------------------------------------# (C) Copyright IBM Corp. 2023-2025.# https://opensource.org/licenses/BSD-3-Clause# -----------------------------------------------------------------------------------------from__future__importannotationsfromtypingimport(Literal,Iterable,Callable,Any,cast,TYPE_CHECKING,NoReturn,Generator,TypeAlias,AsyncGenerator,)importnumpyasnpimportjsonfromwarningsimportwarnfromenumimportEnumfromibm_watsonx_ai.utilsimport(print_text_header_h1,print_text_header_h2,StatusLogger,)fromibm_watsonx_ai.utils.utilsimport_get_id_from_deprecated_uidfromibm_watsonx_ai.wml_client_errorimport(WMLClientError,MissingValue,InvalidValue,ApiRequestFailure,)fromibm_watsonx_ai.href_definitionsimportis_idfromibm_watsonx_ai.wml_resourceimportWMLResourcefromibm_watsonx_ai.messages.messagesimportMessagesfromibm_watsonx_ai.metanamesimport(ScoringMetaNames,DecisionOptimizationMetaNames,DeploymentMetaNames,)fromibm_watsonx_ai.libs.repo.util.library_importsimportLibraryCheckerfromibm_watsonx_ai.utils.autoai.utilsimportall_logging_disabledfromurllib.parseimporturlparse,parse_qsifTYPE_CHECKING:fromibm_watsonx_aiimportAPIClientfromibm_watsonx_ai.lifecycleimportSpecStatesfromibm_watsonx_ai.foundation_models.inferenceimportModelInferenceimportpandaslib_checker=LibraryChecker()ListType:TypeAlias=list[docs]classDeployments(WMLResource):"""Deploy and score published artifacts (models and functions)."""DEFAULT_CONCURRENCY_LIMIT=8[docs]classHardwareRequestSizes(str,Enum):""" An enum class that represents the different hardware request sizes available. """Small="gpu_s"Medium="gpu_m"Large="gpu_l" def__init__(self,client:APIClient):WMLResource.__init__(self,__name__,client)self.ConfigurationMetaNames=DeploymentMetaNames()self.ScoringMetaNames=ScoringMetaNames()self.DecisionOptimizationMetaNames=DecisionOptimizationMetaNames()def_deployment_status_errors_handling(self,deployment_details:dict,operation_name:str,deployment_id:str)->NoReturn:try:if"failure"indeployment_details["entity"]["status"]:errors=deployment_details["entity"]["status"]["failure"]["errors"]forerrorinerrors:iftype(error)==str:try:error_obj=json.loads(error)print(error_obj["message"])except:print(error)eliftype(error)==dict:print(error["message"])else:print(error)raiseWMLClientError("Deployment "+operation_name+" failed for deployment id: "+deployment_id+". Errors: "+str(errors))else:print(deployment_details["entity"]["status"])raiseWMLClientError("Deployment "+operation_name+" failed for deployment id: "+deployment_id+". Error: "+str(deployment_details["entity"]["status"]["state"]))exceptWMLClientErrorase:raiseeexceptExceptionase:self._logger.debug("Deployment "+operation_name+" failed: "+str(e))print(deployment_details["entity"]["status"]["failure"])raiseWMLClientError("Deployment "+operation_name+" failed for deployment id: "+deployment_id+".")# TODO model_id and artifact_id should be changed to artifact_id only[docs]defcreate(self,artifact_id:str|None=None,meta_props:dict|None=None,rev_id:str|None=None,**kwargs:dict,)->dict:"""Create a deployment from an artifact. An artifact is a model or function that can be deployed. :param artifact_id: ID of the published artifact (the model or function ID) :type artifact_id: str :param meta_props: meta props. To see the available list of meta names, use: .. code-block:: python client.deployments.ConfigurationMetaNames.get() :type meta_props: dict, optional :param rev_id: revision ID of the deployment :type rev_id: str, optional :return: metadata of the created deployment :rtype: dict **Example:** .. code-block:: python meta_props = { client.deployments.ConfigurationMetaNames.NAME: "SAMPLE DEPLOYMENT NAME", client.deployments.ConfigurationMetaNames.ONLINE: {}, client.deployments.ConfigurationMetaNames.HARDWARE_SPEC : { "id": "e7ed1d6c-2e89-42d7-aed5-8sb972c1d2b"}, client.deployments.ConfigurationMetaNames.SERVING_NAME : 'sample_deployment' } deployment_details = client.deployments.create(artifact_id, meta_props) """artifact_id=_get_id_from_deprecated_uid(kwargs=kwargs,resource_id=artifact_id,resource_name="artifact")# Backward compatibility in past `rev_id` was an int.ifisinstance(rev_id,int):rev_id_as_int_deprecated=("`rev_id` parameter type as int is deprecated, ""please convert to str instead")warn(rev_id_as_int_deprecated,category=DeprecationWarning)rev_id=str(rev_id)Deployments._validate_type(artifact_id,"artifact_id",str,True)ifself._client.ICP_PLATFORM_SPACES:predictionUrl=self._credentials.urlifmeta_propsisNone:raiseWMLClientError("Invalid input. meta_props can not be empty.")ifself._client.CLOUD_PLATFORM_SPACESand"r_shiny"inmeta_props:raiseWMLClientError("Shiny is not supported in this release")ifself._client.CPD_version>=4.8orself._client.CLOUD_PLATFORM_SPACES:fromibm_watsonx_ai.foundation_models.utils.enumsimportModelTypesbase_model_id=meta_props.get(self.ConfigurationMetaNames.BASE_MODEL_ID)ifisinstance(base_model_id,ModelTypes):meta_props[self.ConfigurationMetaNames.BASE_MODEL_ID]=(base_model_id.value)metaProps=self.ConfigurationMetaNames._generate_resource_metadata(meta_props)if("serving_name"instr(metaProps)andmeta_props.get("serving_name",False)and"r_shiny"instr(metaProps)):if"parameters"inmetaProps["r_shiny"]:metaProps["r_shiny"]["parameters"]["serving_name"]=meta_props["serving_name"]else:metaProps["r_shiny"]["parameters"]={"serving_name":meta_props["serving_name"]}if"online"inmetaProps:delmetaProps["online"]if"wml_instance_id"inmeta_props:metaProps.update({"wml_instance_id":meta_props["wml_instance_id"]})##Check if default space is setmetaProps["asset"]=(metaProps.get("asset")ifmetaProps.get("asset")else{"id":artifact_id})ifrev_idisnotNone:metaProps["asset"].update({"rev":rev_id})ifself._client.default_project_id:metaProps["project_id"]=self._client.default_project_idelse:metaProps["space_id"]=self._client.default_space_id# note: checking if artifact_id points to prompt_templateifself._client.CPD_version>=4.8orself._client.CLOUD_PLATFORM_SPACES:withall_logging_disabled():try:fromibm_watsonx_ai.foundation_models.promptsimport(PromptTemplateManager,)model_id=(PromptTemplateManager(api_client=self._client).load_prompt(artifact_id).model_id)exceptException:pass# Foundation models scenario should not impact other ML models' deployment scenario.else:metaProps.pop("asset")metaProps["prompt_template"]={"id":artifact_id}if(DeploymentMetaNames.BASE_MODEL_IDnotinmetaPropsandDeploymentMetaNames.BASE_DEPLOYMENT_IDnotinmetaProps):metaProps.update({DeploymentMetaNames.BASE_MODEL_ID:model_id})# --- end noteurl=self._client._href_definitions.get_deployments_href()response=self._client.httpx_client.post(url,json=metaProps,params=self._client._params(),# version is mandatoryheaders=self._client._get_headers(),)## Post Deployment call executedifresponse.status_code==202:deployment_details=response.json()ifkwargs.get("background_mode"):background_mode_turned_on_warning=("Background mode is turn on and deployment scoring will be available only when status of deployment will be `ready`. ""To check deployment status run `client.deployment.get_details(deployment_id)")warn(background_mode_turned_on_warning)returndeployment_detailselse:ifself._client.ICP_PLATFORM_SPACES:if"online_url"indeployment_details["entity"]["status"]:scoringUrl=(deployment_details.get("entity").get("status").get("online_url").get("url").replace("https://ibm-nginx-svc:443",predictionUrl))deployment_details["entity"]["status"]["online_url"]["url"]=scoringUrldeployment_id=self.get_id(deployment_details)importtimeprint_text_header_h1("Synchronous deployment creation for id: '{}' started".format(artifact_id))status=deployment_details["entity"]["status"]["state"]notifications=[]withStatusLogger(status)asstatus_logger:whileTrue:time.sleep(5)deployment_details=self._client.deployments.get_details(deployment_id,_silent=True)# this is wrong , needs to update for ICPif"system"indeployment_details:notification=deployment_details["system"]["warnings"][0]["message"]ifnotificationnotinnotifications:print("\nNote: "+notification)notifications.append(notification)status=deployment_details["entity"]["status"]["state"]status_logger.log_state(status)ifstatus!="DEPLOY_IN_PROGRESS"andstatus!="initializing":breakifstatus=="DEPLOY_SUCCESS"orstatus=="ready":print("")print_text_header_h2("Successfully finished deployment creation, deployment_id='{}'".format(deployment_id))returndeployment_detailselse:print_text_header_h2("Deployment creation failed")self._deployment_status_errors_handling(deployment_details,"creation",deployment_id)else:error_msg="Deployment creation failed"reason=response.textprint_text_header_h2(error_msg)print(reason)raiseWMLClientError(error_msg+". Error: "+str(response.status_code)+". "+reason) [docs]@staticmethoddefget_uid(deployment_details:dict)->str:"""Get deployment_uid from the deployment details. *Deprecated:* Use ``get_id(deployment_details)`` instead. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment UID that is used to manage the deployment :rtype: str **Example:** .. code-block:: python deployment_uid = client.deployments.get_uid(deployment) """get_uid_deprecated_warning=("`get_uid()` is deprecated and will be removed in future. ""Instead, please use `get_id()`.")warn(get_uid_deprecated_warning,category=DeprecationWarning)returnDeployments.get_id(deployment_details) [docs]@staticmethoddefget_id(deployment_details:dict)->str:"""Get the deployment ID from the deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment ID that is used to manage the deployment :rtype: str **Example:** .. code-block:: python deployment_id = client.deployments.get_id(deployment) """Deployments._validate_type(deployment_details,"deployment_details",dict,True)try:if"id"indeployment_details["metadata"]:id=deployment_details.get("metadata",{}).get("id")else:id=deployment_details.get("metadata",{}).get("guid")exceptExceptionase:raiseWMLClientError("Getting deployment ID from deployment details failed.",e)ifidisNone:raiseMissingValue("deployment_details.metadata.id")returnid [docs]@staticmethoddefget_href(deployment_details:dict)->str:"""Get deployment_href from the deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment href that is used to manage the deployment :rtype: str **Example:** .. code-block:: python deployment_href = client.deployments.get_href(deployment) """Deployments._validate_type(deployment_details,"deployment_details",dict,True)try:if"href"indeployment_details["metadata"]:url=deployment_details.get("metadata",{}).get("href")else:url="/ml/v4/deployments/{}".format(deployment_details["metadata"]["id"])exceptExceptionase:raiseWMLClientError("Getting deployment url from deployment details failed.",e)ifurlisNone:raiseMissingValue("deployment_details.metadata.href")returnurl def_get_serving_name_info(self,serving_name:str)->tuple:"""Get info about the serving name :param serving_name: serving name that filters deployments :type serving_name: str :return: information about the serving name: (<status_code>, <response json if any>) :rtype: tuple **Example:** .. code-block:: python is_available = client.deployments.is_serving_name_available('test') """params={"serving_name":serving_name,"conflict":"true","version":self._client.version_param,}url=self._client._href_definitions.get_deployments_href()res=self._client.httpx_client.get(url,headers=self._client._get_headers(),params=params)ifres.status_code==409:response=res.json()else:response=Nonereturn(res.status_code,response)[docs]defis_serving_name_available(self,serving_name:str)->bool:"""Check if the serving name is available for use. :param serving_name: serving name that filters deployments :type serving_name: str :return: information about whether the serving name is available :rtype: bool **Example:** .. code-block:: python is_available = client.deployments.is_serving_name_available('test') """status_code,_=self._get_serving_name_info(serving_name)returnstatus_code!=409 [docs]defget_details(self,deployment_id:str|None=None,serving_name:str|None=None,limit:int|None=None,asynchronous:bool=False,get_all:bool=False,spec_state:SpecStates|None=None,_silent:bool=False,**kwargs:Any,)->dict:"""Get information about deployment(s). If deployment_id is not passed, all deployment details are returned. :param deployment_id: unique ID of the deployment :type deployment_id: str, optional :param serving_name: serving name that filters deployments :type serving_name: str, optional :param limit: limit number of fetched records :type limit: int, optional :param asynchronous: if True, it will work as a generator :type asynchronous: bool, optional :param get_all: if True, it will get all entries in 'limited' chunks :type get_all: bool, optional :param spec_state: software specification state, can be used only when `deployment_id` is None :type spec_state: SpecStates, optional :return: metadata of the deployment(s) :rtype: dict (if deployment_id is not None) or {"resources": [dict]} (if deployment_id is None) **Example:** .. code-block:: python deployment_details = client.deployments.get_details(deployment_id) deployment_details = client.deployments.get_details(deployment_id=deployment_id) deployments_details = client.deployments.get_details() deployments_details = client.deployments.get_details(limit=100) deployments_details = client.deployments.get_details(limit=100, get_all=True) deployments_details = [] for entry in client.deployments.get_details(limit=100, asynchronous=True, get_all=True): deployments_details.extend(entry) """deployment_id=_get_id_from_deprecated_uid(kwargs=kwargs,resource_id=deployment_id,resource_name="deployment",can_be_none=True,)ifnotself._client.CLOUD_PLATFORM_SPACESandself._client.CPD_version<4.8:self._client._check_if_space_is_set()Deployments._validate_type(deployment_id,"deployment_id",str,False)ifdeployment_idisnotNoneandnotis_id(deployment_id):raiseWMLClientError("'deployment_id' is not an id: '{}'".format(deployment_id))url=self._client._href_definitions.get_deployments_href()query_params=self._client._params()ifserving_name:query_params["serving_name"]=serving_nameifdeployment_idisNone:filter_func=(self._get_filter_func_by_spec_state(spec_state)ifspec_stateelseNone)deployment_details=self._get_artifact_details(base_url=url,id=deployment_id,limit=limit,resource_name="deployments",query_params=query_params,_async=asynchronous,_all=get_all,_filter_func=filter_func,)else:deployment_details=self._get_artifact_details(url,deployment_id,limit,"deployments",query_params=query_params,)if(notisinstance(deployment_details,Generator)and"system"indeployment_detailsandnot_silent):print("Note: "+deployment_details["system"]["warnings"][0]["message"])returndeployment_details [docs]@staticmethoddefget_scoring_href(deployment_details:dict)->str:"""Get scoring URL from deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: scoring endpoint URL that is used to make scoring requests :rtype: str **Example:** .. code-block:: python scoring_href = client.deployments.get_scoring_href(deployment) """Deployments._validate_type(deployment_details,"deployment",dict,True)scoring_url=Nonetry:url=deployment_details["entity"]["status"].get("online_url")ifurlisnotNone:scoring_url=deployment_details["entity"]["status"]["online_url"]["url"]else:raiseMissingValue("Getting scoring url for deployment failed. This functionality is available only for sync deployments")exceptExceptionase:raiseWMLClientError("Getting scoring url for deployment failed. This functionality is available only for sync deployments",e,)ifscoring_urlisNone:raiseMissingValue("scoring_url missing in online_predictions")returnscoring_url [docs]@staticmethoddefget_serving_href(deployment_details:dict)->str:"""Get serving URL from the deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: serving endpoint URL that is used to make scoring requests :rtype: str **Example:** .. code-block:: python scoring_href = client.deployments.get_serving_href(deployment) """Deployments._validate_type(deployment_details,"deployment",dict,True)try:serving_name=(deployment_details["entity"]["online"].get("parameters").get("serving_name"))serving_url=[urlforurlindeployment_details["entity"].get("status").get("serving_urls")ifserving_name==url.split("/")[-2]][0]ifserving_url:returnserving_urlelse:raiseMissingValue("Getting serving url for deployment failed. This functionality is available only for sync deployments with serving name.")exceptExceptionase:raiseWMLClientError("Getting serving url for deployment failed. This functionality is available only for sync deployments with serving name.",e,) [docs]defdelete(self,deployment_id:str|None=None,**kwargs:Any)->str:"""Delete a deployment. :param deployment_id: unique ID of the deployment :type deployment_id: str :return: status ("SUCCESS" or "FAILED") :rtype: str **Example:** .. code-block:: python client.deployments.delete(deployment_id) """deployment_id=_get_id_from_deprecated_uid(kwargs=kwargs,resource_id=deployment_id,resource_name="deployment")ifnotself._client.CLOUD_PLATFORM_SPACESandself._client.CPD_version<4.8:self._client._check_if_space_is_set()Deployments._validate_type(deployment_id,"deployment_id",str,True)ifdeployment_idisnotNoneandnotis_id(deployment_id):raiseWMLClientError("'deployment_id' is not an id: '{}'".format(deployment_id))deployment_url=self._client._href_definitions.get_deployment_href(deployment_id)response_delete=self._client.httpx_client.delete(deployment_url,params=self._client._params(),headers=self._client._get_headers(),)returnself._handle_response(204,"deployment deletion",response_delete,False) [docs]defscore(self,deployment_id:str,meta_props:dict,transaction_id:str|None=None)->dict:"""Make scoring requests against the deployed artifact. :param deployment_id: unique ID of the deployment to be scored :type deployment_id: str :param meta_props: meta props for scoring, use ``client.deployments.ScoringMetaNames.show()`` to view the list of ScoringMetaNames :type meta_props: dict :param transaction_id: transaction ID to be passed with the records during payload logging :type transaction_id: str, optional :return: scoring result that contains prediction and probability :rtype: dict .. note:: * *client.deployments.ScoringMetaNames.INPUT_DATA* is the only metaname valid for sync scoring. * The valid payloads for scoring input are either list of values, pandas or numpy dataframes. **Example:** .. code-block:: python scoring_payload = {client.deployments.ScoringMetaNames.INPUT_DATA: [{'fields': ['GENDER','AGE','MARITAL_STATUS','PROFESSION'], 'values': [ ['M',23,'Single','Student'], ['M',55,'Single','Executive'] ] }] } predictions = client.deployments.score(deployment_id, scoring_payload) """ifnotself._client.CLOUD_PLATFORM_SPACESandself._client.CPD_version<4.8:self._client._check_if_space_is_set()Deployments._validate_type(deployment_id,"deployment_id",str,True)Deployments._validate_type(meta_props,"meta_props",dict,True)ifmeta_props.get(self.ScoringMetaNames.INPUT_DATA)isNone:raiseWMLClientError("Scoring data input 'ScoringMetaNames.INPUT_DATA' is mandatory for synchronous ""scoring")scoring_data=meta_props[self.ScoringMetaNames.INPUT_DATA]ifscoring_dataisnotNone:score_payload=[]foreach_score_requestinscoring_data:lib_checker.check_lib(lib_name="pandas")importpandasaspdscoring_values=each_score_request["values"]# Check feature types, currently supporting pandas df, numpy.ndarray, python lists and Dmatrixifisinstance(scoring_values,pd.DataFrame):scoring_values=scoring_values.where(pd.notnull(scoring_values),None)fields_names=scoring_values.columns.values.tolist()values=scoring_values.values.tolist()try:values[pd.isnull(values)]=None# note: above code fails when there is no null values in a dataframeexceptTypeError:passeach_score_request["values"]=valuesiffields_namesisnotNone:each_score_request["fields"]=fields_names## If payload is a numpy dataframeelifisinstance(scoring_values,np.ndarray):values=scoring_values.tolist()each_score_request["values"]=valuesscore_payload.append(each_score_request)##See if it is scoring or DecisionOptimizationJobpayload={}payload["input_data"]=score_payloadifmeta_props.get(self.ScoringMetaNames.SCORING_PARAMETERS)isnotNone:payload["scoring_parameters"]=meta_props.get(self.ScoringMetaNames.SCORING_PARAMETERS)headers=self._client._get_headers()iftransaction_idisnotNone:headers.update({"x-global-transaction-id":transaction_id})scoring_url=(self._credentials.url+"/ml/v4/deployments/"+deployment_id+"/predictions")params=self._client._params()delparams["space_id"]response_scoring=self._client.httpx_client.post(scoring_url,json=payload,params=params,# version parameter is mandatoryheaders=headers,)returnself._handle_response(200,"scoring",response_scoring) #########################################[docs]defget_download_url(self,deployment_details:dict)->str:"""Get deployment_download_url from the deployment details. :param deployment_details: created deployment details :type deployment_details: dict :return: deployment download URL that is used to get file deployment (for example: Core ML) :rtype: str **Example:** .. code-block:: python deployment_url = client.deployments.get_download_url(deployment) """ifself._client.ICP_PLATFORM_SPACES:raiseWMLClientError("Downloading virtual deployment is no longer supported in Cloud Pak for Data, versions 3.5 and later.")ifself._client.CLOUD_PLATFORM_SPACES:raiseWMLClientError("Downloading virtual deployment is no longer supported in Cloud Pak for Data as a Service.")Deployments._validate_type(deployment_details,"deployment_details",dict,True)try:virtual_deployment_detaails=(deployment_details.get("entity",{}).get("status",{}).get("virtual_deployment_downloads"))ifvirtual_deployment_detaailsisnotNone:url=virtual_deployment_detaails[0].get("url")else:url=NoneexceptExceptionase:raiseWMLClientError("Getting download url from deployment details failed.",e)ifurlisNone:raiseMissingValue("deployment_details.entity.virtual_deployment_downloads.url")returnurl [docs]deflist(self,limit:int|None=None,artifact_type:str|None=None)->pandas.DataFrame:"""Returns deployments in a table format. :param limit: limit number of fetched records :type limit: int, optional :param artifact_type: return only deployments with the specified artifact_type :type artifact_type: str, optional :return: pandas.DataFrame with the listed deployments :rtype: pandas.DataFrame **Example:** .. code-block:: python client.deployments.list() """ifnotself._client.CLOUD_PLATFORM_SPACESandself._client.CPD_version<4.8:self._client._check_if_space_is_set()details=self.get_details(get_all=self._should_get_all_values(limit))resources=details["resources"]values=[]index=0defenrich_asset_with_type(asset_details:dict,asset_type:str)->dict:ifasset_type:asset_details["metadata"]["asset_type"]=asset_typereturnasset_detailsasset_info={el["metadata"]["id"]:enrich_asset_with_type(el,asset_type)forasset_type,resourcesin{"model":self._client._models.get_details(get_all=True),"function":self._client._functions.get_details(get_all=True),}.items()forelinresources["resources"]}forminresources:# Deployment service currently doesn't support limit querying# As a workaround, its filtered in python client# Ideally this needs to be on the server sideiflimitisnotNoneandindex==limit:breakasset_details=asset_info.get(m["entity"].get("asset",m["entity"].get("prompt_template"))["id"],{},)if(artifact_typeandm["entity"].get("deployed_asset_type","unknown")!=artifact_type):pass# filter by artifact_typeelse:values.append(((m["metadata"]["guid"]if"guid"inm["metadata"]elsem["metadata"]["id"]),m["entity"]["name"],m["entity"]["status"]["state"],m["metadata"]["created_at"],m["entity"].get("deployed_asset_type","unknown"),self._client.software_specifications._get_state(asset_details),self._client.software_specifications._get_replacement(asset_details),))index=index+1table=self._list(values,["ID","NAME","STATE","CREATED","ARTIFACT_TYPE","SPEC_STATE","SPEC_REPLACEMENT",],limit,)returntable [docs]deflist_jobs(self,limit:int|None=None)->pandas.DataFrame:"""Return the async deployment jobs in a table format. :param limit: limit number of fetched records :type limit: int, optional :return: pandas.DataFrame with listed deployment jobs :rtype: pandas.DataFrame .. note:: This method list only async deployment jobs created for WML deployment. **Example:** .. code-block:: python client.deployments.list_jobs() """details=self.get_job_details(limit=limit)resources=details["resources"]values=[]index=0forminresources:# Deployment service currently doesn't support limit querying# As a workaround, its filtered in python clientiflimitisnotNoneandindex==limit:breakif"scoring"inm["entity"]:state=m["entity"]["scoring"]["status"]["state"]else:state=m["entity"]["decision_optimization"]["status"]["state"]deploy_id=m["entity"]["deployment"]["id"]values.append((m["metadata"]["id"],state,m["metadata"]["created_at"],deploy_id))index=index+1table=self._list(values,["JOB-ID","STATE","CREATED","DEPLOYMENT-ID"],limit)returntable def_get_deployable_asset_type(self,details:dict)->str:url=details["entity"]["asset"]["id"]if"model"inurl:return"model"elif"function"inurl:return"function"else:return"unknown"[docs]defupdate(self,deployment_id:str|None=None,changes:dict|None=None,background_mode:bool=False,**kwargs:Any,)->dict|None:"""Updates existing deployment metadata. If ASSET is patched, then 'id' field is mandatory and it starts a deployment with the provided asset id/rev. Deployment ID remains the same. :param deployment_id: unique ID of deployment to be updated :type deployment_id: str :param changes: elements to be changed, where keys are ConfigurationMetaNames :type changes: dict :return: metadata of the updated deployment :rtype: dict or None :param background_mode: indicator whether the update() method will run in the background (async) or not (sync), defaults to False :type background_mode: bool, optional **Examples** .. code-block:: python metadata = {client.deployments.ConfigurationMetaNames.NAME:"updated_Deployment"} updated_deployment_details = client.deployments.update(deployment_id, changes=metadata) metadata = {client.deployments.ConfigurationMetaNames.ASSET: { "id": "ca0cd864-4582-4732-b365-3165598dc945", "rev":"2" }} deployment_details = client.deployments.update(deployment_id, changes=metadata) """deployment_id=_get_id_from_deprecated_uid(kwargs=kwargs,resource_id=deployment_id,resource_name="deployment")ifchangesisNone:raiseTypeError("update() missing 1 required positional argument: 'changes'")Deployments._validate_type(changes,"changes",dict,True)ifnotself._client.CLOUD_PLATFORM_SPACESandself._client.CPD_version<4.8:self._client._check_if_space_is_set()Deployments._validate_type(deployment_id,"deployment_id",str,True)if("asset"inchangesandnotchanges["asset"])and("prompt_template"inchangesandnotchanges["prompt_template"]):msg="ASSET/PROMPT_TEMPLATE cannot be empty. 'id' and 'rev' (only ASSET) fields are supported. 'id' is mandatory"print(msg)raiseWMLClientError(msg)patch_job=(changes.get("asset")isnotNoneorself.ConfigurationMetaNames.PROMPT_TEMPLATEinchangesorself.ConfigurationMetaNames.SERVING_NAMEinchangesorself.ConfigurationMetaNames.OWNERinchanges)patch_job_field=Noneifpatch_job:ifchanges.get("asset")isnotNone:patch_job_field="ASSET"elifself.ConfigurationMetaNames.PROMPT_TEMPLATEinchanges:patch_job_field="PROMPT_TEMPLATE"elifself.ConfigurationMetaNames.SERVING_NAMEinchanges:patch_job_field="SERVING_NAME"elifself.ConfigurationMetaNames.OWNERinchanges:patch_job_field="OWNER"ifpatch_job_fieldisNone:raiseWMLClientError("Unexpected patch job element.")ifpatch_joband(len(changes)>1):msg=(f"When{patch_job_field} is being updated/patched, other fields cannot be updated. If other fields are to be "f"updated, try without{patch_job_field} update.{patch_job_field} update triggers deployment with the new asset retaining ""the same deployment_id")print(msg)raiseWMLClientError(msg)deployment_details=self.get_details(deployment_id,_silent=True)serving_name_change=Falsenew_serving_name=Noneifself.ConfigurationMetaNames.SERVING_NAMEinchanges:new_serving_name=changes.pop(self.ConfigurationMetaNames.SERVING_NAME)serving_name_change=Truepatch_payload=self.ConfigurationMetaNames._generate_patch_payload(deployment_details,changes,with_validation=True)ifserving_name_change:replace="serving_name"indeployment_details["entity"].get("online").get("parameters",[])patch_payload.append({"op":"replace"ifreplaceelse"add","path":"/online/parameters","value":{"serving_name":new_serving_name},})url=self._client._href_definitions.get_deployment_href(deployment_id)response=self._client.httpx_client.patch(url,json=patch_payload,params=self._client._params(),headers=self._client._get_headers(),)ifpatch_jobandresponse.status_code==202:deployment_details=self._handle_response(202,"deployment asset patch",response)print(f"Since{patch_job_field} is patched, deployment need to be restarted. ")ifbackground_mode:print("Monitor the status using deployments.get_details(deployment_id) api")elifresponse.status_code==202:deployment_details=self._handle_response(202,"deployment scaling",response)else:deployment_details=self._handle_response(200,"deployment patch",response)ifbackground_mode:returndeployment_detailsifresponse.status_codein(200,202):deployment_details=self.get_details(deployment_id,_silent=True)importtimeprint_text_header_h1("Deployment update for id: '{}' started".format(deployment_id))status=deployment_details["entity"]["status"]["state"]withStatusLogger(status)asstatus_logger:whileTrue:time.sleep(5)deployment_details=self.get_details(deployment_id,_silent=True)status=deployment_details["entity"]["status"]["state"]status_logger.log_state(status)ifstatus!="initializing"andstatus!="updating":breakif(status=="ready"and"failure"notindeployment_details["entity"]["status"]):# from apidocs: If any failures, deployment will be reverted back to the previous id/rev and the failure message will be captured in 'failure' field in the response.print("")print_text_header_h2("Successfully finished deployment update, deployment_id='{}'".format(deployment_id))returndeployment_detailselse:print_text_header_h2("Deployment update failed")ifdeployment_idisnotNone:self._deployment_status_errors_handling(deployment_details,"update",deployment_id)else:error_msg="Deployment update failed"reason=response.textprint(reason)print_text_header_h2(error_msg)raiseWMLClientError(error_msg+". Error: "+str(response.status_code)+". "+reason)returndeployment_details ## Below functions are for async scoring. They are just dummy functions.def_score_async(self,deployment_id:str,scoring_payload:dict,transaction_id:str|None=None,retention:int|None=None,)->str|dict:Deployments._validate_type(deployment_id,"deployment_id",str,True)Deployments._validate_type(scoring_payload,"scoring_payload",dict,True)headers=self._client._get_headers()iftransaction_idisnotNone:headers.update({"x-global-transaction-id":transaction_id})# making change - connection keep alivescoring_url=self._client._href_definitions.get_async_deployment_job_href()params=self._client._params()ifnotself._client.ICP_PLATFORM_SPACESandretentionisnotNone:ifnotisinstance(retention,int)orretention<-1:raiseTypeError("`retention` takes integer values greater or equal than -1.")params.update({"retention":retention})response_scoring=self._client.httpx_client.post(scoring_url,params=params,json=scoring_payload,headers=headers)returnself._handle_response(202,"scoring asynchronously",response_scoring)[docs]defcreate_job(self,deployment_id:str,meta_props:dict,retention:int|None=None,transaction_id:str|None=None,_asset_id:str|None=None,)->str|dict:"""Create an asynchronous deployment job. :param deployment_id: unique ID of the deployment :type deployment_id: str :param meta_props: metaprops. To see the available list of metanames, use ``client.deployments.ScoringMetaNames.get()`` or ``client.deployments.DecisionOptimizationmetaNames.get()`` :type meta_props: dict :param retention: how many job days job meta should be retained, takes integer values >= -1, supported only on Cloud :type retention: int, optional :param transaction_id: transaction ID to be passed with the payload :type transaction_id: str, optional :return: metadata of the created async deployment job :rtype: dict or str .. note:: * The valid payloads for scoring input are either list of values, pandas or numpy dataframes. **Example:** .. code-block:: python scoring_payload = {client.deployments.ScoringMetaNames.INPUT_DATA: [{'fields': ['GENDER','AGE','MARITAL_STATUS','PROFESSION'], 'values': [['M',23,'Single','Student'], ['M',55,'Single','Executive']]}]} async_job = client.deployments.create_job(deployment_id, scoring_payload) """Deployments._validate_type(deployment_id,"deployment_id",str,True)Deployments._validate_type(meta_props,"meta_props",dict,True)if_asset_id:Deployments._validate_type(_asset_id,"_asset_id",str,True)# We assume that _asset_id is the id of the asset that was deployed# in the deployment with id deployment_id, and we save one REST callasset=_asset_idelse:deployment_details=self.get_details(deployment_id)asset=deployment_details["entity"]["asset"]["id"]do_model=Falseasset_details=self._client.data_assets.get_details(asset)if("wml_model"inasset_details["entity"]and"type"inasset_details["entity"]["wml_model"]):if"do"inasset_details["entity"]["wml_model"]["type"]:do_model=Trueflag=0## To see if it is async scoring or DecisionOptimization Jobifdo_model:payload=self.DecisionOptimizationMetaNames._generate_resource_metadata(meta_props,with_validation=True,client=self._client)flag=1else:payload=self.ScoringMetaNames._generate_resource_metadata(meta_props,with_validation=True,client=self._client)scoring_data=Noneif"scoring"inpayloadand"input_data"inpayload["scoring"]:scoring_data=payload["scoring"]["input_data"]if("decision_optimization"inpayloadand"input_data"inpayload["decision_optimization"]):scoring_data=payload["decision_optimization"]["input_data"]ifscoring_dataisnotNone:score_payload=[]foreach_score_requestinscoring_data:lib_checker.check_lib(lib_name="pandas")importpandasaspdif"values"ineach_score_request:scoring_values=each_score_request["values"]# Check feature types, currently supporting pandas df, numpy.ndarray, python lists and Dmatrixifisinstance(scoring_values,pd.DataFrame):fields_names=scoring_values.columns.values.tolist()values=scoring_values.where(pd.notnull(scoring_values),None).values.tolist()# replace nan with Noneeach_score_request["values"]=valuesiffields_namesisnotNone:each_score_request["fields"]=fields_names## If payload is a numpy dataframeelifisinstance(scoring_values,np.ndarray):# replace nan with Nonevalues=np.where(pd.notnull(scoring_values),scoring_values,None).tolist()# type: ignore[call-overload]each_score_request["values"]=valuesscore_payload.append(each_score_request)##See if it is scoring or DecisionOptimizationJobifflag==0:payload["scoring"]["input_data"]=score_payloadifflag==1:payload["decision_optimization"]["input_data"]=score_payloadimportcopyif"input_data_references"inmeta_props:Deployments._validate_type(meta_props.get("input_data_references"),"input_data_references",list,True,)modified_input_data_references=Falseinput_data=copy.deepcopy(meta_props.get("input_data_references"))input_data=cast(Iterable[Any],input_data)fori,input_data_fieldsinenumerate(input_data):if"connection"notininput_data_fields:modified_input_data_references=Trueinput_data_fields.update({"connection":{}})ifmodified_input_data_references:if"scoring"inpayload:payload["scoring"].update({"input_data_references":input_data})else:payload["decision_optimization"].update({"input_data_references":input_data})if"output_data_reference"inmeta_props:Deployments._validate_type(meta_props.get("output_data_reference"),"output_data_reference",dict,True,)output_data=copy.deepcopy(meta_props.get("output_data_reference"))output_data=cast(dict,output_data)if("connection"notinoutput_data):# and output_data.get('connection', None) is not None:output_data.update({"connection":{}})payload["scoring"].update({"output_data_reference":output_data})if"output_data_references"inmeta_props:Deployments._validate_type(meta_props.get("output_data_references"),"output_data_references",list,True,)output_data=copy.deepcopy(meta_props.get("output_data_references"))modified_output_data_references=Falseoutput_data=cast(Iterable[Any],output_data)fori,output_data_fieldsinenumerate(output_data):if"connection"notinoutput_data_fields:modified_output_data_references=Trueoutput_data_fields.update({"connection":{}})ifmodified_output_data_referencesand"decision_optimization"inpayload:payload["decision_optimization"].update({"output_data_references":output_data})payload.update({"deployment":{"id":deployment_id}})if"hardware_spec"inmeta_props:payload.update({"hardware_spec":meta_props[self.ConfigurationMetaNames.HARDWARE_SPEC]})if"hybrid_pipeline_hardware_specs"inmeta_props:payload.update({"hybrid_pipeline_hardware_specs":meta_props[self.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS]})payload.update({"space_id":self._client.default_space_id})if"name"notinpayload:importuuidpayload.update({"name":"name_"+str(uuid.uuid4())})returnself._score_async(deployment_id,payload,transaction_id=transaction_id,retention=retention) [docs]defget_job_details(self,job_id:str|None=None,include:str|None=None,limit:int|None=None,**kwargs:Any,)->dict:"""Get information about deployment job(s). If deployment job_id is not passed, all deployment jobs details are returned. :param job_id: unique ID of the job :type job_id: str, optional :param include: fields to be retrieved from 'decision_optimization' and 'scoring' section mentioned as value(s) (comma separated) as output response fields :type include: str, optional :param limit: limit number of fetched records :type limit: int, optional :return: metadata of deployment job(s) :rtype: dict (if job_id is not None) or {"resources": [dict]} (if job_id is None) **Example:** .. code-block:: python deployment_details = client.deployments.get_job_details() deployments_details = client.deployments.get_job_details(job_id=job_id) """job_id=_get_id_from_deprecated_uid(kwargs=kwargs,resource_id=job_id,resource_name="job",can_be_none=True)ifjob_idisnotNone:Deployments._validate_type(job_id,"job_id",str,True)url=self._client._href_definitions.get_async_deployment_job_href()params=self._client._params()ifinclude:params["include"]=includereturnself._get_artifact_details(base_url=url,id=job_id,limit=limit,resource_name="async deployment job"ifjob_idelse"async deployment jobs",query_params=params,) [docs]defget_job_status(self,job_id:str)->dict:"""Get the status of a deployment job. :param job_id: unique ID of the deployment job :type job_id: str :return: status of the deployment job :rtype: dict **Example:** .. code-block:: python job_status = client.deployments.get_job_status(job_id) """job_details=self.get_job_details(job_id)if"scoring"notinjob_details["entity"]:returnjob_details["entity"]["decision_optimization"]["status"]returnjob_details["entity"]["scoring"]["status"] [docs]defget_job_id(self,job_details:dict)->str:"""Get the unique ID of a deployment job. :param job_details: metadata of the deployment job :type job_details: dict :return: unique ID of the deployment job :rtype: str **Example:** .. code-block:: python job_details = client.deployments.get_job_details(job_id=job_id) job_status = client.deployments.get_job_id(job_details) """returnjob_details["metadata"]["id"] [docs]defget_job_uid(self,job_details:dict)->str:"""Get the unique ID of a deployment job. *Deprecated:* Use ``get_job_id(job_details)`` instead. :param job_details: metadata of the deployment job :type job_details: dict :return: unique ID of the deployment job :rtype: str **Example:** .. code-block:: python job_details = client.deployments.get_job_details(job_uid=job_uid) job_status = client.deployments.get_job_uid(job_details) """get_job_uid_deprecated_warning=("`get_job_uid()` is deprecated and will be removed in future. ""Instead, please use `get_job_id()`.")warn(get_job_uid_deprecated_warning,category=DeprecationWarning)returnself.get_job_id(job_details) [docs]defget_job_href(self,job_details:dict)->str:"""Get the href of a deployment job. :param job_details: metadata of the deployment job :type job_details: dict :return: href of the deployment job :rtype: str **Example:** .. code-block:: python job_details = client.deployments.get_job_details(job_id=job_id) job_status = client.deployments.get_job_href(job_details) """return"/ml/v4/deployment_jobs/{}".format(job_details["metadata"]["id"]) [docs]defdelete_job(self,job_id:str|None=None,hard_delete:bool=False,**kwargs:Any)->str:"""Delete a deployment job that is running. This method can also delete metadata details of completed or canceled jobs when hard_delete parameter is set to True. :param job_id: unique ID of the deployment job to be deleted :type job_id: str :param hard_delete: specify `True` or `False`: `True` - To delete the completed or canceled job. `False` - To cancel the currently running deployment job. :type hard_delete: bool, optional :return: status ("SUCCESS" or "FAILED") :rtype: str **Example:** .. code-block:: python client.deployments.delete_job(job_id) """job_id=_get_id_from_deprecated_uid(kwargs=kwargs,resource_id=job_id,resource_name="job")Deployments._validate_type(job_id,"job_id",str,True)ifjob_idisnotNoneandnotis_id(job_id):raiseWMLClientError("'job_id' is not an id: '{}'".format(job_id))params=self._client._params()ifnotself._client.CLOUD_PLATFORM_SPACESandself._client.CPD_version<=5.1:# for CPD 5.1 and lower there is need to use the jobs api directly.# From CPD 5.2.x + and Cloud deployment service will cover the call in DELETE /ml/v4/deployment_jobs# issue: #48242try:job_details=self.get_job_details(job_id=job_id)run_id=job_details["entity"]["platform_job"]["run_id"]jobs_runs_url=self._client._href_definitions.get_jobs_runs_href(job_id=job_id,run_id=run_id)response_delete=self._client.httpx_client.delete(jobs_runs_url,headers=self._client._get_headers(),params=params)returnself._handle_response(204,"deployment async job deletion",response_delete,False)except:passurl=self._client._href_definitions.get_async_deployment_jobs_href(job_id)ifhard_deleteisTrue:params.update({"hard_delete":"true"})response_delete=self._client.httpx_client.delete(url,headers=self._client._get_headers(),params=params)returnself._handle_response(204,"deployment async job deletion",response_delete,False) def_get_filter_func_by_spec_state(self,spec_state:SpecStates)->Callable:deffilter_func(resources:list)->list[str]:asset_ids=[i["metadata"]["id"]forkey,valuein{"model":self._client._models.get_details(get_all=True,spec_state=spec_state),"function":self._client._functions.get_details(get_all=True,spec_state=spec_state),}.items()foriinvalue["resources"]]return[rforrinresourcesifr["entity"].get("asset",{}).get("id")inasset_ids]returnfilter_funcdef_get_model_inference(self,deployment_id:str,inference_type:Literal["text","text_stream","chat","chat_stream"],params:dict|None=None,)->"ModelInference":"""Based on provided deployment_id and params get ModelInference object. Verify that the deployment with the given deployment_id has generating methods. """# Import ModelInference here to avoid circular import errorfromibm_watsonx_ai.foundation_models.inferenceimportModelInferencematchinference_type:case"text":generated_url=(self._client._href_definitions.get_fm_deployment_generation_href(deployment_id=deployment_id,item="text"))case"text_stream":ifself._client._use_fm_ga_api:generated_url=self._client._href_definitions.get_fm_deployment_generation_stream_href(deployment_id=deployment_id)else:# Remove on CPD 5.0 releasegenerated_url=self._client._href_definitions.get_fm_deployment_generation_href(deployment_id=deployment_id,item="text_stream")case"chat":generated_url=(self._client._href_definitions.get_fm_deployment_chat_href(deployment_id=deployment_id))case"chat_stream":generated_url=(self._client._href_definitions.get_fm_deployment_chat_stream_href(deployment_id=deployment_id))case_:raiseInvalidValue(value_name="inference_type",reason=f"Available types: 'text', 'text_stream', 'chat', 'chat_stream', got:{inference_type}.",)inference_url_list=[url.get("url")forurlinself.get_details(deployment_id,_silent=True)["entity"].get("status",{}).get("inference",{})]ifnotinference_url_list:inference_url_list=(self.get_details(deployment_id,_silent=True)["entity"].get("status",{}).get("serving_urls",[]))if(inference_typein["text","text_stream"]andgenerated_urlnotininference_url_listandall("/text/generation"notininference_urlforinference_urlininference_url_list)):raiseWMLClientError(Messages.get_message(deployment_id,message_id="fm_deployment_has_not_inference_for_generation",))returnModelInference(deployment_id=deployment_id,params=params,api_client=self._client)[docs]defgenerate(self,deployment_id:str,prompt:str|None=None,params:dict|None=None,guardrails:bool=False,guardrails_hap_params:dict|None=None,guardrails_pii_params:dict|None=None,concurrency_limit:int=DEFAULT_CONCURRENCY_LIMIT,async_mode:bool=False,validate_prompt_variables:bool=True,guardrails_granite_guardian_params:dict|None=None,)->dict:"""Generate a raw response with `prompt` for given `deployment_id`. :param deployment_id: unique ID of the deployment :type deployment_id: str :param prompt: prompt needed for text generation. If deployment_id points to the Prompt Template asset, then the prompt argument must be None, defaults to None :type prompt: str, optional :param params: meta props for text generation, use ``ibm_watsonx_ai.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict, optional :param guardrails: If True, then potentially hateful, abusive, and/or profane language (HAP) was detected filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool, optional :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watsonx_ai.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict, optional :param concurrency_limit: number of requests to be sent in parallel, maximum is 10 :type concurrency_limit: int, optional :param async_mode: If True, then yield results asynchronously (using generator). In this case both the prompt and the generated text will be concatenated in the final response - under `generated_text`, defaults to False :type async_mode: bool, optional :param validate_prompt_variables: If True, prompt variables provided in `params` are validated with the ones in Prompt Template Asset. This parameter is only applicable in a Prompt Template Asset deployment scenario and should not be changed for different cases, defaults to True :type validate_prompt_variables: bool :param guardrails_granite_guardian_params: parameters for Granite Guardian moderations :type guardrails_granite_guardian_params: dict, optional :return: scoring result containing generated content :rtype: dict """d_inference=self._get_model_inference(deployment_id,"text",params)returnd_inference.generate(prompt=prompt,guardrails=guardrails,guardrails_hap_params=guardrails_hap_params,guardrails_pii_params=guardrails_pii_params,concurrency_limit=concurrency_limit,params=params,async_mode=async_mode,validate_prompt_variables=validate_prompt_variables,guardrails_granite_guardian_params=guardrails_granite_guardian_params,) [docs]defgenerate_text(self,deployment_id:str,prompt:str|None=None,params:dict|None=None,raw_response:bool=False,guardrails:bool=False,guardrails_hap_params:dict|None=None,guardrails_pii_params:dict|None=None,concurrency_limit:int=DEFAULT_CONCURRENCY_LIMIT,validate_prompt_variables:bool=True,guardrails_granite_guardian_params:dict|None=None,)->str:"""Given the selected deployment (deployment_id), a text prompt as input, and the parameters and concurrency_limit, the selected inference will generate a completion text as generated_text response. :param deployment_id: unique ID of the deployment :type deployment_id: str :param prompt: the prompt string or list of strings. If the list of strings is passed, requests will be managed in parallel with the rate of concurency_limit, defaults to None :type prompt: str, optional :param params: meta props for text generation, use ``ibm_watsonx_ai.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict, optional :param raw_response: returns the whole response object :type raw_response: bool, optional :param guardrails: If True, then potentially hateful, abusive, and/or profane language (HAP) was detected filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool, optional :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watsonx_ai.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict, optional :param concurrency_limit: number of requests to be sent in parallel, maximum is 10 :type concurrency_limit: int, optional :param validate_prompt_variables: If True, prompt variables provided in `params` are validated with the ones in Prompt Template Asset. This parameter is only applicable in a Prompt Template Asset deployment scenario and should not be changed for different cases, defaults to True :type validate_prompt_variables: bool :param guardrails_granite_guardian_params: parameters for Granite Guardian moderations :type guardrails_granite_guardian_params: dict, optional :return: generated content :rtype: str .. note:: By default only the first occurance of `HAPDetectionWarning` is displayed. To enable printing all warnings of this category, use: .. code-block:: python import warnings from ibm_watsonx_ai.foundation_models.utils import HAPDetectionWarning warnings.filterwarnings("always", category=HAPDetectionWarning) """d_inference=self._get_model_inference(deployment_id,"text",params)returnd_inference.generate_text(prompt=prompt,raw_response=raw_response,guardrails=guardrails,guardrails_hap_params=guardrails_hap_params,guardrails_pii_params=guardrails_pii_params,concurrency_limit=concurrency_limit,params=params,validate_prompt_variables=validate_prompt_variables,guardrails_granite_guardian_params=guardrails_granite_guardian_params,) [docs]defgenerate_text_stream(self,deployment_id:str,prompt:str|None=None,params:dict|None=None,raw_response:bool=False,guardrails:bool=False,guardrails_hap_params:dict|None=None,guardrails_pii_params:dict|None=None,validate_prompt_variables:bool=True,guardrails_granite_guardian_params:dict|None=None,)->str:"""Given the selected deployment (deployment_id), a text prompt as input and parameters, the selected inference will generate a streamed text as generate_text_stream. :param deployment_id: unique ID of the deployment :type deployment_id: str :param prompt: the prompt string, defaults to None :type prompt: str, optional :param params: meta props for text generation, use ``ibm_watsonx_ai.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict, optional :param raw_response: yields the whole response object :type raw_response: bool, optional :param guardrails: If True, then potentially hateful, abusive, and/or profane language (HAP) was detected filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool, optional :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watsonx_ai.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict, optional :param validate_prompt_variables: If True, prompt variables provided in `params` are validated with the ones in Prompt Template Asset. This parameter is only applicable in a Prompt Template Asset deployment scenario and should not be changed for different cases, defaults to True :type validate_prompt_variables: bool :param guardrails_granite_guardian_params: parameters for Granite Guardian moderations :type guardrails_granite_guardian_params: dict, optional :return: generated content :rtype: str .. note:: By default only the first occurance of `HAPDetectionWarning` is displayed. To enable printing all warnings of this category, use: .. code-block:: python import warnings from ibm_watsonx_ai.foundation_models.utils import HAPDetectionWarning warnings.filterwarnings("always", category=HAPDetectionWarning) """d_inference=self._get_model_inference(deployment_id,"text_stream",params)returnd_inference.generate_text_stream(prompt=prompt,params=params,raw_response=raw_response,guardrails=guardrails,guardrails_hap_params=guardrails_hap_params,guardrails_pii_params=guardrails_pii_params,validate_prompt_variables=validate_prompt_variables,guardrails_granite_guardian_params=guardrails_granite_guardian_params,) defchat(self,deployment_id:str,messages:ListType[dict],context:str|None=None,tools:list|None=None,tool_choice:dict|None=None,tool_choice_option:Literal["none","auto"]|None=None,)->dict:d_inference=self._get_model_inference(deployment_id,"chat")returnd_inference.chat(messages=messages,context=context,tools=tools,tool_choice=tool_choice,tool_choice_option=tool_choice_option,)defchat_stream(self,deployment_id:str,messages:ListType[dict],context:str|None=None,tools:list|None=None,tool_choice:dict|None=None,tool_choice_option:Literal["none","auto"]|None=None,)->Generator:d_inference=self._get_model_inference(deployment_id,"chat_stream")returnd_inference.chat_stream(messages=messages,context=context,tools=tools,tool_choice=tool_choice,tool_choice_option=tool_choice_option,)asyncdefachat(self,deployment_id:str,messages:ListType[dict],context:str|None=None,tools:list|None=None,tool_choice:dict|None=None,tool_choice_option:Literal["none","auto"]|None=None,)->dict:d_inference=self._get_model_inference(deployment_id,"chat")returnawaitd_inference.achat(messages=messages,context=context,tools=tools,tool_choice=tool_choice,tool_choice_option=tool_choice_option,)asyncdefachat_stream(self,deployment_id:str,messages:ListType[dict],context:str|None=None,tools:list|None=None,tool_choice:dict|None=None,tool_choice_option:Literal["none","auto"]|None=None,)->AsyncGenerator:d_inference=self._get_model_inference(deployment_id,"chat_stream")returnawaitd_inference.achat_stream(messages=messages,context=context,tools=tools,tool_choice=tool_choice,tool_choice_option=tool_choice_option,)[docs]defrun_ai_service(self,deployment_id:str,ai_service_payload:dict,path_suffix:str|None=None,)->Any:"""Execute an AI service by providing a scoring payload. :param deployment_id: unique ID of the deployment :type deployment_id: str :param ai_service_payload: AI service payload to be passed to generate the method :type ai_service_payload: dict :param path_suffix: path suffix to be appended to the scoring url, defaults to None :type path_suffix: str, optional :return: response of the AI service :rtype: Any .. note:: * By executing this class method, a POST request is performed. * In case of `method not allowed` error, try sending requests directly to your deployed ai service. """Deployments._validate_type(deployment_id,"deployment_id",str,True)Deployments._validate_type(ai_service_payload,"ai_service_payload",dict,True)scoring_url=(self._client._href_definitions.get_deployment_href(deployment_id)+"/ai_service")ifpath_suffixisnotNone:scoring_url+="/"+path_suffixresponse_scoring=self._client.httpx_client.post(url=scoring_url,json=ai_service_payload,params=self._client._params(skip_for_create=True,skip_userfs=True),# version parameter is mandatoryheaders=self._client._get_headers(),)error_msg="POST is not supported using this method. Send requests directly to the deployed ai_service."reason=response_scoring.textifresponse_scoring.status_code==405:raiseWMLClientError(error_msg+" Error: "+str(response_scoring.status_code)+". "+reason)returnself._handle_response(200,"AI Service run",response_scoring) [docs]defrun_ai_service_stream(self,deployment_id:str,ai_service_payload:dict,)->Generator:"""Execute an AI service by providing a scoring payload. :param deployment_id: unique ID of the deployment :type deployment_id: str :param ai_service_payload: AI service payload to be passed to generate the method :type ai_service_payload: dict :return: stream of the response of the AI service :rtype: Generator """Deployments._validate_type(deployment_id,"deployment_id",str,True)Deployments._validate_type(ai_service_payload,"ai_service_payload",dict,True)scoring_url=(self._client._href_definitions.get_deployment_href(deployment_id)+"/ai_service_stream")withself._client.httpx_client.stream(url=scoring_url,json=ai_service_payload,headers=self._client._get_headers(),params=self._client._params(skip_for_create=True,skip_userfs=True),method="POST",)asresp:ifresp.status_code==200:forchunkinresp.iter_lines():field_name,_,response=chunk.partition(":")iffield_name=="data":yieldresponseelse:resp.read()raiseApiRequestFailure(f"Failure during AI Service run steam",resp) ### Definition of Runtime Context[docs]classRuntimeContext:""" Class included to keep the interface compatible with the Deployment's RuntimeContext used in AIServices implementation. :param api_client: initialized APIClient object with a set project ID or space ID. If passed, ``credentials`` and ``project_id``/``space_id`` are not required. :type api_client: APIClient :param request_payload_json: Request payload for testing of generate/ generate_stream call of AI Service. :type request_payload_json: dict, optional :param method: HTTP request method for testing of generate/ generate_stream call of AI Service. :type method: str, optional :param path: Request endpoint path for testing of generate/ generate_stream call of AI Service. :type path: str, optional `` RuntimeContext`` initialized for testing purposes before deployment: .. code-block:: python context = RuntimeContext(api_client=client, request_payload_json={"field": "value"}) Examples of ``RuntimeContext`` usage within AI Service source code: .. code-block:: python def deployable_ai_service(context, **custom): task_token = context.generate_token() def generate(context) -> dict: user_token = context.get_token() headers = context.get_headers() json_body = context.get_json() ... return {"body": json_body} return generate generate = deployable_ai_service(context) generate_output = generate(context) # returns {"body": {"field": "value"}} Change the JSON body in ``RuntimeContext``: .. code-block:: python context.request_payload_json = {"field2": "value2"} generate = deployable_ai_service(context) generate_output = generate(context) # returns {"body": {"field2": "value2"}} """def__init__(self,api_client:APIClient,request_payload_json:dict|None=None,method:str|None=None,path:str|None=None,):self._api_client=api_clientself.request_payload_json=request_payload_jsonself.method=methodself.path=path@propertydefrequest_payload_json(self)->dict|None:returnself._request_payload_json@request_payload_json.setterdefrequest_payload_json(self,value:dict)->None:try:json_value=json.loads(json.dumps(value))exceptTypeErrorase:raiseInvalidValue("request_payload_json",reason=str(e))self._request_payload_json=json_value[docs]defget_token(self)->str:"""Return user token."""returnself.generate_token() [docs]defgenerate_token(self)->str:"""Return refreshed token."""returnself._api_client._get_icptoken() [docs]defget_headers(self)->dict:"""Return headers with refreshed token."""returnself._api_client._get_headers() [docs]defget_json(self)->dict|None:"""Get payload JSON send in body of API request to the generate or generate_stream method in deployed AIService. For testing purposes the payload JSON need to be set in RuntimeContext initialization or later as request_payload_json property. """returnself.request_payload_json [docs]defget_space_id(self)->str:"""Return default space id."""returnself._api_client.default_space_id [docs]defget_method(self)->str:"""Return the HTTP request method: 'GET', 'POST', etc."""returnself.methodor"" [docs]defget_path_suffix(self)->str:"""Return the suffix of ai_service endpoint including the query parameters."""try:suffix=self.path.split("ai_service",1)[1]exceptIndexErrorase:raiseValueError("Couldn't find the path suffix since endpoint URL is incorrect.")fromeifsuffix:suffix=suffix.removeprefix("/")returnsuffix [docs]defget_query_parameters(self)->dict:"""Return the query parameters from the ai_service endpoint as a dict."""parsed_url=urlparse(self.path)query=parsed_url.queryparams=parse_qs(query)ifparams:flat_params={k:v[0]fork,vinparams.items()}returnflat_paramselse:return{} [docs]defget_bytes(self)->bytes:"""Return the request data as bytes."""payload_json=self.get_json()payload_str=json.dumps(payload_json)bytes_data=payload_str.encode("utf-8")returnbytes_data