【Java Day Tokyo 2015 3-‐5】 jBatch実践入門 NTTコムウェア株式会社 上妻 宜人 (あげつま のりと) はてなブログ : 見習いプログラミング日記 Copyright © NTT COMWARE 2015 上妻 宜人 あげつま のりと • Javaトラブル解析/技術サポート業務に従事 • Java EE について調べて伝えることが好き • 日本Javaユーザグループでの講演 • JJUG CCC 2014, ナイトセミナ, JavaOne報告会 など 『バッチジョブ』 どのように実装していますか? よくあるバッチアプリ構成を 思い浮かべてみます。 よくあるバッチ構成例 ジョブスケジューラ ジョブネットA Job1 Job2-‐1 Job3 Job2-‐2 マシンA Job1 マシンB Job2 Job2 Job3 mainメソッドから始まるJavaプログラム よくある困ったジョブ実装 ジョブスケジューラ • 試験期間の後半に問題が見つかる • 一度に大量ロードしてOutOfMemoryError ジョブネットA Job1 Job2-‐1 Job3 • JDBCのexecuteBatch未使用による性能遅延 • 似たようなジョブでも構造がばらばら Job2-‐2 • 独自フレームワークは難読化と隣り合わせ マシンA Job1 マシンB Job2 Job2 Job3 mainメソッドから始まるJavaプログラム ジョブ実装への様々な要求 • 耐エラー性 • 不正データが混在していても弾いて準正常終了 • リスタート可能 • エラーによる再実行時には途中から再開可 • 多様な入出力形式 • CSVファイルやRDBMSが入力とは限らない • Web API によりjson形式の入力を持ってくる 等 これら課題を解決するための、 7 1つの候補が Java EE より導入 (2013/5 release) JSR 352 Batch ApplicaSon for The Java PlaTorm 本日のコンテンツ • jBatchとは何か • jBatchをどうやって使うのか • メリット と デメリット • よくある疑問 • まとめ jBatchとは何か? • Java EE7 より導入されたバッチフレームワーク • 各ジョブの実装を楽にするのが目的 • Spring Batch の構造を引き継いで仕様化 jBatchの全体像 1. バッチ用語とアーキテクチャの定義 JobOperator Job Step1 Item Reader Step2 Step3 Item Processor Item Writer Job Repository 2. ジョブXML <job id=“monthly_billing”> <step id=“step1” next=“step2”> <step id=“step2”> </job> 3. ユーザが実装するAPI public interface ItemReader { Object readItem() throws ... } jBatch機能 #1 順序制御 ジョブフローはXML、ステップや分岐条件はJavaで実装 Cron or EJBタイマ ジョブスケジューラ Job1 job2 jBatch Job Step1 Java 0 4 * * * batch.sh Job XML Decision (条件分岐) Java </> Java Step2 Step3-‐1 Split Step3-‐2 Java (並行実行) Java Step4 jBatch機能#2 処理のフレームワーク化 • バッチ処理の多くは 『入力⇒処理⇒出力』 の流れ • 入出力と処理を分けて作成し、jBatchが呼び出す構造 • フレームワークによる設計の再利用 jBatch Job Step1 Step2 <<interface>> <<interface>> <<interface>> Item Item Item ReaderJava ProcessorJava Writer Java readItem() processItem() writeItems() jBatch RunSme jBatch機能#3 チェックポイント • 異常終了に備えてチェックポイント情報を収集 • デフォルト 10アイテム処理間隔 • リスタート時にItemReaderとItemWriterに渡す • 例えば、ファイルの途中から再処理が可能 start 10行完了 COMIT COMIT 10 20 20行完了 Job Repository × restart 20行目から再処理 30 ジョブ 完了 jBatch機能#4 エラーハンドリング • スキップ • 例外が投げられたレコードの処理をスキップ • 例: 入力チェックエラー行はロギングのちスキップ • 上限を設定して、ジョブ自体の異常終了も可能 • リトライ • 例外が投げられたら該当のステップを自動リトライ • デフォルトはリトライなし。ジョブ異常終了。 jBatchのアーキテクチャ • JobOperatorからジョブ起動 • 1つのジョブは複数のステップより構成 • 各ステップは Read – Process – Write に分けて実装 • 起動終了時刻、終了ステータスをJobRepositoryに保存 JobOperator Job Step1 Item Reader Job Repository Step2 Step3 Item Processor Item Writer ジョブ • トップレベル要素 • jBatchにジョブネットやジョブグループのような考え方はない • ジョブは必ず1つ以上のステップを含む • パラメータ、再実行可否 を設定可能 (デフォルト再実行可) • param : /var/file/売り上げデータ.csv 請求書発行ジョブ • エラー時は途中ステップから再実行可 (例: メールサーバ異常時には、Step3から再実行) 【Step1】 請求額確定 【Step2】 PDF生成 【Step3】 メール送付 ステップ • ユーザがJavaで実装する処理 • ステップ毎に2種類の実装方式が選べる • チャンク: Reader, Processor, Writer を分けて実装 • バッチレット: 単純に1回だけ呼ばれるタスク ステップ: チャンク方式 • ItemReader、ItemProcessor、ItemWriter の3つを実装 • 入力と処理を繰り返した後、一定数まとめて出力 • ファイルやDBなどのレコード単位の処理向け Step jBatch RunSme 『入力 ⇒ 処理』 1レコードずつ 10回繰り返す Item Reader Item Processor Item Writer read process 10レコード分まとめて書き出す write Commit ▶ ステップ: チャンク方式のメリット • バッチ処理の一定の型を規定する • 似たような処理を一つの設計にまとめる • Reader / Writer を汎用ライブラリ化しやすい Item Reader • CSVなどの FlatFileReader • XML / JSON Reader • JDBC / JPA Reader Item Processor • ビジネスロジック の実装 Item Writer • Readerと同様に ライブラリ提供あり ステップ: バッチレット方式 • ステップ毎に1回だけ呼び出す方式 • ファイル転送、ファイル圧縮、ジョブ完了メール通知 • Spring Batch では Tasklet と呼ばれている Step jBatch RunSme Batchlet process() ステップの終了コード ファイル転送、 ファイル圧縮処理など ステップ以外のジョブ構成要素 #1 • Flow: ステップをグループ化する • Decision: 条件分岐 • Split: 並列実行。Flow毎に可能 (Step毎は不可) ジョブ Flow1 Step1-‐1 Step1-‐2 Step2-‐1 Decision Split Flow2 Flow3 終了 ステップ以外のジョブ構成要素 #2 • Listener: 各ジョブ要素の前後に呼ばれる • インターセプタのようなイメージ • 特にスキップ時のSkipListenerはよく使う (ロギング等) Job Listener Step Listener ジョブ Step1 Item Reader Item Processor Skip Listener Item Writer Retry Listener ここまでのまとめ • jBatchはバッチフレームワークの標準仕様 • アーキテクチャとジョブXML、APIを規定 • XMLで順序定義し、ステップをAPIに沿って実装 • ジョブがトップレベル。1つ以上のステップを含む • ステップは Chunk方式 or Batchlet方式 で実装 • Chunk方式ではReader/Processor/Writerを実装 • Batchlet方式ではBatchletのみ実装 本日のコンテンツ • jBatchとは何か • jBatchをどうやって使うのか • メリット と デメリット • よくある疑問 • まとめ jBatch利用の流れ 1. ジョブ設計 • ジョブとステップを抽出する 2. ジョブ実装 • ジョブXMLの作成 • ステップの実装 (Reader/Processor/Writer) • ジョブの起動 ジョブ設計 • ジョブをステップに分割 • 各ステップの入力と出力を決める 請求書発行ジョブ 【Step1】 請求額確定 入力 CSV aa,bb 出力 【Step2】 請求書生成 入力 出力 DBMS 請求額TBL 他システムから 受領した売上データ 【Step3】 メール送付 PDF 請求書PDF 請求先へ ジョブ実装 #1 ジョブXMLの作成 チャンク方式でファイルを読み、DBに書くステップ定義 META-‐INF/batch-‐jobs/issueBill.xml <job id=“issueBill”> 次ステップは請求書生成 <!-‐-‐ Step1 請求額確定 -‐-‐> <step id=“importInvoice” next=“genPDFBill” > <chunk item-‐count=“100”> 読込⇒処理 x 100回後に <reader ref=“safesFileReader”/> 出力する(デフォルト10) <processor ref=“billProcessor”/> <writer ref=“billWriter”/> </chunk> </step> <step id=“genPDFBill”> ItemReader, ItemProcessor, ItemWriter定義 クラス名の頭を小文字 or FQCN で指定 ... ジョブ実装 #1-‐1 ジョブプロパティ定義 ItemReaderの入力ファイル名を定義する。 固定の場合と、可変の場合で定義方法は異なる。 <job id=“issueBill”> <!-‐-‐ Step1 請求額確定 -‐-‐> <step id=“importInvoice” next=“genPDFBill” > <chunk item-‐count=“100”> 固定的なプロパティは 値を直接ジョブXMLに定義する <reader ref=“safesFileReader”> <properSes> <property name=“fixedFileName” value=“input.csv”/> <property name=“fileName” value=“#{jobParameters[‘fileName’]}”/> </properSes> </reader> <processor ref=“billProcessor”/> <writer ref=“billWriter”/> </chunk> </step> ... 可変プロパティはプロパティ名を定義 後述のジョブ起動引数で値を指定する ジョブ実装 #1-‐2 スキップ定義を追加 エラー行をスキップしたい場合は、さらに設定追加 <job id=“issueBill”> <!-‐-‐ Step1 請求額確定 -‐-‐> <step id=“importInvoice” next=“genPDFBill” > <chunk item-‐count=“100” skip-‐limit=“10”> <reader ref=“safesFileReader”> <properSes> <property name=“fixedFileName” value=“input.csv”/> <property name=“fileName” value=“#{jobParameters[‘fileName’]}”/> </properSes> </reader> <processor ref=“billProcessor”/> <writer ref=“billWriter”/> 10行までスキップする それ以上はジョブ異常終了 スキップ対象例外 (FQCN) <skippable-‐excepSon-‐class> <include class=“sample.InvalidRecordExcepSon”/> </skippable-‐excepSon-‐class> </chunk> スキップ時リスナの定義 <listeners> <listener ref=“logErrorRecordListener” /> <listeners> </step> ... ジョブ実装 #2 ステップの実装 • Chunkを構成する各クラスを実装する • スキップリスナを実装する 【Step1】 請求額確定 SalesFile Reader 入力 CSV aa,bb LogErrorRecordListener Bill Processor Bill Writer 出力 DBMS 請求額TBL 【Step2】 請求書生成 【Step3】 メール送付 ジョブ実装 #2-‐1 ItemReaderの実装 ItemReaderには4つのAPIが定義されている @Named public class SalesFileReader implements ItemReader { public void open(Serializable checkpoint) { } public Object readItem() throws ExcepSon{ } public void close() { } public Serializable checkpointInfo() { } } ジョブ実装 #2-‐1-‐1 ItemReader @BatchPropertyによるパラメータ取得 @Named public class SalesFileReader implements ItemReader { @Inject @BatchProperty(name=“fileName”) private String fileName; ジョブXMLのプロパティ名を ... @BatchProperty引数に指定 <property name=“fileName” value=“#{jobParameters[‘fileName’]}”/> ジョブ実装 #2-‐1-‐2 ItemReader.Open @Named public class SalesFileReader implements ItemReader { ... private BufferedReader reader; private int rowNum; public void open(Serializable checkpoint) { reader = Files.newBufferedReader(Paths.get(fileName)); if (checkpoint != null) { // TODO skip reader } } public Serializable checkpointInfo() { return rowNum; } ジョブ実装 #2-‐1-‐2 ItemReader.Open @Named 初回スタート時はnull。 public class SalesFileReader implements ItemReader { リスタート時には異常終了前最後 ... private BufferedReader reader; のcheckpointInfo()の値が渡される private int rowNum; public void open(Serializable checkpoint) { reader = Files.newBufferedReader(Paths.get(fileName)); if (checkpoint != null) { // TODO skip reader } チェックポイント毎に呼ばれる } (item-‐count属性に設定した間隔) public Serializable checkpointInfo() { return rowNum; } ジョブ実装 #2-‐1-‐3 ItemReader.readItem @Named public class SalesFileReader implements ItemReader { ... 入力がなくなったらnullを返す public Object readItem() { String line = reader.readLine(); if (line == null) return null; // TODO1 line to Sales obj // TODO2 入力値チェック if (!isValid()) { throw new InvalidRecordExcepSon(rowNum, line); } rowNum++; 入力値チェックもItemReaderの役割。 return sales; エラー時はスキップ対象例外を返す。 } ジョブ実装 #2-‐1-‐4 ItemReader.close @Named public class SalesFileReader implements ItemReader { ... public void open(Serializable checkpoint) { // 済 } public Serializable checkpointInfo() { //済 } public Object readItem() throws ExcepSon { // 済 } public void close() { reader.close(); } ... リソースクローズ処理。 readItem()から null が返される or 異常終了時に呼ばれる。 AbstractItemReaderにより省略可 一部メソッドは実装不要時に使用。readItem()のみ必須。 @Named public class SalesFileReader extends AbstractItemReader { @Override public Object readItem() throws ExcepSon { ... } } ジョブ実装 #2-‐2 ItemProcessorの実装 • ItemReaderで読んだ値を引数に処理 • ビジネスロジック実装部分 【Step1】 請求額確定 SalesFile Reader 入力 CSV aa,bb 済 LogErrorRecordListener Bill Processor Bill Writer 出力 DBMS 請求額TBL 【Step2】 請求書生成 【Step3】 メール送付 ジョブ実装 #2-‐2-‐1 ItemProcessorの実装 ビジネスロジックをprocessItem()に実装する @Named public class BillProcessor implements ItemProcessor { @Override public Object processItem(Object item) throws ExcepSon { Sales sales = (Sales) item; // ビジネスロジックをここに実装 // TODO 売上データ ⇒ 請求額 算出 return bill; } } ジョブ実装 #2-‐3 ItemWriterの実装 • ItemProcessorで処理した値を引数に • データを書き出す部分 【Step1】 請求額確定 SalesFile Reader 入力 CSV aa,bb 済 LogErrorRecordListener Bill Processor 済 Bill Writer 出力 DBMS 請求額TBL 【Step2】 請求書生成 【Step3】 メール送付 ジョブ実装 #2-‐3-‐1 ItemWriterの実装 ItemReaderとの違いはwriteItemsのみ @Named public class BillWriter implements ItemWriter { public void open(Serializable checkpoint) { } public void writeItems(List<Object> items) throws ExcepSon { } public void close() { } public Serializable checkpointInfo() { } } ジョブ実装 #2-‐3-‐1 ItemWriterの実装 コミットはwriteItemメソッドの呼び出し毎に実行 @Named public class BillWriter implements ItemWriter { @Override public void writeItems(List<Object> items) throws ExcepSon { List<Bill> bills = (List<Bill>) items; bills.stream() 複数のRead => Process .forEach(this::insertToDB); 結果のリストが引数に設定 } private void insertToDB(Bill bill) { .. } ... } ジョブ実装 #2-‐4 スキップリスナ • スキップ対象例外の発生時に呼ばれる • エラーレコードをロギングしたい 【Step1】 請求額確定 SalesFile Reader 入力 CSV aa,bb 済 LogErrorRecordListener Bill Processor 済 BillAmount Writer 【Step2】 請求書生成 済 出力 DBMS 請求額TBL 【Step3】 メール送付 ジョブ実装 #2-‐4-‐1 スキップリスナの実装 @Named public class LogErrorRecordListener implements SkipReadListener { private staSc final Logger log = ...; @Override public void onSkipReadItem(ExcepSon e) { InvalidRecordExcepSon ex = (InvalidRecordExcepSon) e; log.warn(“Row ”+ ex.getNum() + “ is Invalid Record.”); } } スキップした行情報のロギング。 スキップした行全体を別ファイルに書出すのもあり。 ジョブ実装 #3 ジョブの起動 • JobOperator経由で起動する • EJBタイマ、JAX-RS/Servletなど で起動 EJBタイマ 請求書発行ジョブ JobOperator JAX-‐RS/Servlet 【Step1】 LoggingErrorRecordListener 済 請求額確定 SalesFile Reader 済 Bill Processor 済 BillAmount Writer 済 【Step2】 請求書生成 【Step3】 メール送付 ジョブ実装 #3-‐1 ジョブの起動 @ScheduleによるEJB タイマー @javax.ejb.Singleton public class ScheduleTimer { @Schedule(hour=“4”, minute=“2”) public void Smeout() { // start JobOperator jobOperator = BatchRunSme.getJobOperator(); Property props = new Property(); props.setProperty(“fileName”, “/var/xxx/input.csv”); jobOperator.start(“issueBill”, props); } } ジョブIDと、ジョブプロパティを引数に起動 ジョブ実装 #3-‐2 ジョブの起動 EEサーバ上なのでJAX-RS経由でも起動できる @Path(“/jobs/{jobId}”) public class BatchResource { @POST public void start(@PathParam String jobId) { // start JobOperator jobOperator = BatchRunSme.getJobOperator(); Property props = new Property(); props.setProperty(“fileName”, “/var/xxx/input.csv”); jobOperator.start(“issueBill”, props); } ... ジョブ実装 #3-‐3 ジョブの停止 startの返り値のジョブ実行IDを引数に停止 @Path(“/jobs/{jobId}”) public class BatchResource { @POST public void start(@PathParam String jobId) { // start JobOperator jobOperator = BatchRunSme.getJobOperator(); long execId = jobOperator.start(jobId, props); // stop jobOperator.stop(execId); } ... ジョブ実装 #3-‐4 ジョブの再実行 Reader/Writerのopen引数にチェックポイントが設定 @Path(“/jobs/{jobId}”) public class BatchResource { @POST public void start(@PathParam String jobId) { // start JobOperator jobOperator = BatchRunSme.getJobOperator(); long execId = jobOperator.start(jobId, props); // restart jobOperator.restart(execId, props); } ... まとめ : jBatch利用の流れ 1. ジョブ設計 • ジョブを抽出。ステップと入出力を定義する。 2. ジョブ実装 • ステップの流れ、プロパティをジョブXMLに定義 • ItemReader/ItemProcessor/ItemWriterのJava実装 • JobOperator経由で起動、停止、リスタート制御 • EJBタイマ、JAX-‐RS Web-‐API 等 から起動 本日のコンテンツ • jBatchとは何か • jBatchをどうやって使うのか • メリット と デメリット • よくある疑問 • まとめ jBatch適用のメリット • 一定の型に填めて開発できる • 1つ1つのジョブ構造のAP設計簡略化 • 第3者(保守・改造時) が構造を理解しやすい • チェックポイント等のバッチ共通機能の恩恵 • Java EE サーバの機能が使える • JAX-RS(REST入出力) / JMS連携 / CDIによるDI • リソース管理: DB接続プール、スレッドプール jBatch適用のデメリット • 型に填めにくい処理への適用が難しい • 複数入力(マスタ / トランザクション読込)、複数出力 • 読み込み方の工夫で解決できることもある CSV aa,bb 1マスタ/複数トランザクションデータ毎 に読み込み 【Step】 売上データ集計 商品マスタ CSV aa,bb 売上データ Reader Processor Writer public class Aggregate { private Master master; private List<TransacSon> trans; DBMS 集計済 売り上げ jBatch適用のデメリット • 処理によっては記述が冗長 • XMLによる処理の流れの定義 • java.nio.file.Files.linesメソッドで十分な場合もある // CSVファイルより、15歳以上のみ抽出して別ファイルへ int sum = Files.lines(Path.get(“csv.txt”)) .map(line -‐> line.split(“,”)) Reader .map(arr -‐> // TODO array to Student) .filter(student -‐> s.getAge() > 15) Processor .forEachOrdered(s -‐> // write to file); Writer 本日のコンテンツ • jBatchとは何か • jBatchをどうやって使うのか • メリット と デメリット • よくある疑問 • まとめ QuesSon #1 Spring Batch と何が違うのか? jBatch と SpringBatch の違い #1 • jBatchは仕様、SpringBatchは実装 • jBatch実装: RI (GlassFish), JBeret (WildFly) • SpringBatch3.0より jBatchに準拠 • ライブラリの豊富さは先行のSpringBatch有利 SpringBatchの機能範囲 アーキテクチャ ジョブXML API定義 jBatch • 豊富なライブラリ群 • CSV/TSV, XML, JDBCページング • かゆいところに手が届く • 同一形式の複数ファイル読込 (stock_1.csv, stock_2.csv ...) • 複数行を1アイテムにマッピング jBatch と SpringBatch の違い #2 • SpringBatchはジェネリクス対応 public class Reader implements ItemReader<Stock> { @Override public Stock read() { // read stock ジェネリクス対応により } キャストが不要 } public class Processor implements ItemProcessor<Stock, Result> { @Override public Result process(Stock s) throws ExcepSon { ... } } QuesSon #2 もうXMLはつらいです。 Spring Batchでは Java でジョブ定義可能 @Bean public Job job() { return jobsBuilderFactory.get("myJob") .start(step1()).next(step2()) .build(); } @Bean protected Step step1(...) { return stepBuilderFactory.get("step1") .<Person, Person> chunk(10) .reader(reader).processor(processor).writer(writer) .build(); } Spring Batch -‐ Reference DocumentaSon hwp://docs.spring.io/spring-‐batch/trunk/reference/html/configureJob.html#javaConfig NetBeans “jBatch Suite” • NetBeanプラグインによる jBatch サポート • ジョブXMLの生成 • ブランク ItemXXX/Batchlet 生成 ドラック & ドロップ で ステップの流れを作成 QuesSon #3 ジョブの流れ定義は ジョブスケジューラ or jBatch ? やりたいことに応じて使い分ける • システム全体のジョブ管理にはスケジューラ • APサーバだけで完結しないジョブへの対応 • 例: ログバックアップ、サーバ再起動 • ジョブAP内の順序制御には jBatch • Javaコード (Decision) による複雑な条件判定も可 • job.xml変更時は *.war 再デプロイが必要 jBatch実装のこれからに期待 • 管理コンソールが強化されると jBatch はより便利に • ジョブフロー状況の図示、ジョブ間の順序制御 GlassFish4はテーブル形式でジョブ履歴を表示 QuesSon #4 スケールアウトは可能か? 今のところ仕様上は未対応 • jBatchの仕様としては定義されていない • 同一ジョブを複数サーバにデプロイする等の対処 • 実装サーバでの対応に期待 • クラスタ機能と連携したジョブ分散 • ジョブ毎のリソース管理 (スレッド数、Javaヒープ等) 本日のコンテンツ • jBatchとは何か • jBatchをどうやって使うのか • メリット と デメリット • よくある疑問 • まとめ まとめ #1 jBatchはジョブを実装する為のフレームワーク。 JobOperator Job Step1 Item Reader Step2 Step3 Item Processor Item Writer Job Repository <job id=“monthly_billing”> <step id=“step1” next=“step2”> <step id=“step2”> </job> public interface ItemReader { Object readItem() throws ... } まとめ #2 jBatchの利用により実現できること。 • ジョブXMLによる順序制御 • APの構造を決める (Reader/Processor/Writer) • チェックポイント管理 • エラーハンドリング (スキップ/リトライ) • ジョブ実装で Java EE の各種機能が使える まとめ #3 【ここが嬉しい】 • Java EE でバッチ処理が注目され始めた • 一定の型に填めてジョブ設計を再利用できる • ジョブに必要な共有機能が享受できる 【もう少しな部分】 • 型に填めにくい / 塡まらない場合もある • 現状は先発のSpring Batchの方が多機能 最後に 既にjBatch対応製品がリリースされています。 OSS製品なので是非まずは試してみてください! GlassFish4.1 OracleとJavaは、Oracle CorporaSon及びその子会社、関連会社の米国及びその他の国における登録商標です。 文中の社名、商品名等は各社の商標または登録商標である場合があります。
© Copyright 2024