Azure Batch AI Trainingを利用したツイートデータ分類モデルの分散学習

はじめまして、システムソリューション部の松本です。

今回は、ディープラーニングの分散学習を可能にするサービス「Azure Batch AI Training」を用いて、Twitterの投稿に対する分類モデルを分散学習させてみようと思います。

アウトライン

  1.  Azure Batch AI Trainingとは
  2.  Azure Batch AI Training APIを使用してディープラーニング分散処理を行う流れ(Python&Tensorflowを使用した例)
  3. Tensorflowによるリツイート数でのツイートCNN2値分類
  4. 結果
  5. まとめ
  6. Appendix:独自Dockerコンテナssh設定

1. Azure Batch AI Trainingとは

クラウドの複数のGPUVM上でディープラーニング分散処理を行えるサービスです。

VMの設定・起動からジョブの実行までブラウザ上でUI操作して行うのではなく、
全て提供されているAPIを用いてプログラムベースで進めるものとなっています。

APIはPython、Java、C#向けに提供されており、ディープラーニングのフレームワークはTensorflow、CNTK、Caffe、Chainerがサポートされています。
Azure-CLIを用いて利用することもできます。

VM数、NFS等のノード設定や、VM上で動かす環境構築済みコンテナ等を指定後、分散処理を行うスクリプトとデータをVM側へ渡してジョブを実行すれば、後はクラウドVM上で分散処理が行われ、アウトプットがAzureファイル共有等に出力されるといった仕組みです。

