Claudia - Anthropic ClaudeのC# SDKと現代的なC#によるウェブAPIクライアントの作り方

AI関連、競合は現れども、性能的にやはりOpenAI一強なのかなぁというところに現れたAnthropic Claude 3は、確かに明らかに性能がいい、GPT-4を凌駕している……!というわけで大いに気に入った(ついでに最近のOpenAIのムーブが気に入らない)ので、C#で使い倒していきたい!そこで、まずはSDKがないので非公式SDKを作りました。こないだまでプレビュー版を流していたのですが、今回v1.0.0として出します。ライブラリ名は、Claudeだから、Claudiaです!.NET全般で使えるのと、Unity(Runtime/Editor双方)でも動作確認をしているので、アイディア次第で色々活用できると思います。

今回のSDKを作るにあたっての設計指針の一番目は、公式のPython SDKTypeScript SDKと限りなく似せること、です。というのもドキュメント類の解説はこれら公式SDKベースになるし、世の中的にもブログなどには公式SDKベースの記事が多く出回るでしょう。公式の充実したプロンプトライブラリも、APIリクエストで叩き込みたくなるかもしれない。

そんな時に、APIのスタイルが違うと、変換の認知負荷がかかります。些細なことですが、そういうところがすごく大事で引っ掛かってしまうので、徹底的に取り除きます。そのうえで、無理に動的な要素を入れず、C#らしさを崩さないというバランス取りが設計において重要です。

C#クライアントの見た目はこうです。

// C#
using Claudia;

var anthropic = new Anthropic();

var message = await anthropic.Messages.CreateAsync(new()
{
    Model = "claude-3-opus-20240229",
    MaxTokens = 1024,
    Messages = [new() { Role = "user", Content = "Hello, Claude" }]
});

Console.WriteLine(message);

比較してTypeScriptの見た目はこうなっています。

// TypeScript
import Anthropic from '@anthropic-ai/sdk';

const anthropic = new Anthropic();

const message = await anthropic.messages.create({
    model: 'claude-3-opus-20240229',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Hello, Claude' }],
});

console.log(message.content);

かなり近い!でしょう。そのうえで、C#版はdynamicDictionary<string, object>などは使わず、全て型付けされたものが指定されます。上記の例で使用しているC# 9.0で追加されたTarget-typed new expressionsや、C# 12で追加されたCollection expressionsの存在を前提として、うまくAPIを合わせています。

もともと、動的型付け言語のAPIのほうが(見た目は)簡潔で使いやすそう、という印象を抱くことは多いので、それと同レベルの簡潔さで、しっかりと型付けが効いて書けるというのは、現代のC#の大きな強みです。(そもそもTypeScriptの公式SDKに合わせようと思ったのは、私から見ても公式SDKのAPIスタイルはよくできていると思ったからです、仮にあまりにも酷かった場合は合わせようとはしなかったでしょう)

いかにも古典的なC#やJavaみたいな冗長な設計のAPIクライアントは、反省しましょう。現代のC#はここまでやれるのだから。

Streaming and Blazor

StreamingのAPIも用意されていて、Blazorと組み合わせれば簡単にリアルタイムに更新されるChat UIが作れます。コードは本当にたったのこれだけ、メソッド本体なんて10行ちょい!

[Inject]
public required Anthropic Anthropic { get; init; }

double temperature = 1.0;
string textInput = "";
string systemInput = SystemPrompts.Claude3;
List<Message> chatMessages = new();

async Task SendClick()
{
    chatMessages.Add(new() { Role = Roles.User, Content = textInput });

    var stream = Anthropic.Messages.CreateStreamAsync(new()
    {
        Model = Models.Claude3Opus,
        MaxTokens = 1024,
        Temperature = temperature,
        System = string.IsNullOrWhiteSpace(systemInput) ? null : systemInput,
        Messages = chatMessages.ToArray()
    });

    var currentMessage = new Message { Role = Roles.Assistant, Content = "" };
    chatMessages.Add(currentMessage);

    textInput = "";
    StateHasChanged();

    await foreach (var messageStreamEvent in stream)
    {
        if (messageStreamEvent is ContentBlockDelta content)
        {
            currentMessage.Content[0].Text += content.Delta.Text;
            StateHasChanged();
        }
    }
}

blazorclauderec

全てのリクエスト/レスポンス型はSystem.Text.Json.JsonSerializerでシリアライズ可能なため、このList<Message>をそのままシリアライズすれば保存、デシリアライズすれば読み込みになります。

Function Calling

ClaudiaはただのREST APIを叩くだけのSDK、ではありません。Source Generatorを活用して、Function Callingを簡単に定義するための仕組みを用意しました。

Function Callingができると何がいいか、というと、現状のLLMは単体だとできないことが幾つかあります。例えば計算は、それっぽい答えを返してくれる場合も多いし、Step-by-Stepで考えさせるなど、それっぽさの精度を上げることはできるけれど、正確な計算はできないという苦手分野だったりします(複雑な計算を投げると正しそうで間違ってる答えを出しやすい)。それなら計算が必要なら普通に計算機で計算して、その答えをもとに文章を作ればいいじゃん、と。あるいは現在日時を答えることもできません。ウェブページを指定して要約したり翻訳して欲しいとお願いしても、中身を見ることはできませんと言われます。それらを解決するのがFunction Callingです。

