System.Text.Json을 사용하여 목록을 비 직렬화 큰 json 파일을

많은 객체 목록을 포함하는 큰 json 파일을 요청한다고 가정 해 봅시다. 나는 그들이 한 번에 메모리에 들어가기를 원하지 않지만 오히려 하나씩 읽고 처리하려고합니다. 따라서 비동기 System.IO.Stream스트림을로 변환해야합니다 IAsyncEnumerable<T>. 이를 위해 새로운 System.Text.JsonAPI를 어떻게 사용 합니까?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}


답변

그렇습니다. 진정한 스트리밍 JSON (serializer) 직렬 변환기는 여러 곳에서 성능을 크게 향상시킵니다.

불행히도 System.Text.Json현재는이 작업을 수행하지 않습니다. 앞으로 일어날 지 잘 모르겠습니다. 그렇게되기를 바랍니다! JSON의 진정한 스트리밍 역 직렬화는 다소 어려운 것으로 판명되었습니다.

매우 빠른 Utf8Json이 지원 하는지 확인할 수 있습니다 .

그러나 요구 사항이 어려움을 제한하는 것처럼 보이기 때문에 특정 상황에 맞는 사용자 지정 솔루션이있을 수 있습니다.

아이디어는 한 번에 한 항목을 배열에서 수동으로 읽는 것입니다. 우리는 목록의 각 항목 자체가 유효한 JSON 객체라는 사실을 이용하고 있습니다.

[(첫 번째 항목) 또는 ,(다음 각 항목)을 지나서 수동으로 건너 뛸 수 있습니다 . 그런 다음 가장 좋은 방법은 .NET Core를 사용 Utf8JsonReader하여 현재 객체가 끝나는 위치를 결정하고 스캔 한 바이트를에 공급하는 것 JsonDeserializer입니다.

이 방법으로 한 번에 하나의 오브젝트를 약간만 버퍼링합니다.

그리고 우리는 성능을 이야기하고 있기 때문에에있는 PipeReader동안 입력을 얻을 수 있습니다. 🙂


답변

TL; DR 사소하지 않습니다.


누군가가 이미 스트림에서 버퍼를 읽고 Utf8JsonRreader에 공급 하는 구조체에 대한 전체 코드게시 한 것처럼 보입니다 . 코드도 사소하지 않습니다. 관련 질문은 여기에 있으며 대답은 여기에 있습니다 .Utf8JsonStreamReaderJsonSerializer.Deserialize<T>(ref newJsonReader, options);

그것은 충분하지 않습니다- HttpClient.GetAsync전체 응답이 수신 된 후에 만 ​​반환되며 본질적으로 메모리의 모든 것을 버퍼링합니다.

이를 피하려면 HttpClient.GetAsync (string, HttpCompletionOption)을와 함께 사용해야합니다 HttpCompletionOption.ResponseHeadersRead.

역 직렬화 루프는 취소 토큰도 확인하고 신호가 있으면 종료하거나 던집니다. 그렇지 않으면 전체 스트림이 수신되고 처리 될 때까지 루프가 진행됩니다.

이 코드는 관련 답변의 예를 기반으로 HttpCompletionOption.ResponseHeadersRead하며 취소 토큰을 사용 하고 확인합니다. 적절한 항목 배열이 포함 된 JSON 문자열을 구문 분석 할 수 있습니다. 예 :

[{"prop1":123},{"prop1":234}]

첫 번째 호출 jsonStreamReader.Read()은 배열의 시작 으로 이동하고 두 번째 호출 은 첫 번째 객체의 시작으로 이동합니다. 배열 끝 ( ])이 감지되면 루프 자체가 종료됩니다 .

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,
                                                       HttpCompletionOption.ResponseHeadersRead,
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

JSON 조각, AKA 스트리밍 JSON 일명 … *

이벤트 스트리밍 또는 로깅 시나리오에서 개별 JSON 객체를 파일에 추가하는 것이 일반적입니다 (예 : 한 줄에 하나씩).

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

유효한 JSON 문서는 아니지만 개별 조각은 유효합니다. 이는 빅 데이터 / 고 동시 시나리오에 몇 가지 장점이 있습니다. 새로운 이벤트를 추가하려면 전체 파일을 구문 분석하고 다시 작성하지 않고 파일에 새 줄을 추가하기 만하면됩니다. 처리 , 특히 병렬 처리는 다음 두 가지 이유로 더 쉽습니다.

  • 스트림에서 한 줄을 읽기만하면 개별 요소를 한 번에 하나씩 검색 할 수 있습니다.
  • 입력 파일은 라인 경계에 걸쳐 쉽게 분할 및 분할되어 각 부품을 별도의 작업자 프로세스 (예 : Hadoop 클러스터) 또는 애플리케이션의 다른 스레드에 공급할 수 있습니다. 그런 다음 첫 번째 줄 바꿈을 찾으십시오. 그 시점까지 모든 것을 별도의 작업자에게 공급하십시오.

