univocity csv parsers, fs2 and scala

Univocity makes fast CSV parsers. I needed to marry them up to fs2 for stream processing in scala.
First we need to create our univocity parser and begin parsing:
    val settings = new CsvParserSettings()
    settings.setHeaderExtractionEnabled(true)
    val parser = new CsvParser(settings)
    parser.beginParsing(new FileReader(inputFile))
Now we need to connect up this source to fs2. Univocity is starting to implement java iterators which would make this easier, but we can create our own. Anytime we have an object that generates data, we need to think of using unfold, which means to unfold "something" into a stream of values:
def toStream[Record](parser: CsvParser) = 
  Stream.unfold(parser) { p => 
     Option(p.parseNextRecord()).map((_, p)) }
This works because unfold needs an Option with a tuple of a value to emit and the next "state". The initial state is the parser itself. Since Option(..) will return None when the argument is null and parseNextRecord() returns null when there are no more CSV records, we can create an Option with a Record in it, then map into it. If there is a record in the Optionthen we form a tuple (_, p) which is the value and parser. If parseNextRecord returns null, then Option(null) evaluates to None, None is returned and unfolding stops.
To define our stream, just use it as a source:
 val inputs = toStream(parser).covary[Task]
      .map { ... }
      .take(100)
      ...
If we wanted to be fancier, we could use Stream.bracket to ensure that parser.stopParsing is called.
That's it!

Popular posts from this blog

graphql (facebook), falcor (netflix) and odata and ...

React, Redux, Recompose and some simple steps to remove "some" boilerplate and improve reuse

Using wye and tee with scalaz-stream