まずは一例ということで、指定したURLのウェブページをClaudeに返す関数を定義してみましょう。

public static partial class FunctionTools
{
    /// <summary>
    /// Retrieves the HTML from the specified URL.
    /// </summary>
    /// <param name="url">The URL to retrieve the HTML from.</param>
    [ClaudiaFunction]
    static async Task<string> GetHtmlFromWeb(string url)
    {
        using var client = new HttpClient();
        return await client.GetStringAsync(url);
    }
}

[ClaudiaFunction]で定義した関数がSource Generatorによって色々生成されます。これを利用する場合、以下のようになります。

var input = new Message
{
    Role = Roles.User,
    Content = """
        Could you summarize this page in three lines?
        https://docs.anthropic.com/claude/docs/intro-to-claude
"""
};

var message = await anthropic.Messages.CreateAsync(new()
{
    Model = Models.Claude3Haiku,
    MaxTokens = 1024,
    System = FunctionTools.SystemPrompt, // set generated prompt
    StopSequences = [StopSequnces.CloseFunctionCalls], // set </function_calls> as stop sequence
    Messages = [input],
});

var partialAssistantMessage = await FunctionTools.InvokeAsync(message);

var callResult = await anthropic.Messages.CreateAsync(new()
{
    Model = Models.Claude3Haiku,
    MaxTokens = 1024,
    System = FunctionTools.SystemPrompt,
    Messages = [
        input,
        new() { Role = Roles.Assistant, Content = partialAssistantMessage! } // set as Assistant
    ],
});

// The page can be summarized in three lines:
// 1. Claude is a family of large language models developed by Anthropic designed to revolutionize the way you interact with AI.
// 2. This documentation is designed to help you get the most out of Claude, with clear explanations, examples, best practices, and links to additional resources.
// 3. Claude excels at a wide variety of tasks involving language, reasoning, analysis, coding, and more, and the documentation covers key capabilities, getting started with prompting, and using the API.
Console.WriteLine(callResult);

Claudeへは二回のリクエストを行っています。まず、最初のClaudeへのリクエストでは、質問と共に利用可能な関数の一覧と説明を送り、関数を実行するのが最適だと判断されると、実行したい関数名とパラメーターが返されます。それを下に、手元で関数を実行し、結果をClaudeに渡すことで最終的に求める結果を得られます。

ではSource Generatorは何をやっているのかというと、まずはClaudeのシステム文に渡しているFunctionTools.SystemPromptを生成しているわけですが、その中身はこれです(一部省略)。

// ...前文は省略

<tools>
    <tool_description>
        <tool_name>GetHtmlFromWeb</tool_name>
        <description>Retrieves the HTML from the specified URL.</description>
        <parameters>
            <parameter>
                <name>url</name>
                <type>string</type>
                <description>The URL to retrieve the HTML from.</description>
            </parameter>
        </parameters>
    </tool_description>
</tools>

XMLです。ClaudeはXMLタグを認識するようになっていて、システム的に明確に情報を与えたい場合はXMLタグを活用することがベストプラクティスとなっています。そこで、C#の関数からClaudeに渡すためのXMLを自動生成しています。これを手書きは、したくないでしょう……?

そしてClaudeはそのリクエストに対して、以下のような結果を返します。

<function_calls>
    <invoke>
        <tool_name>GetHtmlFromWeb</tool_name>
        <parameters>
            <url>https://docs.anthropic.com/claude/docs/intro-to-claude</url>
        </parameters>
    </invoke>

やはりXMLです(閉じタグが欠けているのはStopSequencesで止めているため。関数を呼びたい場合はこれ以上の情報は不要なので打ち止めておく)。これをパースして、関数(GetHtmlFromWeb)を実行し、Claudeに渡すためのメソッド FunctionTools.InvokeAsync がSource Generatorによって生成されています。実際生成されているInvokeAsyncメソッドは以下のようなものです。

#pragma warning disable CS1998
    public static async ValueTask<string?> InvokeAsync(MessageResponse message)
    {
        var content = message.Content.FirstOrDefault(x => x.Text != null);
        if (content == null) return null;

        var text = content.Text;
        var tagStart = text .IndexOf("<function_calls>");
        if (tagStart == -1) return null;

        var functionCalls = text.Substring(tagStart) + "</function_calls>";
        var xmlResult = XElement.Parse(functionCalls);

        var sb = new StringBuilder();
        sb.AppendLine(functionCalls);
        sb.AppendLine("<function_results>");

        foreach (var item in xmlResult.Elements("invoke"))
        {
            var name = (string)item.Element("tool_name")!;
            switch (name)
            {
                case "GetHtmlFromWeb":
                    {
                        var parameters = item.Element("parameters")!;

                        var _0 = (string)parameters.Element("url")!;

                        BuildResult(sb, "GetHtmlFromWeb", await GetHtmlFromWeb(_0).ConfigureAwait(false));
                        break;
                    }

                default:
                    break;
            }
        }

        sb.Append("</function_results>"); // final assistant content cannot end with trailing whitespace

        return sb.ToString();

        static void BuildResult<T>(StringBuilder sb, string toolName, T result)
        {
            sb.AppendLine(@$"    <result>
        <tool_name>{toolName}</tool_name>
        <stdout>{result}</stdout>
    </result>");
        }
    }
#pragma warning restore CS1998
}

