univocity csv parsers, fs2 and scala
Univocity makes fast CSV parsers. I needed to marry them up to fs2 for stream processing in scala.
First we need to create our univocity parser and begin parsing:
val settings = new CsvParserSettings()
settings.setHeaderExtractionEnabled(true)
val parser = new CsvParser(settings)
parser.beginParsing(new FileReader(inputFile))
Now we need to connect up this source to fs2. Univocity is starting to implement java iterators which would make this easier, but we can create our own. Anytime we have an object that generates data, we need to think of using
unfold
, which means to unfold "something" into a stream of values:def toStream[Record](parser: CsvParser) =
Stream.unfold(parser) { p =>
Option(p.parseNextRecord()).map((_, p)) }
This works because
unfold
needs an Option with a tuple of a value to emit and the next "state". The initial state is the parser itself. Since Option(..)
will return None when the argument is null and parseNextRecord()
returns null when there are no more CSV records, we can create an Option with a Record in it, then map into it. If there is a record in the Option
then we form a tuple (_, p)
which is the value and parser. If parseNextRecord
returns null, then Option(null)
evaluates to None, None is returned and unfolding stops.
To define our stream, just use it as a source:
val inputs = toStream(parser).covary[Task]
.map { ... }
.take(100)
...
If we wanted to be fancier, we could use
Stream.bracket
to ensure that parser.stopParsing
is called.
That's it!
Comments
Post a Comment