2012年9月16日日曜日

playframework1.2のJobでハマったところ

playframework1系は、リクエスト・ジョブ単位でトランザクションを管理している。 シングルトランザクションで業務処理を行う場合は、分かりやすいのだが、マルチトランザクションを用いないと実現できない場合は、かなり面倒なことになる。 
個人的には、playframework2.0系であれば、プログラマが自由にトランザクション範囲を定義することができるため、今後は2.0を使っていきたいが、1系のアプリケーションをメンテする必要がある場合は、マルチトランザクションを実装しないといけない場合があるだろう。 playframework1系でハマったところを備忘録としてメモっておく。 

今回、playframework1系で実現したい機能は以下である。 playframeworkのJob機能を使って、複数のバッチジョブを順に実行したり並行して実行するために、ジョブ制御テーブルをDBにもち、各バッチジョブの開始、終了時にジョブ制御テーブルを更新する。


まずは、playframeworkのJobクラスを拡張する。

トランザクションがネストしていないので、トランザクション境界にはJPAPlugin.startTx(false)とJPAPlugin.closeTx(false)を用いることがポイントだ。

業務ロジック用のJobはコンストラクタに設定してあげればよい。

public class JobInvoker() extends Job{

    /**
     * 業務処理をするJobクラス
     */
    private Job<JobResult> job = null;
    
    /**
     * コンストラクタ、業務処理するJobクラスを設定する。
     * @param job
     */
    public JobInvoker(Job<JobResult> job){
        this.job = job; 
    }    

    /**
     * Jobの実行状況を制御するJobStatusManagerを用いてステータスを変更する。 
     * @see play.jobs.Job#doJob()
     */
    @Override
    public void doJob() throws Exception {
        //この段階で既にトランザクションが開始されている。
        JobStatusManager manager = new JobStatusManager("JobA");
        try {
            
            if (manager.isExecutable()) {
                            //DBのテーブルにあるステータスの値を「実行前」から「実行中」に更新する。
                     manager.startJobStatus();
                     manager.commit();
                            //ここでcloseTxするか、DB.getConnection().commit()しないとDBに値が更新されない。
                     JPAPlugin.closeTx(false);

                            //ここから業務処理用のジョブの開始
                            //業務処理用に新たにトランザクションを開始する。
                            //doTask()メソッドがコンストラクタに設定したJobを呼び出す。
                     { 
                     JPAPlugin.startTx(false);
                            doTask();
                     JPAPlugin.closeTx(false);
                     }

                            //ステータス更新のためのトランザクションを開始する。
                     JPAPlugin.startTx(false);
                            //DBのテーブルにあるステータスを「実行中」から「完了」に更新する。
                     manager.completeJobStatus();
                     manager.commit();
            }

        } catch (Exception e) {
       //異常時はDBのテーブルにあるステータスを「実行中」から「異常終了」に更新する。
            manager.failJobStatus();
            manager.commit();
            Logger.error(e, "%s failed !", this.getClass().getName());

        } finally { 
         manager.close();
      if(JPA.isInsideTransaction()){
             JPAPlugin.closeTx(false);
      }
        }

    }

    /**
     * コンストラクタに設定したジョブを呼び出して結果を取得する。
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    protected void doTask() throws Exception {
        
        Promise<jobresult> promise = job.now();
        
        if(!promise.get().isSuccess()){
            Logger.error(promise.get().getException(), "Error", null);
            throw promise.get().getException();
        }
    }
}

また、上記のように実装した場合、設定ファイルのapplication.confもいくつか追記しないといけない。


  1. db.pool.maxIdleTimeExcessConnectionsを記述しないとDBのコネクションが解放されずに増え続け、一定時間後にPersistanceExceptionが発生する。
  2. db.pool.minSizeは、play.jobs.poolよりも多く設定しておく。そうしないとコネクションがタイムアウトしましたというPersistanceExceptionが発生する。


#Jobに割り当てる最大スレッド数の設定
play.jobs.pool=10

#DBの設定
db.pool.timeout=10000
db.pool.maxSize=40
db.pool.minSize=20
db.pool.maxIdleTimeExcessConnections=10

やはり仕事で使う場合は、マルチトランザクションは必要だ。また、常駐ジョブは一定期間の安定テストを行う必要がある。playframeworkでマルチトランザクションでバッチを起動するなら上記設定内容だけは覚えておきたい。