Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A Python connector for Druid

License

NotificationsYou must be signed in to change notification settings

druid-io/pydruid

Repository files navigation

pydruid exposes a simple API to create, execute, and analyzeDruid queries. pydruid can parse query results intoPandas DataFrame objects for subsequent data analysis -- this offers a tight integration betweenDruid, theSciPy stack (for scientific computing) andscikit-learn (for machine learning). pydruid can export query results into TSV or JSON for further processing with your favorite tool, e.g., R, Julia, Matlab, Excel. It provides both synchronous and asynchronous clients.

Additionally, pydruid implements thePython DB API 2.0, aSQLAlchemy dialect, and a provides a command line interface to interact with Druid.

To install:

pipinstallpydruid# or, if you intend to use asynchronous clientpipinstallpydruid[async]# or, if you intend to export query results into pandaspipinstallpydruid[pandas]# or, if you intend to do bothpipinstallpydruid[async,pandas]# or, if you want to use the SQLAlchemy enginepipinstallpydruid[sqlalchemy]# or, if you want to use the CLIpipinstallpydruid[cli]

Documentation:https://pythonhosted.org/pydruid/.

examples

The following exampes show how to execute and analyze the results of three types of queries: timeseries, topN, and groupby. We will use these queries to ask simple questions about twitter's public data set.

timeseries

What was the average tweet length, per day, surrounding the 2014 Sochi olympics?

frompydruid.clientimport*frompylabimportpltquery=PyDruid(druid_url_goes_here,'druid/v2')ts=query.timeseries(datasource='twitterstream',granularity='day',intervals='2014-02-02/p4w',aggregations={'length':doublesum('tweet_length'),'count':doublesum('count')},post_aggregations={'avg_tweet_length': (Field('length')/Field('count'))},filter=Dimension('first_hashtag')=='sochi2014')df=query.export_pandas()df['timestamp']=df['timestamp'].map(lambdax:x.split('T')[0])df.plot(x='timestamp',y='avg_tweet_length',ylim=(80,140),rot=20,title='Sochi 2014')plt.ylabel('avg tweet length (chars)')plt.show()

alt text

topN

Who were the top ten mentions (@user_name) during the 2014 Oscars?

