2025.01.13

GeminiのStreamingは通常のレスポンスもエラーハンドリングしよう 〜 Failed to parse final chunk of streamでプロセスが死なないために

Google Cloud Verrtex AI GeminiのSDKを利用して,GeminiのStreamingレスポンスを得るような処理を行っている方はお気付きかもしれません。おそらく,2024年12月中旬から,GeminiのAPIレスポンスに小さな変更が行われた関係で,レートリミットに当たった場合にこれまでと異なるエラーレスポンスが返却されるようになりました。
また,それに伴い,SDKでこれまでとは異なるタイミングで例外が発生し,Node.jsの場合は実装によってはUnhandled Promise Rejectionでプロセスが予期せず終了する可能性があります。
この記事では,その問題の原因と対応について考察します。

1.現象と原因,対策

結論から言うと,予期せぬタイミングでNode.jsのStreamのerrorが発生し,Node.jsのプロセスが終了します。

対策は,以下のようにStreamではないResponseのPromiseのrejectもハンドリングすることです。

const streamingResp = await generativeModel.generateContentStream({
  // 必要なパラメータ
});
streamingResp.response.catch((e) => {
  // これを実装しないと落ちる
});
for await (const chunk of streamingResp.stream) {
  // 正常系処理
}

では,次にエラーの原因を詳しくみてみましょう。

Google GeminiのStreamingレスポンスをVertex AIを使って受信する場合,以下のような実装を行います。

const generativeModel = vertexAI.getGenerativeModel({
  ...options // オプション(詳細はSDK
});

const streamingResp = await generativeModel.generateContentStream({
  // 必要なパラメータ
});

for await (const chunk of streamingResp.stream) {
  ...chunkに対する処理
}

この実装では,受信したレスポンスをJSのStreamで捌くことが可能です。

正常系では,この実装で何も問題がありません。また,API実行時に判定されるレートリミットや,不適切表現でのブロックは,generateContentStreamの段階で例外が上がるため,適切にエラーハンドリング可能です。

しかし,2024年12月ごろから,Streamの処理の途中でエラーが発生する,つまり,SDK内部で使用しているstreamにてerrorが発生するようになりました。代表的な要因としては,レートリミットがあるようです。この場合,Failed to parse final chunk of streamというエラーメッセージが出力されます。

このような事象が発生すると,アプリケーションでstreamのエラーハンドリングを適切に行なっていても,Node.jsのプロセスが終了してしまいます。

※Streamのerrorを適切にハンドリングしない場合,Node.jsのプロセスが終了するのは正常な挙動です。しかし,今回はSDKの利用者に隠蔽された形でわかりづらく,原因の特定のためにSDKのコードを参照する必要があります。

GeminiのSDKは,Streamのレスポンスを得るために,generateContentStreamを呼び出します。この関数の戻り値は,Streamを処理するためのAsyncIteratorと,レスポンス全体を参照するためのPromiseです。これらは,このようにAPIのレスポンスのStreamをteeで分割し,streamとレスポンス全体を表すプロミスに分割されます。

これらは,後続の処理を読むとわかるように,Streamをteeで分割し,StreamをAsyncIteratorとPromiseに分割しています。

Streamを使った実装する上では,generateContentStreamの戻り値のうち,Streamのみ処理すれば良いように見えます。しかし,SDKの内部でStreamをteeで分割し,個別にReaderで処理を行っている都合上,Streamでerrorが発生すると双方が影響を受けます。そのため,使用していない.responseの戻り値でも例外が発生し,ハンドリングされていない例外となってプロセスが終了してしまうのです。

2.小さな再現コード

GeminiのSDKで発生する問題を,通信を伴わない小さなコードで再現しました。

import { setTimeout } from 'node:timers/promises';

const stream = new ReadableStream({
  start(controller) {
    return a();
    function a() {
    setTimeout(1000).then(() => {
      controller.enqueue('a');
      return setTimeout(3000);
    }).then(() => {
       controller.enqueue('b');
       return setTimeout(0);
     }).then(() => controller.error('error'));
    }
  }
});

async function *itr(strr) {
  const reader = strr.getReader();
  while (true) {
    const chunk = await reader.read();
    yield chunk;
    if (chunk.done) break;
  }
}

async function itr_err(strr) {
  const reader = strr.getReader();
  const res = [];
  while (true) {
    const chunk = await reader.read();
    res.push(chunk);
    if (chunk.done) break;
  }
  return res.join('');
}

function getStream() {
  const [s1, s2] = stream.tee();
  return [Promise.resolve(itr(s1)), Promise.resolve(itr_err(s2))];
}

try {
  const testStream = await getStream()[0];
  for await (const s of testStream) {
    console.log(s);
    await setTimeout(5000);
  }
} catch(e) {
  console.log(e);
}

このコードは,GeminiのStream API通信をsetTimeoutで擬似的に再現し,途中で必ずエラーが生じるようにしたものです。また,受信側も内部の処理での遅延を大袈裟に再現するためにウェイトを入れています。

このコードは末尾のStream受信処理を見てもわかるように,エラーハンドリングを正しく行っているように見えます。しかし,getStream関数で大元のstreamをteeで分割しており,s2を使って生成したPromiseのエラーが処理されていないことがわかるでしょう。

このコードは非常にシンプルなものですが,実はGoogle Vertex AIのSDKも類似の実装になっており,streamとresponseという2つの変数を返す関数があり,それぞれstreamをteeで分割する実装になっているため,s2を処理するresponse Promiseの処理を忘れるとプロセスが死んでしまう,ということでした。

3.まとめ

Node.jsでGoogle Cloud Verte AI GeminiのSDKを利用して,Streamingレスポンスを用いた実装を行う場合,streamだけでなくresponseのエラーもcacheする実装を行わないと,予期せぬ例外によりプロセスが終了する可能性があることを解説しました。

このように,SDKが返すPromiseを使用しない場合も,エラーハンドリングを実装しておかないとプロセスが終了する(=アーキテクチャによってはサービス障害につながる)可能性もあります。

次世代システム研究室では,グループ全体のインテグレーションを支援してくれるアーキテクトを募集しています。アプリケーション開発者の方,次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら,ぜひ募集職種一覧からご応募をお願いします。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事