#TL;DR
FastAPIでscoped_sessionをデフォルト設定のまま使うと、同期エンドポイントがThreadPoolExecutorで実行されることでセッションがスレッドに永続的に紐づく。アイドル状態のスレッドが持つコネクションがMySQLのwait_timeoutで切られると、Broken Pipeが発生する。scopefuncをContextVarベースのものに置き換えれば、既存コードを一切変更せずにリクエスト単位のセッション分離を実現できる。
#散発的に発生するBroken Pipeエラー
(2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))")
このエラーは月に一、二度、たいてい夜明け前か、朝いちばんのリクエストで顔を出した。リトライすれば毎回消え、翌月の同じ時間帯にまた顔を出す。しばらく優先度を下げて寝かせていたが、ある火曜日、頻度が普段の十倍に跳ね上がった。さすがに腰を据えて追いかけることにした。追ってみるとパターンははっきりしていた。
- トラフィックがしばらく途絶えた後、再びリクエストが来たときに発生する
- MySQLの
wait_timeout(デフォルト8時間)とぴったり一致する - 特定のエンドポイントではなく、さまざまなAPIで散発的に出る
pool_pre_ping=Trueがすでに有効になっているのに消えない
#最初に疑ったもの
不思議だったのは、pool_pre_ping=Trueがすでに有効になっているという事実だった。SQLAlchemyの公式推奨オプションで、死んだコネクションをチェックアウトの時点で検証してくれる。productionのデフォルトとしてずっと前から有効にしていた。それでもエラーは毎月のように戻ってきた。
まず疑ったのはMySQLのwait_timeoutだった。デフォルトは28800秒、つまり8時間。トラフィックが8時間も途絶えることは滅多にないと思い込んでいたが、明け方4時から8時のあいだ、一部のendpointがちょうどその長さの沈黙を作っていた。wait_timeoutを16時間に伸ばした。次の週、同じことが起きた。
pool_recycle=3600も追加した。改善は微々たるもの。そこでようやく「チェックアウト」という単語が頭に引っかかった。pool_pre_pingはチェックアウトする瞬間にしか動かない。もし我々のケースでは、チェックアウト自体が起きていないとしたら? その問いが、本当の原因へとつながる最初の手がかりになった。
後から分かった。pool_pre_pingはずっと正しく動いていた、ただしチェックアウトが実際に発生するコネクションについては。普段捕まえていたエラーはそちらだった。捕まえ損ねていたほうは、そもそもチェックアウトが起きないコネクションだった。それが本当の問題だった。
#scoped_sessionの設計前提とFastAPIの衝突
#scoped_sessionはWSGI時代の「1リクエスト = 1スレッド」モデルを前提に設計された
scoped_sessionが作られた背景を理解しないと、問題は見えない。Flask、Django、Pyramidなど従来のPython Webフレームワークは、WSGI(Web Server Gateway Interface)をベースにしている。WSGIサーバ(Gunicorn、uWSGIなど)は各HTTPリクエストを一つのワーカースレッドに割り当て、リクエスト処理が完了するとそのスレッドを解放する。1リクエスト = 1スレッドが保証されるモデルだ。
SQLAlchemy公式ドキュメントはこの前提を明示している。
“the majority of Python web frameworks utilize threads in a simple way, such that a particular web request is received, processed and completed within the scope of a single worker thread.”
「ほとんどのPython Webフレームワークは単純な方法でスレッドを使います。特定のWebリクエストが単一のワーカースレッドの範囲内で受信・処理・完了されます。」
— SQLAlchemy Docs: Using Thread-Local Scope with Web Applications
この前提の下では「セッションをスレッドに紐づければ、結局はリクエストに紐づくことになる」という論理が成立する。だからscoped_sessionはデフォルトでthreading.local()ベースのThreadLocalRegistryを使い、スレッドごとに一つのセッションを保持する。カスタムscopefuncを渡すと辞書ベースのScopedRegistryに切り替わる。この記事の解決策はまさにそのメカニズムを利用する。WSGI時代にはデフォルト設定だけで十分だった。
#なぜモダンフレームワークで問題になるのか
FastAPI、Starletteなど、ASGI(Asynchronous Server Gateway Interface)ベースのフレームワークは並行性モデルが根本的に異なる。
- WSGI: リクエストごとにスレッド割り当て。スレッドとリクエストが1:1。
- ASGI: イベントループベース。
async defエンドポイントはイベントループで直接実行され、defエンドポイントはThreadPoolExecutorのスレッドプールで実行される。
決定的な違いは、スレッドプールのスレッドが再利用される点だ。WSGIではリクエストが終わるとスレッドが解放される。FastAPIは違う。anyioのCapacityLimiterが同時実行をデフォルトで40に制限し、同じスレッドたちが複数のリクエストを順に処理する。一つのスレッドがリクエストAを処理した後、しばらく休んでリクエストBを処理し、またリクエストCを処理する。一つのスレッドが複数のリクエストを跨いで生き続ける環境でthreading.local()でセッションを管理すると、セッションがスレッドに紐づいたまま残り続ける。
SQLAlchemy自身もこの限界を認めている。
“It is however strongly recommended that the integration tools provided with the web framework itself be used, if available, instead of scoped_session. In particular, while using a thread local can be convenient, it is preferable that the Session be associated directly with the request, rather than with the current thread.”
「可能であればscoped_sessionの代わりに、Webフレームワーク自体が提供する統合ツールを使うことを強く推奨します。thread localは便利ですが、Sessionは現在のスレッドではなくリクエストと直接結び付ける方が望ましいです。」
— SQLAlchemy Docs: Using Thread-Local Scope with Web Applications
graph TB
Client[Client Request] --> FastAPI[FastAPI<br/>async event loop]
FastAPI -->|"def endpoint<br/>(同期)"| TPE["ThreadPoolExecutor<br/>(スレッドプール)"]
FastAPI -->|"async endpoint<br/>(非同期)"| EL[Event Loopで直接実行]
TPE --> T1[Thread-1]
TPE --> T2[Thread-2]
TPE --> TN[Thread-N]
T1 -->|"threading.local() ベース"| S1["Session-A<br/>(永続バインディング)"]
T2 -->|"threading.local() ベース"| S2["Session-B<br/>(永続バインディング)"]
TN -->|"threading.local() ベース"| SN["Session-N<br/>(永続バインディング)"]
style S1 fill:#fee,stroke:#c00,color:#18181b
style S2 fill:#fee,stroke:#c00,color:#18181b
style SN fill:#fee,stroke:#c00,color:#18181b
スレッドプールのサイズが40なら、最大40個のセッションがそれぞれ一つずつスレッドに永続的に紐づく。
#「close()を呼んでいるのになぜBroken Pipeが出るのか?」
この問題に最初に出会うと不可解だ。明らかにsession.close()を呼んでいるのに、なぜコネクションが片付かないのか? 原因はclose()とremove()の違いにある。scoped_session環境ではこの二つは根本的に違う仕事をする。
SQLAlchemy公式ドキュメントによると:
“The
scoped_session.remove()method first callsSession.close()on the current Session, which has the effect of releasing any connection/transactional resources owned by the Session first, then discarding the Session itself.”「
scoped_session.remove()メソッドは、まず現在のSessionに対してSession.close()を呼び出してコネクション/トランザクションリソースを解放し、その後Session自体を破棄します。」
整理すると:
graph LR
subgraph "session.close()"
direction TB
C1[Session-A] -->|"コネクションのみプールに返却"| CP1[Connection Pool]
C1 -.->|"registryに残ったまま"| REG1["registry#91;thread_1#93; = Session-A ❌"]
end
subgraph "session_factory.remove()"
direction TB
C2[Session-A] -->|"コネクションをプールに返却"| CP2[Connection Pool]
C2 -->|"registryから削除"| REG2["registry#91;thread_1#93; 削除 ✅"]
end
style REG1 fill:#fee,stroke:#c00,color:#18181b
style REG2 fill:#efe,stroke:#0a0,color:#18181b
close()= コネクションを返却。セッションオブジェクトはregistryに残る。次の呼び出しでは同じセッションが返る。remove()= close()を実行した後、registryからセッションを削除。次の呼び出しでは新しいセッションが生成される。
公式ドキュメントの説明どおり、remove()はclose()を実行した後、セッションオブジェクト自体をregistryから取り除く。そのセッションに溜まったidentity mapやエラー状態まで完全に初期化される。
ところがscoped_sessionの便利さは、こうしたセッション管理を明示的にしなくていい点にある。close()やremove()を呼ばなくてもsession_factory()を呼ぶだけで勝手にセッションを返してくれる。この便利さがFastAPIでは毒になる。ほとんどのコードでclose()を明示的に呼ばないので、Sessionがコネクションを保持したままスレッドに残り続ける。
#Broken Pipeまでの全体の流れ
sequenceDiagram
participant C as Client
participant T as Thread-3
participant S as Session-A<br/>(scoped)
participant M as MySQL
Note over C,M: 22:00 — 最後のリクエスト
C->>T: GET /api/data
T->>S: session_factory() → Session-Aを返す
S->>M: Connection-Xでクエリ
M-->>C: Responseを返す
Note over S: リクエスト完了。close()未呼び出し<br/>Session-AがConnection-Xを保持し続ける
Note over C,M: ⏳ 8時間経過 (Thread-3にリクエストなし)
Note over M: 06:00 — MySQL wait_timeout 満了
M-xS: Connection-Xをサーバ側で終了<br/>(クライアントは知らない)
Note over C,M: 09:00 — 新しいリクエストがThread-3に振られる
C->>T: GET /api/users
T->>S: session_factory() → 同じSession-Aを返す
S->>M: 既に保持しているConnection-Xでクエリ試行
M--xS: ❌ Broken Pipe!
Note over T: (2006, "MySQL server has gone away")
Sessionがコネクションを直接保持しているので、コネクションプールでチェックアウトが発生しない。pool_pre_pingはチェックアウト時点でしか動かないので、このシナリオでは無力化される。
#同じ問題に当たっているなら — 関連issue集
この問題を追跡する中で、FastAPI + SQLAlchemyの組み合わせで同じ現象が繰り返し報告されていることを確認した。似た症状を見ているなら、以下のスレッドが参考になる。
FastAPI GitHubの関連discussion:
- Discussion #8017 — scoped_session vs Dependency vs Middlewareの比較、ContextVar + scopefuncによる解決事例
- Discussion #6628 — DI方式セッションのデッドロック問題。スレッドプールがコネクション待ちで埋まるとアプリ全体が止まる
SQLAlchemy側でも、FastAPI関連の並行性issueが急増しているというコメント:
“we seem to be getting a flurry of concurrency issues involving FastAPI very suddenly. These errors, particularly the ‘lost connection’ error, are often the side effect of conditions that were ultimately caused by concurrency issues.”
「FastAPIに関する並行性の問題が急に押し寄せている。これらのエラー、特に『lost connection』エラーは、究極的には並行性の問題によって引き起こされた状況の副作用であることが多い。」
— zzzeek, sqlalchemy/sqlalchemy Discussion #8891
この問題が発生する典型的な条件:
scoped_sessionをscopefunc指定なしで使う(デフォルトthreading.local()ベース)- 同期エンドポイント(
def)としてThreadPoolExecutorで実行 - MySQL/MariaDBなど
wait_timeoutでアイドルコネクションを切るDBを使用 - トラフィックが一定でなく、一部のスレッドが長時間アイドル状態になる
#scopefuncをContextVarに置き換えて解決
#核となるアイデア
カスタムscopefuncを渡すと、scoped_sessionはThreadLocalRegistryの代わりにScopedRegistryを使う。この構造はただの辞書だ。
# SQLAlchemy ScopedRegistry の内部 (簡略化)
class ScopedRegistry:
def __init__(self, createfunc, scopefunc):
self.registry = {}
def __call__(self):
key = self.scopefunc() # このキーが何になるか — それが全て
if key not in self.registry:
self.registry[key] = self.createfunc()
return self.registry[key]
キーをリクエストごとに固有の値に設定すれば、同じスレッドで実行されてもリクエストごとに別のセッションを使うようになる。
graph LR
subgraph "Before: スレッドIDがキー"
direction TB
SF1["scopefunc()"] -->|"Thread-1 ID"| R1["registry#91;thread_1#93; = Session-A"]
SF1 -->|"Thread-2 ID"| R2["registry#91;thread_2#93; = Session-B"]
end
subgraph "After: リクエストIDがキー"
direction TB
SF2["scopefunc()"] -->|"req-a1b2"| RR1["registry#91;req-a1b2#93; = Session-X"]
SF2 -->|"req-e5f6"| RR2["registry#91;req-e5f6#93; = Session-Y"]
end
style R1 fill:#fee,stroke:#c00,color:#18181b
style R2 fill:#fee,stroke:#c00,color:#18181b
style RR1 fill:#efe,stroke:#0a0,color:#18181b
style RR2 fill:#efe,stroke:#0a0,color:#18181b
#実装: 3ファイル、変更点を最小化
#ContextVarモジュール (新規)
# context_session.py
from contextvars import ContextVar, Token
_session_context: ContextVar[str] = ContextVar("session_context")
def set_session_context(context_id: str) -> Token:
return _session_context.set(context_id)
def get_session_context() -> str:
return _session_context.get()
def reset_session_context(token: Token) -> None:
_session_context.reset(token)
#Database クラス (1行変更)
class Database:
def __init__(self, engine: Engine) -> None:
self.session_factory = orm.scoped_session(
orm.sessionmaker(autocommit=False, autoflush=False, bind=engine),
scopefunc=get_session_context # ← この一行が核心
)
#ミドルウェア (新規)
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from uuid import uuid4
class ContextSessionMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
context_id = str(uuid4())[:8]
token = set_session_context(context_id)
try:
response = await call_next(request)
return response
finally:
# remove() → reset の順序が重要:
# remove()は内部でscopefunc()を呼び、現在のコンテキストのセッションを探す。
# resetが先に走るとscopefunc()が別の値を返し、見当違いのセッションが消される。
database.session_factory.remove() # 現在のコンテキストのセッションを探して削除
reset_session_context(token) # その後にコンテキストをリセット
app.add_middleware(ContextSessionMiddleware)
UseCase、Repository、エンドポイントなど既存コードは変更しない。
#適用後のリクエストライフサイクル
sequenceDiagram
participant C as Client
participant MW as ContextSession<br/>Middleware
participant T as Thread-3
participant S as Session<br/>(リクエストごとに生成)
participant M as MySQL
C->>MW: GET /api/data
MW->>MW: set_session_context(uuid)
MW->>T: call_next(request)
T->>S: session_factory()
Note over S: scopefunc() → uuid<br/>registryになし → 新しいSession生成
S->>M: query 実行
M-->>S: 結果
S-->>T: 結果
T-->>MW: Response
MW->>MW: session_factory.remove()
Note over S: registryから削除 ✅
MW->>MW: reset_session_context(token)
MW-->>C: Response
同じスレッドに別のリクエストが振られてもcontext_idが違うので、別のセッションが生成され、リクエストが終わるとremove()で完全に片付けられる。
このパターンが動く核となる前提: FastAPI/Starletteは同期エンドポイントをanyioのrun_in_threadpoolで実行する。anyioは内部でPython標準のContext.run()を使うので、呼び出し時点のContextVarがワーカースレッドにそのままコピーされる。つまり、ミドルウェアでsetしたcontext_idが、エンドポイント内で呼ばれるsession_factory()まで同じ値で届く。
デプロイの翌日からMySQL server has gone awayエラーは0になった。一ヶ月経った今も0だ。メモリ使用量に変化なし(remove()が毎リクエスト片付ける)。レスポンスタイムP99も変化なし(ContextVarのオーバーヘッドは無視できる程度)。
#DIパターンに切り替えなかった理由
FastAPI公式ドキュメントはDepends(get_db)パターンを推奨している。
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
return db.query(User).all()
セッションのライフサイクルが明示的で、分離も完璧だ。だが既存のコードベースがDIコンテナを通じてscoped_sessionをグローバルに注入する構造なら、移行コストが大きい。すべてのUseCaseとRepositoryのセッション受け渡し方を変更する必要があり、テストコードも全面的に書き直さなければならない。
DI方式自体にもFastAPI上の問題がある。Discussion #6628で報告されているように、スレッドプールがコネクション待ちで埋まると、dependencyのfinallyブロックを実行するスレッドを確保できず、デッドロックが発生し得る。
scopefuncの置き換えは既存構造を維持したまま核心的な問題だけを解決する。SQLAlchemyが推奨する「Sessionをリクエストと直接結び付ける」方向にも合致する。
#他のフレームワークも同じ仕組みで解決している
主要なフレームワークは結局、同じメカニズム — scopefuncの戻り値をキーとして使うregistry — に行き着く。違いはキーを何にするかだけだ。
| 実装 | scopefunc | キーの単位 |
|---|---|---|
scoped_session デフォルト | threading.local() | スレッド |
| Flask-SQLAlchemy | Flask request context | Flask リクエスト |
async_scoped_session | ContextVar.get() / current_task() | 非同期タスク |
| Galaxy Project | ContextVar (フォールバック: threading.get_ident) | リクエスト |
| この記事の解決策 | ContextVar.get() | リクエスト |
FastAPI Discussion #8017でも「リクエストごとにUUIDを生成してContextVarに保存し、それをscopefuncとして使えば完全に動く」という検証事例が共有されている。Galaxy Project(大規模バイオインフォマティクスプラットフォーム)もFastAPIベースで同じパターンをproductionに適用している。
SQLAlchemy作者のMike Bayer(zzzeek)もGitHub discussionで、asyncio環境ではthreading.local()の代わりにPython標準ライブラリのcontextvarsを使うことを推奨している。
#適用時の注意点
#1. threading.local()はスレッド終了時にのみ安全だ
SQLAlchemyドキュメントによれば、threading.local()オブジェクトはスレッドが終了するとそのストレージがガベージコレクションされる。スレッドを生成・破棄する環境ではremove()なしでも安全だ。だがFastAPIのThreadPoolExecutorはスレッドを終了せずに再利用する。この環境ではremove()の呼び出しが必須で、それを保証できないならscopefunc自体を置き換える方が確実だ。
#2. pool_pre_pingとpool_recycleは根本解決ではない
engine = create_engine(
"mysql+pymysql://...",
pool_pre_ping=True,
pool_recycle=3600,
)
これらの設定はコネクションプール層で死んだコネクションを検出する。だが核となる限界がある。pool_pre_pingはコネクションがプールからチェックアウトされる時点でしか動かない。
scoped_sessionパターンではclose()を明示的に呼ばないことが多い。このときSessionがコネクションを直接保持しているので、プールからのチェックアウトが発生せず、pool_pre_pingが動く機会がない。close()を呼んだとしてもSession自体はregistryに残って同じSessionが返されるので、根本問題(セッションがスレッドに紐づくこと)は解決されない。
根本解決はscopefuncの置き換えで、pool_pre_pingは防御的な補助設定として併用するのが良い。
#3. remove() の漏れはメモリリークを起こす
ミドルウェアのfinallyブロックでremove()を必ず呼ぶこと。スレッドベースではスレッド数(数十個)分しかセッションが生まれなかったが、ContextVarベースではリクエスト数だけ生まれ得る。remove()が抜けるとScopedRegistryの辞書が無限に膨らむ。
#4. BaseHTTPMiddleware の制約を知って使う
この記事の例はBaseHTTPMiddlewareを使う。実装は簡潔だが、Starletteには既知の制約がある。downstreamのアプリが別タスクで実行されるため、streaming responseが正常に動かないことがある。ContextVarをミドルウェアでsetしてdownstreamでreadするこのパターンは問題なく動くが、productionでstreamingが必要ならpure ASGI middlewareへの移行を検討するとよい。
#5. セッションの生成と片付けはすべてミドルウェア内で処理する
FastAPIはミドルウェアとdependencyを別のコンテキストで実行することがある。ContextVarのset/resetをミドルウェアで行い、remove()をdependencyで行うと、コンテキストが既にリセットされた後でremove()が呼ばれることがあり得る。この問題はdependency-injector Discussion #493でも報告されている。セッションの全ライフサイクルをミドルウェアの中で管理するのが安全だ。
#まとめ
| 項目 | デフォルト scoped_session | scopefunc + ContextVar |
|---|---|---|
| セッションの分離単位 | スレッド | リクエスト |
| セッション管理 | 暗黙的 (threading.local) | 暗黙的 (ContextVar) |
| Broken Pipe リスク | あり (アイドルスレッド) | なし (リクエスト終了時に片付け) |
| 既存コードの変更 | なし | scopefunc 1行 + ミドルウェア |
| UseCase/Repository の修正 | 不要 | 不要 |
| デバッグの難易度 | 高い (8時間遅れて発現) | 低い (即時発見) |
| greenlet/async 互換性 | 限定的 | 互換 |
結局のところ、間違った場所に縛りつけていたのはコードではなくセッションが暮らす空間のほうだった。「スレッド = リクエスト」というWSGI時代のレガシーを、そのままASGI環境に持ち込んでいたのだ。修正は一行で済む。ただ、その一行をどこに置くかを見つけるまでに数日かかった。
FastAPIでscoped_sessionを使っていて、明け方にたまにerrno 32が出るなら、pool_pre_pingに手を伸ばす前に、まず一度こう問いかけてみてほしい — 自分のセッションは、いったいどこに縛られているのか?
#参考資料
- SQLAlchemy 2.0 Docs — Contextual/Thread-local Sessions — scoped_sessionの設計前提、close() vs remove()、カスタムscopefunc
- fastapi/fastapi Discussion #8017 — scoped_session vs Dependency vs Middlewareの比較、ContextVar + scopefuncの解決事例
- fastapi/fastapi Discussion #6628 — DI方式セッションのデッドロック報告
- sqlalchemy/sqlalchemy Discussion #8891 — FastAPI関連の並行性issue急増、zzzeekコメント