これを手書きは、あまりしたくはないでしょう。特に呼び出したい関数が増えれば増えるほど大変ですし。

これで呼び出し&生成したXMLを再度Claudeに、Assistantによる先頭の出力結果だと渡すことによって、望む答えを得ることができます。このテクニックはPrefill Claude's responseとして公式でもベストプラクティスの一つとして案内されているもので、Claudeによる返答を望む方向に導くのに有益です。例えば{をprefill responseとして返すと、Claudeが結果をJSONとして出力する確率が飛躍的に上昇します。

API vs LangChain, SemanticKernel

大規模言語モデルを触るなら、生で使うよりもLangChainや、特にC#だとSemantic Kernelを使うというのを入り口にするのも定説ではありますが、やや疑問はあります。最近でもLangChainを使わないLangChain は LLM アプリケーションの開発に採用すべきではないといった記事のようにLangChain不要論も出てきています。

そもそも、まぁこの記事はエンジニア向けに書いてるわけですが、一部の機能はあきらかに過剰でいらないんじゃないかと、保存用のプラグインとか。Semantic Kernelの大量にあるコネクターパッケージとかぞっとする感じで、コード書けないデータサイエンティストが継ぎ接ぎでやるならともかく、エンジニアは保存ぐらい自前でやったほうが絶対いいでしょ。TimePluginだのHttpPluginだのFileIOPluginだのも、正直馬鹿らしい、という感じしかないのでは。

どうせ最後に叩くのは生APIなら、真摯にAPIドキュメントを読め、と。ClaudeのAPIドキュメントのUser Guidesは分かりやすく素晴らしく、それもまたClaudeを支持したい理由の一つになります。しょうもない抽象化を通すぐらいならClaudeに特化して、特徴的なXMLによる指示の活かしかたを考えろ、と。

特にC#の人はSemantic Kernel至上主義になってると思われるので、いったんまずそっから離れて考えていくといいんじゃないです?

モダンウェブAPIクライアントの作り方

ここからはClaudiaの設計から見る現代的なAPIクライアントの設計方法の話をします。

まず、通信の基盤はHttpClientを使います。一択です。異論を挟む余地はない。Grpc.Net.ClientだってHTTP/2 gRPC通信にHttpClientを使っていますし、好むと好まざると全てのHTTP系の通信の基盤はHttpClientです。

ここでは、外からHttpMessageHandlerを受け取れるようにしておくといいでしょう。

public class Anthropic : IMessages, IDisposable
{
    readonly HttpClient httpClient;

    // DefaultRequestHeadersやBaseAddressを変更させてあげるためにpublicで公開しておく
    public HttpClient HttpClient => httpClient;

    public Anthropic()
        : this(new HttpClientHandler(), true)
    {
    }

    public Anthropic(HttpMessageHandler handler)
        : this(handler, true)
    {
    }

    public Anthropic(HttpMessageHandler handler, bool disposeHandler)
    {
        this.httpClient = new HttpClient(handler, disposeHandler);
    }

    public void Dispose()
    {
        httpClient.Dispose();
    }
}

HttpClientというのは実はガワでしかなくて、実体はHttpMessageHandlerです。HttpMessageHandlerにはやれることが色々あって、DelegatingHandlerを実装してリクエストの前後をフックするような機能を仕込んだりも出来るし、Cysharp/YetAnotherHttpHandlerはHttpMessageHandlerの実装という形で通信処理を丸ごとRust実装に差し替えています。Unityでは.NETランタイムの通信実装じゃなくてUnityWebRequestを使いたいんだよなあ、といったような場合にはUnityWebRequestHttpMessageHandler.csを使えば、やはり通信処理が全てUnityによるものに差し替わります。

インターフェイスの切り方も工夫していきましょう。

client.Messages.CreateAsync のように、MVCでいったら.Controller.Methodのように、2階層に整理された呼び出し方は直感的で使いやすい設計です。特に、入力補完に優しいのが嬉しい。そのためには、まずインターフェイスを切りますが、工夫として、それを明示的なインターフェイスの実装にして、インターフェイス自体はreturn this;で返してやりましょう。

public interface IMessages
{
    Task<MessageResponse> CreateAsync(MessageRequest request, RequestOptions? overrideOptions = null, CancellationToken cancellationToken = default);
    IAsyncEnumerable<IMessageStreamEvent> CreateStreamAsync(MessageRequest request, RequestOptions? overrideOptions = null, CancellationToken cancellationToken = default);
}

public class Anthropic : IMessages, IDisposable
{
    public IMessages Messages => this;

    async Task<MessageResponse> IMessages.CreateAsync(MessageRequest request, RequestOptions? overrideOptions, CancellationToken cancellationToken)
    {
        // ...
    }

    async IAsyncEnumerable<IMessageStreamEvent> IMessages.CreateStreamAsync(MessageRequest request, RequestOptions? overrideOptions, [EnumeratorCancellation] CancellationToken cancellationToken)    
    {
        // ...
    }
}

