Movatterモバイル変換


[0]ホーム

URL:


You can store and retrieve data fromData Stores in Python without connecting to a 3rd party database.Add a data store as a input to a Python step, then access it in your Pythonhandler withpd.inputs["data_store"].
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Store a value under a key    data_store["key"]= "Hello World"    # Retrieve the value and print it to the step's Logs    print(data_store["key"])

Adding a Data Store

ClickAdd Data Store near the top of a Python step:
This will add the selected data store to your Python code step.

Saving data

Data stores are key-value stores. Saving data within a data store is just like setting a property on a dictionary:
from datetimeimport datetimedef handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Store a timestamp    data_store["last_ran_at"]= datetime.now().isoformat()

Setting expiration (TTL) for records

You can set an expiration time for a record by passing a TTL (Time-To-Live) option as the third argument to theset method. The TTL value is specified in seconds:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Store a temporary value that will expire after 1 hour (3600 seconds)    data_store.set("temporaryToken","abc123",ttl=3600)    # Store a value that will expire after 1 day    data_store.set("dailyMetric",42,ttl=86400)
When the TTL period elapses, the record will be automatically deleted from the data store.

Updating TTL for existing records

You can update the TTL for an existing record using theset_ttl method:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Update an existing record to expire after 30 minutes    data_store.set_ttl("temporaryToken",ttl=1800)    # Remove expiration from a record    data_store.set_ttl("temporaryToken",ttl=None)
This is useful for extending the lifetime of temporary data or removing expiration from records that should now be permanent.

Retrieving keys

Fetch all the keys in a given data store using thekeys method:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Retrieve all keys in the data store    keys= pd.inputs["data_store"].keys()    # Print a comma separated string of all keys    print(*keys,sep=",")
Thedatastore.keys() method does not return a list, but instead it returns aKeys iterable object. You cannot export adata_store ordata_store.keys() from a Python code step at this time.Instead, build a dictionary or list when using thedata_store.keys() method.

Checking for the existence of specific keys

If you need to check whether a specifickey exists in a data store, useif andin as a conditional:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Search for a key in a conditional    if "last_ran_at" in data_store:      print(f"Last ran at{data_store['last_ran_at']}")

Retrieving data

Data stores are very performant at retrieving single records by keys. However you can also use key iteration to retrieve all records within a Data Store as well.
Data stores are intended to be a fast and convienent data storage option for quickly adding data storage capability to your workflows without adding another database dependency.However, if you need more advanced querying capabilities for querying records with nested dictionaries or filtering based on a record value - consider using a full fledged database. Pipedream can integrate with MySQL, Postgres, DynamoDb, MongoDB and more.

Get a single record

You can retrieve single records from a data store by key:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Retrieve the timestamp value by the key name    last_ran_at= data_store["last_ran_at"]    # Print the timestamp    print(f"Last ran at{last_ran_at}")
Alternatively, use thedata_store.get() method to retrieve a specific key’s contents:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Retrieve the timestamp value by the key name    last_ran_at= data_store.get("last_ran_at")    # Print the timestamp    print(f"Last ran at{last_ran_at}")
What’s the difference betweendata_store["key"] anddata_store.get("key")?
  • data_store["key"] will throw aTypeError if the key doesn’t exist in the data store.
  • data_store.get("key") will instead returnNone if the key doesn’t exist in the data store.
  • data_store.get("key", "default_value") will return"default_value" if the key doesn’t exist on the data store.

Retrieving all records

You can retrieve all records within a data store by using an async iterator:
def handler(pd:"pipedream"):    data_store= pd.inputs["data_store"]    records= {}    for k,vin data_store.items():        records[k]= v    return records
This code step example exports all records within the data store as a dictionary.

Deleting or updating values within a record

To delete or update thevalue of an individual record, assignkey a new value or'' to remove the value but retain the key.
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Assign a new value to the key    data_store["myKey"]= "newValue"    # Remove the value but retain the key    data_store["myKey"]= ""

Working with nested dictionaries

