- Notifications
You must be signed in to change notification settings - Fork27
socrata data-pipeline python library
License
socrata/socrata-py
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Python SDK for the Socrata Data Management API. Use this library to call into publishing and ETL functionality offered when writing to Socrata datasets.
withopen('cool_dataset.csv','rb')asfile: (revision,output)=Socrata(auth).create(name="cool dataset",description="a description" ).csv(file)revision.apply(output_schema=output)
- Development
- Library Docs
This only supports python3.
Installation is available through pip. Using a virtualenv is advised. Installthe package by running
pip3 install socrata-py
The only hard dependency isrequests
which will be installed via pip. Pandas is not required, but creating a dataset from a Pandas dataframe is supported. See below.
Try the command line example with
python -m examples.create'Police Reports'~/Desktop/catalog.data.gov/Seattle_Real_Time_Fire_911_Calls.csv'pete-test.test-socrata.com' --username$SOCRATA_USERNAME --password$SOCRATA_PASSWORD
# Import some stufffromsocrata.authorizationimportAuthorizationfromsocrataimportSocrataimportos# Boilerplate...# Make an auth objectauth=Authorization("pete-test.test-socrata.com",os.environ['SOCRATA_USERNAME'],os.environ['SOCRATA_PASSWORD'])
To create a dataset, you can do this:
withopen('cool_dataset.csv','rb')asfile:# Upload + Transform step# revision is the *change* to the view in the catalog, which has not yet been applied.# output is the OutputSchema, which is a change to data which can be applied via the revision (revision,output)=Socrata(auth).create(name="cool dataset",description="a description" ).csv(file)# Transformation step# We want to add some metadata to our column, drop another column, and add a new column which will# be filled with values from another column and then transformedoutput=output\ .change_column_metadata('a_column','display_name').to('A Column!')\ .change_column_metadata('a_column','description').to('Here is a description of my A Column')\ .drop_column('b_column')\ .add_column('a_column_squared','A Column, but times itself','to_number(`a_column`) * to_number(`a_column`)','this is a column squared')\ .run()# Validation of the results stepoutput=output.wait_for_finish()# The data has been validated now, and we can access errors that happened during validation. For example, if one of the cells in `a_column` couldn't be converted to a number in the call to `to_number`, that error would be reflected in this error_countassertoutput.attributes['error_count']==0# If you want, you can get a csv stream of all the errorserrors=output.schema_errors_csv()forlineinerrors.iter_lines():print(line)# Update step# Apply the revision - this will make it public and available to make# visualizations fromjob=revision.apply(output_schema=output)# This opens a browser window to your revision, and you will see the progress# of the jobrevision.open_in_browser()# Application is async - this will block until all the data# is in place and readablejob.wait_for_finish()
Similar to thecsv
method are thexls
,xlsx
, andtsv
methods, which uploadthose files.
There is ablob
method as well, which uploads blobby data to the source. This means the data will not be parsed, and will be displayed under "Files and Documents" in the catalog once the revision is applied.
Datasets can also be created from Pandas DataFrames
importpandasaspddf=pd.read_csv('socrata-py/test/fixtures/simple.csv')# Do various Pandas-y changes and modifications, then...(revision,output)=Socrata(auth).create(name="Pandas Dataset",description="Dataset made from a Pandas Dataframe").df(df)# Same code as above to apply the revision.
A Socrataupdate
is actually an upsert. Rows are updated or created based on the row identifier. If the row-identifer doesn't exist, all updates are just appends to the dataset.
Areplace
truncates the whole dataset and then inserts the new data.
# This is how we create our view initiallywithopen('cool_dataset.csv','rb')asfile: (revision,output)=Socrata(auth).create(name="cool dataset",description="a description" ).csv(file)revision.apply(output_schema=output)# This will build a configuration using the same settings (file parsing and# data transformation rules) that we used to get our output. The action# that we will take will be "update", though it could also be "replace"config=output.build_config("cool-dataset-config","update")# Now we need to save our configuration name and view id somewhere so we# can update the view using our configconfiguration_name="cool-dataset-config"view_id=revision.view_id()# Now later, if we want to use that config to update our view, we just need the view and the configuration_namesocrata=Socrata(auth)view=socrata.views.lookup(view_id)# View will be the view we are updating with the new datawithopen('updated-cool-dataset.csv','rb')asmy_file: (revision,job)=socrata.using_config(configuration_name,view ).csv(my_file)print(job)# Our update job is now running
# This is our socrata object, using the auth variable from abovesocrata=Socrata(auth)# This will make our initial revision, on a view that doesn't yet existrevision=socrata.new({'name':'cool dataset'})# revision is a Revision object, we can print itprint(revision)Revision({'created_by': {'display_name':'rozap','email':'chris.duranti@socrata.com','user_id':'tugg-ikce'},'fourfour':'ij46-xpxe','id':346,'inserted_at':'2017-02-27T23:05:08.522796','metadata':None,'update_seq':285,'upsert_jobs': []})# We can also access the attributes of the revisionprint(revision.attributes['metadata']['name'])'cool dataset'
# Using that revision, we can create an uploadupload=revision.create_upload('foo.csv')# And print itprint(upload)Source({'content_type':None,'created_by': {'display_name':'rozap','email':'chris.duranti@socrata.com','user_id':'tugg-ikce'},'source_type': {'filename':'foo.csv','type':'upload' },'finished_at':None,'id':290,'inserted_at':'2017-02-27T23:07:18.309676','schemas': []})
# And using that upload we just created, we can put bytes into itwithopen('test/fixtures/simple.csv','rb')asf:source=upload.csv(f)
Transforming data consists of going from input data (data exactly as it appeared in the source)to output data (data as you want it to appear).
Transformation from input data to output data often has problems. You might, for example, have a columnfull of numbers, but one row in that column is actually the valuehehe!
which cannot be transformed intoa number. Rather than failing at each datum which is dirty or wrong, transforming your data allows you toreconcile these issues.
We might have a dataset calledtemps.csv
that looks like
date, celsius8-24-2017, 228-25-2017, 208-26-2017, 238-27-2017, hehe!8-28-2017,8-29-2017, 21
Suppose we uploaded it in our previous step, like this:
withopen('temps.csv','rb')asf:source=upload.csv(f)input_schema=source.get_latest_input_schema()
Ourinput_schema
is the input data exactly as it appeared in the CSV, with all values of typestring
.
Ouroutput_schema
is the output data as it wasguessed by Socrata. Guessing may not always be correct, which is why we have import configs to "lock in" a schema for automation. We can get theoutput_schema
like so:
output_schema=input_schema.get_latest_output_schema()
We can now make changes to the schema, like so
new_output_schema=output# Change the field_name of date to the_date .change_column_metadata('date','field_name').to('the_date')\# Change the description of the celsius column .change_column_metadata('celsius','description').to('the temperature in celsius')\# Change the display name of the celsius column .change_column_metadata('celsius','display_name').to('Degrees (Celsius)')\# Change the transform of the_date column to to_fixed_timestamp(`date`) .change_column_transform('the_date').to('to_fixed_timestamp(`date`)')\# Make the celsius column all numbers .change_column_transform('celsius').to('to_number(`celsius`)')\# Add a new column, which is computed from the `celsius` column .add_column('fahrenheit','Degrees (Fahrenheit)','(to_number(`celsius`) * (9 / 5)) + 32','the temperature in celsius')\ .run()
change_column_metadata(column_name, column_attribute)
takes the field name used toidentify the column and the column attribute to change (field_name
,display_name
,description
,position
)
add_column(field_name, display_name, transform_expression, description)
will create a new column
We can also calldrop_column(celsius)
which will drop the column.
.run()
will then make a request and return the new output_schema, or an error if something is invalid.
Transforms can be complex SoQL expressions. Available functions are listedhere. You can do lots of stuff with them;
For example, you could change allnull
values into errors (which won't be imported) by doingsomething like
new_output_schema=output .change_column_transform('celsius').to('coalesce(to_number(`celsius`), error("Celsius was null!"))') .run()
Or you could add a new column that says if the day was hot or not
new_output_schema=output .add_column('is_hot','Was the day hot?','to_number(`celsius`) >= 23') .run()
Or you could geocode a column, given the following CSV
address,city,zip,state10028 Ravenna Ave NE, Seattle, 98128, WA1600 Pennsylvania Avenue, Washington DC, 20500, DC6511 32nd Ave NW, Seattle, 98155, WA
We could transform our firstoutput_schema
into a single column dataset, where thatsingle column is aPoint
of the address
output=output\ .add_column('location','Incident Location','geocode(`address`, `city`, `state`, `zip`)')\ .drop_column('address')\ .drop_column('city')\ .drop_column('state')\ .drop_column('zip')\ .run()
Composing these SoQL functions into expressions will allow you to validate, shape, clean and extend your data to make it more useful to the consumer.
Transformations are async, so if you want to wait for it to finish, you can do so
Transformations may have had errors, like in the previous example, we can't converthehe!
to a number. We can see the count of them like this:
print(output_schema.attributes['error_count'])
We can view the detailed errors like this:
errors=output_schema.schema_errors()
We can get a CSV of the errors like this:
csv_stream=output_schema.schema_errors_csv()
We can look at the rows of our schema as well
rows=output_schema.rows(offset=0,limit=20)self.assertEqual(rows, [ {'b': {'ok':' bfoo'}}, {'b': {'ok':' bfoo'}}, {'b': {'ok':' bfoo'}}, {'b': {'ok':' bfoo'}}])
# Now we have transformed our data into the shape we want, let's do an upsertjob=revision.apply(output_schema=output_schema)# This will complete the upsert behind the scenes. If we want to# re-fetch the current state of the upsert job, we can do sojob=job.show()# To get the progressprint(job.attributes['log'])[ {'details': {'Errors':0,'Rows Created':0,'Rows Updated':0,'By RowIdentifier':0,'By SID':0,'Rows Deleted':0},'time':'2017-02-28T20:20:59','stage':'upsert_complete'}, {'details': {'created':1},'time':'2017-02-28T20:20:59','stage':'columns_created'}, {'details': {'created':1},'time':'2017-02-28T20:20:59','stage':'columns_created'}, {'details':None,'time':'2017-02-28T20:20:59','stage':'started'}]# So maybe we just want to wait here, printing the progress, until the job is donejob.wait_for_finish(progress=lambdajob:print(job.attributes['log']))# So now if we go look at our original four-four, our data will be there
When there is an existing Socrata view that you'd like to update the metadata of, you can do so by creating a Source which is the Socrata view.
view=socrata.views.lookup('abba-cafe')revision=view.revisions.create_replace_revision()source=revision.source_from_dataset()output_schema=source.get_latest_input_schema().get_latest_output_schema()new_output_schema=output_schema\ .change_column_metadata('a','description').to('meh')\ .change_column_metadata('b','display_name').to('bbbb')\ .change_column_metadata('c','field_name').to('ccc')\ .run()revision.apply(output_schema=new_output_schema)
Install test deps by runningpip install -r requirements.txt
. This will installpdoc
andpandas
which are required to run the tests.
Configuration is set intest/auth.py
for tests. It reads the domain, username, and password from environment variables. If you want to run the tests, set those environment variables to something that will work.
If I wanted to run the tests against my local instance, I would run:
SOCRATA_DOMAIN=localhost SOCRATA_USERNAME=$SOCRATA_LOCAL_USER SOCRATA_PASSWORD=$SOCRATA_LOCAL_PASS bin/test
make the docs by running
make docs
You will need to havetwine installed (pip3 install twine
), and a.pypirc
file in your home directory.For help, readthis
An example of a pypirc file looks like:
[distutils]index-servers = local pypi[local]repository=https://repo.socrata.com/artifactory/api/pypi/pypiusername=shared-engrpassword=<REDACTED>[pypi]repository=https://upload.pypi.org/legacy/username=socratapassword=<REDACTED>
Make sure the version in setup.py is new and makes sense for the change you're releasing. Then run:
python3 setup.py sdisttwine upload dist/<your distribution file>
ArgSpec Args: auth
Top level publishing object.
All functions making HTTP calls return a result tuple, where the first element in thetuple is whether or not the call succeeded, and the second element is the returnedobject if it was a success, or a dictionary containing the error response if the callfailed. 2xx responses are considered successes. 4xx and 5xx responses are considered failures.In the event of a socket hangup, an exception is raised.
Shortcut to create a dataset. Returns aCreate
object,which contains functions which will create a view, uploadyour file, and validate data quality in one step.
To actually place the validated data into a view, you can call .apply()on the revision
(revision, output_schema) Socrata(auth).create( name = "cool dataset", description = "a description").csv(file)job = revision.apply(output_schema = output_schema)
Args:
**kwargs: Arbitrary revision metadata values
Returns:
result (Revision, OutputSchema): Returns the revision that was created and the OutputSchema created from your uploaded file
Examples:
Socrata(auth).create(name="cool dataset",description="a description").csv(open('my-file.csv'))
ArgSpec Args: metadata, deleted_at
Create an empty revision, on a view that doesn't exist yet. Theview will be created for you, and the initial revision will be returned.
Args:
metadata (dict): Metadata to apply to the revision
Returns:
Revision
Examples:
rev=Socrata(auth).new({'name':'hi','description':'foo!','metadata': {'view':'metadata','anything':'is allowed here' } })
ArgSpec Args: config_name, view
Update a dataset, using the configuration that you previouslycreated, and saved the name of. Takes theconfig_name
parameterwhich uniquely identifies the config, and theView
object, which canbe obtained fromsocrata.views.lookup('view-id42')
Args:
config_name (str): The config name view (View): The view to update
Returns:
result (ConfiguredJob): Returns the ConfiguredJob
Note:Typical usage would be in a context manager block (as demonstrated in the examplebelow). In this case, theConfiguredJob
is created and immediately launched by way ofthe call to theConfiguredJob.csv
method.
Examples:
with open('my-file.csv', 'rb') as my_file: (rev, job) = p.using_config(name, view).csv(my_file)
ArgSpec Args: domain, username, password, request_id_prefix Defaults: domain=
Manages basic authorization for accessing the socrata API.This is passed into theSocrata
object once, which is the entrypoint for all operations.
auth = Authorization( "data.seattle.gov", os.environ['SOCRATA_USERNAME'], os.environ['SOCRATA_PASSWORD'])publishing = Socrata(auth)
Disable SSL checking. Note that this shouldonly be used while developingagainst a local Socrata instance.
ArgSpec Args: fourfour, auth
ArgSpec Args: metadata, permission Defaults: metadata={}, permission=public
Create a revision on the view, which when applied, will delete rows of data.
This is an upsert; a row id must be set.
Args:
metadata (dict): The metadata to change; these changes will be applied when the revision is applied permission (string): 'public' or 'private'
Returns:
Revision The new revision, or an error
Examples:
view.revisions.create_delete_revision(metadata= {'name':'new dataset name','description':'description' })
ArgSpec Args: metadata, permission Defaults: metadata={}, permission=public
Create a revision on the view, which when applied, will replace the data.
Args:
metadata (dict): The metadata to change; these changes will be applied when the revision is applied permission (string): 'public' or 'private'
Returns:
Revision The new revision, or an error
Examples:
>>> view.revisions.create_replace_revision(metadata = {'name': 'new dataset name', 'description': 'updated description'})
ArgSpec Args: metadata, permission Defaults: metadata={}, permission=public
Create a revision on the view, which when applied, will update the datarather than replacing it.
This is an upsert; if there is a rowId defined and you have duplicate ID values,those rows will be updated. Otherwise they will be appended.
Args:
metadata (dict): The metadata to change; these changes will be applied when the revision is applied permission (string): 'public' or 'private'
Returns:
Revision The new revision, or an error
Examples:
view.revisions.create_update_revision(metadata= {'name':'new dataset name','description':'updated description' })
ArgSpec Args: config
Create a revision for the given dataset.
List all the revisions on the view
Returns:
list[Revision]
ArgSpec Args: revision_seq
Lookup a revision within the view based on the sequence number
Args:
revision_seq (int): The sequence number of the revision to lookup
Returns:
Revision The Revision resulting from this API call, or an error
ArgSpec Args: auth, response, parent
A revision is a change to a dataset
ArgSpec Args: output_schema
Apply the Revision to the view that it was opened on
Args:
output_schema (OutputSchema): Optional output schema. If your revision includes data changes, this should be included. If it is a metadata only revision, then you will not have an output schema, and you do not need to pass anything here
Returns:
Job
Examples:
job = revision.apply(output_schema = my_output_schema)
ArgSpec Args: filename, parse_options Defaults: filename={}
Create an upload within this revision
Args:
filename (str): The name of the file to upload
Returns:
Source: Returns the new Source The Source created by this API call, or an error
Discard this open revision.
Returns:
Revision The closed Revision or an error
Get a list of the operations that you can perform on thisobject. These map directly onto what's returned from the APIin thelinks
section of each resource
Open this revision in your browser, this will open a window
Return the list of operations this revision will make when it is applied
Returns:
dict
ArgSpec Args: notes
Set any notes on the revision. The notes is displayed in the datasetchangelog if the dataset is enrolled in archiving. If it is not enrolledin archiving, this has no effect.
Args:
notes (string): The change notes
Returns:
Revision The updated Revision as a result of this API call, or an error
Examples:
revision=revision.set_notes("Just updating my dataset")
ArgSpec Args: output_schema_id
Set the output schema id on the revision. This is what will get applied whenthe revision is applied if no ouput schema is explicitly supplied
Args:
output_schema_id (int): The output schema id
Returns:
Revision The updated Revision as a result of this API call, or an error
Examples:
revision=revision.set_output_schema(42)
ArgSpec Args: filename, parse_options Defaults: filename={}
Create a source from a file that should remain unparsed
ArgSpec Args: agent_uid, namespace, path, parse_options, parameters Defaults: agent_uid={}, namespace={}
Create a source from a connection agent in this revision
ArgSpec Args: parse_options Defaults: parse_options={}
Create a dataset source within this revision
ArgSpec Args: url, parse_options Defaults: url={}
Create a URL source
Args:
url (str): The URL to create the dataset from
Returns:
Source: Returns the new Source The Source created by this API call, or an error
This is the URL to the landing page in the UI for this revision
Returns:
url (str): URL you can paste into a browser to view the revision UI
ArgSpec Args: body
Set the metadata to be applied to the viewwhen this revision is applied
Args:
body (dict): The changes to make to this revision
Returns:
Revision The updated Revision as a result of this API call, or an error
Examples:
revision=revision.update({'metadata': {'name':'new name','description':'new description' } })
ArgSpec Args: auth
ArgSpec Args: filename
Create a new source. Takes abody
param, which must contain afilename
of the file.
Args:
filename (str): The name of the file you are uploading
Returns:
Source: Returns the new Source
Examples:
upload=revision.create_upload('foo.csv')
ArgSpec Args: source_id
Lookup a source
Args:
source_id (int): The id
Returns:
Source: Returns the new Source The Source resulting from this API call, or an error
ArgSpec Args: auth, response, parent
ArgSpec Args: revision
Associate this Source with the given revision.
ArgSpec Args: file_handle
Uploads a Blob dataset. A blob is a file that will not be parsed as a data file,ie: an image, video, etc.
Returns:
Source: Returns the new Source
Examples:
withopen('my-blob.jpg','rb')asf:upload=upload.blob(f)
ArgSpec Args: name
Change a parse option on the source.
If there are not yet bytes uploaded, these parse options will be usedin order to parse the file.
If there are already bytes uploaded, this will trigger a re-parsing ofthe file, and consequently a new InputSchema will be created. You can callsource.latest_input()
to get the newest one.
Parse options are:header_count (int): the number of rows considered a headercolumn_header (int): the one based index of row to use to generate the headerencoding (string): defaults to guessing the encoding, but it can be explicitly setcolumn_separator (string): For CSVs, this defaults to ",", and for TSVs " ", but you can use a custom separatorquote_char (string): Character used to quote values that should be escaped. Defaults to """
Args:
name (string): One of the options above, ie: "column_separator" or "header_count"
Returns:
change (ParseOptionChange): implements a `.to(value)` function which you call to set the value
For our example, assume we have this dataset
This is my cool datasetA, B, C1, 2, 34, 5, 6
We want to say that the first 2 rows are headers, and the second of those 2rows should be used to make the column header. We would do that like so:
Examples:
source=source .change_parse_option('header_count').to(2) .change_parse_option('column_header').to(2) .run()
ArgSpec Args: file_handle
Upload a CSV, returns the new input schema.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-file.csv','rb')asf:upload=upload.csv(f)
ArgSpec Args: dataframe
Upload a pandas DataFrame, returns the new source.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
importpandasdf=pandas.read_csv('test/fixtures/simple.csv')upload=upload.df(df)
ArgSpec Args: file_handle
Upload a geojson file, returns the new input schema.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-geojson-file.geojson','rb')asf:upload=upload.geojson(f)
ArgSpec Args: file_handle
Upload a KML file, returns the new input schema.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-kml-file.kml','rb')asf:upload=upload.kml(f)
Get a list of the operations that you can perform on thisobject. These map directly onto what's returned from the APIin thelinks
section of each resource
Forces the source to load, if it's a view source.
Returns:
Source: Returns the new Source
Open this source in your browser, this will open a window
ArgSpec Args: file_handle
Upload a Shapefile, returns the new input schema.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-shapefile-archive.zip','rb')asf:upload=upload.shapefile(f)
ArgSpec Args: file_handle
Upload a TSV, returns the new input schema.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-file.tsv','rb')asf:upload=upload.tsv(f)
This is the URL to the landing page in the UI for the sources
Returns:
url (str): URL you can paste into a browser to view the source UI
ArgSpec Args: progress, timeout, sleeptime Defaults: progress=<function noop at 0x7fa34fb57040>, timeout=43200, sleeptime=1
Wait for this data source to finish transforming and validating. Accepts a progress functionand a timeout.
Default timeout is 12 hours
ArgSpec Args: progress, timeout, sleeptime Defaults: progress=<function noop at 0x7fa34fb57040>, timeout=43200, sleeptime=1
Wait for this data source to have at least one schema present. Accepts a progress functionand a timeout.
Default timeout is 12 hours
ArgSpec Args: file_handle
Upload an XLS, returns the new input schema
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-file.xls','rb')asf:upload=upload.xls(f)
ArgSpec Args: file_handle
Upload an XLSX, returns the new input schema.
Args:
file_handle: The file handle, as returned by the python function `open()` max_retries (integer): Optional retry limit per chunk in the upload. Defaults to 5. backoff_seconds (integer): Optional amount of time to backoff upon a chunk upload failure. Defaults to 2.
Returns:
Source: Returns the new Source
Examples:
withopen('my-file.xlsx','rb')asf:upload=upload.xlsx(f)
ArgSpec Args: auth
ArgSpec Args: name, data_action, parse_options, columns
Create a new ImportConfig. Seehttp://docs.socratapublishing.apiary.io/ImportConfig section for what is supported indata_action
,parse_options
,andcolumns
.
List all the ImportConfigs on this domain
ArgSpec Args: name
Obtain a single ImportConfig by name
ArgSpec Args: auth, response, parent
ArgSpec Args: name
Change a parse option on the source.
If there are not yet bytes uploaded, these parse options will be usedin order to parse the file.
If there are already bytes uploaded, this will trigger a re-parsing ofthe file, and consequently a new InputSchema will be created. You can callsource.latest_input()
to get the newest one.
Parse options are:header_count (int): the number of rows considered a headercolumn_header (int): the one based index of row to use to generate the headerencoding (string): defaults to guessing the encoding, but it can be explicitly setcolumn_separator (string): For CSVs, this defaults to ",", and for TSVs " ", but you can use a custom separatorquote_char (string): Character used to quote values that should be escaped. Defaults to """
Args:
name (string): One of the options above, ie: "column_separator" or "header_count"
Returns:
change (ParseOptionChange): implements a `.to(value)` function which you call to set the value
For our example, assume we have this dataset
This is my cool datasetA, B, C1, 2, 34, 5, 6
We want to say that the first 2 rows are headers, and the second of those 2rows should be used to make the column header. We would do that like so:
Examples:
source=source .change_parse_option('header_count').to(2) .change_parse_option('column_header').to(2) .run()
ArgSpec Args: fourfour
Create a new Revision in the context of this ImportConfig.Sources that happen in this Revision will take on the valuesin this Config.
Delete this ImportConfig. Note that this cannot be undone.
Get a list of the operations that you can perform on thisobject. These map directly onto what's returned from the APIin thelinks
section of each resource
ArgSpec Args: body
Mutate this ImportConfig in place. Subsequent revisions opened against thisImportConfig will take on its new value.
ArgSpec Args: auth, response, parent
This represents a schema exactly as it appeared in the source
Note that this does not make an API request
Returns:output_schema (OutputSchema): Returns the latest output schema
Get the latest (most recently created) OutputSchemawhich descends from this InputSchema
Returns:OutputSchema
Get a list of the operations that you can perform on thisobject. These map directly onto what's returned from the APIin thelinks
section of each resource
ArgSpec Args: body
Transform this InputSchema into an Output. Returns thenew OutputSchema. Note that this call is async - the datamay still be transforming even though the OutputSchema isreturned. See OutputSchema.wait_for_finish to block untilthe
ArgSpec Args: progress, timeout, sleeptime Defaults: progress=<function noop at 0x7fa34fb57040>, timeout=43200, sleeptime=1
Wait for this data source to have at least one schema present. Accepts a progress functionand a timeout.
Default timeout is 12 hours
This is data as transformed from an InputSchema
ArgSpec Args: field_name, display_name, transform_expr, description
Add a column
Args:
field_name (str): The column's field_name, must be unique display_name (str): The columns display name transform_expr (str): SoQL expression to evaluate and fill the column with data from description (str): Optional column description
Returns:
output_schema (OutputSchema): Returns self for easy chaining
Examples:
new_output_schema=output# Add a new column, which is computed from the `celsius` column .add_column('fahrenheit','Degrees (Fahrenheit)','(to_number(`celsius`) * (9 / 5)) + 32','the temperature in celsius')# Add a new column, which is computed from the `celsius` column .add_column('kelvin','Degrees (Kelvin)','(to_number(`celsius`) + 273.15') .run()
This is probably not the function you are looking for.
This returns whether or not any transform in this output schema has failed. "Failed" in thiscase means an internal error (which is unexpected), not a data error (which is expected). Thisfunction will wait for processing to complete if it hasn't yet.
For data errors:
Tell whether or not there are data errors output_schema.any_errors()Get the count of data errors output_schema.attributes['error_count']Get the errors themselves output_schema.schema_errors(offset = 0, limit = 20)
ArgSpec Args: name, data_action
Create a new ImportConfig from this OutputSchema. See the APIdocs for what an ImportConfig is and why they're useful
ArgSpec Args: field_name, attribute
Change the column metadata. This returns a ColumnChange,which implements a.to
function, which takes the new value to change to
Args:
field_name (str): The column to change attribute (str): The attribute of the column to change
Returns:
change (TransformChange): The transform change, which implements the `.to` function
Examples:
new_output_schema=output# Change the field_name of date to the_date .change_column_metadata('date','field_name').to('the_date')# Change the description of the celsius column .change_column_metadata('celsius','description').to('the temperature in celsius')# Change the display name of the celsius column .change_column_metadata('celsius','display_name').to('Degrees (Celsius)') .run()
ArgSpec Args: field_name
Change the column transform. This returns a TransformChange,which implements a.to
function, which takes a transform expression.
Args:
field_name (str): The column to change
Returns:
change (TransformChange): The transform change, which implements the `.to` function
Examples:
new_output_schema=output .change_column_transform('the_date').to('to_fixed_timestamp(`date`)')# Make the celsius column all numbers .change_column_transform('celsius').to('to_number(`celsius`)')# Add a new column, which is computed from the `celsius` column .add_column('fahrenheit','Degrees (Fahrenheit)','(to_number(`celsius`) * (9 / 5)) + 32','the temperature in celsius') .run()
ArgSpec Args: field_name
Drop the column
Args:
field_name (str): The column to drop
Returns:
output_schema (OutputSchema): Returns self for easy chaining
Examples:
new_output_schema=output .drop_column('foo') .run()
Get a list of the operations that you can perform on thisobject. These map directly onto what's returned from the APIin thelinks
section of each resource
ArgSpec Args: offset, limit Defaults: offset=0, limit=500
Get the rows for this OutputSchema. Accepsoffset
andlimit
paramsfor paging through the data.
Run all adds, drops, and column changes.
Returns:
OutputSchema
Examples:
new_output_schema=output# Change the field_name of date to the_date .change_column_metadata('date','field_name').to('the_date')# Change the description of the celsius column .change_column_metadata('celsius','description').to('the temperature in celsius')# Change the display name of the celsius column .change_column_metadata('celsius','display_name').to('Degrees (Celsius)')# Change the transform of the_date column to to_fixed_timestamp(`date`) .change_column_transform('the_date').to('to_fixed_timestamp(`date`)')# Make the celsius column all numbers .change_column_transform('celsius').to('to_number(`celsius`)')# Add a new column, which is computed from the `celsius` column .add_column('fahrenheit','Degrees (Fahrenheit)','(to_number(`celsius`) * (9 / 5)) + 32','the temperature in celsius') .run()
ArgSpec Args: offset, limit Defaults: offset=0, limit=500
Get the errors that resulted in transforming into this output schema.Acceptsoffset
andlimit
params
Get the errors that results in transforming into this output schemaas a CSV stream.
Note that this returns a Reponse, where Reponseis a python requests Reponse object
ArgSpec Args: field_name
Set the row id. Note you must callvalidate_row_id
before doing this.
Args:
field_name (str): The column to set as the row id
Returns:
OutputSchema
Examples:
new_output_schema=output.set_row_id('the_id_column')
Replace the columns used to sort the dataset. This returns a SortChange,which implements a.on
function to add a sort and a.end_sort
functionto finish.
If you do not call this, the OutputSchema will try to preserve any existingsorts, which means it will remove sorts on deleted columns or on columnswhose transforms are changed.
Returns:
change (SortChange): The sort change, which implements the `.on` and `.end_sort` functions
Examples:
new_output_schema=output .set_sort_by() .on('column_one',ascending=True) .on('column_two',ascending=False) .on('column_three')# ascending = True is the default .end_sort() .run()
ArgSpec Args: field_name
Set the row id. Note you must callvalidate_row_id
before doing this.
Args:
field_name (str): The column to validate as the row id
Returns:
boolean
ArgSpec Args: progress, timeout, sleeptime Defaults: progress=<function noop at 0x7fa34fb57040>, timeout=10800, sleeptime=1
Wait for this dataset to finish transforming and validating. Accepts a progress functionand a timeout.
Default timeout is 3 hours
ArgSpec Args: auth, response, parent
Has this job finished or failed (or been submitted for approval)
Get a list of the operations that you can perform on thisobject. These map directly onto what's returned from the APIin thelinks
section of each resource
Has this job entered the approval queue (rather than finishing or failing)
ArgSpec Args: progress, timeout, sleeptime Defaults: progress=<function noop at 0x7fa34fb57040>, sleeptime=1
Wait for this dataset to finish transforming and validating. Accepts a progress functionand a timeout.
About
socrata data-pipeline python library