6-2.DynamoDB へのデータ出力

ラボの概要

このラボでは、Kinesis Streams を使用して、Amazon Connect の問い合わせ履歴である CTR(Contact Trace Record) およびエージェントイベントストリームを Amazon DynamoDB にエクスポートします。

このラボで使用する IAM ポリシーの json、Lambda 関数のコードはこちら からダウンロード可能です。

CTR を Kinesis から DynamoDB に連携する

  1. Amazon Connect から CTR をストリーミングするために、Kinesis ストリームを作成
    1. CTR は、コンタクトセンターにおけるコンタクトに関連するイベントを記録するために作成されます。例えば、通話時間、開始/終了時間などです。顧客が Amazon Connect に接続すると CTR が作成され、コンタクトフローまたはエージェントとの対話が終了したとき(つまり、エージェントが ACW を完了したとき)に保存されます。
    2. AWS コンソールから Amazon Kinesis に移動します。
    3. 左側のサイドバーの データストリーム を選択し、データストリームの作成をクリックします。
    4. データストリーム名を付与します(例:kinesisCtrStream)。この名前をメモしておきます。
    5. データストリームの作成をクリックします。 102
    6. 作成されたデータストリームの ARN をメモしておきます。 103
  2. Amazon Connect インスタンスで Kinesis ストリームを有効化
    1. AWS コンソールで Amazon Connect に移動します。
    2. 作成済みのインスタンスエイリアスを選択します。
    3. インスタンス管理画面でデータストリーミングを選択し、データストリーミングの有効化チェックボックスをオンにします。
    4. 問い合わせ追跡レコードにおいて、Kinesis ストリームを選択します。
    5. 前のステップで作成した Kinesis ストリームをドロップダウンメニューから選択し、保存をクリックします。 104
  3. CTR を格納する DynamoDB テーブルを作成
    1. AWS コンソールで DynamoDB サービスに移動します。
    2. 左側のサイドバーのテーブルを選択し、テーブルの作成をクリックします。
    3. DynamoDB テーブルを作成します。
      • テーブル名:Connect-ContactTraceRecords (文字列)
      • パーティションキー:ContactId (文字列)
    4. その他はデフォルト設定のままテーブルの作成をクリックします。テーブルの作成には数分かかります。
    5. サイドバーのテーブルをクリックし、Connect-contractTraceRecordsを選択して、作成したテーブルの ARN をメモします。 105
  4. Kinesis ストリームデータを処理するための Lambda の作成
    1. AWS コンソールで Lambda サービスに移動します。
    2. 関数の作成をクリックします。関数の作成ページが開きます。
    3. 関数名を付与します(例:ctrToDynamo)。
    4. ランタイムは Node.js 16.x を選択します。
    5. 残りのセクションはデフォルト値のまま、関数の作成をクリックします。
    6. 関数が作成されたら、設定-アクセス権限を開き、実行ロールにあるロール名をメモします。 106
  5. DynamoDB に書き込みを許可する IAM ポリシーを作成し、Lambda にアタッチ

    1. AWS コンソールで IAM サービスに移動します。
    2. 左側のサイドバーの ポリシーに移動し、ポリシーを作成を選択します。
    3. JSON タブを開き、以下に示す IAM ポリシーのサンプルをペーストします。このとき Resource 欄の kinesisARN と、dynamodbARN を、メモしておいた値に書き換えます。

      {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "dynamodb:PutItem",
                  "dynamodb:DescribeTable",
                  "kinesis:GetShardIterator",
                  "dynamodb:GetItem",
                  "kinesis:GetRecords",
                  "kinesis:DescribeStream",
                  "dynamodb:Query",
                  "dynamodb:UpdateItem"
              ],
              "Resource": [
                  "arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/kinesisCtrStream",
                  "arn:aws:dynamodb:us-east-1:xxxxxxxxxxx:table/Connect-ContactTraceRecords"
              ]
          },
          {
              "Sid": "VisualEditor1",
              "Effect": "Allow",
              "Action": "kinesis:ListStreams",
              "Resource": "*"
          }
      ]
      }
      
    4. 次のステップ:タグをクリックます。このページでは何も設定しません。

    5. 次のステップ:確認をクリックします。

    6. ポリシーの名前を入力し(例:ctrKinesisDynamoPolicy)、ポリシーの作成をクリックします。

    7. 作成したポリシーを既存のロールにアタッチします。サイドバーのロールに移動します。

    8. Lambda 作成時にメモしておいたロール名を検索し、ヒットしたロール名をクリックします。

    9. ポリシーをアタッチしますをクリックします。

    10. 検索を使用して、上記で作成したポリシー名を検索します。ヒットしたポリシーのチェックボックスをオンにして、ポリシーのアタッチをクリックします。

  6. Lambda のトリガーに Kinesis ストリームを設定

    1. AWS コンソールで Lambda サービスに移動します。
    2. 関数のリストに移動し、上記で作成したLambda関数を選択します。
    3. +トリガーの追加をクリックして、ドロップダウンからKinesisトリガーを追加します。
    4. 上記で作成したKinesisストリーム名を選択します。 107
    5. 残りの設定はデフォルトのままにしておき、追加をクリックします。
    6. コードタブに移動し、コードソースセクションに、以下のサンプルコードをコピーして貼り付け、Deploy をクリックします。

      var AWS = require("aws-sdk");
      var docClient = new AWS.DynamoDB.DocumentClient();
      
      exports.handler = (event, context, callback) => {
      console.log(JSON.stringify(event, null, 2));
      event.Records.forEach(function(record) {
          // Kinesis data is base64 encoded so decode here
          let payload = new Buffer.from(record.kinesis.data, 'base64');
          console.log('Decoded payload:', payload.toString('utf8'));
          // Create a JSON object from the core payload
          let dpObject = JSON.parse(payload.toString('utf8'));
          console.log('Converting to JSON:', dpObject.toString());
          // Extract the fields that will be inserted into the DynamoDB table
          if (dpObject){
              let ContactId = dpObject.ContactId;
              let AgentUsername = "";
              if (dpObject.Agent){
                  AgentUsername = dpObject.Agent.Username;
              }
              let CallerAni = dpObject.CustomerEndpoint.Address;
              let Timestamp = dpObject.DisconnectTimestamp;
              //Create the item to be inserted into the table
              let item = {
                  "ContactId": ContactId,
                  "Agent": AgentUsername,
                  "CallerAni": CallerAni,
                  "Timestamp": Timestamp
              };
              const tableName = "Connect-ContactTraceRecords";
              let params = {
                  TableName: tableName,
                  Item: item
              };
              //Insert the item into the database
              docClient.put(params, function(err, data) {
                  if (err) {
                      console.log("Error", err);
                  } else {
                      console.log("Success", data);
                  }
              });
          }   
      });
      };
      
    • Kinesis データは Base64 でエンコードされているため、各 Kinesis レコードについて Base64 エンコーディングからデコードします。
    • コアペイロードから JSON オブジェクトを作成します。
    • 各オブジェクトをループし、DynamoDB テーブルに必要なフィールドを検索して抽出します。
    • 対応する DynamoDB 項目を作成し、データを書き込みます。
  7. Amazon Connect インスタンスで取得した番号に電話から発信を行い、DynamoDB テーブルに値が入力されることを確認します。

    1. DynamoDB テーブルを確認するには、DynamoDB に移動し、テーブル[作成したテーブル]テーブルアイテムの探索 をクリックします。
    2. Lambda で集計ログを表示するには、モニタリングタブに移動し、ロードするセクションでログを表示します。
    3. Lambda の詳細なログを確認したり、トラブルシューティングを行う場合は、Amazon CloudWatch ログ内のロググループで、/aws/lambda/[作成した Lambda 名] を選択します。