これによって一個階層を下がる際のアロケーションがない(thisを返すため)ですし、明示的な実装になっているのでトップ階層では入力補完には現れないので、使いやすさと性能、ついでにいえば実装のしやすさ(全てのクライアントのフィールドにそのままアクセスできるため)の全てが満たされます。

ユーザーフレンドリーなリクエスト型生成

Anthropicのリクエスト型はかなり整理されて、型有り言語に優しい仕様になっているのですが、一部、single string or an array of content blocksというものがあります。どっちか、とかそういうの微妙に困るわけですが、しかし、じゃあOption<Either<List<>>>かなー、とか、そういうことではありません。そんな定義にしたらAPIクライアントの手触りは最悪になるでしょう。よく考えてみると、Anthropic APIのこの場合のstringは、長さ1のstring contentと同一です。

// こうじゃなくて
Content = [ new() { Type = "text", Text = "Hello, Claude" }]

// こう書きたい
Content = "Hello, Claude"

これは、良い仕様だと思います。杓子定規に Type = "text", Text = "..." と書かせるのはダルいでしょう。利用時の95%ぐらいはsingle string contentでしょうし(Typeはimageの場合もある、その場合はSourceにバイナリのbase64文字列を設定する。arrayなのは、画像とテキストを両方渡したりするため)。

その仕様をC#で実現しましょう。今回の場合、正規化するようなイメージでいいので、暗黙的変換で実装しました。

public record class Message
{
    /// <summary>
    /// user or assistant.
    /// </summary>
    [JsonPropertyName("role")]
    public required string Role { get; set; }

    /// <summary>
    /// single string or an array of content blocks.
    /// </summary>
    [JsonPropertyName("content")]
    public required Contents Content { get; set; }
}

public class Contents : Collection<Content>
{
    public static implicit operator Contents(string text)
    {
        var content = new Content
        {
            Type = ContentTypes.Text,
            Text = text
        };
        return new Contents { content };
    }
}

Content[]ではなくて独自のコレクションにして、それの文字列からの暗黙的変換でsingle string contentを生成する形にしました。別に最新のC#仕様でもなんでもなく昔からある手法ですし、闇雲な利用は厳禁ですが、こうしたところに利用するのはAPIクライアントの手触り向上に効果的です。

タイムアウト

タイムアウトは定番の処理なので、APIクライアントで簡単にユーザーが設定できるようにしておいたほうがいいでしょう。といっても、HttpClientがTimeoutプロパティを持っているので、通常はそれにセットしてあげるだけで十分です。しかし、Claudiaではあえて無効にしています。

public class Anthropic : IMessages, IDisposable
{
    public TimeSpan Timeout { get; init; } = TimeSpan.FromMinutes(10);

    public Anthropic(HttpMessageHandler handler, bool disposeHandler)
    {
        this.httpClient = new HttpClient(handler, disposeHandler);
        this.httpClient.Timeout = System.Threading.Timeout.InfiniteTimeSpan;
    }
}

Anthropicの公式クライアントがメソッド呼び出し毎にTimeout設定をオーバーライドできるという仕様を持っているため、それにならってオーバーライド可能に必要があったためです。HttpClientやそれに準ずるもの呼び出しはスレッドセーフであるべき(実際APIクライアントはSingletonで登録されたりする場合がある)なので、SendAsyncでHttpCleintのプロパティの値を弄るのはよくない。ので、HttpClientが持つTimeoutは無効にして、手動で処理するようにしています。

実装方法は、LinkedTokenSourceを生成し、CancelAfterによってタイムアウト時間後にキャンセルされるCancellationTokenを作り、HttpClient.SendAsyncに渡すだけです。なお、これはHttpClient.Timeoutがタイムアウト時間を持つ場合の内部実装と同じです。

// 実際のコードはリトライ処理と混ざっているため、若干異なります
async Task<TResult> RequestWithAsync<TResult>(HttpRequestMessage message, CancellationToken cancellationToken, RequestOptions? overrideOptions)
{
    var timeout = overrideOptions?.Timeout ?? Timeout;
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
    {
        cts.CancelAfter(timeout);

        try
        {
            var result = await httpClient.SendAsync(message, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(ConfigureAwait);
            return result;
        }
        catch (OperationCanceledException ex) when (ex.CancellationToken == cts.Token)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                throw new OperationCanceledException(ex.Message, ex, cancellationToken);
            }
            else
            {
                throw new TimeoutException($"The request was canceled due to the configured Timeout of {Timeout.TotalSeconds} seconds elapsing.", ex);
            }

            throw;
        }
    }
}

実際にキャンセルされた場合(OperationCanceledExceptionが投げられる)のエラーハンドリングには注意しましょう。まず、LinkedTokenを剥がす必要があります。素通しだとOperationCanceledExceptionのTokenがLinkedTokenのままですが、これだと上流側でキャンセル原因の判定に使うことができません。キャンセル原因が渡されているCancellationTokenのキャンセルだった場合は、OperationCanceledExceptionを作り直してキャンセル理由のTokenを変更します。

タイムアウトだった場合はOperationCanceledExceptionではなく、TimeoutExceptionを投げてあげるのが良いでしょう。なお、HttpClientのタイムアウト実装を使った場合は歴史的事情でTaskCanceledExceptionを投げてくるようになっています(互換性のため変更したくても、もう変更できない、とのこと。あまり良い設計ではないと言えるので、そこは見習わなくていいでしょう)

