ArangoDB in FS2

Implementation of ArangoDB with fs2

Arango client database is written with fs2 I/O and query streaming with fs2 stream

Installation

// add dependency to the arangodb client
libraryDependencies += "com.bicou" %% "avokka-arangodb-fs2" % "0.0.7"

Usage

Let’s use cats effect IO and assume we have a ContextShift[_] and a Timer[_] in implicit scope (maybe from IOApp)

import cats.effect._

implicit def cs: ContextShift[IO] = ???
implicit def timer: Timer[IO] = ???

We also need to provide a log4cats Logger[_]

import org.typelevel.log4cats._
import org.typelevel.log4cats.slf4j.Slf4jLogger

implicit def unsafeLogger: Logger[IO] = Slf4jLogger.getLogger[IO]

And an ArangoConfiguration (we could also have use Cats-effect module for PureConfig)

import avokka.arangodb.ArangoConfiguration

val configuration = ArangoConfiguration.load()

Then we build a Resource to connect to ArangoDB

import avokka.arangodb.fs2._

val arango = Arango[IO](configuration)
// arango: Resource[[A >: Nothing <: Any] => IO[A], Arango[[A >: Nothing <: Any] => IO[A]]] = Bind(
//   source = Eval(
//     fa = Pure(value = SocketAddress(host = localhost, port = ( = 8530)))
//   ),
// ...

We .use the resource to obtain an instance of ArangoClient[IO]

arango.use { client =>
  client.server.version()
}.unsafeRunSync()
// res0: ArangoResponse[Version] = ArangoResponse(
//   header = Header(
//     version = 1,
//     type = ResponseFinal,
//     responseCode = 200,
//     meta = Map("X-Arango-Queue-Time-Seconds" -> "0.000000")
//   ),
//   body = Version(
//     server = "arango",
//     license = "community",
//     version = "3.10.6",
//     details = Map()
//   )
// )

Example in a for comprehension

import avokka.arangodb.types._

val r = arango.use { client =>
  for {
    info    <- client.system.info()
    _       <- IO { println(s"database '${client.system.name}' is system = ${info.body.isSystem}") }
    cCount  <- client.db.collection(CollectionName("countries")).documents.count() 
  } yield cCount.body.count 
}
// r: IO[Long] = FlatMap(
//   ioe = Pure(value = SocketAddress(host = localhost, port = ( = 8530))),
//   f = cats.effect.kernel.Resource$$Lambda$16605/0x0000000104244840@7905e1be,
//   event = cats.effect.tracing.TracingEvent$StackTrace
// )

r.unsafeRunSync()
// database 'DatabaseName(_system)' is system = true
// res1: Long = 250L

Query result streaming with FS2 streams

Call .stream[T] on a ArangoQuery[F] to transform it to a fs2.Stream[F, T]

arango.use { client =>
  client.db
    .query("FOR c IN countries RETURN c.name")
    .batchSize(100)
    .stream[String]
    .compile
    .toVector
}.unsafeRunSync()
// res2: Vector[String] = Vector(
//   "Anguilla",
//   "Afghanistan",
//   "Argentina",
//   "Angola",
//   "Armenia",
//   "Aruba",
//   "American Samoa",
//   "Andorra",
//   "United Arab Emirates",
//   "Albania",
//   "Åland Islands",
//   "French Southern and Antarctic Lands",
//   "Antarctica",
// ...

IOApp example

import avokka.arangodb.ArangoConfiguration
import avokka.arangodb.fs2._
import cats.effect._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import pureconfig.module.catseffect.syntax._

object ArangoExample extends IOApp {
  implicit def unsafeLogger: Logger[IO] = Slf4jLogger.getLogger[IO]

  override def run(args: List[String]): IO[ExitCode] = for {
    config <- Blocker[IO].use(ArangoConfiguration.at().loadF[IO, ArangoConfiguration])
    arango = Arango(config)
    _ <- arango.use { client =>
      for {
        version <- client.server.version()
        _ <- IO { println(version.body) }
      } yield ()
    }
  } yield ExitCode.Success
}