PR TIMESデザイナー&エンジニアブログ BREAK TIMES

PR TIMES Developer Blog(デザイナー&エンジニアによる開発者ブログ)

PR TIMES Developer Blog
デザイナー&エンジニアによる開発者ブログ

複数サーバーでバッチを平行起動した時に便利すぎたSQLトランザクション

こんにちは、PR TIMESエンジニアの落合です!

みなさんも、普段バッチを作成する事があると思います。
1台のサーバーで、10分に1回などの実行に関しては、処理内容の事だけを考えてコーディングすればいいですが、
負荷分散や、バッチサーバーを冗長化する場合、複数台のサーバーで同時刻に実行したい場合もあると思います。
ただ、何も考慮せずに、複数台で起動してしまうと、重複した処理を実施してしまいます。
今回は、そういった時に、MySQLトランザクション機能を使って、簡単に解消出来た便利な機能を紹介させていただければと思います!

前提

サンプル用のテーブルは、以下のようになっている前提です。
データは、適当に10万件追加しました。

CREATE TABLE IF NOT EXISTS `site` (
  `id` int(13) NOT NULL AUTO_INCREMENT,
  `site_name` varchar(256) NOT NULL,
  `status` int(4) NOT NULL DEFAULT '0'
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT='サイト情報';

プログラムに関しては、10万件のデータを1回のバッチで1000件ずつ処理を行い、5分置きに実行するようなバッチです。
今回は、サンプルなので、処理はせずに1件あたり1秒のスリープを実行しています。

$host = "localhost";
$user = "xxxxxxxx";
$pass = "xxxxxxxx";
$db = "develop";
$dsn = "mysql:dbname=$db;$host=$host;charset=utf8";

//まずは、DB接続
try{
    $pdo = new PDO ($dsn, $user, $pass);
} catch(PDOException $e) {
    var_dump("DB connected error!");
    exit();
}

// selectで1000件の情報を取得する。
$sql = "select * from site where status=0 Limit 2";
$res = $pdo->query($sql);

foreach($res as $v){
    // 取得したデータを、実行中のステータスに変更する。
    $sql = "UPDATE site SET status=1 where id=" . $v["id"];
    $res = $pdo->exec($sql);

    //実行処理内容が1秒かかるとします。
    sleep(1);

    // 取得したデータを、完了のステータスに変更する。
    $sql = "UPDATE site SET status=2 where id=" . $v["id"];
    $res = $pdo->exec($sql);
}
// 未実行件数を取得して、0件の場合は、ステータスを0に戻す。
$sql = "select count(*) as cnt from site where status=0";
$res = $pdo->query($sql);
$count_data = $res->fetch();
if($count_data["cnt"] == 0){
    $sql = "UPDATE site SET status=2 where id=" . $v["id"];
    $res = $pdo->exec($sql);
}

処理時間が同時刻に複数サーバーで実行した場合、ループ処理中に時間がかかると、最初に起動したバッチと、2台目に起動したバッチが、取得する内容が重複する為、このままだと分散処理が正常に行えない事象が発生してしまいます。

SQLトランザクションを使用して解消!

MySQLトランザクションには、4段階の分離レベルが存在しています。
ここで、全てを話してしまうと長くなってしまうので、割愛します。
以下のページに詳しく解説してありますので、ご参考にしてください!
MySQLのトランザクション分離レベル

さて、このトランザクションの挙動を応用して、解決したいと思います。
まず、アップデート自体を禁止する為の設定や、読み込み自体を拒否するような設定などがありますが、
今回はserializableに設定を行って、commitを実行しない限り、ロック中のレコードには、selectをしても返却されない状態にしたいと思います。

ロック用のテーブルを用意する

今回のSQLでは、該当するレコードをロックしてしまうので、処理実行中かを判断する為のテーブルを用意します。
そこに、レコードで実行プログラム名を追加していきます。

CREATE TABLE IF NOT EXISTS `cron_lock` (
  `id` int(13) NOT NULL AUTO_INCREMENT,
  `name` varchar(256) NOT NULL
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT='ロックテーブル';

ALTER TABLE `cron_lock` ADD PRIMARY KEY (`id`);

INSERT INTO `cron_lock` (`id`, `name`) VALUES(1, 'test.php');

それでは、ロックデーブルが完成したら、sshを2個立ち上げて、テストしてみます。

#プロセスAで実行
mysql> use develop
mysql> SET tx_isolation = SERIALIZABLE;
mysql> SET innodb_lock_wait_timeout=30;
mysql> START TRANSACTION;
mysql> select * from cron_lock where name='test.php' FOR UPDATE;
+----+----------+
| id | name     |
+----+----------+
|  1 | test.php |
+----+----------+
1 row in set (0.00 sec)

次に同じクエリを実行したら、レスポンスが返却されない事がわかると思います。

#プロセスBで実行
mysql> use develop
mysql> SET tx_isolation = SERIALIZABLE;
mysql> SET innodb_lock_wait_timeout=30;
mysql> START TRANSACTION;
mysql> select * from cron_lock where name='test.php' FOR UPDATE;
|
# ロックされているので返却を待っている状態です。

次に、プロセスAをcommitして、ロックを解除すると、プロセスBで順番待ちしているselectが実行されます。

#プロセスAで実行
mysql> COMMIT;

すると、プロセスBのSelectの結果が返却されると思います。

#プロセスBで実行
mysql> select * from cron_lock where name='test.php' FOR UPDATE;
+----+----------+
| id | name     |
+----+----------+
|  1 | test.php |
+----+----------+
1 row in set (20.91 sec)

結果は分かっていても、心の中では、オォォーって感動してしまいます。(笑)
さて、この挙動を応用して、複数同時起動しても実行待機してくれる仕組みをプログラムで作成していきたいと思います。

負荷分散対応バッチのプログラムについて

さて、先ほど作成したcron_logテーブルを用いて、プログラムにしていきたいと思います。

$db = "develop";
$dsn = "mysql:dbname=$db;$host=$host;charset=utf8";
//まずは、PDOでDB接続を行う。
try{
    $pdo = new PDO ($dsn, $user, $pass);
} catch(PDOException $e) {
    var_dump("DB connected error!");
    exit();
}

// cron_lockトランザクションを設定して、レコードをロックする。
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->query("SET innodb_lock_wait_timeout=30;");
$pdo->beginTransaction();
$sql = "select * from cron_lock where name='test.php' FOR UPDATE;";
$res = $pdo->query($sql);

// selectで1000件の情報を取得する。
$sql = "select * from site where status=0 Limit 2";
$res = $pdo->query($sql);
foreach($res as $v){
    // 取得したデータを、全件処理開始フラグを設定する。
    $sql = "UPDATE site SET status=1 where id=" . $v["id"];
    $array[] = $v;
}

// cron_lock解除
$pdo->commit();

foreach($array as $v){
    //実行処理内容が1秒かかるとします。
    sleep(1);

    // 取得したデータを、完了のステータスに変更する。
    $sql = "UPDATE site SET status=2 where id=" . $v["id"];
    $res = $pdo->exec($sql);
}

// 未実行件数を取得して、0件の場合は、ステータスを0に戻す。
$sql = "select count(*) as cnt from site where status=0";
$res = $pdo->query($sql);
$count_data = $res->fetch();
if($count_data["cnt"] == 0){
    $sql = "UPDATE site SET status=2 where id=" . $v["id"];
    $res = $pdo->exec($sql);
}

最後に

今回のコードは、あくまでも参考までのコードでしかありませんが、これらをクラス化しておき、
色んな多重起動するバッチに使用するのもいいかと思います。
設計時に、色々考え、考慮したのですが、毎分実行するようなプログラムで、処理が追いつかない時に、複数のプロセスで同時起動したい場合などにも使える機能だと思います。
実行時間短縮に、プロセスをForkして、並列処理を実行して、速度を担保したとしても、複数サーバーで実行出来ていれば、冗長化出来ているので、安心できます。

今回は、簡単だと思った事が、ちゃんと仕様として、考えたら、意外と難しかったというハマりについて向き合った結果、トランザクションの新たな使い方を発見したので、書かせていただきました。
これを機に、複数サーバーによる、バッチを同時起動する事で、サービスの事前処理などの速度改善に役立てたいと思っています。