6 min read

Design Den: Fundamentals of Data Streaming Frameworks

The key benefit here is uniformity. With consistent interfaces, it's much easier to be more productive and faster to ramp up.

How to make complex IO simple with 3 core components.

-Ghost--Streams-Overview--2--2

Hey friend,

If you've seen other programming languages, you know there's a few things missing in Go. Most of them - intentionally, Go is super opinionated. πŸ€·β€β™‚οΈ

Despite the fact that it was meant to attract C++ programmers with its simplicity, there are far more people coming into Go from Python, Ruby and Java (see this blogpost by its authors).

I went through a similar path and even though I like Go, there are definitely a few things I wish it did better. Mainly, the functional-like programming for data manipulation (which is most of what we do in our applications anyway).

I like how in Java I can do stream manipulation which is easy to comprehend and looks neat and tidy:

collection.stream()
    .filter( x -> choose(x) )
    .map( x -> doSomething(x) )
    .collect()

or Python:

map(lambda x: do_something(x),
    filter(lambda x: choose(x), collection))

However, I'd like to take this even further and use this interface for all record-based IO, not just in-memory collections.

In fact, a similar framework already exists in the dark and well-forgotten language of the ancient πŸ™ – Perl.
It's called: Flux (here's a blogpost about it).

In this post, I will re-implement its key concepts and show how you can do something similar in Go:

type FooBar struct {
    value int,
    ...
}

func main() {
    // Data source (file) to read records from
    in := datapump.FileIn("source.json.log")
    // Transform them into objects with a types JSON filter
    in = datapump.PipeIn(in, datapump.filters.FromJSON[FooBar]())
    // Do custom data processing
    in = datapump.PipeIn(in, datapump.filters.Lambda(
        func (record FooBar) FooBar {
            record.value *= 10;
            return record;
        }
    ))
    // Destination store (file) to write records to
    out := datapump.FileOut("destination.json.log")
    // Transform objects back to JSON strings
    out = datapump.PipeOut(out, datapump.filters.ToJSON[FooBar]())
    
    // Pump all records from the "in" source into the "out" destination.
    datapump.Pump(in, out)
}

It's not ideal, but I think this looks pretty neat. You can imagine how this could be extended to various types of source and destination points (database, API endpoint) all bound by this simple interface.

For me, the key benefit here is uniformity. When all your data endpoints have consistent expectations, it's much easier to be more productive with your code and super easy to onboard new engineers. And as with any framework, - you only need to solve a problem once and all your various data processing nodes would benefit from this.

In order to see how this all ties together, I'll "reinvent the bicycle" and design the core of this framework 😎

Requirements

  1. It has be able to work with heterogeneous data stores.
  2. It has to provide a simple high level interface abstracted from the store-specific implementation details.
  3. It's always a unidirectional data from - reading from a source, writing results into a destination store.
  4. It may produce duplicate records, but must not lose data.

Core Interface

What does the simplest data processing look like?

  1. You read some data from a "source"
  2. You do some alterations
  3. You write this new data back into some "destination"

Let's craft interfaces that allow us to do exactly these things, no more, no less:

In – the data source we're reading from:

type In[T any] interface {
	// read at most `count` data records from the source
	// and return a slice of them or an error.
	Read(count int) ([]T, error)
	// tell the data source that we've finished processing
	// everything it's given us until now.
	Commit() error
}

Filter – do some actions on the records and spit out the resulting ones:

type Filter[T any, S any] interface {
	// given a batch of records of type `T`
	// do some transformations on them and
	// return a batch of resulting records
	// of type `S` or an error.
	Write(records []T) ([]S, error)
}

Out – the destination for storing results of our data processing:

type Out[T any] interface {
	// write given records to the underlying data store
	// and return an error if this failed.
	Write(records []T) error
	// tell the data source to persist written records,
	// return an error if records were not saved.
	Commit() error
}

Filter Composition

In order to make Filter a little more convenient, I can attach it to an existing In or an Out. As a result, we get a new "filtered" In or Out which performs an extra data transformation (or filtering) on top of existing logic.

// FilteredIn type which also conforms to
// the In[S] interface by doing filter T->S
// data transformation
type FilteredIn[T any, S any] struct {
	in     In[T]
	filter Filter[T, S]
}

// FilteredOut type which also conforms to
// the Out[T] interface by doing filter T->S
// data transformation
type FilteredOut[T any, S any] struct {
	out    Out[S]
	filter Filter[T, S]
}

Composition and chaining is made possible because our filtered In and Out types conform to the defined In[S] and Out[T] interfaces. This way we can stack more filters on top of each other and gradually craft our data transformation pipeline.

In interface conformance:

func (fin *FilteredIn[T, S]) Read(count int) ([]S, error) {
	batch, err := fin.in.Read(count)
	if err != nil {
		return nil, err
	}
	if batch == nil {
		return nil, nil
	}

	return fin.filter.Write(batch)
}

func (fin *FilteredIn[T, S]) Commit() error {
	return fin.in.Commit()
}

Out interface conformance:

func (fout *FilteredOut[T, S]) Write(records []T) error {
	batch, err := fout.filter.Write(records)
	if err != nil {
		return err
	}
	if batch == nil {
		return nil
	}
	return fout.out.Write(batch)
}

func (fout *FilteredOut[T, S]) Commit() error {
	return fout.out.Commit()
}

Getting pumped! πŸš€

Now that we have a (potentially filtered) In and and Out, the only step left is to pump the records from the In into the Out.

I'll call this type a Pumper:

// Pumps records from the In and into the Out
// with a `batchSize` and committing at regular
// `commitCount` intervals.
type Pumper[T any] struct {
	in          In[T]
	out         Out[T]
	batchSize   int
	commitCount int
}

It's pretty simple and will have a single method only:

func (p *Pumper[T]) Pump() error {
	processed := 0
	for {
		batch, err := p.in.Read(p.batchSize)
		if err != nil { // stop if error
			return err
		}

		// nil batch represent the EOF state
		// when nothing else could be read at
		// this point.
		if batch == nil { // stop because we've read everything
			break
		}

		err = p.out.Write(batch)
		if err != nil { // stop if error
			return err
		}

		// count processed and commit our
		// progress at regular intervals
		processed += len(batch)
		if processed > p.commitCount {
			processed = processed % p.commitCount
			err = p.out.Commit()
			if err != nil {
				return err
			}
			err = p.in.Commit()
			if err != nil {
				return err
			}
		}
	}

	// commit progress at the end
	// of processing
	err := p.out.Commit()
	if err != nil {
		return err
	}
	err = p.in.Commit()
	if err != nil {
		return err
	}

	return nil
}

What we've learnt

  1. The 3 component framework we've built here is both super simple and immensely extendable with new data source and destination components.
  2. It provides a very concise way of expressing complex data manipulations
  3. It gives you a way to build reusable endpoint and filter components that could be easily plugged into different data pipelines.

Hope you enjoyed this design exercise πŸ˜‰
Thank you for reading, you've been awesome!

Apropos of Conclusion

You can find all of this code and a few more examples on my github:
https://github.com/sombr/go-datapump.git

I really enjoy making educational material and your feedback means a lot! If you like my style, please give the post a clap and subscribe ☺️