リトライ

リトライをAPIクライアント自身が持つべきかどうかに関しては、少し議論があるかもしれません。しかし、単純に例外が出たらcatchしてリトライかければいいというものではなく、リトライ可なものと不可のものの判別がまず必要です。例えば認証に失敗しているとか、リクエストに投げるJSONが腐ってるといった場合は何度リトライしても無駄なのでリトライすべきものではないのですが、そうした細かい条件は、APIクライアント自身しか知り得ないので、リトライ処理を内蔵してしまうのは良いと思います。

Claudiaでは公式クライアントに準拠する形で、具体的には408 Request Timeout, 409 Conflict, 429 Rate Limit, and >=500 Internal errorsをリトライ対象にしています。認証失敗のPermissionError(403)やリクエスト内容が不正(InvalidRequestError(400))はリトライされません。たまによくあるOverloadedError(過負荷状態なので結果返せまんでしたエラー)は529で、これは何度か叩き直せば解消されるやつなのでリトライして欲しい、といったものはリトライされます。

リトライロジックも公式クライアントに準拠していて、レスポンスヘッダにretry-after-msやretry-afterがあればそれに従いつつ、ない場合(やretry-afterが規定よりも大きい場合)はジッター付きのExponential Backoffで間隔を制御しています。

キャンセル

クライアント側に.Cancel()メソッドなどは持たせません。というのも、HttpClientと準拠させるとクライアントそのものは、ほぼシングルトンで使えて、各呼び出しに対して共有されることになります(場合によってはDIでシングルトンでインジェクトするかもしれませんし)。なので、全てに影響を与える.Cancel()ではなくて、各呼び出しそれぞれにCancellationTokenを渡してね、という形を取ります。

Server Sent Eventsの超高速パース

Streamingでレスポンスを取得するAPIは、server-sent eventsという仕様で、ストリーミングで送信されてきます。具体的には以下のようなテキストメッセージが届きます。

event: message_start
data: {"type":"message_start","message":...}

event: content_block_start
data: {"type":"content_block_start","index":...}

event: イベント名, data: JSON, ...。といったことの繰り返しです。さて、改行区切りのテキストメッセージといったらStreamReaderでReadLine、というのは正解、ではあるのですがモダンC#的には不正解です。

ReadLineは文字列を生成します。イベント名の判定のために、あるいは最終的にdataのJSONはデシリアライズしてオブジェクトに変換するのですが、UTF8のデータから直接変換できるはずです。というわけで、ここは(ユーザーに渡すオブジェクトの生成以外は)ゼロアロケーションが狙えます。文字列を通しさえしなければ。というわけでStreamReaderの出番はありません。

具体的なコードを見ていきましょう。前半部(下準備)と後半部(パース部分)で分けます。

internal class StreamMessageReader
{
    readonly PipeReader reader;
    readonly bool configureAwait;
    MessageStreamEventKind currentEvent;

    public StreamMessageReader(Stream stream, bool configureAwait)
    {
        this.reader = PipeReader.Create(stream);
        this.configureAwait = configureAwait;
    }

