Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions bson/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package bson

import (
"bytes"
"encoding/binary"
"fmt"
"io"
)

const (
// MinDocumentSize is the size of the smallest possible valid BSON document:
// an int32 size header + 0x00 (end of document).
MinDocumentSize = 5

// MaxDocumentSize is the largest possible size for a BSON document allowed by MongoDB,
// that is, 16 MiB (see https://docs.mongodb.com/manual/reference/limits/).
MaxDocumentSize = 16777216
)

// ErrInvalidDocumentSize is an error returned when a BSON document's header
// contains a size smaller than MinDocumentSize or greater than MaxDocumentSize.
type ErrInvalidDocumentSize struct {
DocumentSize int32
}

func (e ErrInvalidDocumentSize) Error() string {
return fmt.Sprintf("invalid document size %d", e.DocumentSize)
}

// A Decoder reads and decodes BSON values from an input stream.
type Decoder struct {
source io.Reader
}

// NewDecoder returns a new Decoder that reads from source.
// It does not add any extra buffering, and may not read data from source beyond the BSON values requested.
func NewDecoder(source io.Reader) *Decoder {
return &Decoder{source: source}
}

// Decode reads the next BSON-encoded value from its input and stores it in the value pointed to by v.
// See the documentation for Unmarshal for details about the conversion of BSON into a Go value.
func (dec *Decoder) Decode(v interface{}) (err error) {
// BSON documents start with their size as a *signed* int32.
var docSize int32
if err = binary.Read(dec.source, binary.LittleEndian, &docSize); err != nil {
return
}

if docSize < MinDocumentSize || docSize > MaxDocumentSize {
return ErrInvalidDocumentSize{DocumentSize: docSize}
}

docBuffer := bytes.NewBuffer(make([]byte, 0, docSize))
if err = binary.Write(docBuffer, binary.LittleEndian, docSize); err != nil {
return
}

// docSize is the *full* document's size (including the 4-byte size header,
// which has already been read).
if _, err = io.CopyN(docBuffer, dec.source, int64(docSize-4)); err != nil {
return
}

// Let Unmarshal handle the rest.
defer handleErr(&err)
return Unmarshal(docBuffer.Bytes(), v)
}

// An Encoder encodes and writes BSON values to an output stream.
type Encoder struct {
target io.Writer
}

// NewEncoder returns a new Encoder that writes to target.
func NewEncoder(target io.Writer) *Encoder {
return &Encoder{target: target}
}

// Encode encodes v to BSON, and if successful writes it to the Encoder's output stream.
// See the documentation for Marshal for details about the conversion of Go values to BSON.
func (enc *Encoder) Encode(v interface{}) error {
data, err := Marshal(v)
if err != nil {
return err
}

_, err = enc.target.Write(data)
return err
}
77 changes: 77 additions & 0 deletions bson/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package bson_test

import (
"bytes"

"github.com/globalsign/mgo/bson"
. "gopkg.in/check.v1"
)

var invalidSizeDocuments = [][]byte{
// Empty document
[]byte{},
// Incomplete header
[]byte{0x04},
// Negative size
[]byte{0xff, 0xff, 0xff, 0xff},
// Full, valid size header but too small (less than 5 bytes)
[]byte{0x04, 0x00, 0x00, 0x00},
// Valid header, valid size but incomplete document
[]byte{0xff, 0x00, 0x00, 0x00, 0x00},
// Too big
[]byte{0xff, 0xff, 0xff, 0x7f},
}

// Reusing sampleItems from bson_test

func (s *S) TestEncodeSampleItems(c *C) {
for i, item := range sampleItems {
buf := bytes.NewBuffer(nil)
enc := bson.NewEncoder(buf)

err := enc.Encode(item.obj)
c.Assert(err, IsNil)
c.Assert(string(buf.Bytes()), Equals, item.data, Commentf("Failed on item %d", i))
}
}

func (s *S) TestDecodeSampleItems(c *C) {
for i, item := range sampleItems {
buf := bytes.NewBuffer([]byte(item.data))
dec := bson.NewDecoder(buf)

value := bson.M{}
err := dec.Decode(&value)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i))
}
}

func (s *S) TestStreamRoundTrip(c *C) {
buf := bytes.NewBuffer(nil)
enc := bson.NewEncoder(buf)

for _, item := range sampleItems {
err := enc.Encode(item.obj)
c.Assert(err, IsNil)
}

// Ensure that everything that was encoded is decodable in the same order.
dec := bson.NewDecoder(buf)
for i, item := range sampleItems {
value := bson.M{}
err := dec.Decode(&value)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i))
}
}

func (s *S) TestDecodeDocumentTooSmall(c *C) {
for i, item := range invalidSizeDocuments {
buf := bytes.NewBuffer(item)
dec := bson.NewDecoder(buf)
value := bson.M{}
err := dec.Decode(&value)
c.Assert(err, NotNil, Commentf("Failed on invalid size item %d", i))
}
}