You can store dictionaries within a record. This allows you to create complex records.However, to update specific attributes within a nested dictionary, you’ll need to replace the record entirely.For example, the code the below willnot update thename attribute on the stored dictionary stored under the keypokemon:
def handler(pd:"pipedream"):    # The current dictionary looks like this:    # pokemon: {    #  "name": "Charmander"    #  "type": "fire"    # }    # You'll see "Charmander" in the logs    print(pd.inputs['data_store']['pokemon']['name'])    # attempting to overwrite the pokemon's name will not apply    pd.inputs['data_store']['pokemon']['name']= 'Bulbasaur'    # Exports "Charmander"    return pd.inputs['data_store']['pokemon']['name']
Instead,overwrite the entire record to modify attributes:
def handler(pd:"pipedream"):    # retrieve the record item by it's key first    pokemon= pd.inputs['data_store']['pokemon']    # now update the record's attribute    pokemon['name']= 'Bulbasaur'    # and out right replace the record with the new modified dictionary    pd.inputs['data_store']['pokemon']= pokemon    # Now we'll see "Bulbasaur" exported    return pd.inputs['data_store']['pokemon']['name']

Deleting specific records

To delete individual records in a data store, use thedel operation for a specifickey:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Delete the last_ran_at timestamp key    del data_store["last_ran_at"]

Deleting all records from a specific data store

If you need to delete all records in a given data store, you can use theclear method.
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # Delete the entire contents of the datas store    data_store.clear()
data_store.clear() is anirreversible change,even when testing code in the workflow builder.

Viewing store data

You can view the contents of your data stores in yourPipedream dashboard.From here you can also manually edit your data store’s data, rename stores, delete stores or create new stores.

Workflow counter example

You can use a data store as a counter. For example, this code counts the number of times the workflow runs:
def handler(pd:"pipedream"):    # Access the data store under the pd.inputs    data_store= pd.inputs["data_store"]    # if the counter doesn't exist yet, start it at one    if data_store.get("counter")== None:      data_store["counter"]= 1    # Otherwise, increment it by one    else:      count= data_store["counter"]      data_store["counter"]= count+ 1

Dedupe data example

Data Stores are also useful for storing data from prior runs to prevent acting on duplicate data, or data that’s been seen before.For example, this workflow’s trigger contains an email address from a potential new customer. But we want to track all emails collected so we don’t send a welcome email twice:
def handler(pd:"pipedream"):    # Access the data store    data_store= pd.inputs["data_store"]    # Reference the incoming email from the HTTP request    new_email= pd.steps["trigger"]["event"]["body"]["new_customer_email"]    # Retrieve the emails stored in our data store    emails= data_store.get('emails', [])    # If this email has been seen before, exit early    if new_emailin emails:      print(f"Already seen{new_email}, exiting")      return False    # This email is new, append it to our list    else:      print(f"Adding new email to data store{new_email}")      emails.append(new_email)      data_store["emails"]= emails      return new_email

TTL use case: temporary caching and rate limiting

TTL functionality is particularly useful for implementing temporary caching and rate limiting. Here’s an example of a simple rate limiter that prevents a user from making more than 5 requests per hour:
def handler(pd:"pipedream"):    # Access the data store    data_store= pd.inputs["data_store"]    user_id= pd.steps["trigger"]["event"]["user_id"]    rate_key= f"ratelimit:{user_id}"    # Try to get current rate limit counter    requests_num= data_store.get("rate_key")    if not requests_num:        # First request from this user in the time window        data_store.set(rate_key,1,ttl=3600)# Expire after 1 hour        return {"allowed":True,"remaining":4 }    if requests_num>= 5:        # Rate limit exceeded        return {"allowed":False,"error":"Rate limit exceeded","retry_after":"1 hour" }    # Increment the counter    data_store["rate_key"]= requests_num+ 1    return {"allowed":True,"remaining":4 - requests_num }
This pattern can be extended for various temporary caching scenarios like:
  • Session tokens with automatic expiration
  • Short-lived feature flags
  • Temporary access grants
  • Time-based promotional codes

Supported data types

Data stores can hold any JSON-serializable data within the storage limits. This includes data types including:
  • Strings
  • Dictionaries
  • Lists
  • Integers
  • Floats
But you cannot serialize Modules, Functions, Classes, or other more complex objects.

[8]ページ先頭

©2009-2025 Movatter.jp