ArangoDB in FS2
Implementation of ArangoDB with fs2
Arango client database is written with fs2 I/O and query streaming with fs2 stream
Installation
// add dependency to the arangodb client
libraryDependencies += "com.bicou" %% "avokka-arangodb-fs2" % "0.0.7"
Usage
Let’s use cats effect IO
and assume we have a ContextShift[_]
and a Timer[_]
in implicit scope (maybe from IOApp
)
import cats.effect._
implicit def cs: ContextShift[IO] = ???
implicit def timer: Timer[IO] = ???
We also need to provide a log4cats Logger[_]
import org.typelevel.log4cats._
import org.typelevel.log4cats.slf4j.Slf4jLogger
implicit def unsafeLogger: Logger[IO] = Slf4jLogger.getLogger[IO]
And an ArangoConfiguration (we could also have use Cats-effect module for PureConfig)
import avokka.arangodb.ArangoConfiguration
val configuration = ArangoConfiguration.load()
Then we build a Resource
to connect to ArangoDB
import avokka.arangodb.fs2._
val arango = Arango[IO](configuration)
// arango: Resource[[A >: Nothing <: Any] => IO[A], Arango[[A >: Nothing <: Any] => IO[A]]] = Bind(
// source = Eval(
// fa = Pure(value = SocketAddress(host = localhost, port = ( = 8530)))
// ),
// ...
We .use
the resource to obtain an instance of ArangoClient[IO]
arango.use { client =>
client.server.version()
}.unsafeRunSync()
// res0: ArangoResponse[Version] = ArangoResponse(
// header = Header(
// version = 1,
// type = ResponseFinal,
// responseCode = 200,
// meta = Map("X-Arango-Queue-Time-Seconds" -> "0.000000")
// ),
// body = Version(
// server = "arango",
// license = "community",
// version = "3.10.6",
// details = Map()
// )
// )
Example in a for comprehension
import avokka.arangodb.types._
val r = arango.use { client =>
for {
info <- client.system.info()
_ <- IO { println(s"database '${client.system.name}' is system = ${info.body.isSystem}") }
cCount <- client.db.collection(CollectionName("countries")).documents.count()
} yield cCount.body.count
}
// r: IO[Long] = FlatMap(
// ioe = Pure(value = SocketAddress(host = localhost, port = ( = 8530))),
// f = cats.effect.kernel.Resource$$Lambda$16605/0x0000000104244840@7905e1be,
// event = cats.effect.tracing.TracingEvent$StackTrace
// )
r.unsafeRunSync()
// database 'DatabaseName(_system)' is system = true
// res1: Long = 250L
Query result streaming with FS2 streams
Call .stream[T]
on a ArangoQuery[F]
to transform it to a fs2.Stream[F, T]
arango.use { client =>
client.db
.query("FOR c IN countries RETURN c.name")
.batchSize(100)
.stream[String]
.compile
.toVector
}.unsafeRunSync()
// res2: Vector[String] = Vector(
// "Anguilla",
// "Afghanistan",
// "Argentina",
// "Angola",
// "Armenia",
// "Aruba",
// "American Samoa",
// "Andorra",
// "United Arab Emirates",
// "Albania",
// "Åland Islands",
// "French Southern and Antarctic Lands",
// "Antarctica",
// ...
IOApp example
import avokka.arangodb.ArangoConfiguration
import avokka.arangodb.fs2._
import cats.effect._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import pureconfig.module.catseffect.syntax._
object ArangoExample extends IOApp {
implicit def unsafeLogger: Logger[IO] = Slf4jLogger.getLogger[IO]
override def run(args: List[String]): IO[ExitCode] = for {
config <- Blocker[IO].use(ArangoConfiguration.at().loadF[IO, ArangoConfiguration])
arango = Arango(config)
_ <- arango.use { client =>
for {
version <- client.server.version()
_ <- IO { println(version.body) }
} yield ()
}
} yield ExitCode.Success
}