Scalaz(37)- Free :实践-DB Transaction free style

简介:

 我一直在不断的提示大家:FP就是Monadic Programming,是一种特殊的编程风格。在我们熟悉的数据库编程领域能不能实现FP风格呢?我们先设计一些示范例子来分析一下惯用的数据库编程过程:


 1 import scalaz._
 2 import Scalaz._
 3 import scala.language.higherKinds
 4 import scala.language.implicitConversions
 5 import com.jolbox.bonecp.BoneCP
 6 import com.jolbox.bonecp.BoneCPConfig
 7 import java.sql.Connection
 8 import java.sql.ResultSet
 9 
10 object freedbtxns {
11 def getTutorId(courseId: Int, conn: Connection): Int = {
12   val sqlString = "select TUTOR from COURSES where ID=" + courseId
13   conn.createStatement().executeQuery(sqlString).getInt("ID")
14 }  
15 def getTutorPay(courseId: Int, conn: Connection): Double = {
16   val sqlString = "select PAYAMT from COURSES where ID=" + courseId
17   conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
18 }  
19 def getStudentFee(courseId: Int, conn: Connection): Double = {
20   val sqlString = "select FEEAMT from COURSES where ID=" + courseId
21   conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22 }  
23 def updateTutorPay(tutorId: Int, plusAmt: Double, conn: Connection): Unit = {
24   val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
25   conn.createStatement().executeUpdate(sqlString)
26 }  
27 def updateStudentFee(studentId: Int, plusAmt: Double, conn: Connection): Unit = {
28   val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
29   conn.createStatement().executeUpdate(sqlString)
30 }  
31 def findEmptySeat(courseId: Int, conn: Connection): Int = {
32   val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
33   conn.createStatement().executeQuery(sqlString).getInt("ID")
34 }
35 def updateSeatsStatus(seatId: Int, taken: Boolean, conn: Connection): Unit = {
36   val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
37   conn.createStatement().executeUpdate(sqlString)
38 }  

我这里模拟了一个培训学校内的一些业务。上面设计的是一些基本函数,可以分别对学员、导师、座位进行查询和更新。如果我们需要把更新工作放入事务处理(transaction)内的话我们可以这样做:


1 def updateStudent(studentId: Int, courseId: Int): Unit = {
 2    val config = new BoneCPConfig()
 3    val bonecp = new BoneCP(config)
 4    val conn = bonecp.getConnection()
 5    conn.setReadOnly(false)
 6    conn.setAutoCommit(false)
 7    conn.rollback()
 8    try {
 9      val fee = getStudentFee(courseId, conn)
10      updateStudentFee(studentId,fee, conn)
11      conn.commit()
12    } catch {
13      case (e:Exception) => conn.rollback()
14    } finally {
15      conn.close()
16    }   
17 }
18 def updateStudentAndSeat(studentId: Int, courseId: Int): Unit = {
19    val config = new BoneCPConfig()
20    val bonecp = new BoneCP(config)
21    val conn = bonecp.getConnection()
22    conn.setReadOnly(false)
23    conn.setAutoCommit(false)
24    conn.rollback()
25    try {
26      val fee = getStudentFee(courseId, conn)
27      updateStudentFee(studentId,fee, conn)
28      val seatId = findEmptySeat(courseId, conn)
29      updateSeatsStatus(seatId, true, conn)
30      conn.commit()
31    } catch {
32      case (e:Exception) => conn.rollback()
33    } finally {
34      conn.close()
35    }   
36 }

马上可以发现在我们对这些函数在事务处理内进行组合使用时我们必须重新对事务处理进行设置,无法实现真正意义上的函数组合。如果我们认可FP风格的话,这里起码有两项弊处:一是源代码增加了大量的铺垫(boilerplate code),重复事务处理设置、二是每个更新函数都会产生副作用,换句话说就是这里那里都会有副作用影响,很难控制,这样就增加了程序的复杂程度,造成代码分析的困难。

我们希望达到的目标:


 1 /*
 2 def updateStudentAndSeat(studentId: Int): program {
 3  // findEmptySeat
 4  // updateStudentFee
 5  // updateSeatStatus 
 6 }
 7 
 8 def runDBTxn(prg: program) {
 9   //conn= getConnection
10   //try
11   // run(pre)
12   //commit
13   //catch
14   //rollback
15 }
16 runDBTxn(updateStudent)
17 runDBTxn(updateStudentAndSeat)
18 runDBTxn(updateSeatStatus)
19 */  

我们只在一个地方设置和运行事务处理。我们希望能把不同的program传入runDBTxn去运算。这不就是Free Monad的编程、运算关注分离模式嘛。那我们就试着用Free Monad来提供数据库事务处理支持。按上篇讨论的设计流程我们先设计ADT:


1 case class SqlOp[A](run: Connection => A)

模拟sql指令很简单,两种情况:query或者update。两者都可以用函数run表示:传入Connection,返回结果A,A有可能是Unit。要成为Free Monad就必须先获取SqlOp的Functor实例:


1 case class SqlOp[A](run: Connection => A)
2 implicit val sqlOpFunctor = new Functor[SqlOp] {
3   def map[A,B](sa: SqlOp[A])(f: A => B): SqlOp[B] = 
4     SqlOp{ (conn: Connection) => f(sa.run(conn)) }
5 }

基本功能的sql操作函数及升格Free:


1 type Sql[A] = Free[SqlOp,A]
 2 def getTutorId(courseId: Int): Sql[Int] = 
 3   Free.liftF(SqlOp{
 4     (conn: Connection) => {
 5       val sqlString = "select TUTOR from COURSES where ID=" + courseId
 6       conn.createStatement().executeQuery(sqlString).getInt("ID")
 7     }  
 8   })
 9   
10 def getTutorPay(courseId: Int): Sql[Double] = 
11   Free.liftF(SqlOp{
12     (conn: Connection) => {
13       val sqlString = "select PAYAMT from COURSES where ID=" + courseId
14       conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
15     }  
16   })
17 def getStudentFee(courseId: Int): Sql[Double] = 
18   Free.liftF(SqlOp{
19     (conn: Connection) => {
20      val sqlString = "select FEEAMT from COURSES where ID=" + courseId
21      conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22     }  
23   })
24 def updateTutorPay(tutorId: Int, plusAmt: Double): Sql[Unit] = 
25   Free.liftF(SqlOp{
26     (conn: Connection) => {
27       val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
28       conn.createStatement().executeUpdate(sqlString)
29     }  
30   })
31 def updateStudentFee(studentId: Int, plusAmt: Double): Sql[Unit] = 
32   Free.liftF(SqlOp{
33     (conn: Connection) => {
34       val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
35       conn.createStatement().executeUpdate(sqlString)
36     }  
37   })
38 def findEmptySeat(courseId: Int): Sql[Int] = 
39   Free.liftF(SqlOp{
40     (conn: Connection) => {
41       val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
42       conn.createStatement().executeQuery(sqlString).getInt("ID")
43     }  
44   })
45 def updateSeatsStatus(seatId: Int, taken: Boolean): Sql[Unit] = 
46   Free.liftF(SqlOp{
47     (conn: Connection) => {
48       val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
49       conn.createStatement().executeUpdate(sqlString)
50     }  
51   })

我们现在可以用这些升格成Free的函数来建设AST示范例子:


1   def takeSeat(courseId: Int): Sql[Unit] = for {
 2     emptySeat <- findEmptySeat(courseId)
 3     _ <- updateSeatsStatus(emptySeat, true)
 4   } yield()
 5   def addCourse(studentId: Int, courseId: Int): Sql[Unit] = for {
 6     fee <- getStudentFee(courseId)
 7     pay <- getTutorPay(courseId)
 8     tutorId <- getTutorId(courseId)
 9     _ <- updateStudentFee(studentId, fee)
10     _ <- updateTutorPay(tutorId, pay)
11     _ <- takeSeat(courseId)
12   } yield()

addCourse对基本函数进行了组合,又调用了已经组合过一次的takeSeat,证明AST可以实现高度的函数组合。

下面示范实现相关的Interpreter:


1   def runTransactionImpl[A](conn: Connection, ast: Sql[A]): A = 
2     ast.resume.fold ({
3       case x: SqlOp[Sql[A]] => runTransactionImpl(conn, x.run(conn))
4     },
5     (a: A) => a 
6    )

我们需要一个通用的事务处理方法:


 1   def runTransaction[A](ast: Sql[A]): Exception \/ A = {
 2     val config = new BoneCPConfig()
 3     val bonecp = new BoneCP(config)
 4     val conn = bonecp.getConnection()
 5     conn.setReadOnly(false)
 6     conn.setAutoCommit(false)
 7     conn.rollback()
 8     try {
 9       val result: A = runTransactionImpl(conn, ast)
10       result.right[Exception]
11     } catch {
12       case e: Exception => e.left[A]
13     } finally {
14       conn.close
15     } 
16   }

这样,我们可以在一个地方使用事务处理来运算任何事先设计的AST。

我们可以用不同的方法来实现Interpreter。下面就是用Free.foldMap来运算AST的示范。由于我们需要注入Connection,所以采用了Sql to State的自然转换(natural transformation):


 1   type SqlState[A] = State[Connection, A]
 2   object SqlToState extends (SqlOp ~> SqlState) {
 3     def apply[A](sa: SqlOp[A]): SqlState[A] = sa match {
 4       case SqlOp(f) => State {
 5         conn => (conn,f(conn))
 6       }
 7     }
 8   }
 9   def runTransactionImplState[A](conn: Connection, ast: Sql[A]) =
10     ast.foldMap(SqlToState).run(conn)

下面是这个用Free来实现FP风格数据库事务处理的完整示范代码:


 1 import scalaz._
  2 import Scalaz._
  3 import scala.language.higherKinds
  4 import scala.language.implicitConversions
  5 import com.jolbox.bonecp.BoneCP
  6 import com.jolbox.bonecp.BoneCPConfig
  7 import java.sql.Connection
  8 import java.sql.ResultSet
  9 
 10 object freedbtxns {
 11 
 12 case class SqlOp[A](run: Connection => A)
 13 implicit val sqlOpFunctor = new Functor[SqlOp] {
 14   def map[A,B](sa: SqlOp[A])(f: A => B): SqlOp[B] = 
 15     SqlOp{ (conn: Connection) => f(sa.run(conn)) }
 16 }
 17 type Sql[A] = Free[SqlOp,A]
 18 def getTutorId(courseId: Int): Sql[Int] = 
 19   Free.liftF(SqlOp{
 20     (conn: Connection) => {
 21       val sqlString = "select TUTOR from COURSES where ID=" + courseId
 22       conn.createStatement().executeQuery(sqlString).getInt("ID")
 23     }  
 24   })
 25   
 26 def getTutorPay(courseId: Int): Sql[Double] = 
 27   Free.liftF(SqlOp{
 28     (conn: Connection) => {
 29       val sqlString = "select PAYAMT from COURSES where ID=" + courseId
 30       conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
 31     }  
 32   })
 33 def getStudentFee(courseId: Int): Sql[Double] = 
 34   Free.liftF(SqlOp{
 35     (conn: Connection) => {
 36      val sqlString = "select FEEAMT from COURSES where ID=" + courseId
 37      conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
 38     }  
 39   })
 40 def updateTutorPay(tutorId: Int, plusAmt: Double): Sql[Unit] = 
 41   Free.liftF(SqlOp{
 42     (conn: Connection) => {
 43       val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
 44       conn.createStatement().executeUpdate(sqlString)
 45     }  
 46   })
 47 def updateStudentFee(studentId: Int, plusAmt: Double): Sql[Unit] = 
 48   Free.liftF(SqlOp{
 49     (conn: Connection) => {
 50       val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
 51       conn.createStatement().executeUpdate(sqlString)
 52     }  
 53   })
 54 def findEmptySeat(courseId: Int): Sql[Int] = 
 55   Free.liftF(SqlOp{
 56     (conn: Connection) => {
 57       val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
 58       conn.createStatement().executeQuery(sqlString).getInt("ID")
 59     }  
 60   })
 61 def updateSeatsStatus(seatId: Int, taken: Boolean): Sql[Unit] = 
 62   Free.liftF(SqlOp{
 63     (conn: Connection) => {
 64       val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
 65       conn.createStatement().executeUpdate(sqlString)
 66     }  
 67   })
 68 
 69   def takeSeat(courseId: Int): Sql[Unit] = for {
 70     emptySeat <- findEmptySeat(courseId)
 71     _ <- updateSeatsStatus(emptySeat, true)
 72   } yield()
 73   def addCourse(studentId: Int, courseId: Int): Sql[Unit] = for {
 74     fee <- getStudentFee(courseId)
 75     pay <- getTutorPay(courseId)
 76     tutorId <- getTutorId(courseId)
 77     _ <- updateStudentFee(studentId, fee)
 78     _ <- updateTutorPay(tutorId, pay)
 79     _ <- takeSeat(courseId)
 80   } yield()
 81 
 82   def runTransactionImpl[A](conn: Connection, ast: Sql[A]): A = 
 83     ast.resume.fold ({
 84       case x: SqlOp[Sql[A]] => runTransactionImpl(conn, x.run(conn))
 85     },
 86     (a: A) => a 
 87    )
 88   def runTransaction[A](ast: Sql[A]): Exception \/ A = {
 89     val config = new BoneCPConfig()
 90     val bonecp = new BoneCP(config)
 91     val conn = bonecp.getConnection()
 92     conn.setReadOnly(false)
 93     conn.setAutoCommit(false)
 94     conn.rollback()
 95     try {
 96       val result: A = runTransactionImpl(conn, ast)
 97       result.right[Exception]
 98     } catch {
 99       case e: Exception => e.left[A]
100     } finally {
101       conn.close
102     } 
103   }
104 }

相关文章
|
7月前
|
Docker 容器
devmapper: Thin Pool has 162394 free data blocks which is less than minimum required 163840 free dat
devmapper: Thin Pool has 162394 free data blocks which is less than minimum required 163840 free dat
49 0
|
SQL 关系型数据库 Oracle
ORA-01466: unable to read data - table definition has changed
1. Oracle建议我们等待大约5分钟之后再进行flashback query新创建的表,否则可能会碰到这个错误ORA-01466: unable to read data - table definition has changed.
1799 0
|
SQL Oracle 关系型数据库
|
JavaScript Oracle 关系型数据库