こんにちわ alivelimb です。
pandasは言わずと知れた Python のデータ分析ライブラリですが、私が分析業務をする時も利用しています。本記事では自分自身が過去に詰まったところや、後輩のコードレビューをしていてバグの原因になることが多いと感じた部分を Tips として紹介します。
本記事では成績ダミーデータを生成して、データ操作を行います。
import randomimport pandasas pddefcreate_dummy_scores(n): scores=[dict( user_id=f"user{i+1}", subject=subject, score=random.randint(0,100))for iinrange(n)for subjectin["国語","数学","英語","理科","社会"]]return scoresscores= create_dummy_scores(5)score_df= pd.DataFrame(scores)
ダミーデータ例
user_id | subject | score | |
---|---|---|---|
0 | user1 | 国語 | 57 |
1 | user1 | 数学 | 78 |
2 | user1 | 英語 | 68 |
3 | user1 | 理科 | 87 |
4 | user1 | 社会 | 87 |
5 | user2 | 国語 | 41 |
6 | user2 | 数学 | 24 |
7 | user2 | 英語 | 13 |
8 | user2 | 理科 | 45 |
9 | user2 | 社会 | 28 |
なお、検証した バージョンは以下の通りです
3.8.7
1.4.2
1.22.3
1.6.1
3.1.0
0.10.1
pandas で Create(作成、追加), Read(抽出), U(更新), Delete(削除)の方法について紹介します。
io.StringIO
を用いると以下のように csv っぽい文字列からread_csv
できます。
from ioimport StringIOdata="""user_id,subject,score,pres_scoreuser1,国語,57,85user2,数学,78,16"""pd.read_csv(StringIO(data))
ダミーデータ生成のようにdict
からDataFrame
を作る方が書きやすいと思いますが、ちょっとした挙動確認や以下のような関数の単体テストに利用できます。この関数に関しては今後の章でも言及します。
defsafe_read_csv(path): df= pd.read_csv(path)# 型や値の検証
列・行の追加方法はいくつかありますが、私は以下のように書いています。
列の追加
# 全て同じ値で追加する場合score_df["new_col"]="new_value"# 個別に値を指定して追加する場合new_column=[seq_nofor seq_noinrange(len(score_df))]score_df["new_col"]= new_columns# 複数列追加する場合 -> DataFrameにしてDataFrame同士の結合new_columns=dict( new_col1=[seq_nofor seq_noinrange(len(score_df))], new_col2=[seq_nofor seq_noinrange(len(score_df))],)score_df= pd.concat([score_df, pd.DataFrame(new_columns)], axis=1)
行の追加
new_rows= create_dummy_scores(5)score_df= pd.concat([score_df, pd.DataFrame(new_rows)])
ちなみにappendという関数もありますが、Deprecated になっているため使わない方が良いでしょう。
こちらもいくつか書き方はありますが、私は以下のように書いています。
# user1の行を全て抽出score_df[score_df["user_id"]=="user1"]# user1でかつ教科が国語の行を全て抽出condition=(score_df["user_id"]=="user1")&(score_df["subject"]=="国語")score_df[condtion]
DataFrame
で複数条件を指定する時は以下の 2 点に注意が必要です。
and
,or
,not
ではなく&
,|
,~
を使う必要がある()
で囲む必要があるでは条件を満たす行を抽出(選択)した後に、列も抽出(射影)してみます。
例えば user1 の国語の点数を抽出してみましょう。
condition=(score_df["user_id"]=="user1")&(score_df["subject"]=="国語")score_df[condition]["score"]
これは公式ドキュメントでchained indexing
と呼ばれています。この書き方は値の確認をするだけなら問題ないのですが、抽出結果を更新しようとするとバグの原因となります。これについては次節で紹介します。
ちなみに
score_df[n:m]
試しに user1 の国語の点数を 100 点に書き換えてみましょう。前節で紹介した方法で書いてみると
condition=(score_df["user_id"]=="user1")&(score_df["subject"]=="国語")print(score_df[condition]["score"].to_numpy()[0])# 57score_df[condition]["score"]=100print(score_df[condition]["score"].to_numpy()[0])# 57(変わっていない!)
値が更新されず、以下のような Warning が表示されます。
SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead
前節で紹介ようにdf[...][...]
という形式はchained indexing
と呼ばれており([]
が 2 回続いているから)、加えてこれに代入することをchained assignment
と呼ばれています。これらについての詳しい解説は本記事では行わず、私が行なっている対応策と参考にさせて頂いた記事を紹介したいと思います。
対応策としては以下の通りです。
chained indexing
でも気にせず書く.loc
を使って代入するdf[]
のように行を条件抽出(選択)または列を抽出(射影)したものを以降の処理でも使う場合は.copy
を使ってコピーを作成する1 は前節で紹介した通りですが、2 は以下のように書けばchained indexing
にはならず、更新できます。
condition=(score_df["user_id"]=="user1")&(score_df["subject"]=="国語")print(score_df[condition]["score"].to_numpy()[0])# 57score_df.loc[condition,"score"]=100print(score_df[condition]["score"].to_numpy()[0])# 100
3 については[]
を 1 回つけた時点で DataFrame のコピーを作成することでchained indexing
を回避する方法です。新しく作成した DataFrame では 1 回も[]
をしていないため問題ないということです。
SettingWithCopyWarning
について参考にさせて頂いた記事は以下の通りです。
また上記と関係なくDataFrame
を引数に受け取る関数を定義する時も原則.copy
を使うようにしています。これは DataFrame が dict などと同様にミュータブルなオブジェクトだからです。以下の簡単な例で紹介してみます。
data="""col1,col210,2011,2112,22"""example_df= pd.read_csv(StringIO(data))defadd_col3(df): df["col3"]=[30,31,32]add_col3(example_df)example_df.head()# col3が追加されたDataFrame
add_col3
には example_df のコピーではなく実体が渡されてしまうのが原因です。大きなデータを扱っていて、コピーを作成するのはメモリ的に厳しいという理由があれば良いのですが、そうでない場合はこのような書き方は望ましくないと考えているので以下のように書いています。
defadd_col3(df): new_df= df.copy() new_df["col3"]=[30,31,32]return new_dfexample_df_col3_added= add_col3(example_df)example_df.head()# 元々のDataFrame
列の削除に関しては素直に.drop
を使います。ただし、元の DataFrame を直接編集するinplace=True
は原則使わないようにしています。デフォルトでinplace=False
なので特に気にしなくても良いでしょう。
# 列の削除(scoreを消して何がしたいんだ...)score_df_score_dropped= score_df.drop(["score"], axis=1)
行の削除は基本的に書きません。「行を削除する」ではなく「削除しない方を抽出する」という考えです。また、抽出した後は前節で紹介した通りコピーを生成しています。
# 行の削除(赤点じゃない生徒を除外)score_df_akaten= score_df[score_df["score"]<60].copy()
CRUD が終わったところでデータ集計にもよく用いる.groupby
です。私はpandas
を使い始める新人にはまず SQL の勉強を進めています。その理由として私自身が SQL をしっかり学ぶ前に pandas をコネコネしていたので.groupby
や.join
でよく分からんなーとなることが多かったからです。
基本的な例は以下のようなものだと思います。
# 教科(集約キー)ごとの行数を集計: 集約キーをindexに指定するscore_df.groupby("subject").size()# pd.Series# 教科(集約キー)ごとの行数を集計: 集約キーをindexに指定しないscore_df.groupby("subject", as_index=False).size()# pd.DataFrame# 教科(集約キー)ごとの点数の平均値を集計score_df.groupby("subject", as_index=False)["score"].sum()# ユーザID(集約キー)ごとに最初の1行を取得score_df.groupby("user_id", as_index=False).head(1)
# ※pandas1.4.3では実行できない書き方agg_dict=dict( score_max="max", score_mean="mean", score_std="std",)score_df.groupby("subject")["score"].agg(agg_dict)
のように書けていたのですが、pandas0.20.1で Deprecations になったようで、pandas1.4.3 では以下のようなエラーが発生しました。
SpecificationError: nested renamer is not supported
わかりやすく.rename
を使って書けば問題ないです。
( score_df.groupby("subject")["score"].agg(["max","mean","std"]).rename( columns=dict(max="score_max", mean="score_mean", std="score_std",)))
ちなみに、pandas でメソッドチェーンが長くなった時は()
で囲むと改行できて見やすいです
※2022.05.02 追記
nkay さんにコメントして頂いた通り、以下のように書けばSpecificationError
を回避できます。
nkay さんコメントありがとうございます!
agg_dict=dict( score_max="max", score_mean="mean", score_std="std",)score_df.groupby("subject")["score"].agg(**agg_dict)
成績データに偏差値列をつけたいとしましょう。
偏差値の計算式は
で計算できるので平均点と標準偏差を求める必要がありますが、行単位の操作で計算が完結するように各教科の平均点と標準偏差の列を追加したいです。こんな時には.transform
を使うと便利です。普通に groupby をすると行数が「集約キーの組み合わせ数」に集約されますが、.transform
では行数が変わらないので、列追加に向いています。
for agg_funcin["mean","std"]: score_df[f"score_{agg_func}"]= score_df.groupby("subject")["score"].transform(agg_func)
ちなみに、.transform
にもリストで集約関数を渡して
subject_agg_df=( score_df.groupby("subject")["score"].transform(["mean","std"]).rename( columns=dict( mean="subject_mean", std="subject_mean",)))score_df= pd.concat([score_df, subject_agg_df], axis=1)
と書けると思っていたのですが、エラーが吐かれました。
TypeError: unhashable type: 'list'
公式ドキュメントには
funcfunction, str, list-like or dict-like
list-like of functions and/or function names, e.g. [np.exp, 'sqrt']
とあるので書けそうなのですが、Series
では書けるけどGroupBy
オブジェクトでは出来ないということでしょうか...
さて次章ではここから偏差値を求めていきましょう。
Dataframe
でループを回す時に、.iterrows
などがありますが、私は原則 for 文は書かないようにしています。for 分を回さない方法としてはまず.apply
があるでしょう。
# 偏差値は英語でdeviation valueというらしい# 逆に分かりにくいと思ったのでローマ字でdefget_hensachi(score, mean, std):return50+(score- mean)/ std*10score_df["hensachi"]=( score_df[["score","score_mean","score_std"]].apply(lambda row: get_hensachi(row["score"], row["score_mean"], row["score_std"]), axis=1))
.apply
を使う上でよくある間違いに触れておきます。複数列を選択して apply を行う時に
score_df[["score","score_mean","score_std"]]
ではなく
score_df["score","score_mean","score_std"]
と書いてしまったり、1 列について.apply
をする時に
score_df["score"].apply(.., axis=1)
とaxis
を指定してしまったりします。
これはDataFrame
とSeries
の次元を意識しておくと間違えにくいかと思います。score_df
から 2 種類の方法でuser_id
を取り出して見ましょう。
print("score_df['user_id']:",type(score_df["user_id"]))print("score_df[['user_id']]:",type(score_df[["user_id"]]))
実行結果
score_df['user_id']: <class 'pandas.core.series.Series'>score_df[['user_id']]: <class 'pandas.core.frame.DataFrame'>
Series
DataFrame
になります。小難しくいうと「切り出した次元 = 結果の第 2 次元」になると思います。それぞれの次元数を出してみると分かりやすいです。
print("score_df['user_id']:", score_df["user_id"].shape)print("score_df[['user_id']]:", score_df[["user_id"]].shape)
実行結果
score_df['user_id']: (25,)score_df[['user_id']]: (25, 1)
axis=1
が列方向というのを理解している方は多いと思うのですが、意外と次元の意識をしておらずSeries
なのに axis を指定してしまうケースをよく見るので、意識してみてはどうでしょうか。
ところで「.apply
は遅いんじゃないの?」と思った方もいるのではないでしょうか。私も知識だけあったので、100 万行 x 2 列のデータを生成し、apply で和をとって実際に検証してみます。(本来ならnumpy
で完結させた方が圧倒的に早い処理です)
import timeimport numpyas np# numpyでデータ生成(100万行 x 2列)start_time= time.time()benchmark_df= pd.DataFrame(np.random.rand(10**6,2), columns=["a","b"])print(time.time()- start_time)# 0.02182292938232422# applyで100万行に対して和をとるstart_time= time.time()benchmark_df["sum"]= benchmark_df.apply(lambda row: row["a"]+ row["b"], axis=1)print(time.time()- start_time)# 11.313948154449463
確かに遅いですね。この解決策としてnp.vectrize
などを使ってベクトル化する方法(Do You Use Apply in Pandas? There is a 600x Faster Way)なども挙げられますが、私は並列処理を可能にしてくれるpandarallelを使っています。
from pandarallelimport pandarallelpandarallel.initialize()# INFO: Pandarallel will run on 4 workers.# INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.start_time= time.time()benchmark_df["sum"]= benchmark_df.parallel_apply(lambda row: row["a"]+ row["b"], axis=1)print(time.time()- start_time)# 3.1791868209838867
4workers で動かしているので処理時間は約 1/4 になっていますね。
pandarallel の利点は以下の 2 点だと思っています。
import
して.apply
を.parallel_apply
に変えるだけなので導入・ルール化が簡単一人でコードを書くのであれば問題ないと思うのですが、複数名でコードを管理していく場合はルール化や理解しやすいコードを書くことに重きを置きたいと考えているためpandarallel
は重宝しています。後述しますが、DataFrame
で速度面を気にする時は設計の改善で回避できるケースもあると思っています。
parallel_apply
のように導入が簡単なパッケージは積極的に使っていこうと思っています。pandas でオレオレ関数をたくさん定義していくのも楽しいのですが、車輪の再開発になったり、チーム開発の時に使えなかったり、テストをちゃんと書いておく必要があったりします。
parallel_apply
の他にはpandas-profilingがあります。pandas-profiling
はデータの概要を把握するのに非常に便利です(下の gif は公式ドキュメントから引用しました)。
本節で言いたかったことは、あくまで apply が遅いことは認識しておくことであり「.apply
を書くな!」ではありません。自分(or 組織)のルールとして.apply
を使わないというのは 1 つの方針であるとは思いますが、for 文を書く方法しか知らなかった方は.apply
という選択肢があるし、それほどデータ量が多くなければ十分使えるよという意図で書いています。
pandas
は様々なファイル IO(Input/Output)に対応していますが、一番よく使うのはやはり CSV ではないでしょうか?しかし CSV はParquetなどとは異なり、項目ごとの型定義がされているわけではありません。そのため、注意しないと以下のようなバグが生じます。
# 成績のダミーデータを生成するdefcreate_dummy_scores(n): scores=[dict( user_id=f"{i+1:>03}",# 001, 002, ... に変更 subject=subject, score=random.randint(0,100))for iinrange(n)for subjectin["国語","数学","英語","理科","社会"]]return scorespd.DataFrame(create_dummy_scores(3))
user_id | subject | score | |
---|---|---|---|
0 | 001 | 国語 | 89 |
# 書き込みf_write= StringIO()# 書き込むファイルの代わりpd.DataFrame(create_dummy_scores(3)).to_csv(f_write, index=False)# 読み込みf_read= StringIO(f_write.getvalue())# 読み込むファイルの代わりpd.read_csv(f_read)
user_id | subject | score | |
---|---|---|---|
0 | 1 | 国語 | 89 |
はい、user_id
が「001」から「1」になりました。これはpd.read_csv
の時に 001 をint64
と判断しているのが、原因です。これを回避するには
f_read= StringIO(f_write.getvalue())# 読み込むファイルの代わりdtype=dict( user_id="object", subject="object", score="int64")pd.read_csv(f_read, dtype=dtype)
のようにdtype
を指定してあげれば正しく読み込むことが出来ます。
panderaは列の型や値の検証に役立つバリデーションツールです。pandera についての詳しい内容は公式ドキュメントを参照して頂ければと思いますが、成績テーブルのスキーマを定義してみましょう。
import panderaas pafrom pandera.typingimport SeriesclassScoreSchema(pa.SchemaModel): user_id: Series[str]= pa.Field(nullable=False) subject: Series[str]= pa.Field(isin=["国語","数学","英語","理科","社会"], nullable=False) score: Series[int]= pa.Field(ge=0, le=100, nullable=False)
非常に直感的ですね。ge
(greater or equal)は「以上」、le
(less or equal)は「以下」になります。試しに 101 点を代入してみてましょう。
df.loc[0,"score"]=101ScoreSchema.validate(df)
SchemaModel
にはvalidate
メソッドがあり、これを実行することでスキーマ通りのDataFrame
になっているかの検証を行ってくれます。結果は以下のようなエラーを吐きます。
SchemaError: <Schema Column(name=score, type=DataType(int64))> failed element-wise validator 1: <Check less_than_or_equal_to: less_than_or_equal_to(100)>
df.loc[0, "subject"] = "プログラミング"
のようにしても同じように検証で弾かれます。
また、スキーマから pandas のdtype
を生成することも出来るため、read_csv
する時のdtype
指定にも役立ちます
from typingimport Dict, Typedefget_dtype_from_schema(schema: Type[pa.SchemaModel])-> Dict[str,str]:return{col:str(dtype)for col, dtypein schema.to_schema().dtypes.items()}dtype= get_dtype_from_schema(ScoreSchema)df= pd.read_csv(f_read, dtype=dtype)
pandera は DataFrame をカッチリ運用したいケースで威力を発揮するので、検証用にさらっと jupyter でコードを書くだけであれば不要、もしくは開発スピードを下げてしまうかもしれません。また、Python の静的解析ツールであるmypyとの親和性が良いこともあり、私は jupyter で分析する時にはほとんど用いないですが、PoC でちょっとした集計の自動化が必要な場合は VSCode で開発しつつ pandera を活用するケースもあります。
DataFrame
の非 null 制約や主キー制約が気になってくるのであれば、csv ではなく RDB から取得するというのも一つの手段です。あるのであれば検証用 DB を使えばよいですし、なれけば検証用のデータを csv ではなく、sqlite の db 形式で用意するといった方法もあるかと思います。sqlite とread_sqlを用いた例は以下の通りです。
sqlite の DB(ファイル)作成
import sqlite3DBNAME="score.db"conn= sqlite3.connect(DBNAME)cur= conn.cursor()# scoreテーブルの作成cur.execute("""CREATE TABLE scores( user_id TEXT, subject TEXT, score INTEGER, PRIMARY KEY(user_id, subject))""")# データ投入for scorein create_dummy_scores(5): insert_sql=""" INSERT INTO scores(user_id, subject, score) VALUES (?, ?, ?) """ cur.execute( insert_sql,(score["user_id"], score["subject"], score["score"]))# commit & closeconn.commit()cur.close()conn.close()
sqlite から SQL で取得したデータDataFrame
に変換
DBNAME="score.db"conn= sqlite3.connect(DBNAME)df= pd.read_sql("select * from scores", con=conn)conn.close()
ちなみに、SQLite の型定義でSTRING
があり、こちらを使っていたのですが、「001」ではなく「1」という形式で保存されてしまったのでTEXT
にしています。これは公式ドキュメントに記載がありました(知らなかった)。
And the declared type of "STRING" has an affinity of NUMERIC, not TEXT.
この章に関しては様々な意見があると思いますが、私の執筆時点での考えを紹介しておきます。
まず第一に pandas は非常に素晴らしいツールだと考えています。機械学習をやる方はもちろん、ちょっとした集計、可視化を行いたいのであればピッタリです。ただ一方で、分析や検証のために使っていたコードをそのまま大規模データに使おうと考えたり、プロダクトのコードに移行しようと考えるのは待ってほしいと思います。
データ量がある程度大きいのであればDataFrame
ではなく DB や BigQuery(BQ), Athena などのクラウドサービス, Tableau, Redash などの BI ツールを使うのが良いと思っています。SQL だけでは不十分であれば AWS Glue や Spark(PySpark)という選択肢もあります。Athena のクエリ実行料金は現時点で 5$/1TB なので、リーズナブルに利用できます。
私自身、大規模データを処理しなければならないケースには何度か遭遇したことはありますが、DataFrame
でやらなければならないケースはそれほどありません。(kaggle
などの機械学習コンペに現時点であまり参加したことがないので、コンペではDataFrame
で速度を気にしながら処理する必要があるのかもしれません。)大規模データの集計や分析は BigQuery や Athena で行い、サンプリングしたデータセットをDataFrame
で分析するという方法もあります。
高度な機械学習モデルなどに詳しいデータサイエンティストが、データ加工やデータ基盤に精通しているデータエンジニアの領域もカバーしようとしてプロジェクトが失敗するというケースもよくあるのではないでしょうか。確かにDataFrame
は便利ですが、使う場面を間違えないようにすることも意識して頂きたいと思います。私自身は小規模〜中規模データで利用するようにしています。
本記事で何度か紹介した通り、DataFrame
は RDB のように主キー制約や非 NULL 制約はないので、様々なことを気にしながらデータ処理を行わないとバグになる可能性があります。集計結果を出して見たけど、実は重複計算していた、実は抜け漏れがあったという経験がある方も多いのではないのでしょうか?
もちろんテストをしっかり書いて品質を担保するのは必須だとして、プロダクトレベルではDataFrame
ではなく BQ などのクラウドサービスを利用したり、dbtなどを利用するのも良いかと思います。この辺りは私自身勉強中なので、キャッチアップしていきたいと思います。
当初の想定よりも長文になってしまいました。機械学習や kaggle をやりたくて Python を始めた方は pandas を必須で使うかと思います(私自身は機械学習よりもデータ加工やデータ基盤に興味があります)。この記事を読んで pandas を使う上で少しでも役に立てば幸いです。
バッジを受け取った著者にはZennから現金やAmazonギフトカードが還元されます。