Tuesday, May 1, 2018

Streaming data from PostgreSQL using Akka Streams and Slick in Play Framework

In this blog post I’ll try to explain the process wherein you can stream data directly from PostgreSQL database using Scala Slick (which is Scala’s database access/query library) and Akka Streams (which is an implementation of Reactive Streams specification on top of Akka toolkit) in Play Framework. The process is going to be pretty straightforward in terms of implementation where data is read from one of the tables in your SQL database as stream and then it is sent/streamed to one of the REST end point configured to download this data.



For better understanding let’s take an example of an application or service which is used for administering a huge customer base of an organisation/company. The person involved in administering the customer base wants to get the entire data-set of customers for let’s say auditing purpose. Based on requirements it would sometimes make sense to stream this data directly into a downloadable file which is what we are going to do in this blog post.
(For this blog post you should have a basic knowledge of using Play Framework and Slick library)

The example uses following dependencies
  1. Play Framework 2.6.10 (“com.typesafe.play” % “sbt-plugin” % “2.6.10”)
  2. Play-Slick 3.0.1 (“com.typesafe.play” %% “play-slick” % “3.0.1”)
  3. Akka Streams 2.5.8 (“com.typesafe.akka” %% “akka-stream” % “2.5.8”)
  4. PostgreSQL 42.1.4 (“org.postgresql” % “postgresql” % “42.1.4”)
Let’s start by assuming we have a customer table in our PostgreSQL database which has the following structure

CREATE TABLE customers (
  id        BIGSERIAL PRIMARY KEY,
  firstname VARCHAR(255) NOT NULL,
  lastname  VARCHAR(255),
  email     VARCHAR(255)
);
Slick’s functional relational mapping corresponding to this table structure should look like this
case class Customer(id: Long,
                    firstName: String,
                    lastName: String,
                    email: String)

trait CustomerTable extends HasDatabaseConfigProvider[slick.jdbc.JdbcProfile] {
  
  import profile.api._

  val customerQuery: TableQuery[CustomerMapping] = TableQuery[CustomerMapping]

  private[models] class CustomerMapping(tag: Tag) extends Table[Customer](tag, "customers") {

    def id: Rep[Long] = column[Long]("id", O.PrimaryKey, O.AutoInc)

    def firstName: Rep[String] = column[String]("firstname")

    def lastName: Rep[String] = column[String]("lastname")

    def email: Rep[String] = column[String]("email")

    def * : ProvenShape[Customer] = (id, firstName, lastName, email) <>(Customer.tupled, Customer.unapply)

  }

}
Now let’s use the customerQuery to get data from the customers table in the form of DatabasePublisher of type Customer, i.e DatabasePublisher[Customer], which is Slick’s implementation of reactive stream’s Publisher where Publisher is the (potential) unbounded sequence of elements that publishes the elements according to the demand from the Subscriber. We will define this inside CustomerRepository.

def customers: DatabasePublisher[Customer] =
  db.stream(
    customerQuery
      .result
      .withStatementParameters(
         rsType = ResultSetType.ForwardOnly,
         rsConcurrency = ResultSetConcurrency.ReadOnly,
         fetchSize = 10000)
      .transactionally)
Certain things to be noted when using PostgreSQL for streaming data/records, which is also noted in Slick’s Official documentation:
  1. The use of transactionally which enforces the code to run on a single Connection with auto commit set as false [setAutoCommit(false)], by default slick is set to run in auto commit mode.
  2. The use of fetchSize so that the JDBC driver does not fetch all rows to the memory (i.e on client side) at once but instead fetch the specified number of rows at a time.
  3. ResultSetType.ForwardOnly sets the type to allow results to be read sequentially so that the cursor will only move forward.
  4. ResultSetConcurrency.ReadOnly makes sure that the ResultSet may not be updated.
Only if all of the above is done will the streaming work properly for PostgreSQL else it won’t and the actions inside the stream behavior will fetch the entire dataset.
So, the database repository code base is now sorted out. Let’s focus on the controller and how it’ll stream this data to a downloadable file.
We can create a new Play controller for the purpose of managing all APIs related to the customers and this controller has access to the CustomerRepository we created earlier in which the customers method is defined and implemented.
We’ll use Play’s simple Result to stream the data to our client, i.e to the person administering the customers on /customers API (added to Play routes) by providing the customer stream to HttpEntity.Streamed case class like this
Result(
      header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
      body = HttpEntity.Streamed(csvSource, None, None))
The entire controller method would look something like this
def customers: Action[AnyContent] = Action { implicit request =>
  val customerDatabasePublisher = customerRepository.customers
  val customerSource = Source.fromPublisher(customerDatabasePublisher)

  val headerCSVSource = Source.single(ByteString(""""First Name","Last Name","Email"""" + "\n"))
  val customerCSVSource =
    customerSource.map(data => ByteString(s""""${data.firstName}","${data.lastName}","${data.email}"""" + "\n"))
  
  val csvSource = Source.combine(headerCSVSource, customerCSVSource)(Concat[ByteString])

  Result(
        header = ResponseHeader(OK, Map(CONTENT_DISPOSITION → s"attachment; filename=customers.csv")),
        body = HttpEntity.Streamed(csvSource, None, None))
}
Note that the DatabasePublisher[Customer] is converted to Source of Customer using the Source.fromPublisher helper method which is used to create a Source from Publisher.
Rest of the manipulations are done on the Source to convert the data into readable CSV file format.
Also, note the use of Source.combine method which is used to combine sources with fan-in strategy which in our case is Concat.
Hope this helps :)
The entire code base is available in the following repository playakkastreams.
This article was first published on the Knoldus blog.

No comments:

Post a Comment