    public async IAsyncEnumerable<IMessageStreamEvent> ReadMessagesAsync([EnumeratorCancellation] CancellationToken cancellationToken)
    {
    READ_AGAIN:
        var readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(configureAwait);

        if (!(readResult.IsCompleted | readResult.IsCanceled))
        {
            var buffer = readResult.Buffer;

            while (TryReadData(ref buffer, out var streamEvent))
            {
                yield return streamEvent;
                if (streamEvent.TypeKind == MessageStreamEventKind.MessageStop)
                {
                    yield break;
                }
            }

            reader.AdvanceTo(buffer.Start, buffer.End);
            goto READ_AGAIN;
        }
    }

まず、Streamは、System.IO.Pipelines.PipeReaderに渡しておきます。今回のStreamはネットワークからサーバー側がストリーミングで返してくる不安定なStreamなので、バッファ管理が大変です。PipeReader/PipeWriterは、若干癖がありますが、その辺の管理をよしなにやってくれるもので、現代のC#ではかなり重要なライブラリです。

基本の流れはバッファを読み込み(ReadAsync)、そのバッファでパース可能(行の末尾までないとパースできないので、改行コードが含まれているかどうか)な状態なら、1行毎にパース(TryReadData)してyield returnでオブジェクトを返す。バッファが足りなかったらAdvanceToで読み取った部分までマークしてから、再度ReadAsync、といった流れになります。

利用側はBlazorのサンプルで出していたのですが、await foreachで列挙するのが基本になります。

await foreach (var messageStreamEvent in Anthropic.Messages.CreateStreamAsync())
{
}

こういったネットワークの絡む処理のストリーミング処理にはIAsyncEnumerableが非常に向いていますし、データソース側も、非同期シーケンスをyield returnで返せるというのは、とても楽になりました。これがない時代には、もう戻るのは無理でしょう……。

次に後半部、PipeReaderによって分解されたバッファからパースする処理になります。

[SkipLocalsInit]
bool TryReadData(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true)] out IMessageStreamEvent? streamEvent)
{
    var reader = new SequenceReader<byte>(buffer);
    Span<byte> tempBytes = stackalloc byte[64]; // alloc temp
    
    while (reader.TryReadTo(out ReadOnlySequence<byte> line, (byte)'\n', advancePastDelimiter: true))
    {
        if (line.Length == 0)
        {
            continue; // next.
        }
        else if (line.FirstSpan[0] == 'e') // event
        {
            // Parse Event.
            if (!line.IsSingleSegment)
            {
                line.CopyTo(tempBytes);
            }
            var span = line.IsSingleSegment ? line.FirstSpan : tempBytes.Slice(0, (int)line.Length);

            var first = span[7]; // "event: [c|m|p|e]"

            if (first == 'c') // content_block_start/delta/stop
            {
                switch (span[23]) // event: content_block_..[]
                {
                    case (byte)'a': // st[a]rt
                        currentEvent = MessageStreamEventKind.ContentBlockStart;
                        break;
                    case (byte)'o': // st[o]p
                        currentEvent = MessageStreamEventKind.ContentBlockStop;
                        break;
                    case (byte)'l': // de[l]ta
                        currentEvent = MessageStreamEventKind.ContentBlockDelta;
                        break;
                    default:
                        break;
                }
            }
            else if (first == 'm') // message_start/delta/stop
            {
                switch (span[17]) // event: message_..[]
                {
                    case (byte)'a': // st[a]rt
                        currentEvent = MessageStreamEventKind.MessageStart;
                        break;
                    case (byte)'o': // st[o]p
                        currentEvent = MessageStreamEventKind.MessageStop;
                        break;
                    case (byte)'l': // de[l]ta
                        currentEvent = MessageStreamEventKind.MessageDelta;
                        break;
                    default:
                        break;
                }
            }
            else if (first == 'p')
            {
                currentEvent = MessageStreamEventKind.Ping;
            }
            else if (first == 'e')
            {
                currentEvent = (MessageStreamEventKind)(-1);
            }
            else
            {
                // Unknown Event, Skip.
                // throw new InvalidOperationException("Unknown Event. Line:" + Encoding.UTF8.GetString(line.ToArray()));
                currentEvent = (MessageStreamEventKind)(-2);
            }

            continue;
        }
        else if (line.FirstSpan[0] == 'd') // data
        {
            // Parse Data.
            Utf8JsonReader jsonReader;
            if (line.IsSingleSegment)
            {
                jsonReader = new Utf8JsonReader(line.FirstSpan.Slice(6)); // skip data: 
            }
            else
            {
                jsonReader = new Utf8JsonReader(line.Slice(6)); // ReadOnlySequence.Slice is slightly slow
            }

            switch (currentEvent)
            {
                case MessageStreamEventKind.Ping:
                    streamEvent = JsonSerializer.Deserialize<Ping>(ref jsonReader, AnthropicJsonSerialzierContext.Default.Options)!;
                    break;
                case MessageStreamEventKind.MessageStart:
                    streamEvent = JsonSerializer.Deserialize<MessageStart>(ref jsonReader, AnthropicJsonSerialzierContext.Default.Options)!;
                    break;
                // 中略(MessageDela, MessageStop, ContentBlockStart, ContentBlockDelta, ContentBlockStop, errorに対して同じようなDeserialize<T>
                default:
                    // unknown event, skip
                    goto END;
            }

            buffer = buffer.Slice(reader.Consumed);
            return true;
        }
    }
END:
    streamEvent = default;
    buffer = buffer.Slice(reader.Consumed);
    return false;
}

event, dataの二行から、dataのJSONをデシリアライズしてオブジェクトを返したい。というのが処理のやりたいことです。bufferには必ずしも都合よくevent, dataの二行が入っているわけでもなくeventだけかもしれない、dataだけかもしれない、あるいはdataも途中で切れてる(そのままだと不完全なJSON)かもしれない。といったことを考慮して、中断・再開できる構造にしておく必要があります。

といっても、基本的には改行コードが存在してれば一行分のバッファは十分あるだろうということで、 while (reader.TryReadTo(out ReadOnlySequence<byte> line, (byte)'\n', advancePastDelimiter: true)) といったループを回して、これをStreamReader.ReadLineの代わりにしています。このreaderはSequenceReaderというReadOnlySequenceからの読み取りをサポートするユーティリティで、ref structのため、それ自体のアロケーションはありません。ReadOnlySequenceは性能良く正しく使うには、かなり落とし穴の多いクラスなので、こうしたユーティリティベースに実装したほうがお手軽かつ安全です。

まずeventのパースで、ここからdataがどの種類化を読み取っています。正攻法でやると if (span.SequenceEqual("content_block_start")) といったように判定していくことになります。Span<byte>へのSequenceEqualは高速な実装になっているので、まぁ悪くないといえば悪くないのですが、とはいえifの連打は如何なものか……。そこで、Claudiaでは実際には以下のような判定に簡略化しています。

var first = span[7]; // "event: [c|m|p|e]"