(Azure AI Batch : https://batchaitraining.azure.com/)

2. Azure Batch AI Training APIを使用してディープラーニング分散処理を行う流れ (Python&Tensorflowを使用した例)

マイクロソフト様が作成された、NFSとAzureファイル共有をマウントした2つの
GPUVM上で分散処理を行うAzure Batch AI Private Previewサンプルプログラムを引用させていただき流れの説明を行います。
※現在Private Preview中で、今後Public PreviewのタイミングでいくつかのAPIが変更にるため、サンプルコードも修正が必要となります。
https://github.com/azure/Azbait-privatePreview

大まかな流れとしては、以下のようになります。

  1. Azureサブスクリプション準備、ストレージアカウントとファイル共有を作成
    == 以下Azure Batch AI Training APIを使用 ==
  2. 認証
  3. NFS作成&作成したNFSとAzureファイル共有をマウントしたGPUVMクラスタ作成
  4. NFSへ分散処理Tensorflowスクリプトと各種データをコピー
  5. ジョブを作成&実行
  6. Azureファイル共有に保存されたアウトプットファイルをローカルへコピー
  7. ジョブとクラスタの削除

2.1 Azureサブスクリプションの準備、ストレージアカウントとファイル共有作成

VM上で処理されたアウトプットファイルを保存するため、Azureストレージアカウントとファイル共有を作成します。

ストレージアカウントを作成するにはAzureサブスクリプションが必要となります。
Azureサブスクリプション登録は、Azureのサイトから行えます。

登録したアカウントでポータルサイトにログインすると、以下のような画面が開きます。

https://portal.azure.com

左のメニューにある「ストレージアカウント」を選択します。

左上の「+追加」を選択します。

各種設定を上記のように入力した後、下部の「作成」を選択します。
名前とリソースグループ名は任意です。
ここで指定した名前とリソースグループ名は後程APIを使用する際に使用します。
「※1:リソースグループ 、※2: 名前」

これでストレージアカウントは作成されました。

左のメニューからストレージアカウントのアイコンを選択後、先ほど作成した
ストレージアカウントを選択します。

上記キーは、後程APIを使用してストレージアカウントにアクセスする際に使用します。※3

次にファイル共有を作成します。

左のメニューからストレージアカウントアイコンを選択し、先ほど作成した
ストレージアカウントを選択後、画面真ん中のファイルを選択します。

左上の「ファイル共有」を選択し、任意の名前を入力後「OK」を押します。

作成したファイル共有の右側にあるメニューからプロパティを選択します。

右側にあるURLは後でAPIから使用します。※4

2.2 認証

進める前に、必要な各種コンフィグ値を準備しておきます。

subscription_id = [サブスクリプションID]
url = "https://eastus.batchaitraining-test.azure.com"
api_version =  "2017-05-01"
resource_group_name = [リソースグループ名※1]
authentication_thumbprint = [サブスクリプションアクセスキー]
region = "eastus"

storage_account_name = [上記ストレージアカウント名※2]
storage_account_key = [上記ストレージアカウントキー※3]
storage_file_share_url = [上記ファイル共有のURL※4]

※urlとapi_versionはPreview版用に用意されたものを使用していますが、正式版では適宜修正が必要かもしれません。

認証はbatchaitrainingclient.BatchAITrainingClientで行います。

import os
import subprocess
import time
from datetime import datetime
import requests

import batchaitrainingclient as training
import batchaitrainingclient.models as trainingModels

def GetBatchTrainingClient():
client = training.BatchAITrainingClient(
subscription_id = subscription_id,
api_version = api_version,
base_url= url
)
client._client.add_header("x-ms-auth-cert", authentication_thumbprint)
client.config.generate_client_request_id = True
return client

client = GetBatchTrainingClient()

2.3 NFS作成&作成したNFSをマウントしたGPUVMクラスタ作成

リソースグループ名、クラスタ名、GPUVM数、ネットワークファイルシステム名を
指定して、NFSとAzureファイル共有をマウントしたクラスタを作成します。

今回は2つのNC6 GPUVMを用いるため、第3引数を2としています。

try:
    clusterName = "clusterwithnfs"
    nfsName = "demonfs"
    clusterId = CreateClusterWithNFS(resource_group_name , clusterName, 2, nfsName)
except Exception as e:
    print(e)
    raise

少し長いですが、CreateClusterWithNFSの実装は以下に記載しています。

ClusterCreateParametersでクラスタの各種設定を作成しています。
vm_sizeはVMのサイズ、target_number_of_vmsは使用するVM数、
node_setupはノード設定です。mount_volumesで、VMにマウントする
NFSとAzureファイル共有を指定しています。

NFSはCreateSingleNodeNFSで作成しています。

クラスタ作成後、WaitForClusterStateでクラスタの状態がsteadyになるのを待っています。

# NFSとAzureファイル共有をマウントしたクラスタ作成
def CreateClusterWithNFS(resourcegroupName, clustername, numberofGPUVMs, nfsName):
    try:
        print("ClusterName: {0}".format(clustername))
        client = GetBatchTrainingClient()
        nfsId = GetNFSId(resourcegroupName, nfsName)
        if(nfsId == None):
            nfsId = CreateSingleNodeNFS(resourcegroupName, nfsName)
        
        cluster = client.cluster.create(
            resource_group_name = resourcegroupName, 
            cluster_name = clustername, 
            # クラスタの設定パラメーター
            parameters = trainingModels.ClusterCreateParameters(
                location = region, 
                vm_size = "STANDARD_NC6", 
                user_account_settings = trainingModels.UserAccountSettings(
                    group_id = 1,
                    admin_user_name = 'your_user_name',
                    admin_user_ssh_public_key = get_ssh_key(),
                    admin_user_password = 'your_password'),
                target_number_of_vms = numberofGPUVMs, 
                node_setup = trainingModels.NodeSetup(
                    mount_volumes = trainingModels.MountVolumes(
                        # VMにマウントするAzureファイル共有
                        azure_file_share_references = [
                            trainingModels.AzureFileShareReference(
                                account_name = storage_account_name,
                                azure_file_url= storage_file_share_url, 
                                relative_mount_path = "myazfileshare",
                                credentials_info = trainingModels.AzureStorageCredentialsInfo(
                                    account_key = storage_account_key))],
                        # VMにマウントするNFS
                        file_server_references = [
                            trainingModels.FileServerReference(
                                file_server = trainingModels.ResourceId(nfsId),
                                relative_mount_path = "mynfs",
                                mount_options = "rw"
                                )]
                    )
                )
            )
        )           
        
        # クラスタ状態がsteadyになるのを待つ
        cluster = WaitForClusterState(resourcegroupName, clustername, trainingModels.AllocationState.steady)
        return cluster.id    
        
    except BaseException as e:
        print(e)    
        raise

CreateSingleNodeNFS、WaitForClusterStateの実装は以下です。

def GetCluster(resourceGroupName, clusterName):
    """Get the Cluster
    """
    client = GetBatchTrainingClient()
    cluster = client.cluster.get(resourceGroupName, clusterName)
    return cluster
    
def PrintClusterStatus(resourceGroupName, clusterName):
    cluster = GetCluster(resourceGroupName, clusterName)
    print(
        "Cluster state = {0}. Target= {1} Allocated = {2}, NumIdle = {3}, NumUnusable = {4}, NumRunning = {5}, NumPreparing = {6}".format(
            cluster.allocation_state,
            cluster.target_number_of_vms,
            cluster.current_number_of_vms,
            cluster.node_state_counts.idle_node_count,
            cluster.node_state_counts.unusable_node_count,
            cluster.node_state_counts.running_node_count,
            cluster.node_state_counts.preparing_node_count))
    if (cluster.errors != None):
        for error in cluster.errors:
            print("Cluster error = {0}, ErrorMessage = {1}".format(error.code, error.message))
            print("Details:")
            errorDetails = error.details
            for errorDetail in errorDetails:
                print("{0}:{1}".format(errorDetail.name, errorDetail.value))
    return cluster
    
def WaitForClusterState(resourceGroupName, clusterName, targetState):
    while True:
        cluster = PrintClusterStatus(resourceGroupName, clusterName)
        if cluster.allocation_state == targetState:
            return cluster
            break
        else:
            time.sleep(5)
    
def get_ssh_key():
    """Reads default ssh public key."""
    if os.name == 'nt':
        return None
    try:
        key = subprocess.check_output(['ssh-keygen -y -f ~/.ssh/id_rsa'],
                                      shell=True)
        return key.decode().split()[1]
    except:
        raise EnvironmentError('Please generate default ssh key using '
                               'ssh-keygen')

# NFSのIDを取得
def GetNFSId(resourceGroupName, fsName):
    try:
        client = GetBatchTrainingClient()    
        nfs = client.file_server.get(resourceGroupName, fsName)
        print(nfs.id)
        return nfs.id
    except Exception as e:
        print (e)
        return None
        
# シングルノードのNFS作成
def CreateSingleNodeNFS(resourceGroupName, fsName):    
    try:
        print("SingleNodeNFSName: {0}".format(fsName))        
        client = GetBatchTrainingClient() 
        client.file_server.create(
            resource_group_name = resourceGroupName,
            file_server_name = fsName,
            parameters = trainingModels.FileServerCreateParameters( 
                location = region,
                new_file_server = trainingModels.NewFileServer(
                    vm_size = 'Standard_D1_V2',
                    ssh_configuration = trainingModels.SshConfiguration( 
                        user_account_settings = trainingModels.UserAccountSettings(
                            group_id = 1, 
                            admin_user_name = 'your_user_name',
                            admin_user_ssh_public_key=get_ssh_key(),
                            admin_user_password = 'your_password')),
                    data_disks = trainingModels.DataDisks(
                        disk_size_in_gb = '10',
                        number_of_disks = '2',
                        storage_account_type = 'Standard_LRS'))))


        while (True):
            fs = client.file_server.get(resource_group_name = resourceGroupName, file_server_name = fsName)
            if (fs.provisioning_state == trainingModels.FileServerProvisioningState.succeeded):
                print ("file server created")
                return fs.id
            print ("FileServer provisioning state = {0}".format(fs.provisioning_state.name))
            time.sleep(5)
    except Exception as e:
        print(e)
        raise

2.4 NFSへTensorflowスクリプトと各種データをコピー

VMにマウントされたNFSへ、分散処理を行うTensorflowスクリプトと
各種データファイルをコピーします。

def GetFileServerSSHEndPoint(resourceGroupName, fsName):
    fsr = trainingModels.MountSettings()
    client = GetBatchTrainingClient()
    fs = client.file_server.get(resourceGroupName, fsName)
    return fs.new_file_server.mount_settings.file_server_public_ip

sshEndPoint = GetFileServerSSHEndPoint(resource_group_name, nfsName)
print(sshEndPoint)

上記を実行するとNFSのIPアドレスを取得できますので、今回はscpで
ローカルからNFSへコピーすることにします。

・スクリプト

$ scp -P 22 distributed_cnn_twitter_classifier.py your_user_name@[NFS_IP]:/mnt/data/TWITTER/

・ツイートデータ

scp -P 22 tweet_data/* your_user_name@[NFS_IP]:/mnt/data/TWITTER/tweet_data/

・Word2Vec学習済みデータ

$ scp -P 22 word2vec.gensim.model your_user_name@[NFS_IP]:/mnt/data/TWITTER/
$ scp -P 22 word2vec.gensim.model.syn1neg.npy your_user_name@[NFS_IP]:/mnt/data/TWITTER/
$ scp -P 22 word2vec.gensim.model.wv.syn0.npy your_user_name@[NFS_IP]:/mnt/data/TWITTER/

2.5 ジョブを作成&実行

JobCreateParametersでジョブの設定を決めます。

clusterには先ほど作成したクラスタを指定します。

number_of_vmsにはVM数。今回は2つ使用するため2を指定しています。

tool_typeにはcntk、tensorflow、caffe、chaienr、customのいずれかを指定します。

std_out_err_path_prefixでは、VMの標準出力・エラーログファイルの出力先を設定します。
出力先としてAzureファイル共有を指定しています。

container_settingsで、VM上で動かすコンテナを指定できます。
ここでは、tensorflow+自然言語処理環境構築済みの独自Dockerコンテナを指定しています。
tensorflowの環境構築済みのdockerコンテナbatchaitraining/tensorflow:1.1.0-gpuをベースにカスタマイズしたものです。
※独自Dockerコンテナを用いる場合は、sshの設定を行わないと正常に分散処理が行われないようです。(詳細はAppendixに記載しています。)

output_directoriesでは、アウトプットファイル出力先を設定します。
モデルファイルの出力先をAzureファイル共有に指定しています。

tensor_flow_settingsでは、先ほどNFSにコピーしたTensorflowスクリプトのパス、
スクリプトに渡すコマンドライン引数、パラメータサーバー数、ワーカー数を指定しています。

client.job.createで、作成したパラメータを渡してジョブを作成しています。

jobName = "nfs_"+ datetime.utcnow().strftime("%d_%H-%M-%S-%Y") 
print("JobName: {0}".format(jobName))
try:
    client = GetBatchTrainingClient()
    StdOutErrPathPrefix = "$AZ_LEARNING_MOUNT_ROOT/myazfileshare"
    
    jobCreateParams = trainingModels.job_create_parameters.JobCreateParameters(
        location= region,
        cluster = trainingModels.ResourceId(clusterId),                
        number_of_vms = 2, 
        tool_type = trainingModels.ToolType.tensorflow, 
        std_out_err_path_prefix = StdOutErrPathPrefix, 

        container_settings =  trainingModels.ContainerSettings(
                                image_source_registry = trainingModels.ImageSourceRegistry(
                                    image_name = "albeym/batchai_tensorflow_nlp:ssh")), # Dockerコンテナ指定

        output_directories = [trainingModels.OutputDirectory (
            id = "modelOutput", path_prefix="$AZ_LEARNING_MOUNT_ROOT/myazfileshare", path_suffix="Models", type=trainingModels.OutputType.custom)],

        tensor_flow_settings = trainingModels.TensorFlowSettings(
            master_script_settings = trainingModels.ScriptSettings( 
                script_file_path = "$AZ_LEARNING_MOUNT_ROOT/mynfs/TWITTER/distributed_cnn_twitter_classifier.py", # スクリプトパス
                command_line_args = "--ps_hosts=$AZ_LEARNING_PS_HOSTS --worker_hosts=$AZ_LEARNING_WORKER_HOSTS --job_name=worker --task_index=$AZ_LEARNING_TASK_INDEX --data_dir=$AZ_LEARNING_MOUNT_ROOT/mynfs/TWITTER/ --output_dir=$AZ_LEARNING_OUTPUT_modelOutput --num_gpus=2"), # スクリプトに渡すコマンドライン引数
            worker_script_settings = trainingModels.ScriptSettings(
                script_file_path = "$AZ_LEARNING_MOUNT_ROOT/mynfs/TWITTER/distributed_cnn_twitter_classifier.py", # スクリプトパス
                command_line_args = "--ps_hosts=$AZ_LEARNING_PS_HOSTS --worker_hosts=$AZ_LEARNING_WORKER_HOSTS --job_name=worker --task_index=$AZ_LEARNING_TASK_INDEX --data_dir=$AZ_LEARNING_MOUNT_ROOT/mynfs/TWITTER/ --output_dir=$AZ_LEARNING_OUTPUT_modelOutput --num_gpus=2"), # スクリプトに渡すコマンドライン引数
            parameter_server_script_settings = trainingModels.ScriptSettings(
                script_file_path = "$AZ_LEARNING_MOUNT_ROOT/mynfs/TWITTER/distributed_cnn_twitter_classifier.py", # スクリプトパス
                command_line_args = "--ps_hosts=$AZ_LEARNING_PS_HOSTS --worker_hosts=$AZ_LEARNING_WORKER_HOSTS --job_name=ps --task_index=$AZ_LEARNING_TASK_INDEX --data_dir=$AZ_LEARNING_MOUNT_ROOT/mynfs/TWITTER/ --output_dir=$AZ_LEARNING_OUTPUT_modelOutput --num_gpus=0"), # スクリプトに渡すコマンドライン引数
            number_of_parameter_servers = 1,
            number_of_workers = 2
        ))

    job = client.job.create(resource_group_name = resource_group_name, job_name = jobName, parameters = jobCreateParams)        

    print ("created job - {0}".format(job.id))
except Exception as e:
    print(e)

WaitForJobStateでジョブ状態を監視しrunningになるのを待ちます。

running状態になった後は、whileループに入りcompletedになるまで待ちます。

# Wait for job to start running
try:     
     WaitForJobState(resource_group_name, jobName, clusterName, trainingModels.ExecutionState.running)
except Exception as e:
    print(e)

print("Waiting for job output to become available...")

# Wait for job to complete and tail the stderr.txt
streamer = OutputStreamer(client, resource_group_name, jobName, 'stdOuterr', 'stdout-0.txt')
while (True) :
    streamer.tail()
    submittedJob = client.job.get(resource_group_name, jobName)            
    if(submittedJob.execution_state.name == 'completed'):
        print('Job {} complete'.format(submittedJob.execution_state.name))
        break
    time.sleep(1)

WaitForJobStateとOutputStreamerの実装は下記に記載しています。

def GetJob(resourceGroupName, jobName):
    client = GetBatchTrainingClient()
    job = client.job.get(resourceGroupName, jobName)
    return job

def PrintJobStatus(resourceGroupName, jobName):
    job = GetJob(resourceGroupName, jobName)

    failureMessage = "none"
    exitCode = "none"
    if (job.execution_state == trainingModels.ExecutionState.completed):
        exitCode = job.execution_info.exit_code
        if (job.execution_info.failure_info == None):
            failureMessage = "pass"
        else:
            failureMessage = "\nErrorCode:{0}\nErrorCategory:{1}\nErrorMessage:{2}\n".format(
                job.execution_info.failure_info.code, job.execution_info.failure_info.category,
                job.execution_info.failure_info.message)
            if (job.execution_info.failure_info.details != None):
                failureMessage += "Details:\n"
                for error in job.execution_info.failure_info.details:
                    failureMessage += "{0}:{1}\n".format(error.name, error.value)
    print("job State = {0}. ExitCode = {1} \nFailureDetails:{2}".format(job.execution_state.name, exitCode,
                                                                        failureMessage))
    return job


def PrintClusterStatus(resourceGroupName, clusterName):
    cluster = GetCluster(resourceGroupName, clusterName)
    print(
        "Cluster state = {0}. Target= {1} Allocated = {2}, NumIdle = {3}, NumUnusable = {4}, NumRunning = {5}, NumPreparing = {6}".format(
            cluster.allocation_state,
            cluster.target_number_of_vms,
            cluster.current_number_of_vms,
            cluster.node_state_counts.idle_node_count,
            cluster.node_state_counts.unusable_node_count,
            cluster.node_state_counts.running_node_count,
            cluster.node_state_counts.preparing_node_count))
    if (cluster.errors != None):
        for error in cluster.errors:
            print("Cluster error = {0}, ErrorMessage = {1}".format(error.code, error.message))
            print("Details:")
            errorDetails = error.details
            for errorDetail in errorDetails:
                print("{0}:{1}".format(errorDetail.name, errorDetail.value))
    return cluster


def WaitForJobState(resourceGroupName, jobName, clusterName, targetState):
    while True:
        job = PrintJobStatus(resourceGroupName, jobName)

        if job.execution_state in {targetState, trainingModels.ExecutionState.completed}:
            break
        elif job.execution_state == trainingModels.ExecutionState.queued:
            PrintClusterStatus(resourceGroupName, clusterName)
            time.sleep(5)
        else:
            time.sleep(5)


def WaitForClusterState(resourceGroupName, clusterName, targetState):
    while True:
        cluster = PrintClusterStatus(resourceGroupName, clusterName)
        if cluster.allocation_state == targetState:
            return cluster
            break
        else:
            time.sleep(5)
            
class OutputStreamer:
    def __init__(self, client, resource_group, job_name, output_directory_id, file_name):
        self.client = client
        self.resource_group = resource_group
        self.job_name = job_name
        self.output_directory_id = output_directory_id
        self.file_name = file_name
        self.url = None
        self.downloaded = 0

    def tail(self):
        if not self.url:
            files = self.client.file.list(
                self.resource_group, self.job_name,
                file_list_options=trainingModels.file_list_options.FileListOptions(
                    outputdirectoryid=self.output_directory_id))
            if not files:
                return
            else:
                for f in files.value:
                    if f.name == self.file_name:
                        self.url = f.download_url
        if self.url:
            r = requests.get(self.url, headers={'Range': 'bytes={0}-'.format(self.downloaded)})
            if int(r.status_code / 100) == 2:
                self.downloaded += len(r.content)
                print(r.content.decode(), end='')

2.6 Azureファイル共有に保存されたアウトプットファイルをローカルへダウンロード

完了すると、パラメーターサーバー、ワーカーからの標準出力・エラーログファイル、モデルファイルがマウントされたAzureファイル共有にアウトプットされているので、ブラウザ上から確認できます。

標準出力ログファイルをローカルへダウンロードして内容をみてみます。

■Worker 0
stdout-0.txt

…
1502346711.6833575 : 2017/08/10 06:31:51 : local_step/global_step : 1492/2980
1502346724.9924903 : 2017/08/10 06:32:04 : local_step/global_step : 1493/2982
1502346738.1236217 : 2017/08/10 06:32:18 : local_step/global_step : 1494/2984
1502346751.0443964 : 2017/08/10 06:32:31 : local_step/global_step : 1495/2986
1502346764.2894645 : 2017/08/10 06:32:44 : local_step/global_step : 1496/2988
1502346777.1980283 : 2017/08/10 06:32:57 : local_step/global_step : 1497/2990
1502346790.3024867 : 2017/08/10 06:33:10 : local_step/global_step : 1498/2992
1502346803.422791 : 2017/08/10 06:33:23 : local_step/global_step : 1499/2994
1502346816.6486468 : 2017/08/10 06:33:36 : local_step/global_step : 1500/2996
1502346816.6486702 : 2017/08/10 06:33:36 : local_step/global_step : 1500/2996 - train data accuracy : 0.8585116863250732
1502346817.8208303 : 2017/08/10 06:33:36 : local_step/global_step : 1500/2996 - test data accuracy 3295/4462 = 0.7384580905423577
1502346817.8799393 : 2017/08/10 06:33:36 : local_step/global_step : 1500/2996 - loss : 0.34474071860313416
1502346818.4383993 : 2017/08/10 06:33:36 : local_step/global_step : 1500/2996 - label 0 accuracy 1595/2225 = 0.7168539325842697
1502346819.060343 : 2017/08/10 06:33:36 : local_step/global_step : 1500/2996 - label 1 accuracy 1653/2237 = 0.7389360751005811
1502346832.3352547 : 2017/08/10 06:33:52 : local_step/global_step : 1501/2998
1502346845.4994643 : 2017/08/10 06:34:05 : local_step/global_step : 1502/3000
Training ends @ 1502346846.502469
Training elapsed time: 20172.574143 s

■Worker 1
stdout-1.txt

…
1502346749.297772 : 2017/08/10 06:32:29 : local_step/global_step : 1493/2986
1502346762.465857 : 2017/08/10 06:32:42 : local_step/global_step : 1494/2988
1502346775.6255963 : 2017/08/10 06:32:55 : local_step/global_step : 1495/2990
1502346788.9104798 : 2017/08/10 06:33:08 : local_step/global_step : 1496/2992
1502346802.1854687 : 2017/08/10 06:33:22 : local_step/global_step : 1497/2994
1502346815.1583965 : 2017/08/10 06:33:35 : local_step/global_step : 1498/2996
1502346828.2493777 : 2017/08/10 06:33:48 : local_step/global_step : 1499/2998
1502346841.2159996 : 2017/08/10 06:34:01 : local_step/global_step : 1500/2999
1502346841.2160242 : 2017/08/10 06:34:01 : local_step/global_step : 1500/2999 - train data accuracy : 0.8623220920562744
1502346842.3832383 : 2017/08/10 06:34:01 : local_step/global_step : 1500/2999 - test data accuracy 3300/4462 = 0.7395786642761094
1502346842.440593 : 2017/08/10 06:34:01 : local_step/global_step : 1500/2999 - loss : 0.34384649991989136
1502346843.0026593 : 2017/08/10 06:34:01 : local_step/global_step : 1500/2999 - label 0 accuracy 1594/2225 = 0.7164044943820225
1502346843.6091473 : 2017/08/10 06:34:01 : local_step/global_step : 1500/2999 - label 1 accuracy 1680/2237 = 0.7510058113544926
1502346856.9699488 : 2017/08/10 06:34:16 : local_step/global_step : 1501/3002
Training ends @ 1502346856.970022
Training elapsed time: 20182.624171 s

local_stepが各ワーカーでの処理回数、global_stepが全体での処理回数を表していますが、
Worker0とWorker1で分散処理されているのが確認できます。

2.7 ジョブとクラスタの削除

最後に、作成したジョブとクラスタを削除します。

def DeleteJob(resourceGroupName, jobName):
    """Delete the Job
    """
    client = GetBatchTrainingClient()
    cluster = client.job.delete(resourceGroupName, jobName)

def DeleteCluster(resourceGroupName, clusterName):
    """Delete the Cluster
    """
    client = GetBatchTrainingClient()
    cluster = client.cluster.delete(resourceGroupName, clusterName)

# Delete Job
DeleteJob(resource_group_name, jobName)

# Delete Cluster
DeleteCluster(resource_group_name, clusterName)

3. Tensorflowによるリツイート数でのツイートCNN2値分類

上記分散処理検証では、Twitterのツイートを、今後リツイートされるかどうかを予測・分類するモデルのスクリプトを使用しました。

スクリプトのソースコードは下記に記載しています。

ツイートデータは、TwitterAPIを用いて、キーワード「ドル円」で取得した2ヶ月間(6/3~8/3)のツイートデータから、結果リツイートされた(リツイート数1以上)ツイート及びリツイートされなかった(リツイート数0)ツイートをそれぞれ約1万件を抽出しました。
そこから内8割を学習データ、残り2割を検証データとして使用しています。

分類モデル部分は
https://arxiv.org/pdf/1408.5882.pdf
上記論文の実装
https://github.com/dennybritz/cnn-text-classification-tf
を参考にさせていただき、embeddingの部分を学習済みWord2Vecデータを
用いるように少しカスタマイズして使用させていただきました。

ツイートを形態素解析しトークン化後、各単語を学習済みWord2Vecの単語ベクトル
で置換しツイート単位で行列を作成、トークン数最大長まで0でパッディングして行列サイズをトークン数最大長×単語ベクトル次元に固定化後畳み込みニューラルネットワーク(CNN)へ流しています。
学習済みWord2Vecは以下を使用させていただきました。
http://aial.shiroyagi.co.jp/2017/02/japanese-word2vec-model-builder/
http://public.shiroyagi.s3.amazonaws.com/latest-ja-word2vec-gensim-model.zip

CNNでのツイート文処理は以下のようなイメージです。

引用:https://arxiv.org/pdf/1510.03820.pdf

Tensorflowでの分散処理周りは
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist_test/python/mnist_replica.py
を参考にしています。

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import math
import sys
import tempfile
import time
import os

import tensorflow as tf

import numpy as np
import pandas as pd
from gensim.models.word2vec import Word2Vec
from janome.tokenizer import Tokenizer
import re
from datetime import datetime

flags = tf.app.flags
flags.DEFINE_string("data_dir", "/tmp/twitter-data",
                    "Directory for storing mnist data")
flags.DEFINE_string("output_dir", "/tmp/twitter-data-output",
                    "Directory for storing output data")
flags.DEFINE_integer("task_index", None,
                     "Worker task index, should be >= 0. task_index=0 is "
                     "the master worker task the performs the variable "
                     "initialization ")
flags.DEFINE_integer("num_gpus", 1,
                     "Total number of gpus for each machine."
                     "If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
                     "Number of replicas to aggregate before parameter update"
                     "is applied (For sync_replicas mode only; default: "
                     "num_workers)")
flags.DEFINE_integer("train_steps", 3000,
                     "Number of (global) training steps to perform")
flags.DEFINE_integer("batch_size", 100, "Training batch size")
flags.DEFINE_float("learning_rate", 0.0001, "Learning rate")
flags.DEFINE_boolean("sync_replicas", False,
                     "Use the sync_replicas (synchronized replicas) mode, "
                     "wherein the parameter updates from workers are aggregated "
                     "before applied to avoid stale gradients")
flags.DEFINE_boolean(
    "existing_servers", False, "Whether servers already exists. If True, "
    "will use the worker hosts via their GRPC URLs (one client process "
    "per worker host). Otherwise, will create an in-process TensorFlow "
    "server.")
flags.DEFINE_string("ps_hosts","localhost:2222",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None,"job name: worker or ps")

FLAGS = flags.FLAGS

class TextCNN(object):
    """
    CNN文章分類モデル
    """
    def __init__(
      self, num_classes, vocab_size,
      embedding_size, filter_sizes, num_filters, l2_reg_lambda=0.0):

        self.x = tf.placeholder(tf.float32, [None, vocab_size, embedding_size], name="input_x")
        self.input_x = tf.reshape(self.x, [-1, vocab_size, embedding_size, 1])
        self.input_y = tf.placeholder(tf.float32, [None, num_classes], name="input_y")
        self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")

        # Keeping track of l2 regularization loss (optional)
        l2_loss = tf.constant(0.0)
            
        # 各フィルターサイズに対して畳み込み層とMaxPooling層を作成
        pooled_outputs = []
        for i, filter_size in enumerate(filter_sizes):
            with tf.name_scope("conv-maxpool-%s" % filter_size):
                # 畳み込み層
                filter_shape = [filter_size, embedding_size, 1, num_filters]
                W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1), name="W")
                b = tf.Variable(tf.constant(0.1, shape=[num_filters]), name="b")
                conv = tf.nn.conv2d(
                    self.input_x,
                    W,
                    strides=[1, 1, 1, 1],
                    padding="VALID",
                    name="conv")
                # ReLU活性化関数適用
                h = tf.nn.relu(tf.nn.bias_add(conv, b), name="relu")
                # Maxpooling
                pooled = tf.nn.max_pool(
                    h,
                    ksize=[1, vocab_size - filter_size + 1, 1, 1],
                    strides=[1, 1, 1, 1],
                    padding='VALID',
                    name="pool")
                pooled_outputs.append(pooled)

        # MaxPoolした各アウトプットを結合して1次元配列に変換
        num_filters_total = num_filters * len(filter_sizes)
        self.h_pool = tf.concat(3, pooled_outputs)
        self.h_pool_flat = tf.reshape(self.h_pool, [-1, num_filters_total])

        # Dropout
        with tf.name_scope("dropout"):
            self.h_drop = tf.nn.dropout(self.h_pool_flat, self.dropout_keep_prob)

        # スコアと予測値
        with tf.name_scope("output"):
            W = tf.get_variable(
                "W",
                shape=[num_filters_total, num_classes],
                initializer=tf.contrib.layers.xavier_initializer())
            b = tf.Variable(tf.constant(0.1, shape=[num_classes]), name="b")
            l2_loss += tf.nn.l2_loss(W)
            l2_loss += tf.nn.l2_loss(b)
            self.scores = tf.nn.xw_plus_b(self.h_drop, W, b, name="scores")
            self.predictions = tf.argmax(self.scores, 1, name="predictions")

        # 交差エントロピー損失関数
        with tf.name_scope("loss"):
            losses = tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.input_y)
            self.loss = tf.reduce_mean(losses) + l2_reg_lambda * l2_loss

        # 精度
        with tf.name_scope("accuracy"):
            correct_predictions = tf.equal(self.predictions, tf.argmax(self.input_y, 1))
            self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")

class TweetProcessor:
    def filter_sentence(self, sentence):
        sentence = re.sub(r"(?:https?|ftp)://[A-Za-z0-9.-/]*", "", sentence) # URL削除
        return sentence

    # トークンリスト形式からWord2Vec行列形式へ変換
    def convert_to_sentence_matrix(self, tokens, w2v_model, word_vec_dim):
        sentence_matrix = []
        for token in tokens:
            if token in w2v_model:
                # 単語がWord2Vec学習済み辞書に存在するならば、その単語ベクトルで置き換え
                sentence_matrix.append(np.array(w2v_model[token]))
            else:
                # 学習済み辞書に存在しないならば、0ベクトルで置き換え
                sentence_matrix.append(np.zeros(word_vec_dim))
        return np.array(sentence_matrix)

    # max_length行まで0ベクトルで埋める
    def padding(self, sentence_matrix, max_length, word_vec_dim):
        while sentence_matrix.shape[0] < max_length: sentence_matrix = np.vstack((sentence_matrix, np.zeros(word_vec_dim))) return sentence_matrix # ツイートデータを読み込み加工 def prepare_data(self, file_path_list): # TSVファイルからツイートデータ読み込み print("reading tweet data..") tweet_dataframes = [pd.read_csv(FLAGS.data_dir + path, delimiter="\t") for path in file_path_list] tweet_dataframe = pd.concat(tweet_dataframes) # 重複ツイート削除 tweet_dataframe.drop_duplicates(['TweetID']) # リツイートしているツイート削除 tweet_dataframe = tweet_dataframe[[tt.startswith("RT @") == False for tt in tweet_dataframe['TweetText']]] # リツイート閾値より小(ラベル0)とリツイート閾値以上(ラベル1)のデータを1:1比率で抽出 label1_dataframe = tweet_dataframe[tweet_dataframe['RetweetCount'] >= RETWEET_COUNT_THRESHOLD]
        label0_dataframe = tweet_dataframe[tweet_dataframe['RetweetCount'] < RETWEET_COUNT_THRESHOLD][::int(len(tweet_dataframe[tweet_dataframe['RetweetCount'] < RETWEET_COUNT_THRESHOLD])/len(label1_dataframe))][:len(label1_dataframe)]
        tweet_dataframe = pd.concat([label0_dataframe, label1_dataframe])
        
        # [[ツイート , ラベル([0, 1] or [1, 0])]]のリスト作成
        tweet_label = [[self.filter_sentence(tt), [1, 0] if int(rc) &amp;lt; RETWEET_COUNT_THRESHOLD else [0, 1]] for tt, rc in zip(tweet_dataframe['TweetText'], tweet_dataframe['RetweetCount'])] # 各ツイートを形態素解析してトークン化 print(&quot;tokenizing tweets...&quot;) t = Tokenizer() for tl in tweet_label: tokens = t.tokenize(tl[0]) tl[0] = [token.surface for token in tokens] tweet_label = np.array(tweet_label) # トークン数がMIN_TOKEN_LENGTH以上かつMAX_TOKEN_LENGTH以下のツイートのみ使用 tweet_label = np.array(tweet_label[[len(tokens) &amp;gt;= MIN_TOKEN_LENGTH and len(tokens) &amp;lt;= MAX_TOKEN_LENGTH for tokens in tweet_label[:, 0]]]) # 学習済みWord2Vecモデルデータ読み込み print(&quot;reading pre-learned word2vec model..&quot;) model_path = FLAGS.data_dir + &quot;word2vec.gensim.model&quot; model = Word2Vec.load(model_path) # トークンリスト形式からWord2Vec行列形式へ変換 print(&quot;converting from token list format to verd2vec sentence matrix format..&quot;) for tl in tweet_label: tl[0] = self.convert_to_sentence_matrix(tl[0], model, WORD_VEC_DIM) # MAX_TOKEN_LENGTH行まで0ベクトルで埋める print(&quot;padding..&quot;) for tl in tweet_label: tl[0] = self.padding(tl[0], MAX_TOKEN_LENGTH, WORD_VEC_DIM) # ラベル0とラベル1のデータに分割 tweet_label0 = tweet_label[[tl[1][0] == 1 for tl in tweet_label]] tweet_label1 = tweet_label[[tl[1][1] == 1 for tl in tweet_label]] # ラベル0・ラベル1各データの前8割を学習データとして使用 train_data = np.array([tl for tl in tweet_label0[:int(tweet_label0.shape[0]*0.8), 0]] + [tl for tl in tweet_label1[:int(tweet_label1.shape[0]*0.8), 0]]) train_label = np.array([tl for tl in tweet_label0[:int(tweet_label0.shape[0]*0.8), 1]] + [tl for tl in tweet_label1[:int(tweet_label1.shape[0]*0.8), 1]]) # ラベル0・ラベル1各データ残り2割を検証データとして使用 test_data = np.array([tl for tl in tweet_label0[int(tweet_label0.shape[0]*0.8):, 0]] + [tl for tl in tweet_label1[int(tweet_label1.shape[0]*0.8):, 0]]) test_label = np.array([tl for tl in tweet_label0[int(tweet_label0.shape[0]*0.8):, 1]] + [tl for tl in tweet_label1[int(tweet_label1.shape[0]*0.8):, 1]]) return (train_data, train_label, test_data, test_label) RETWEET_COUNT_THRESHOLD = 1 MAX_TOKEN_LENGTH = 100 MIN_TOKEN_LENGTH = 6 NUM_FILTERS = 100 FILTER_SIZES = [3, 4, 5] WORD_VEC_DIM = 50 TWEETDATA_FILEPATH = [&quot;tweet_data/20170604-20170613_JPYUSD.tsv&quot;, &quot;tweet_data/20170618-20170627_JPYUSD.tsv&quot;, &quot;tweet_data/20170629-20170708_JPYUSD.tsv&quot;, &quot;tweet_data/20170706-20170715_JPYUSD.tsv&quot;, &quot;tweet_data/20170714-20170722_JPYUSD.tsv&quot;, &quot;tweet_data/20170724-20170801_JPYUSD.tsv&quot;] def main(unused_argv): # データの準備 tweet_processor = TweetProcessor() train_data, train_label, test_data, test_label = tweet_processor.prepare_data(file_path_list = TWEETDATA_FILEPATH) if FLAGS.job_name is None or FLAGS.job_name == &quot;&quot;: raise ValueError(&quot;Must specify an explicit <code>job_name</code>&quot;) if FLAGS.task_index is None or FLAGS.task_index ==&quot;&quot;: raise ValueError(&quot;Must specify an explicit <code>task_index</code>&quot;) print(&quot;job name = %s&quot; % FLAGS.job_name) print(&quot;task index = %d&quot; % FLAGS.task_index) #Construct the cluster and start the server ps_spec = FLAGS.ps_hosts.split(&quot;,&quot;) worker_spec = FLAGS.worker_hosts.split(&quot;,&quot;) # Get the number of workers. num_workers = len(worker_spec) cluster = tf.train.ClusterSpec({ &quot;ps&quot;: ps_spec, &quot;worker&quot;: worker_spec}) if not FLAGS.existing_servers: # Not using existing servers. Create an in-process server. server = tf.train.Server( cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == &quot;ps&quot;: server.join() is_chief = (FLAGS.task_index == 0) if FLAGS.num_gpus &amp;gt; 0:
      #  if FLAGS.num_gpus &amp;lt; num_workers: # raise ValueError(&quot;number of gpus is less than number of workers&quot;) # Avoid gpu allocation conflict: now allocate task_num -&amp;gt; #gpu
        # for each worker in the corresponding machine
        #gpu = (FLAGS.task_index % FLAGS.num_gpus)
        gpu = 0
        worker_device = &quot;/job:worker/task:%d/gpu:%d&quot; % (FLAGS.task_index, gpu)
    elif FLAGS.num_gpus == 0:
        # Just allocate the CPU to worker server
        cpu = 0
        worker_device = &quot;/job:worker/task:%d/cpu:%d&quot; % (FLAGS.task_index, cpu)
    # The device setter will automatically place Variables ops on separate
    # parameter servers (ps). The non-Variable ops will be placed on the workers.
    # The ps use CPU and workers use corresponding GPU
    ps_device=&quot;/job:ps/task:%d/cpu:0&quot; % (FLAGS.task_index)
    with tf.device(
        tf.train.replica_device_setter(
              worker_device=worker_device,
              ps_device=ps_device,
              cluster=cluster)):
        global_step = tf.Variable(0, name=&quot;global_step&quot;, trainable=False)

        # init saver
        saver = tf.train.Saver()
        
        # モデル準備
        cnn_model = TextCNN(num_classes = 2, 
                    vocab_size = MAX_TOKEN_LENGTH, 
                    embedding_size = WORD_VEC_DIM, 
                    filter_sizes = FILTER_SIZES,
                    num_filters = NUM_FILTERS)
        
        global_step = tf.Variable(0, name=&quot;global_step&quot;, trainable=False)
        opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
        
        dropout_keep_prob = 0.5

        if FLAGS.sync_replicas:
            if FLAGS.replicas_to_aggregate is None:
                replicas_to_aggregate = num_workers
            else:
                replicas_to_aggregate = FLAGS.replicas_to_aggregate

            opt = tf.train.SyncReplicasOptimizer(
                opt,
                replicas_to_aggregate=replicas_to_aggregate,
                total_num_replicas=num_workers,
                name=&quot;twitter_sync_replicas&quot;)

        train_op = opt.minimize(cnn_model.loss, global_step=global_step)
                
        if FLAGS.sync_replicas:
            local_init_op = opt.local_step_init_op
            if is_chief:
                local_init_op = opt.chief_init_op

            ready_for_local_init_op = opt.ready_for_local_init_op

            # Initial token and chief queue runners required by the sync_replicas mode
            chief_queue_runner = opt.get_chief_queue_runner()
            sync_init_op = opt.get_init_tokens_op()

        init_op = tf.global_variables_initializer()
        train_dir = tempfile.mkdtemp()

        if FLAGS.sync_replicas:
            sv = tf.train.Supervisor(
                is_chief=is_chief,
                logdir=train_dir,
                init_op=init_op,
                local_init_op=local_init_op,
                ready_for_local_init_op=ready_for_local_init_op,
                recovery_wait_secs=1,
                global_step=global_step)
        else:
            sv = tf.train.Supervisor(
                is_chief=is_chief,
                logdir=train_dir,
                init_op=init_op,
                recovery_wait_secs=1,
                global_step=global_step)

        sess_config = tf.ConfigProto(
            allow_soft_placement=True,
            log_device_placement=False,
            device_filters=[&quot;/job:ps&quot;, &quot;/job:worker/task:%d&quot; % FLAGS.task_index])

        # The chief worker (task_index==0) session will prepare the session,
        # while the remaining workers will wait for the preparation to complete.
        if is_chief:
            print(&quot;Worker %d: Initializing session...&quot; % FLAGS.task_index)
        else:
            print(&quot;Worker %d: Waiting for session to be initialized...&quot; %
                FLAGS.task_index)

        if FLAGS.existing_servers:
            server_grpc_url = &quot;grpc://&quot; + worker_spec[FLAGS.task_index]
            print(&quot;Using existing server at: %s&quot; % server_grpc_url)

            sess = sv.prepare_or_wait_for_session(server_grpc_url,
                                                config=sess_config)
        else:
            sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

        print(&quot;Worker %d: Session initialization complete.&quot; % FLAGS.task_index)

        if FLAGS.sync_replicas and is_chief:
            # Chief worker will start the chief queue runner and call the init op.
            sess.run(sync_init_op)
            sv.start_queue_runners(sess, [chief_queue_runner])
          
        # 学習開始
        time_begin = time.time()
        print(&quot;Training begins @ %f&quot; % time_begin)

        model_prefix = os.path.join(FLAGS.output_dir, 'trained_model_re')
        local_step = 0
        while True:
            local_step += 1
            # 入力データの準備
            feed_dict = {
              cnn_model.x: train_data,
              cnn_model.input_y: train_label,
              cnn_model.dropout_keep_prob: dropout_keep_prob
            }
            # 学習
            _, step, loss, accuracy = sess.run(
                [train_op, global_step, cnn_model.loss, cnn_model.accuracy],
                feed_dict)
            now = datetime.now().strftime(&quot;%Y/%m/%d %H:%M:%S&quot;)
            print(&quot;{} : local_step/global_step : {}/{}&quot;.format(now, local_step, step))
            # 10ステップごとに検証データに対して予測
            if local_step % 10 == 0:
                print(&quot;{} : local_step/global_step : {}/{} - train data accuracy : {}&quot;.format(now, local_step, step, accuracy))
                # 全検証データに対して予測
                predicted = sess.run(cnn_model.predictions, feed_dict={cnn_model.x: test_data, cnn_model.dropout_keep_prob: dropout_keep_prob})
                print(&quot;{} : local_step/global_step : {}/{} - test data accuracy {}/{} = {}&quot;.format(now, local_step, step, len(test_data[predicted == np.argmax(test_label, axis=1)]), len(test_data), len(test_data[predicted == np.argmax(test_label, axis=1)])/len(test_data)))
                print(&quot;{} : local_step/global_step : {}/{} - loss : {}&quot;.format(now, local_step, step, loss))
                # ラベル0の検証データに対して予測
                predicted0 = sess.run(cnn_model.predictions, feed_dict={cnn_model.x: test_data[np.argmax(test_label, axis=1) == 0], cnn_model.dropout_keep_prob: dropout_keep_prob})
                print(&quot;{} : local_step/global_step : {}/{} - label 0 accuracy {}/{} = {}&quot;.format(now, local_step, step, len(predicted0[predicted0 == 0]), len(test_data[np.argmax(test_label, axis=1) == 0]), len(predicted0[predicted0 == 0])/len(test_data[np.argmax(test_label, axis=1) == 0])))
                # ラベル1の検証データに対して予測
                predicted1 = sess.run(cnn_model.predictions, feed_dict={cnn_model.x: test_data[np.argmax(test_label, axis=1) == 1], cnn_model.dropout_keep_prob: dropout_keep_prob})
                print(&quot;{} : local_step/global_step : {}/{} - label 1 accuracy {}/{} = {}&quot;.format(now, local_step, step, len(predicted1[predicted1 == 1]), len(test_data[np.argmax(test_label, axis=1) == 1]), len(predicted1[predicted1 == 1])/len(test_data[np.argmax(test_label, axis=1) == 1])))
          
            if step % 1000 == 0 or step == FLAGS.train_steps:
                # モデルの保存
                saver.save(sess, model_prefix, global_step=step)
          
            if step &amp;gt;= FLAGS.train_steps:
                break

        time_end = time.time()
        print(&quot;Training ends @ %f&quot; % time_end)
        training_time = time_end - time_begin
        print(&quot;Training elapsed time: %f s&quot; % training_time)

if __name__ == &quot;__main__&quot;:
  tf.app.run()

4. 結果

非分散・分散で実行した結果を以下にまとめています。
今回は、分散処理による効果を顕著に確認したかったため、学習率を低めに設定しました。

■ 設定1(非分散)

VM数 Epoch フィルターサイズ フィルター数 Pooling ドロップアウト率 Optimizer 学習率 リツイート閾値
1 3000 3, 4, 5 100 Max Pooling 0.5 Adam 0.0001 1
かかった時間 検証用データ正解率
40517秒 (約11時間半) 0.7284 (3250/4462)

■ 設定2(分散)

VM数 Epoch フィルターサイズ フィルター数 Pooling ドロップアウト率 Optimizer 学習率 リツイート閾値
2 3000 3, 4, 5 100 Max Pooling 0.5 Adam 0.0001 1
かかった時間 検証用データ正解率
20172秒 (約6時間) 0.7385 (3295/4462)

全検証データに対する正解率/損失のエポック/時間変化のグラフを以下に記載しています。

・正解率

・損失

全検証データに対して、分散・非分散時共に約73%程度の分類精度で収束しています。
2VMの分散では、非分散に比べて1/2程度の時間で完了しました。
また、約2倍の速度で学習できていることが確認できます。

上記では、Embeddingの部分をWord2Vec学習済みデータを使用するように変更しましたが、Embedding層あり(https://github.com/dennybritz/cnn-text-classification-tf のモデルそのまま)で学習済みデータを使用しないパターン(単語ベースと文字ベース)でも検証を行ったところ、正解率は76%(文字ベース)、73%(単語ベース)程度となりました。
異なる点としては、学習済みデータを使用したものでは単語ベクトルリストの行列を入力値としていましたが、Embedding層あり&学習済みデータを使用しないものでは、ツイートを単語/文字辞書IDで羅列したベクトルに変換し入力している点です。

ソースは下記にあげていますので、興味のある方は参照いたければと思います。

■単語ベース
https://github.com/y-matsumoto-albert/TwitterRetweetCNNClassification/blob/master/TwitterRetweetCNNClassificationCharEmbedding.py

■文字ベース
https://github.com/y-matsumoto-albert/TwitterRetweetCNNClassification/blob/master/TwitterRetweetCNNClassificationCharEmbedding.py

 

5. まとめ

Azure Batch AI Trainingを用いて、2つのクラウドVM上でツイートのCNN2値分類
分散学習を行いました。

Batch AI Trainingで提供されているAPIを用いてクラスタやジョブの作成、実行、監視
あたりの処理は自分で行わなければいけませんが、テンプレート処理を作成しておけば、あとは用途に合わせて
・クラスタのスペック
・環境構築済みのコンテナ
・分散処理を行うスクリプト
をパラメータで指定してジョブを実行すれば複数のクラウドVM上で分散処理が行われ、分散環境準備に手間をかけず比較的簡単にディープラーニング分散学習が行えるのでないかと思います。

6. Appendix : 独自Dockerコンテナssh設定

Tensorflow環境構築済みのコンテナはbatchaitraining/tensorflow:1.1.0-gpuが提供されてますが、独自コンテナを使用する場合、sshでパスワード無でログインできるようにコンテナをセットアップしておかないと分散処理が正常に行われないようでした。
batchaitraining/tensorflow:1.1.0-gpu+ssh設定のDockerファイル例を以下に記載しています。

■Dockerfile

FROM batchaitraining/tensorflow:1.1.0-gpu

COPY ssh_config /root/.ssh/config
RUN apt-get update &amp;&amp; apt-get install -y --no-install-recommends \
        openssh-client \
        openssh-server \
        iproute2 \
    &amp;&amp; apt-get clean \
    &amp;&amp; rm -rf /var/lib/apt/lists/* \
    # configure ssh server and keys
    &amp;&amp; ssh-keygen -A \
    &amp;&amp; sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config \
    &amp;&amp; sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd \
    &amp;&amp; chmod 600 /root/.ssh/config \
    &amp;&amp; chmod 700 /root/.ssh \
    &amp;&amp; cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys

EXPOSE 23
CMD ["/usr/sbin/sshd", "-D", "-p", "23"]

■ssh_config

Host 10.*
  Port 23
  StrictHostKeyChecking no
  UserKnownHostsFile /dev/null