SWFを動かしてみる

なんかまわりがきちんとブログ書いているのを見て、たまには書いてやろうと思った次第です。訳あって、AWS Simple Work Flowを動かして色々検証しています。SWFを一言でいうなら、非同期かつノンブロッキングなやりとりが含まれる複数コンポーネント間でのワークフローを比較的楽に書くためのサービス。それなりに奥が深いのと、実際に日本語の情報が皆無なので、ひとまず動かすところまででも晒してみようと思います(先に続くかわからない・・・)。

動かす言語はJavaにするので、他言語の人はすまん。ちなみにフレームワークも利用します。AWSが出しているFlow Frameworkで、こいつはSWFを使うのを(多分)楽にしてくれる。SWF自体はワークフローサービス(というかワークフローに伴う状態管理サービスが実態に近い)なので、HTTPコールさえできればどの言語からも利用可能ではあるはず。ただし多分大変。誰かRubyとかPythonでやってる人がいたらぜひコードとかみたいです。

Flowのベーシックな仕組み

Flow FrameworkはJavaAPTAOPライブラリであるAspectJを使って、コードの自動生成とコードのバイトコードエンハンスを行います。なので、まずAPTおよびAspectJが動くようにEclipseの設定を行う必要があります。詳細はこちらを見てくださいな。

コンセプト図、これだけだと何かわからなそうだけど・・・w


環境構築

さて、まずは環境構築をします。必要なものは、


まずは


のように、APTのコンパイラが動く設定をします。その後、AspectJのウィービングをロードタイムにする設定を施します。Installed JREJava Agent指定でAspectJのaspectjweaver.jarへのパスを設定しておきます。





自分はこんな感じ


ここまでで環境はEclipseを使ったAPTとAspectJの環境は整ったので、今度はダウンロードしたAWS SDKのサンプル(AwsFlowFrameworkの下)を持ってきます。大事な点としては、

  • AWSのマネージメントコンソールからSWFのページに行き、Manage Domains→Register Newでドメインを登録
  • access.properties
    • AWS.Access.IDに自分のアクセスIDを設定
    • AWS.Secret.Keyに自分のシークレットキーを設定
    • AWS.Account.IDに自分のアカウントIDを設定
    • S3.Access.ID, S3.Secret.Key, S3.Account.IDも同様に設定
    • domainはに登録したドメイン名を設定
    • Activity.Worker.LocalFolderには適当なTmpフォルダを設定
    • S3のバケットを2つ用意する。Workflow.Input.SourceBucketNameをWorkflow.Input.TargetBucketNameにそれぞれ設定
  • 普通のコンソールから確認したかったので、開発はEclipseでしてAntを使ってビルドする。build.xmlを自分の環境用に適切になおしておく(なんかAntにうるさく言われた部分はincludeantruntime="false"などした)
  • log4jの設定も適切に。
  • 余談だけど、AOPの設定の実体はsrc/META-INF/aop.xml。ここに利用しているAspectとウィービング指定の先があるので必要であれば足す

サンプルコード

ここまでやれば、動くはず。ひとまずシンプルなコードは下記の通りです。大事な部分だけひとまずわかればよいと思います。

  • 大事な部分
    • HelloWorkflow
      • ワークフローの定義
    • HelloActivity
      • アクティビティの定義
    • HelloWorkflowImpl
      • ワークフローの実装クラス
    • HelloActivityImpl
      • アクティビティの実装クラス
  • ブートストラップコード(あまり本質的ではない)
    • ActivityHost
      • アクティビティを起動する
    • WorkflowHost
      • ワークフローを起動する
    • WorkflowExecutor
      • ワークフローをキックして最初のコールを行う


ワークフローの定義はインターフェースとアノテーション(@Workflow, @WorkflowRegistrationOptions, @Execute)で行います。

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;

@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 600)
public interface HelloWorkflow {

	@Execute(version = "0.1")
	void helloWorld(String name);
}

次のアクティビティの定義。アクティビティもアノテーションで定義します(@Activities、@Activity(Activitiesあればいらないみたいだけど))

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
import com.amazonaws.services.simpleworkflow.flow.annotations.Activity;
import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;

@Activities(version = "0.1")
public interface HelloActivity {

	@Activity
	@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 30, defaultTaskStartToCloseTimeoutSeconds = 10)
	void hello(String name);
}

これでコンパイルすると、APTによる自動生成でコードが生成されます。これを使って実装を書きます。ワークフローの実装は下記のような感じです。HelloActivityClientは生成されたコードで、これを使います。直接Workflowの実装はアクティビティの実装をコールしません。SWFでは同期的に見えて、原則すべて非同期で動くので、その非同期な状態のコントロールをWorkflowとActivityの間に入って、SWFとFlowが行ってくれます。こことても重要です。最もシンプルな下記の実装ではあまり特段難しく見えないですが、この非同期かつノンブロッキングでやりとりしあっている事を意識しておくとよいです。

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import org.apache.log4j.Logger;

public class HelloWorkflowImpl implements HelloWorkflow {

	HelloActivityClient client = new HelloActivityClientImpl();

	static Logger log = Logger.getLogger(HelloWorkflowImpl.class);

	@Override
	public void helloWorld(String name) {
		client.hello(name);
		log.debug("end HelloWorkflowImpl");
	}

}

アクティビティ側は単なるJavaのコードも普通の実装クラスです。こちらは完全にロジックだけを記述します。

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import org.apache.log4j.Logger;

public class HelloActivityImpl implements HelloActivity {