if (first == 'c') // content_block_start/delta/stop
{
    switch (span[23]) // event: content_block_..[]
    {
        case (byte)'a': // st[a]rt
            currentEvent = MessageStreamEventKind.ContentBlockStart;
            break;
        case (byte)'o': // st[o]p
            currentEvent = MessageStreamEventKind.ContentBlockStop;
            break;
        case (byte)'l': // de[l]ta
            currentEvent = MessageStreamEventKind.ContentBlockDelta;
            break;
        default:
            break;
    }
}
else if (first == 'm') // message_start/delta/stop
{
    switch (span[17]) // event: message_..[]
    {
        case (byte)'a': // st[a]rt
            currentEvent = MessageStreamEventKind.MessageStart;
            break;
        case (byte)'o': // st[o]p
            currentEvent = MessageStreamEventKind.MessageStop;
            break;
        case (byte)'l': // de[l]ta
            currentEvent = MessageStreamEventKind.MessageDelta;
            break;
        default:
            break;
    }
}

メッセージの種類はcontent_block_start/delta/stop, message_start/delta/stop, ping, errorの8種類。まず、先頭1文字でcontent系かmessage系かその他か判定できる。start/delta/stopに関しては3文字目を見ると判定できる。というわけで、1byteのチェックを2回行うだけで分類可能です。明らかに高速!なお、今後のメッセージ種類の追加でチェックが壊れる可能性がゼロではない(例えばcontent_block_ffowardとかが来るとcontent_block_stopと誤判定される)、という問題があることは留意する必要があります。Claudiaではいうて大丈夫だろ、という楽観視してますが。

なお、これは以前に発表したモダンハイパフォーマンスC# 2023でのコードのバリエーションと言えるでしょうか。

テキストプロトコルを見るとなんとかして判定をちょろまかしたいという欲求に抗うのは難しい……。なお、もし厳密な判定をしつつもif連打を避けたい場合は、まず長さチェックをいれます。長さで大雑把な分岐をかけてからSequenceEqualで正確なチェックをします。ようするところ、C#のstringへのswtichの最適化(コンパイラがそういう処理に変換している!)と同じことをやろうという話なだけですが。分岐数が多い場合はハッシュコードを取って分岐かけるとか、ようするにインラインDictionaryのようなものを実装するのもアリでしょう。

最後に、data行はJSON Deserializeです。ReadOnlySpan<byte>またはReadOnlySequence<byte>のままデシリアライズするにはUtf8JsonReaderを通す必要があります。なお、Utf8JsonReaderもref structなのでアロケーションには含めません。

これで、Stringを一切通さない処理ができました!StreamReaderを使えば超単純になるのに!という気はしなくもないですが、文字列化したら負けだと思っている病に罹患しているのでしょーがない……。

Source Generator vs Reflection

Function Callingの実装に、ClaudiaではSource Generatorを採用しました。リフレクションベースで作成することも可能では有りましたが、今回に関してはSource Generatorのほうが望ましい結果が得られました。まず、仮にリフレクションで実装したらどんな関数定義を要求されるだろうか、というところを、Semantic Kernel実装の場合との比較で見てください。

public static partial class FunctionTools
{
    // Claudia Source Generator

    /// <summary>
    /// Retrieve the current time of day in Hour-Minute-Second format for a specified time zone. Time zones should be written in standard formats such as UTC, US/Pacific, Europe/London.
    /// </summary>
    /// <param name="timeZone">The time zone to get the current time for, such as UTC, US/Pacific, Europe/London.</param>
    [ClaudiaFunction]
    public static string TimeOfDay(string timeZone)
    {
        var time = TimeZoneInfo.ConvertTimeBySystemTimeZoneId(DateTime.UtcNow, timeZone);
        return time.ToString("HH:mm:ss");
    }

    // Semantic Kernel

    [KernelFunction]
    [Description("Retrieve the current time of day in Hour-Minute-Second format for a specified time zone. Time zones should be written in standard formats such as UTC, US/Pacific, Europe/London.")]
    public static string TimeOfDay([Description("The time zone to get the current time for, such as UTC, US/Pacific, Europe/London.")]string timeZone)
    {
        var time = TimeZoneInfo.ConvertTimeBySystemTimeZoneId(DateTime.UtcNow, timeZone);
        return time.ToString("HH:mm:ss");
    }
}

Function Callingでは、Claudeに関数の情報を与えなければならないので、メソッド・パラメーター共に説明が必須です。ClaudiaのSource Generator実装ではそれをドキュメントコメントから取得するようにしました。Semantic KernelではDescription属性から取ってきています。これはドキュメントコメントのほうが自然で書きやすいはずです。特にパラメーターへの属性は、書きやすさだけじゃなく、複数パラメーターがある場合にかなり読みづらくなります。

また、Source Generatorではアナライザーとして不足がある際にはコンパイルエラーにできます。

image

全てのパラメーターにドキュメントコメントが書かれていなければならない・対応していない型を利用している、などのチェックが全てコンパイル時どころかエディット時にリアルタイムに分かります。

難点は実装難易度がSource Generatorのほうが高いことと、ドキュメントコメントの利用にはかなり注意が必要です。

Roslyn上でドキュメントコメントを取得するには、ISymbol.GetDocumentationCommtentXml()が最もお手軽なのですが、これが取得できるかどうかは<GenerateDocumentaionFile>に左右されます。falseの場合は常にnullを返します。それだと使いにくすぎるので、ClaudiaではSyntaxNodeから取得しようとしたのですが、それも同じく<GenerateDocumentaionFile>の影響を受けていました。