エージェントイベントストリームを Kinesis から DynamoDB に連携する

  1. エージェントイベント用の Kinesis ストリームを作成
    1. Amazon Connect からエージェントイベントデータをストリーミングするためのストリームを作成します。エージェントイベントストリームは、Amazon Connect インスタンス内のエージェントアクティビティをほぼリアルタイムでレポートします。ストリームに含まれるイベントには、エージェントのログイン、ログアウト、通話しているエージェント、エージェントの状態変更(Available、Offline)などのCCPイベントが含まれます。
    2. AWS コンソールから Amazon Kinesis に移動します。
    3. 左側のサイドバーのデータストリーム を選択し、データストリームの作成をクリックします。
    4. データストリーム名を付与します(例:kinesisAgentStream)。この名前をメモしておきます。
    5. シャード設定は1のままとし、データストリームの作成をクリックします。
    6. 作成されたデータストリームのARNをメモしておきます。 111
  2. Amazon Connect インスタンスで Kinesis ストリームを有効化
    1. AWS コンソールで Amazon Connect に移動します。
    2. 作成済みのインスタンスを選択します。
    3. インスタンス管理画面で データストリーミングを選択します。
    4. エージェントイベントにおいて、前のステップで作成した Kinesis ストリームをドロップダウンメニューから選択し、保存をクリックします。 作成したエージェントイベントが表示されない場合、ページをリロードしてください。 112
  3. エージェントイベントを格納する DynamoDB テーブルを作成
    1. AWS コンソールで DynamoDB サービスに移動します。
    2. 左側のサイドバーのテーブルを選択し、テーブルの作成をクリックします。
    3. DynamoDB テーブルを作成します。
      • テーブル名:Connect-AgentEvents (文字列)
      • パーティションキー:agentId (文字列)
    4. その他はデフォルト設定のままテーブルの作成をクリックします。テーブルの作成には数分かかります。
    5. サイドバーのテーブルをクリックし、Connect-AgentEventsを選択して、作成したテーブルの ARN をメモします。 113
  4. Kinesis ストリームデータを処理するための Lambda の作成
    1. AWS コンソールで Lambda サービスに移動します。
    2. 関数の作成をクリックします。 関数の作成ページが開きます。
    3. 関数名を付与します(例:agentToDynamo)。
    4. ランタイムは Node.js 16.x を選択します。
    5. 残りのセクションはデフォルト値のまま、関数の作成をクリックします。
    6. 関数が作成されたら、設定-アクセス権限を開き、実行ロールにあるロール名をメモします。 114
  5. DynamoDB に書き込みを許可する IAM ポリシーを作成し、Lambda にアタッチ

    1. AWS コンソールで IAM サービスに移動します。
    2. 左側のサイドバーの[ポリシー]に移動し、ポリシーを作成を選択します。
    3. JSON タブを開き、以下に示すIAMポリシーのサンプルをペーストします。このとき Resource 欄の kinesisARN と、dynamodbARN を、メモしておいた値に書き換えます。

      {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "dynamodb:PutItem",
                  "dynamodb:DescribeTable",
                  "kinesis:GetShardIterator",
                  "dynamodb:GetItem",
                  "kinesis:GetRecords",
                  "kinesis:DescribeStream",
                  "dynamodb:Query",
                  "dynamodb:UpdateItem"
              ],
              "Resource": [
                  "arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/kinesisAgentStream",
                  "arn:aws:dynamodb:us-east-1:xxxxxxxxxxx:table/Connect-AgentEvents"
              ]
          },
          {
              "Sid": "VisualEditor1",
              "Effect": "Allow",
              "Action": "kinesis:ListStreams",
              "Resource": "*"
          }
      ]
      }
      
    4. 次のステップ:タグをクリックます。このページでは何も設定しません。

    5. 次のステップ:確認をクリックします。

    6. ポリシーの名前を入力し(例:agentKinesisDynamoPolicy)、ポリシーの作成をクリックします。

    7. 作成したポリシーを既存のロールにアタッチします。サイドバーのロールに移動します。

    8. Lambda 作成時にメモしておいたロール名を検索し、ヒットしたロール名をクリックします。

    9. ポリシーをアタッチしますをクリックします。

    10. 検索を使用して、上記で作成したポリシー名を検索します。ヒットしたポリシーのチェックボックスをオンにして、ポリシーのアタッチをクリックします。

  6. Lambda のトリガーに Kinesis ストリームを設定

    1. AWS コンソールで Lambda サービスに移動します。
    2. 関数のリストに移動し、上記で作成した Lambda 関数を選択します。
    3. +トリガーの追加をクリックして、ドロップダウンから Kinesis トリガーを追加します。
    4. 上記で作成した Kinesis ストリーム名を選択します。 115
    5. 残りの設定はデフォルトのままにしておき、追加をクリックします。
    6. コードタブに移動し、コードソースセクションに、以下のサンプルコードをコピーして貼り付け、Deployをクリックします。

      var AWS = require("aws-sdk");
      var docClient = new AWS.DynamoDB.DocumentClient();
      
      exports.handler = (event, context, callback) => {
      if (event.Records) {
          for (var record of event.Records) {  
              // Kinesis data is base64 encoded so decode here
              var payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
              if (payload) {
                  var eventData = JSON.parse(payload);
                  logState(eventData);
              }
          }
      }
      };
      
      function logState(record) {
      console.log('entering logState', record);
      if (record && record.EventType && record.EventType != 'HEART_BEAT')
      {
          console.log('I have an event type', record.EventType);
          if (
              record.CurrentAgentSnapshot && 
              record.CurrentAgentSnapshot.Configuration && 
              record.CurrentAgentSnapshot.Configuration.Username) 
          {
              console.log('processing record', record);
      
              var hierarchy = getHierarchy(record);
              var contact = getContact(record.CurrentAgentSnapshot.Contacts);
              var paramsPut = {
                  TableName: 'Connect-AgentEvents',
                  Item:{
                      "agentId": record.CurrentAgentSnapshot.Configuration.Username,
                      "name": (record.CurrentAgentSnapshot.Configuration.FirstName + " " + 
                               record.CurrentAgentSnapshot.Configuration.LastName),
                      "hierarchy": hierarchy,
                      "state": record.CurrentAgentSnapshot.AgentStatus.Name,
                      "stateStartTime": record.CurrentAgentSnapshot.AgentStatus.StartTimestamp,
                      "contactId":  (contact ? contact.contactId : null),
                      "initialContactId": (contact ? contact.initialContactId : null),
                      "contactChannel": (contact ? contact.contactChannel : null),
                      "contactQueue": (contact ? contact.contactQueue : null),
                      "contactState": (contact ? contact.contactState : null),
                      "prevState": record.PreviousAgentSnapshot.AgentStatus.Name,
                      "prevStateStartTime": record.PreviousAgentSnapshot.AgentStatus.StartTimestamp
                  }
              };
                      
              console.log("updating state for ", paramsPut.Item.agentid);
      
              docClient.put(paramsPut, function(err, data) {
                  if (err) {
                      console.log(err);
                  }
              });
          }
      }
      }
      
      function getHierarchy(record)
      {
      var hierarchy = "";
      if (record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups) 
      {
          if (record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level1){
              hierarchy = record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level1.Name;
          }
          hierarchy += "-";
          if (record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level2){
              hierarchy += record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level2.Name;
          }
          hierarchy += "-";
          if (record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level3){
              hierarchy += record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level3.Name;
          }
          hierarchy += "-";
          if (record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level4){
              hierarchy += record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level4.Name;
          }
          hierarchy += "-";
          if (record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level5){
              hierarchy += record.CurrentAgentSnapshot.Configuration.AgentHierarchyGroups.Level5.Name;
          }
      }
      
      console.log('hierarchy', hierarchy);
      return hierarchy;
      }
      
      function getContact(contacts){
      if (contacts.length > 0){
          console.log('contacts', contacts);
          var contact = {
              "contactId":  contacts[0].ContactId,
              "initialContactId": contacts[0].InitialContactId,
              "contactChannel": contacts[0].Channel,
              "contactQueue": contacts[0].Queue ? contacts[0].Queue.Name : "",
              "contactState": contacts[0].State
          };
          console.log('contact',contact);
          return contact;
      }
      console.log('no contacts');
      return null;
      }
      
    • Kinesis データは Base64 でエンコードされているため、各 Kinesis レコードについて Base64 エンコーディングからデコードします。
    • コアペイロードから JSON オブジェクトを作成します。
    • 各オブジェクトをループし、DynamoDB テーブルに必要なフィールドを検索して抽出します。
    • 対応する DynamoDB 項目を作成し、データを書き込みます。
    • エージェントの状態がいつ変化したかをチェックし(Available、Offlineなど)、DynamoDB に値を常に更新します。
    • 通話状態も同様に処理します(着信中、保留中、接続中など)
  7. Connect インスタンスで取得した番号に電話から発信してエージェントで受けたり、エージェントステータスを変更して、DynamoDB テーブルに値が入力されることを確認します。

    1. DynamoDB テーブルを確認するには、DynamoDB に移動し、テーブル[作成したテーブル]テーブルアイテムの探索 をクリックします。エージェントイベントはエージェント毎に作成されます。
    2. Lambda で集計ログを表示するには、モニタリングタブに移動し、ロードするセクションでログを表示します。
    3. Lambda の詳細なログを確認したり、トラブルシューティングを行う場合は、Amazon CloudWatch ログ内のロググループで、/aws/lambda/[作成したLambda名] を選択します。

まとめ

このラボでは、Amazon Connect の CTR、エージェントイベントをストリーミング出力する方法を確認しました。Amazon DynamoDB に記録された CTR やエージェントイベントを出力・整形することで、任意の形式にレポートを生成することが可能になり、より柔軟にコンタクトセンターの稼働状況を可視化することが可能になります。