StreamReader 사용

이를 수행하는 할당 방법은 TextReader를 사용하고 한 번에 한 줄을 읽고 JsonSerializer.Deserialize로 구문 분석하는 것입니다 .

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

적절한 배열을 deserialize하는 코드보다 훨씬 간단합니다. 두 가지 문제가 있습니다.

  • ReadLineAsync 취소 토큰을받지 않습니다
  • 각 반복은 System.Text.Json을 사용하여 피하고 싶었던 것 중 하나 인 새 문자열을 할당 합니다.

ReadOnlySpan<Byte>JsonSerializer에 필요한 버퍼 를 생성하는 것으로 충분할 수도 있지만 , Deserialize는 쉽지 않습니다.

파이프 라인 및 SequenceReader

모든 위치를 피하려면 ReadOnlySpan<byte>스트림에서 를 가져와야 합니다. 이를 위해서는 System.IO.Pipeline 파이프와 SequenceReader 구조체를 사용해야합니다 . Steve Gordon의 SequenceReader 소개 에서는이 클래스를 사용하여 구분자를 사용하여 스트림에서 데이터를 읽는 방법에 대해 설명합니다.

불행히도 SequenceReaderref 구조체는 비동기 또는 로컬 메서드에서 사용할 수 없음을 의미합니다. 그래서 Steve Gordon은 자신의 기사에서

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

항목을 읽는 방법은 ReadOnlySequence를 구성하고 종료 위치를 리턴하므로 PipeReader는 항목을 다시 시작할 수 있습니다. 불행히도 우리는 IEnumerable 또는 IAsyncEnumerable을 반환하고 싶습니다. 반복자 메서드는 좋아하지 in않거나 out매개 변수입니다.

우리는 역 직렬화 된 항목을 List 또는 Queue에서 수집하여 단일 결과로 반환 할 수 있지만 여전히 목록, 버퍼 또는 노드를 할당하고 반환하기 전에 버퍼의 모든 항목이 역 직렬화 될 때까지 기다려야합니다.

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

우리 는 반복자 메소드를 요구하지 않고 열거 가능한 것처럼 작동하고 비동기식으로 작동하며 모든 것을 버퍼링하지 않는 무언가 가 필요합니다.

IAsyncEnumerable을 생성하기 위해 채널 추가

ChannelReader.ReadAllAsyncIAsyncEnumerable을 반환합니다. 반복자로 작동 할 수없는 메서드에서 ChannelReader를 반환하고 캐싱없이 요소 스트림을 생성 할 수 있습니다.

Steve Gordon의 코드를 사용하여 채널을 사용하면 ReadItems (ChannelWriter …) 및 ReadLastItem메서드가 제공됩니다. 첫 번째 항목은을 사용하여 한 번에 하나의 항목을 개행까지 읽습니다 ReadOnlySpan<byte> itemBytes. 에서 사용할 수 있습니다 JsonSerializer.Deserialize. 경우 ReadItems구분 기호를 찾을 수 없습니다 PipelineReader 스트림에서 다음 청크를 당길 수 있도록, 그것의 위치를 반환합니다.

마지막 청크에 도달하고 다른 구분자가 없으면 ReadLastItem`은 나머지 바이트를 읽고 역 직렬화합니다.

코드는 Steve Gordon의 코드와 거의 동일합니다. 콘솔에 쓰는 대신 ChannelWriter에 씁니다.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }
}

DeserializeToChannel<T>메소드는 스트림 위에 파이프 라인 리더를 작성하고 채널을 작성한 후 청크를 구문 분석하고이를 채널로 푸시하는 작업자 태스크를 시작합니다.

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted)
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete();
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()를 통해 모든 항목을 소비하는 데 사용할 수 있습니다 IAsyncEnumerable<T>.

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    

답변

자신의 스트림 리더에 능숙해야 할 것 같습니다. 바이트를 하나씩 읽고 오브젝트 정의가 완료되는 즉시 중지해야합니다. 실제로 꽤 저수준입니다. 따라서 전체 파일을 RAM에로드하지 않고 처리하는 부분을 수행하십시오. 답인 것 같습니까?


답변

아마도 Newtonsoft.Jsonserializer를 사용할 수 있습니까?
https://www.newtonsoft.com/json/help/html/Performance.htm

특히 다음 섹션을 참조하십시오.

메모리 사용량 최적화

편집하다

JsonTextReader에서 값을 역 직렬화 할 수 있습니다.

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}