そこでしょうがなく、以下のような拡張メソッドを用意することで全ての状況でドキュメントコメントを取得することに成功しました(Triviaベースなので少し扱いづらいですが、取れないよりも遥かにマシ)

public static DocumentationCommentTriviaSyntax? GetDocumentationCommentTriviaSyntax(this SyntaxNode node)
{
    if (node.SyntaxTree.Options.DocumentationMode == DocumentationMode.None)
    {
        var withDocumentationComment = node.SyntaxTree.Options.WithDocumentationMode(DocumentationMode.Parse);
        var code = node.ToFullString();
        var newTree = CSharpSyntaxTree.ParseText(code, (CSharpParseOptions)withDocumentationComment);
        node = newTree.GetRoot();
    }

    foreach (var leadingTrivia in node.GetLeadingTrivia())
    {
        if (leadingTrivia.GetStructure() is DocumentationCommentTriviaSyntax structure)
        {
            return structure;
        }
    }

    return null;
}

DocumentationModeの状態によってDocumentationCommentTriviaSyntaxが取れるかどうかが変わる(GenerateDocumentaionFile=falseの場合はNoneになる)ので、Noneの場合はDocumentationMode.Parseをつけたうえでパースし直すことで取得できました。SyntaxNodeのままオプションを渡してCSharpSyntaxTreeを生成しても、パースし直してくれないのかDocumentationModeを変更しても無駄だったので、文字列化してからParseTextするようにしています。

JSON Serializer

リクエストもレスポンスもJSONです、今の世の中。そして、使うライブラリはSystem.Text.Json.JsonSerializer一択です。異論を挟む余地は、ありますが、ない。好むと好まざると、もはや使わなければならないわけです。

System.Text.Jsonの特徴としてはUTF8ベースで処理ができることなので、極力文字列を通さないようにしてあげると高い性能が見込めます。ReadOnlySpan<byte>またはReadOnlySequence<byte>をデシリアライズするには Utf8JsonReaderを通す必要があります。これはref structだからアロケーションがないので、そのままnewして使っていきましょう。ではWriterは?というと、Utf8JsonWriterはclassです。どうして……?なので、Writerに関してはアプリケーションの作りによりますが、フィールドに持って使い回せるのならフィールドに持っての使いまわし(Resetがあります)、持てない場合は[ThreadStatic]から引っ張ってくるようにしましょう。

ライブラリで用意する場合は、利用する型が全て決まっているのでソース生成してあげると、パフォーマンスもよく、AOTセーフ度も上がるので望ましいはずです。Claudiaでも生成しています。

[JsonSourceGenerationOptions(
    GenerationMode = JsonSourceGenerationMode.Default,
    DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
    WriteIndented = false)]
[JsonSerializable(typeof(MessageRequest))]
[JsonSerializable(typeof(Message))]
[JsonSerializable(typeof(Contents))]
[JsonSerializable(typeof(Content))]
[JsonSerializable(typeof(Metadata))]
[JsonSerializable(typeof(Source))]
[JsonSerializable(typeof(MessageResponse))]
[JsonSerializable(typeof(Usage))]
[JsonSerializable(typeof(ErrorResponseShape))]
[JsonSerializable(typeof(ErrorResponse))]
[JsonSerializable(typeof(Ping))]
[JsonSerializable(typeof(MessageStart))]
[JsonSerializable(typeof(MessageDelta))]
[JsonSerializable(typeof(MessageStop))]
[JsonSerializable(typeof(ContentBlockStart))]
[JsonSerializable(typeof(ContentBlockDelta))]
[JsonSerializable(typeof(ContentBlockStop))]
[JsonSerializable(typeof(MessageStartBody))]
[JsonSerializable(typeof(MessageDeltaBody))]
public partial class AnthropicJsonSerialzierContext : JsonSerializerContext
{
}
// 内部での利用時は全てこのJsonSerializerContextを指定している
JsonSerializer.SerializeToUtf8Bytes(request, AnthropicJsonSerialzierContext.Default.Options)

一つ引っ掛かったのが、JsonIgnoreCondition.WhenWritingNullが、通常(リフレクションベース)だとNullable<T>にも効いていたのですが、Source Generatorだと効かなくなってnullの時に無視してくれなくなったという挙動の差異がありました。しょうがないので、全ての対象の型のNullable<T>プロパティに直接[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]を付与することで回避しました。

public record class MessageRequest
{
    // ...

    [JsonPropertyName("temperature")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public double? Temperature { get; set; }
}

正直Source Generator版の実装漏れの気がするんですが、まぁ回避できたので、とりあえずはいっか。。。

まとめ

OpenAI APIに対するAzure OpenAI Serviceのように、AWS環境の人はAmazon Bedrock経由のほうが使いやすい、というのがあるかもしれません。というわけで本日の先ほどのリリース(v1.0.1)でBedrock対応もしました!より一層利用しやすくなったはずです。

Anthorpic APIを使うにあたって、このClaudiaが、公式SDKや各言語の非公式SDKも含めて、最も使いやすいSDKになっているんじゃないかと自負します。ということは、C#が最もClaudeをAPI経由で使うのに捗る言語ということです!これはC#やるしかない!あるいはClaudeやるしかない!ということで、やっていきましょう……!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive