[Rust] RedisのListをキューとして利用した外部コマンド起動の仕組みを構築する

2024年3月12日火曜日

Redis Rust

t f B! P L

enter image description here

はじめに

プログラミング言語Rustを用いて、RedisのListデータ型をキューとして扱い、そのキューに登録された外部コマンドを順に実行するシステムの構築方法を紹介します。
Redisは、高速なデータ格納とアクセスを可能にするキーバリューストアです。特に、そのListデータ型は、キューとしての使用も可能です。
今回紹介する方法は、プログラム間でのメッセージパッシングやタスクキューなど、多くの用途に活用できます。

スポンサーリンク

全体イメージ

今回、作成するキューシステムのイメージです。

enter image description here

Producerは、外部コマンドの実行が必要なタイミングでキューを登録します。
Consumerは、キューが登録されるまで待機します。キューが登録されると、指定されたコマンド名に対応する外部コマンドのパスを設定ファイルから取得します。そして、取得したパスの外部コマンドを実行し、完了後、再びキューの登録があるため待機します。

準備

Redisのクレートを設定

RustプロジェクトでRedisを使い始めるには、まずredisクレートをプロジェクトの依存関係に追加する必要があります。また、TOML形式の設定ファイルを読み込むため、tomlおよびserdeクレートも追加します。

プロジェクトのCargo.tomlファイルに以下の行を追加します。

[dependencies]
redis = "0.24.0"
serde = { version = "1.0", features = ["derive"] }
toml = "0.8.10"

設定ファイルを作成

コマンド名と実行する外部コマンドの具体的な情報を指定するため、TOML形式の設定ファイルを準備します。ファイル名は Config.tomlとして保存します。
このファイルには、各コマンドの名前と、それを起動するためのパスを記述します。

以下は、その設定例です。

[[commands]]
name = "cmd_A"
path = "path/to/dir/cmd_A.sh"

[[commands]]
name = "cmd_B"
path = "path/to/dir/cmd_B.sh"

[[commands]]
name = "cmd_C"
path = "path/to/dir/cmd_C.sh"

TOMLフォーマットでは、[[commands]]のようにセクション名を二重の角括弧で囲むことで、配列の要素としてコマンドの定義を追加できます。この構造を用いることで、複数のコマンドを簡潔にかつ明確に列挙することが可能です。各コマンドはnameキーによって一意に識別され、pathキーにはその実行ファイルのパスを指定します。
この設定ファイルを使うことで、呼び出し元は外部コマンドをnameキーによって指定できます。

次は、Rustの実装に入ります。

コードの概要

このRustプログラムは、設定ファイルからコマンドのリストを読み込み、外部コマンドをキューベースで実行するものです。

構造体の宣言

今回の仕組みで使用する構造体を定義します。

設定ファイルの構造

Config構造体は設定ファイルのルートを表し、commandsフィールドを通じて複数のQueueCommandを保持します。QueueCommand構造体は、実行すべきコマンドの名前とパスを定義します。これにより、設定ファイルはプログラムの実行動作を柔軟に指定できます。

/// 設定ファイルのルート
#[derive(Deserialize)]
pub struct Config {
    /// コマンドのリスト
    pub commands: Vec<Command>,
}

/// コマンド情報(実行すべきコマンドの名前とパスを定義)
#[derive(Deserialize)]
struct Command {
    name: String,
    path: String,
}

キューに登録するデータの定義

今回のサンプルでは、Redisのキューに次のTOML形式で起動コマンド名と引数を格納します。
引数は配列として格納し、複数指定できるようにします。

name = "cmd_A"
args = [ "red", "yellow", "green" ]

QueueData構造体は、このTOML形式の起動要求を格納するため構造体です。
キューに登録するコマンドの名前と、それを実行する際に使用される引数を定義します。

#[derive(Serialize, Deserialize)]
struct QueueData {
    // コマンド名
    name: String,
    // 起動パラメータ
    args: Vec<String>,
}

設定ファイルの読み込み

設定ファイルの内容を読み込む関数を作成します。
この関数は、Config.tomlを読み込み、Config構造体を返します。

/// 設定ファイルの読み込み
fn read_config() -> Result<Config, Box<dyn Error>> {
    let str: String = fs::read_to_string("Config.toml")?;
    let config: Config = toml::from_str(&str)?;
    Ok(config)
}

