|
| 1 | +# Import necessary libraries |
| 2 | +importasyncio# For handling asynchronous operations |
| 3 | +importos# For environment variable access |
| 4 | +importsys# For system-specific parameters and functions |
| 5 | +importjson# For handling JSON data (used when printing function declarations) |
| 6 | + |
| 7 | +# Import MCP client components |
| 8 | +fromtypingimportOptional# For type hinting optional values |
| 9 | +fromcontextlibimportAsyncExitStack# For managing multiple async tasks |
| 10 | +frommcpimportClientSession,StdioServerParameters# MCP session management |
| 11 | +frommcp.client.stdioimportstdio_client# MCP client for standard I/O communication |
| 12 | + |
| 13 | +# Import Google's Gen AI SDK |
| 14 | +fromgoogleimportgenai |
| 15 | +fromgoogle.genaiimporttypes |
| 16 | +fromgoogle.genai.typesimportTool,FunctionDeclaration |
| 17 | +fromgoogle.genai.typesimportGenerateContentConfig |
| 18 | + |
| 19 | +fromdotenvimportload_dotenv# For loading API keys from a .env file |
| 20 | + |
| 21 | +# Load environment variables from .env file |
| 22 | +load_dotenv() |
| 23 | + |
| 24 | +classMCPClient: |
| 25 | +def__init__(self): |
| 26 | +"""Initialize the MCP client and configure the Gemini API.""" |
| 27 | +self.session:Optional[ClientSession]=None# MCP session for communication |
| 28 | +self.exit_stack=AsyncExitStack()# Manages async resource cleanup |
| 29 | + |
| 30 | +# Retrieve the Gemini API key from environment variables |
| 31 | +gemini_api_key=os.getenv("GEMINI_API_KEY") |
| 32 | +ifnotgemini_api_key: |
| 33 | +raiseValueError("GEMINI_API_KEY not found. Please add it to your .env file.") |
| 34 | + |
| 35 | +# Configure the Gemini AI client |
| 36 | +self.genai_client=genai.Client(api_key=gemini_api_key) |
| 37 | + |
| 38 | +asyncdefconnect_to_server(self,server_script_path:str): |
| 39 | +"""Connect to the MCP server and list available tools.""" |
| 40 | + |
| 41 | +# Determine whether the server script is written in Python or JavaScript |
| 42 | +# This allows us to execute the correct command to start the MCP server |
| 43 | +command="python"ifserver_script_path.endswith('.py')else"node" |
| 44 | + |
| 45 | +# Define the parameters for connecting to the MCP server |
| 46 | +server_params=StdioServerParameters(command=command,args=[server_script_path]) |
| 47 | + |
| 48 | +# Establish communication with the MCP server using standard input/output (stdio) |
| 49 | +stdio_transport=awaitself.exit_stack.enter_async_context(stdio_client(server_params)) |
| 50 | + |
| 51 | +# Extract the read/write streams from the transport object |
| 52 | +self.stdio,self.write=stdio_transport |
| 53 | + |
| 54 | +# Initialize the MCP client session, which allows interaction with the server |
| 55 | +self.session=awaitself.exit_stack.enter_async_context(ClientSession(self.stdio,self.write)) |
| 56 | + |
| 57 | +# Send an initialization request to the MCP server |
| 58 | +awaitself.session.initialize() |
| 59 | + |
| 60 | +# Request the list of available tools from the MCP server |
| 61 | +response=awaitself.session.list_tools() |
| 62 | +tools=response.tools# Extract the tool list from the response |
| 63 | + |
| 64 | +# Print a message showing the names of the tools available on the server |
| 65 | +print("\nConnected to server with tools:", [tool.namefortoolintools]) |
| 66 | + |
| 67 | +# Convert MCP tools to Gemini format |
| 68 | +self.function_declarations=convert_mcp_tools_to_gemini(tools) |
| 69 | + |
| 70 | + |
| 71 | +asyncdefprocess_query(self,query:str)->str: |
| 72 | +""" |
| 73 | + Process a user query using the Gemini API and execute tool calls if needed. |
| 74 | +
|
| 75 | + Args: |
| 76 | + query (str): The user's input query. |
| 77 | +
|
| 78 | + Returns: |
| 79 | + str: The response generated by the Gemini model. |
| 80 | + """ |
| 81 | + |
| 82 | +# Format user input as a structured Content object for Gemini |
| 83 | +user_prompt_content=types.Content( |
| 84 | +role='user',# Indicates that this is a user message |
| 85 | +parts=[types.Part.from_text(text=query)]# Convert the text query into a Gemini-compatible format |
| 86 | + ) |
| 87 | + |
| 88 | +# Send user input to Gemini AI and include available tools for function calling |
| 89 | +response=self.genai_client.models.generate_content( |
| 90 | +model='gemini-2.0-flash-001',# Specifies which Gemini model to use |
| 91 | +contents=[user_prompt_content],# Send user input to Gemini |
| 92 | +config=types.GenerateContentConfig( |
| 93 | +tools=self.function_declarations,# Pass the list of available MCP tools for Gemini to use |
| 94 | + ), |
| 95 | + ) |
| 96 | + |
| 97 | +# Initialize variables to store final response text and assistant messages |
| 98 | +final_text= []# Stores the final formatted response |
| 99 | +assistant_message_content= []# Stores assistant responses |
| 100 | + |
| 101 | +# Process the response received from Gemini |
| 102 | +forcandidateinresponse.candidates: |
| 103 | +ifcandidate.content.parts:# Ensure response has content |
| 104 | +forpartincandidate.content.parts: |
| 105 | +ifisinstance(part,types.Part):# Check if part is a valid Gemini response unit |
| 106 | +ifpart.function_call:# If Gemini suggests a function call, process it |
| 107 | +# Extract function call details |
| 108 | +function_call_part=part# Store the function call response |
| 109 | +tool_name=function_call_part.function_call.name# Name of the MCP tool Gemini wants to call |
| 110 | +tool_args=function_call_part.function_call.args# Arguments required for the tool execution |
| 111 | + |
| 112 | +# Print debug info: Which tool is being called and with what arguments |
| 113 | +print(f"\n[Gemini requested tool call:{tool_name} with args{tool_args}]") |
| 114 | + |
| 115 | +# Execute the tool using the MCP server |
| 116 | +try: |
| 117 | +result=awaitself.session.call_tool(tool_name,tool_args)# Call MCP tool with arguments |
| 118 | +function_response= {"result":result.content}# Store the tool's output |
| 119 | +exceptExceptionase: |
| 120 | +function_response= {"error":str(e)}# Handle errors if tool execution fails |
| 121 | + |
| 122 | +# Format the tool response for Gemini in a way it understands |
| 123 | +function_response_part=types.Part.from_function_response( |
| 124 | +name=tool_name,# Name of the function/tool executed |
| 125 | +response=function_response# The result of the function execution |
| 126 | + ) |
| 127 | + |
| 128 | +# Structure the tool response as a Content object for Gemini |
| 129 | +function_response_content=types.Content( |
| 130 | +role='tool',# Specifies that this response comes from a tool |
| 131 | +parts=[function_response_part]# Attach the formatted response part |
| 132 | + ) |
| 133 | + |
| 134 | +# Send tool execution results back to Gemini for processing |
| 135 | +response=self.genai_client.models.generate_content( |
| 136 | +model='gemini-2.0-flash-001',# Use the same model |
| 137 | +contents=[ |
| 138 | +user_prompt_content,# Include original user query |
| 139 | +function_call_part,# Include Gemini's function call request |
| 140 | +function_response_content,# Include tool execution result |
| 141 | + ], |
| 142 | +config=types.GenerateContentConfig( |
| 143 | +tools=self.function_declarations,# Provide the available tools for continued use |
| 144 | + ), |
| 145 | + ) |
| 146 | + |
| 147 | +# Extract final response text from Gemini after processing the tool call |
| 148 | +final_text.append(response.candidates[0].content.parts[0].text) |
| 149 | +else: |
| 150 | +# If no function call was requested, simply add Gemini's text response |
| 151 | +final_text.append(part.text) |
| 152 | + |
| 153 | +# Return the combined response as a single formatted string |
| 154 | +return"\n".join(final_text) |
| 155 | + |
| 156 | + |
| 157 | +asyncdefchat_loop(self): |
| 158 | +"""Run an interactive chat session with the user.""" |
| 159 | +print("\nMCP Client Started! Type 'quit' to exit.") |
| 160 | + |
| 161 | +whileTrue: |
| 162 | +query=input("\nQuery: ").strip() |
| 163 | +ifquery.lower()=='quit': |
| 164 | +break |
| 165 | + |
| 166 | +# Process the user's query and display the response |
| 167 | +response=awaitself.process_query(query) |
| 168 | +print("\n"+response) |
| 169 | + |
| 170 | +asyncdefcleanup(self): |
| 171 | +"""Clean up resources before exiting.""" |
| 172 | +awaitself.exit_stack.aclose() |
| 173 | + |
| 174 | +defclean_schema(schema): |
| 175 | +""" |
| 176 | + Recursively removes 'title' fields from the JSON schema. |
| 177 | +
|
| 178 | + Args: |
| 179 | + schema (dict): The schema dictionary. |
| 180 | +
|
| 181 | + Returns: |
| 182 | + dict: Cleaned schema without 'title' fields. |
| 183 | + """ |
| 184 | +ifisinstance(schema,dict): |
| 185 | +schema.pop("title",None)# Remove title if present |
| 186 | + |
| 187 | +# Recursively clean nested properties |
| 188 | +if"properties"inschemaandisinstance(schema["properties"],dict): |
| 189 | +forkeyinschema["properties"]: |
| 190 | +schema["properties"][key]=clean_schema(schema["properties"][key]) |
| 191 | + |
| 192 | +returnschema |
| 193 | + |
| 194 | +defconvert_mcp_tools_to_gemini(mcp_tools): |
| 195 | +""" |
| 196 | + Converts MCP tool definitions to the correct format for Gemini API function calling. |
| 197 | +
|
| 198 | + Args: |
| 199 | + mcp_tools (list): List of MCP tool objects with 'name', 'description', and 'inputSchema'. |
| 200 | +
|
| 201 | + Returns: |
| 202 | + list: List of Gemini Tool objects with properly formatted function declarations. |
| 203 | + """ |
| 204 | +gemini_tools= [] |
| 205 | + |
| 206 | +fortoolinmcp_tools: |
| 207 | +# Ensure inputSchema is a valid JSON schema and clean it |
| 208 | +parameters=clean_schema(tool.inputSchema) |
| 209 | + |
| 210 | +# Construct the function declaration |
| 211 | +function_declaration=FunctionDeclaration( |
| 212 | +name=tool.name, |
| 213 | +description=tool.description, |
| 214 | +parameters=parameters# Now correctly formatted |
| 215 | + ) |
| 216 | + |
| 217 | +# Wrap in a Tool object |
| 218 | +gemini_tool=Tool(function_declarations=[function_declaration]) |
| 219 | +gemini_tools.append(gemini_tool) |
| 220 | + |
| 221 | +returngemini_tools |
| 222 | + |
| 223 | + |
| 224 | + |
| 225 | +asyncdefmain(): |
| 226 | +"""Main function to start the MCP client.""" |
| 227 | +iflen(sys.argv)<2: |
| 228 | +print("Usage: python client.py <path_to_server_script>") |
| 229 | +sys.exit(1) |
| 230 | + |
| 231 | +client=MCPClient() |
| 232 | +try: |
| 233 | +# Connect to the MCP server and start the chat loop |
| 234 | +awaitclient.connect_to_server(sys.argv[1]) |
| 235 | +awaitclient.chat_loop() |
| 236 | +finally: |
| 237 | +# Ensure resources are cleaned up |
| 238 | +awaitclient.cleanup() |
| 239 | + |
| 240 | +if__name__=="__main__": |
| 241 | +# Run the main function within the asyncio event loop |
| 242 | +asyncio.run(main()) |
| 243 | + |
| 244 | + |