	static Logger LOG = Logger.getLogger(HelloActivityImpl.class);

	@Override
	public void hello(String name) {
		String s = "hello " + name + "!";
		LOG.debug("HelloActivityImpl: " + s);
		System.out.println("HelloActivityImpl: " + s);
	}

}


ワークフローは全体のフローの定義とコントロールを行って、ビジネスロジックはアクティビティの方で書きます。さらに、これらをブートストラップするコードが必要です。ワークフロー、アクティビティがそれぞれ別個に起動され、SWFへポーリングしつつコントロールを待ちます。これらを便宜上、WorkflowHostとActivityHostとします。また最初にワークフローで定義したメソッドを呼ぶブートストラップコードが必要なので、それも作ります(WorkflowExecutor)。どれもほぼ決まりきった形なので、そういうものだとして最初は割り切って書いておくのが良さそうです。


ActivityHostでは、アクティビティの実装クラスを渡します。

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
import com.amazonaws.services.simpleworkflow.flow.examples.common.ConfigHelper;

public class ActivityHost {

	public static final String ACTIVITIES_TASK_LIST = "hello";

	static Logger log = Logger.getLogger(ActivityHost.class);

	public static void main(String[] args) throws IllegalArgumentException,
			IOException, InstantiationException, IllegalAccessException,
			SecurityException, NoSuchMethodException {
		ConfigHelper helper = ConfigHelper.createConfig();
		AmazonSimpleWorkflow client = helper.createSWFClient();
		final ActivityWorker worker = new ActivityWorker(client,
				helper.getDomain(), ACTIVITIES_TASK_LIST);
		worker.addActivitiesImplementation(new HelloActivityImpl());

		worker.start();

		log.debug("Activity Worker Started for Task List: "
				+ worker.getTaskListToPoll());

		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				try {
					worker.shutdownAndAwaitTermination(1, TimeUnit.MINUTES);
					log.debug("Activity Worker Exited.");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		log.debug("Please press any key to terminate service.");

		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.exit(0);

	}
}

WorkflowHostでは、ワークフロークラスの実装をClassで渡します。

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import java.io.IOException;

import org.apache.log4j.Logger;

import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
import com.amazonaws.services.simpleworkflow.flow.examples.common.ConfigHelper;
import com.amazonaws.services.simpleworkflow.flow.examples.common.Util;

public class WorkflowHost {

	public static final String DECISION_TASK_LIST = "HelloWorkflow";

	static Logger log = Logger.getLogger(WorkflowHost.class);

	public static void main(String[] args) throws IllegalArgumentException,
			IOException, InstantiationException, IllegalAccessException {
		ConfigHelper helper = ConfigHelper.createConfig();
		AmazonSimpleWorkflow client = helper.createSWFClient();
		final WorkflowWorker worker = new WorkflowWorker(client,
				helper.getDomain(), DECISION_TASK_LIST);
		worker.addWorkflowImplementationType(HelloWorkflowImpl.class);
		worker.start();
		log.debug("workflow starting...");
		Util.addShutdownHook(worker, log, "workflow host is terminated...");

		log.debug("Please press any key to terminate service.");

		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}

		System.exit(0);

	}
}


最後にWorkflowExecutorで実際にワークフローを開始します。1個だけ注意点があって、Workflowを生成するときには、これもAPTで自動生成したコードを使います。workflow.helloWorld("shinpei")の前のコード2行がその部分に当たります。

package com.amazonaws.services.simpleworkflow.flow.examples.hello;

import java.io.IOException;

import org.apache.log4j.Logger;

import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.flow.examples.common.ConfigHelper;
import com.amazonaws.services.simpleworkflow.model.WorkflowExecution;

public class WorkflowExecutor {

	static Logger log = Logger.getLogger(WorkflowExecutor.class);

	public static void main(String[] args) throws IllegalArgumentException,
			IOException {
		ConfigHelper helper = ConfigHelper.createConfig();
		AmazonSimpleWorkflow swf = helper.createSWFClient();

		log.debug("domain : " + helper.getDomain());

		HelloWorkflowClientExternalFactory factory = new HelloWorkflowClientExternalFactoryImpl(
				swf, helper.getDomain());
		HelloWorkflowClientExternal workflow = factory.getClient();
		workflow.helloWorld("shinpei");
		WorkflowExecution executions = workflow.getWorkflowExecution();
		log.debug("Started hello workflow with workflowId=\""
				+ executions.getWorkflowId() + "\" and runId=\""
				+ executions.getRunId() + "\"");
	}
}


ここまで出来れば、あとはEclipse上で実行するか、Antなどでコマンドラインから実行するかをすればよいです。ちなみにAntで実行する場合は、3つウインドウを起動しておいて、下記のように1個づつRunすればOKです。

ant -f build.xml -Dmain-class="com.amazonaws.services.simpleworkflow.flow.examples.hello.ActivityHost" run
ant -f build.xml -Dmain-class="com.amazonaws.services.simpleworkflow.flow.examples.hello.WorkflowHost" run
ant -f build.xml -Dmain-class="com.amazonaws.services.simpleworkflow.flow.examples.hello.WorkflowExecutor" run

ひとまずこんなところでしょーか。次回あたりに書く余裕があれば、SWFで分岐・逐次・発散・リトライあたりを書こうかなと思います。特にリトライはそれなりに考えてあって、色々手立てがあるのでひとまず結構やれるんじゃないかと思います。ちなみに次書く保証はあまりないです・・・