キューからデータを取り出す

Redisのキューからデータを取り出す関数です。FIFOの概念でキューは先頭から取り出します。

キューの取り出しにはblpopコマンドを使用します。blpopは、指定されたキュー("my_queue")から要素をブロッキングで取り出すコマンドです。ここで、第二引数の0.0はタイムアウト値を指定しており、この場合はタイムアウトなしを意味します。つまり、キューに何かがプッシュされるまで待機します。
最後に、TOML形式の文字列として格納されたキューを、QueueData構造体に変換します。

fn dequeue() -> redis::RedisResult<Option<QueueData>> {
    // Redisとの接続を確立
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut con = client.get_connection()?;

    // キューから1件取り出し(キューが登録されまで待機)
    let value: Vec<String> = con.blpop("my_queue", 0.0)?;
    println!("{:?}", value);

    // toml形式の文字列を構造体に変換
    let queue_data: Option<QueueData> = match value.get(1) {
        Some(str) => match toml::from_str(&str) {
            Ok(data) => Some(data),
            Err(_err) => None,
        },
        None => None,
    };

    // 取り出したキューを返却
    return Ok(queue_data);
}

キューの要求に基づく外部コマンドの実行

この関数は、キューに登録された要求に基づいて外部コマンドを実行します。具体的には、QueueCommand構造体とQueueData構造体から情報を取得し、その情報を用いて外部のコマンドを実行します。

fn execute_command(
    command: &QueueCommand,
    queue_data: &QueueData,
) -> std::io::Result<std::process::Output> {
    // 外部コマンドを実行
    Command::new(&command.path)
        .args(queue_data.args.clone())
        .output()
}

メインループの実装

無限ループで、キューの登録を待機および外部コマンドの実行する関数を実装します。

fn main_loop() -> Result<Config, Box<dyn Error>> {
    // 設定ファイルの読み込み
    let config: Config = read_config().expect("設定ファイルの読み込みに失敗");

    // 無限ループ
    loop {
        // キューから1件取り出し(キューが登録されまで待機)
        let dequeue_result = dequeue().unwrap_or(None);

        // キューの読み取りに失敗した場合、リトライ
        if dequeue_result.is_none() {
            continue;
        }

        let queue_data = dequeue_result.unwrap();

        // 設定ファイルから対応するコマンド情報を取得
        let find_command = config
            .commands
            .iter()
            .find(|cmd| cmd.name == queue_data.name);

        // 外部コマンドの実行
        if let Some(command) = find_command {
            match execute_command(command, &queue_data) {
                Ok(output) => println!("コマンドを実行しました"),
                Err(error) => println!("{:?}", error),
            }
        } else {
            println!(
                "設定ファイルにコマンドが見つかりません(name={})",
                queue_data.name
            );
        }
    }
}

メインループの処理は、大きく次の4つのステップに分けられます。

  1. 設定ファイルの読み込み
    最初に、read_config関数を呼び出して設定ファイルを読み込みます。

  2. 無限ループ
    loopキーワードを使用して無限ループを作成します。このループ内で、プログラムは継続的にキューからコマンドの実行リクエストを取り出し、それを処理します。

  3. キューからのデータ取り出し
    dequeue関数を呼び出してキューからデータを1件取り出します。この関数はOption<QueueData>を返すため、何も取り出せなかった場合はNoneを返します。

  4. コマンドの検索と実行
    取り出したデータ(queue_data)に基づいて、設定ファイル内から対応するコマンドを検索します。
    一致するコマンドが見つかった場合、そのコマンド(command)とキューのデータ(queue_data)をexecute_command関数に渡して外部コマンドを実行します。

ここまでで、キューを待機する側の実装は完了です。
次は、キューを登録する側の実装を見てきましょう。

スポンサーリンク

キューを登録する側の実装

最後に、キューを登録する側の関数を実装します。
この関数は、外部コマンド名と引数を QueueData構造体に格納し、最終的には構造体をTOML形式の文字列に変換したデータをRedisのキューに登録します。

pub fn enqueue(name: String, args: Vec<String>) -> Result<(), Box<dyn Error>> {
    // キューに登録するデータを作成
    let queue_data : QueueData = QueueData {
        name: name,
        args: args
    };

    // 構造体をTOML形式の文字列に変換
    let str = toml::to_string(&queue_data)?;

    // Redisに接続
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut con = client.get_connection()?;

    // Redisのキューへ登録
    con.rpush("my_queue", str)?;
    return Ok(());
}

まとめ

RustとRedisで外部コマンドを実行するキューシステムを実装する例を紹介しました。
キューを使ってFIFOでバッチを実行する仕組みは、特に業務システムのバッチ処理の業務要件として多くあります。こういう仕組みを業務システムごとに作りがちですが、1つ汎用的な仕組みの物を作っておいてコスト削減にもつながって便利です。

最後に、今回作成したコードの全文を載せておきます。

use std::{error::Error, fs, process::Command};

use redis::Commands;
use serde::{Deserialize, Serialize};

/// 設定ファイルの内容
#[derive(Deserialize, Debug)]
pub struct Config {
    /// コマンドのリスト
    commands: Vec<QueueCommand>,
}

/// コマンド定義
#[derive(Deserialize, Debug)]
struct QueueCommand {
    // コマンド名
    name: String,
    // 起動コマンドのパス
    path: String,
}

/// キューに登録する内容
#[derive(Serialize, Deserialize, Debug)]
pub struct QueueData {
    // コマンド名
    name: String,
    // 起動パラメータ
    args: Vec<String>
}

// メインループ
pub fn main_loop() -> Result<Config, Box<dyn Error>> {
    // 設定ファイルの読み込み
    let config: Config = read_config().expect("設定ファイルの読み込みに失敗");

    // 無限ループ
    loop {
        // キューから1件取り出し(キューが登録されまで待機)
        let dequeue_result = dequeue().unwrap_or(None);

        // キューの読み取りに失敗した場合、リトライ
        if dequeue_result.is_none() {
            continue;
        }

        let queue_data = dequeue_result.unwrap();

        // 設定ファイルから対応するコマンド情報を取得
        let find_command = config
            .commands
            .iter()
            .find(|cmd| cmd.name == queue_data.name);

        // 外部コマンドの実行
        if let Some(command) = find_command {
            match execute_command(command, &queue_data) {
                Ok(output) => println!("コマンドを実行しました"),
                Err(error) => println!("{:?}", error),
            }
        } else {
            println!(
                "設定ファイルにコマンドが見つかりません(name={})",
                queue_data.name
            );
        }
    }
}


/// 設定ファイルの読み込み
pub fn read_config() -> Result<Config, Box<dyn Error>> {
    let str: String = fs::read_to_string("Config.toml")?;
    let config: Config = toml::from_str(&str)?;
    println!("{:?}", config);
    Ok(config)
}

// 外部コマンドの実行
fn execute_command(
    command: &QueueCommand,
    queue_data: &QueueData,
) -> std::io::Result<std::process::Output> {
    // 外部コマンドを実行
    Command::new(&command.path)
        .args(queue_data.args.clone())
        .output()
}

// キューから1件取り出し
pub fn dequeue() -> redis::RedisResult<Option<QueueData>> {
    // Redisとの接続を確立
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut con = client.get_connection()?;

    // キューから1件取り出し(キューが登録されまで待機)
    let value: Vec<String> = con.blpop("my_queue", 0.0)?;
    println!("{:?}", value);

    // toml形式の文字列を構造体に変換
    let queue_data: Option<QueueData> = match value.get(1) {
        Some(str) => match toml::from_str(&str) {
            Ok(data) => Some(data),
            Err(_err) => None,
        },
        None => None,
    };

    // 取り出したキューを返却
    return Ok(queue_data);
}

// キューへ1件登録
pub fn enqueue(name: String, args: Vec<String>) -> Result<(), Box<dyn Error>> {

    // キューに登録するデータを作成
    let queue_data : QueueData = QueueData {
        name: name,
        args: args
    };

    // 構造体をTOML形式の文字列に変換
    let str = toml::to_string(&queue_data)?;

    // Redisに接続
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut con = client.get_connection()?;

    // Redisへキューを登録
    con.rpush("my_queue", str)?;
    return Ok(());
}
スポンサーリンク
スポンサーリンク

このブログを検索

Profile

自分の写真
Webアプリエンジニア。 日々新しい技術を追い求めてブログでアウトプットしています。
プロフィール画像は、猫村ゆゆこ様に書いてもらいました。

仕事募集もしていたり、していなかったり。

QooQ