top=query.topn(datasource='twitterstream',granularity='all',intervals='2014-03-03/p1d',# utc time of 2014 oscarsaggregations={'count':doublesum('count')},dimension='user_mention_name',filter=(Dimension('user_lang')=='en')& (Dimension('first_hashtag')=='oscars')&           (Dimension('user_time_zone')=='Pacific Time (US & Canada)')&~(Dimension('user_mention_name')=='No Mention'),metric='count',threshold=10)df=query.export_pandas()printdfcounttimestampuser_mention_name013032014-03-03T00:00:00.000ZTheEllenShow1442014-03-03T00:00:00.000ZTheAcademy2212014-03-03T00:00:00.000ZMTV3212014-03-03T00:00:00.000Zpeoplemag4172014-03-03T00:00:00.000ZTHR5162014-03-03T00:00:00.000ZItsQueenElsa6162014-03-03T00:00:00.000Zeonline7152014-03-03T00:00:00.000ZPerezHilton8142014-03-03T00:00:00.000Zrealjohngreen9122014-03-03T00:00:00.000ZKevinSpacey

groupby

What does the social network of users replying to other users look like?

fromigraphimport*fromcairoimport*frompandasimportconcatgroup=query.groupby(datasource='twitterstream',granularity='hour',intervals='2013-10-04/pt12h',dimensions=["user_name","reply_to_name"],filter=(~(Dimension("reply_to_name")=="Not A Reply"))&           (Dimension("user_location")=="California"),aggregations={"count":doublesum("count")})df=query.export_pandas()# map names to categorical variables with a lookup tablenames=concat([df['user_name'],df['reply_to_name']]).unique()nameLookup=dict([pair[::-1]forpairinenumerate(names)])df['user_name_lookup']=df['user_name'].map(nameLookup.get)df['reply_to_name_lookup']=df['reply_to_name'].map(nameLookup.get)# create the graph with igraphg=Graph(len(names),directed=False)vertices=zip(df['user_name_lookup'],df['reply_to_name_lookup'])g.vs["name"]=namesg.add_edges(vertices)layout=g.layout_fruchterman_reingold()plot(g,"tweets.png",layout=layout,vertex_size=2,bbox=(400,400),margin=25,edge_width=1,vertex_color="blue")

alt text

asynchronous client

pydruid.async_client.AsyncPyDruid implements an asynchronous client. To achieve that, it utilizes an asynchronousHTTP client fromTornado framework. The asynchronous client is suitable for use with async frameworks such as Tornadoand provides much better performance at scale. It lets you serve multiple requests at the same time, without blocking onDruid executing your queries.

example

fromtornadoimportgenfrompydruid.async_clientimportAsyncPyDruidfrompydruid.utils.aggregatorsimportlongsumfrompydruid.utils.filtersimportDimensionclient=AsyncPyDruid(url_to_druid_broker,'druid/v2')@gen.coroutinedefyour_asynchronous_method_serving_top10_mentions_for_day(daytop_mentions=yieldclient.topn(datasource='twitterstream',granularity='all',intervals="%s/p1d"% (day, ),aggregations={'count':doublesum('count')},dimension='user_mention_name',filter=(Dimension('user_lang')=='en')& (Dimension('first_hashtag')=='oscars')&               (Dimension('user_time_zone')=='Pacific Time (US & Canada)')&~(Dimension('user_mention_name')=='No Mention'),metric='count',threshold=10)# asynchronously return results# can be simply ```return top_mentions``` in python 3.xraisegen.Return(top_mentions)

thetaSketches

Theta sketch Post aggregators are built slightly differently to normal Post Aggregators, as they have different operators.Note: you must have thedruid-datasketches extension loaded into your Druid cluster in order to use these.See theDruid datasketches documentation for details.

frompydruid.clientimport*frompydruid.utilsimportaggregatorsfrompydruid.utilsimportfiltersfrompydruid.utilsimportpostaggregatorquery=PyDruid(url_to_druid_broker,'druid/v2')ts=query.groupby(datasource='test_datasource',granularity='all',intervals='2016-09-01/P1M',filter= (filters.Dimension('product').in_(['product_A','product_B'])),aggregations={'product_A_users':aggregators.filtered(filters.Dimension('product')=='product_A',aggregators.thetasketch('user_id')            ),'product_B_users':aggregators.filtered(filters.Dimension('product')=='product_B',aggregators.thetasketch('user_id')            )    },post_aggregations={'both_A_and_B':postaggregator.ThetaSketchEstimate(postaggregator.ThetaSketch('product_A_users')&postaggregator.ThetaSketch('product_B_users')            )    })

DB API

frompydruid.dbimportconnectconn=connect(host='localhost',port=8082,path='/druid/v2/sql/',scheme='http')curs=conn.cursor()curs.execute("""    SELECT place,           CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat,           CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon      FROM places     LIMIT 10""")forrowincurs:print(row)

SQLAlchemy

fromsqlalchemyimport*fromsqlalchemy.engineimportcreate_enginefromsqlalchemy.schemaimport*engine=create_engine('druid://localhost:8082/druid/v2/sql/')# uses HTTP by default :(# engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')# engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')places=Table('places',MetaData(bind=engine),autoload=True)print(select([func.count('*')],from_obj=places).scalar())

Column headers

In version 0.13.0 Druid SQL added support for including the column names in theresponse which can be requested via the "header" field in the request. Thishelps to ensure that the cursor description is defined (which is a requirementfor SQLAlchemy query statements) regardless on whether the result set containsany rows. Historically this was problematic for result sets which contained norows at one could not infer the expected column names.

Enabling the header can be configured via the SQLAlchemy URI by using the queryparameter, i.e.,

engine=create_engine('druid://localhost:8082/druid/v2/sql?header=true')

Note the current default isfalse to ensure backwards compatibility but shouldbe set totrue for Druid versions >= 0.13.0.

Command line

$ pydruid http://localhost:8082/druid/v2/sql/> SELECT COUNT(*) AS cnt FROM places  cnt-----12345> SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES;TABLE_NAME----------test_tableCOLUMNSSCHEMATATABLES> BYE;GoodBye!

Contributing

Contributions are welcomed of course. We like to useblack andflake8.

pip install -r requirements-dev.txt# installs useful dev depspre-commit install# installs useful commit hooks

[8]ページ先頭

©2009-2025 Movatter.jp