2022.10.11

InfluxDB及びGrafanaを利用して為替損益を自動計算し可視化してみた

はじめに

最近円安の進行で値動きが激しいですね。特に9月に円相場は、24年ぶりの円安水準となる1ドル=140円台前半まで下落しましたので、この状況で為替損益を常に監視したい方々が多くなるでしょう。今回はUSDJPYの為替レートを継続的にInfluxDBに格納し、Grafanaで為替損益を可視化するシステムの構築方法を紹介したいと思います。
ちなみに、為替レートの変動により生じた利益を為替差益、生じた損失を為替差損といいます。たとえば、米ドルを1ドル=105円のときに購入した後、為替レートが円安方向に動いて1ドル=120円となった場合、購入していたドルを円に交換すれば1ドルにつき15円の利益を得ることになり、これが為替差益となります。逆に、為替レートが円高に進んで1ドル=100円となった場合は1ドルにつき5円の損失をこうむることになり、これが為替差損となります。

1.やりたいこと

                    

  1. Docker Composeで最新版のInfluxDB、Grafana、Telegrafを構築する
  2. USDJPY為替レート及び為替差損データを継続的にInfluxDBに永続化
  3. Grafanaで為替レート及び為替差損・為替差益を可視化する

2.環境構築

2-1.InfluxDBサーバ構築

InfluxDBはオープンソースの時系列データベース(Time series database)です。時系列データベースとは、ログなどの時間を軸とした連続したデータを蓄積、検索することに特化して作られたデータベースです。高い書込スループット、
高い読込スループット、大量データの削除(期限切れデータなど)、データの更新と削除よりも登録と参照のパフォーマンスを優先しているのはInfluxDBの主な特徴です。
DockerでInfluxDBを構築したいので以下の内容でdocker-compose.ymlファイルを追加します。
InfluxDBの2.4.0(9/30現在の最新版)を使います。Containerを起動時に必要な環境変数を設定しておきます。

version: "3"
services:
  influxdb:
    image: influxdb:2.4.0
    ports:
      - 8086:8086
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=my-user
      - DOCKER_INFLUXDB_INIT_PASSWORD=my-password
      - DOCKER_INFLUXDB_INIT_ORG=my-org
      - DOCKER_INFLUXDB_INIT_BUCKET=my-bucket
      - DOCKER_INFLUXDB_INIT_RETENTION=1w
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    volumes:
      - ./data/influxdb2:/var/lib/influxdb2
      - ./data/influxdb2-config:/etc/influxdb2
    container_name: influxdb

2-2.Grafanaサーバー構築

Grafana(グラファナ)は、分析およびインタラクティブな視覚化を可能にする、マルチプラットフォームで動作するオープンソースのWebアプリケーションである。9/30現在はGrafanaの最新版の9.0を使いたいと思います。GrafanaはDockerで構築しやすいので以下の内容でdocker-compose.ymlファイルに追加するとGrafanaサーバの構築できます。

  grafana:
    image: grafana/grafana
    container_name: grafana
    ports:
      - 3000:3000
    volumes:
      - ./data/grafana:/var/lib/grafana
    depends_on:
      - influxdb

2-3.Telegrafの設定

InfluxDBの収集・送信エージェントであるTelegrafは、データベースやシステム、IoTセンサーからメトリクスやイベントを収集し、InfluxDBへ送信するためのプラグイン駆動型のサーバーエージェントです。今回は定期的に為替レートを収集してInfluxDBに送信するため使います。
TelegrafのDocker Imageが提供されていますが、今回は外部サービスから為替レートを継続的に収集した後、Telegrafで為替損益を計算してからInfluxDBに送信します。この処理はPythonで実装するので、以下のDockerfileでTelegrafのDocker ImageをベースにPython実行できるDocker Imageを作ります。

FROM telegraf:latest

RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
    DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends python3 python3-pip python3-setuptools python3-wheel && \
    rm -rf /var/lib/apt/lists/*

RUN pip3 install requests pyyaml pandas influxdb-client

また、Telegraf設定のtelegraf.confファイルを作る必要があるので準備しておきます。内容は以下の通りで、5分毎に1回calc_profit_and_loss.pyスクリプトを実行して、InfluxDBのmy-bucketにデータを送信するための設定です。

[global_tags]

[agent]
  interval = "300s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = false

[[outputs.influxdb_v2]]
  urls = ["http://influxdb:8086"]
  token = "my-super-secret-auth-token"
  organization = "my-org"
  bucket = "my-bucket"
  timeout = "5s"

[[inputs.ping]]

[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false
  report_active = false

[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]

[[inputs.exec]]
  ## Commands array
  commands = [
    "python3 /etc/telegraf/calc_profit_and_loss.py",
  ]

  ## Timeout for each command to complete.
  timeout = "300s"

  ## measurement name suffix (for separating different commands)
  name_suffix = "_mycollector"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

[[inputs.diskio]]

[[inputs.kernel]]

[[inputs.mem]]

[[inputs.processes]]

[[inputs.swap]]

[[inputs.system]]

次は、docker-compose.ymlに以下の内容を追加するとTelegrafの設定が完了。
calc_profit_and_loss.py及びposition.jsonをTelegrafに展開しておきます。また、InfluxDBにデータ永続化するため接続情報が必要なので設定しておきます。

telegraf:
    image: telegraf
    container_name: telegraf
    build:
      context: .
      dockerfile: ./Dockerfile
    volumes:
            - ./src/position.json:/etc/telegraf/position.json:ro
      - ./src/calc_profit_and_loss.py:/etc/telegraf/calc_profit_and_loss.py:ro
      - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
      - /var/run/docker.sock:/var/run/docker.sock:ro
      - /sys:/rootfs/sys:ro
      - /proc:/rootfs/proc:ro
      - /etc:/rootfs/etc:ro
    environment:
      HOST_PROC: /rootfs/proc
      HOST_SYS: /rootfs/sys
      HOST_ETC: /rootfs/etc
      INFLUXDB_HOST: influxdb
      INFLUXDB_PORT: 8086
      INFLUXDB_TOKEN: my-super-secret-auth-token
      INFLUXDB_ORG: my-org
      INFLUXDB_BUCKET: my-bucket
            DOCKER_ALPHA_VANTAGE_API_KEY: MY_REGISTERED_KEY
    depends_on:
      - influxdb

最終的にdocker-compose.ymlファイルの内容はいかになりますね。

version: "3"
services:
  influxdb:
    image: influxdb:2.4.0
    ports:
      - 8086:8086
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=my-user
      - DOCKER_INFLUXDB_INIT_PASSWORD=my-password
      - DOCKER_INFLUXDB_INIT_ORG=my-org
      - DOCKER_INFLUXDB_INIT_BUCKET=my-bucket
      - DOCKER_INFLUXDB_INIT_RETENTION=1w
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    volumes:
      - ./data/influxdb2:/var/lib/influxdb2
      - ./data/influxdb2-config:/etc/influxdb2
    container_name: influxdb
  grafana:
    image: grafana/grafana
    container_name: grafana
    ports:
      - 3000:3000
    volumes:
      - ./data/grafana:/var/lib/grafana
    depends_on:
      - influxdb
  telegraf:
    image: telegraf
    container_name: telegraf
    build:
      context: .
      dockerfile: ./Dockerfile
    volumes:
      - ./src/position.json:/etc/telegraf/position.json:ro
      - ./src/calc_profit_and_loss.py:/etc/telegraf/calc_profit_and_loss.py:ro
      - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
      - /var/run/docker.sock:/var/run/docker.sock:ro
      - /sys:/rootfs/sys:ro
      - /proc:/rootfs/proc:ro
      - /etc:/rootfs/etc:ro
    environment:
      HOST_PROC: /rootfs/proc
      HOST_SYS: /rootfs/sys
      HOST_ETC: /rootfs/etc
      INFLUXDB_HOST: influxdb
      INFLUXDB_PORT: 8086
      INFLUXDB_TOKEN: my-super-secret-auth-token
      INFLUXDB_ORG: my-org
      INFLUXDB_BUCKET: my-bucket
      DOCKER_ALPHA_VANTAGE_API_KEY: MY_REGISTERED_KEY
    depends_on:
      - influxdb

2-4.各サービスの連携設定

ここまでGrafana、InfluxDB、Telegrafの環境構築を準備できましたので各サービスを起動しましょう。
まずは、TelegrafのDocker Imageをビルドします。

docker-compose build

次に、各Containerを起動しましょう

docker-compose up

無事にDocker Containerが起動しました。

InfluxDBのWebUIにアクセスしてみます。

Grafanaにアクセスしてから初期PWを変更しましょう。変更後にトップページが表示されます。

早速、GrafanaのData source設定画面でInfluxDBがデータソースを設定しましょう。まずはGrafanaからInfluxDBへの通信設定を行います。

次に読み込む対象InfluxDB Bucketを設定しましょう。

ここでGrafana、InfluxDB、Telegrafの環境構築は一旦終わりました。

3.為替レートデータ取得

Webサイト(為替)のスクレイピングや「MetaTrader + デモ口座(無料)の登録」など無料で為替の時系列を取得する方法はいくつかありますが、今回は「為替API + 無料プランの登録」の方法でデータ取得したいと思います。

3-1.Alpha VantageからAPIキー取得

AlphaVantageは、多くの研究者、エンジニア、ビジネス専門家が使用する無料のストックAPIです。データは株価、為替(FX)、仮想通貨の他にテクニカル指標、経済指標もあります。メールアドレスを登録すればAPIが取得でき、無料でデータを取得できます。
Alpha Vantageが提供するRestAPIにて、リアルタイム情報の取得が可能です。フリープランでの制限事項は「1分あたり最大5つのAPIリクエスト、1日あたり500のリクエスト」となっていますが十分使えるので今回はこのサービスに会員登録してAPIキーを取得しました。

3-2.為替レートデータ確認

Alpha VantageにAPIキー取得できたので、為替レートデータ取得するため以下のAPIを使います。

https://www.alphavantage.co/query?function=FX_INTRADAY&from_symbol=USD&to_symbol=JPY&interval=5min&apikey=demo

PostmanでAPIから返却したデータを確認できました。良さそうですね。

4.損益計算の実装

4-1.損益計算の計算

購入時の数量及びレートデータと最新レートで為替損益を計算するロジックは以下のPythonスクリプトで実装しました。

import pandas as pd
import requests, json
import time, datetime
import os, sys, traceback
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

class CalculateProfitAndLoss():

    def __init__(self):

      url = "http://" + os.getenv('INFLUXDB_HOST') + ":" + os.getenv('INFLUXDB_PORT')
      self.influxdbClient = InfluxDBClient(
                 url      = url,
                 token    = os.getenv('INFLUXDB_TOKEN'),
                 org      = os.getenv('INFLUXDB_ORG'))

    def get_alpha_vantage_api_url(self):
        # Using Alpha Vantage to get JPYVND prices every 5 minutes
        # Get your key here: https://www.alphavantage.co/support/#api-key
        apikey      = os.getenv("DOCKER_ALPHA_VANTAGE_API_KEY")
        url         = "https://www.alphavantage.co/query?"
        function    = "FX_INTRADAY"
        from_symbol = "USD"
        to_symbol   = "JPY"
        interval    = "5min"
        outputsize  = "compact"

        # Build target url
        target_url = url + "function=" + function + "&from_symbol=" + from_symbol + "&to_symbol=" \
                         + to_symbol + "&interval=" + interval + "&outputsize=" + outputsize \
                         + "&apikey=" + apikey

        return target_url

    def get_latest_quote(self):
        target_url = self.get_alpha_vantage_api_url()
        data = requests.get(target_url).json()
        #Data is returned in the following format:
        # https://www.alphavantage.co/query?function=FX_INTRADAY&from_symbol=USD&to_symbol=JPY&interval=5min&apikey=demo
        t = [t for t in data['Time Series FX (5min)']]
        # Convert human readable time to unix time and add 9 hour to match Japan Tokyo timezone
        last_tick = datetime.datetime.strptime(t[0], "%Y-%m-%d %H:%M:%S") + datetime.timedelta(hours=9)
        unix_ns = last_tick.strftime("%Y-%m-%d %H:%M:%S")

        fields = [v for k, v in data['Time Series FX (5min)'].items()]
        last_closed_price = float(fields[0]['4. close'])

        return unix_ns, last_closed_price

    def calculate_profit_and_loss_data_point(self):

        unix_ns, last_closed_price = self.get_latest_quote()

        with open('/etc/telegraf/position.json') as data_file:

            data = json.load(data_file)

            data_point = Point("profit_and_loss").tag("fx_pair", "USDJPY")\
                                        .field("price", last_closed_price)\
                                        .field("value", round(last_closed_price * data["owned_quantity"])) \
                                        .field("purchased_price", data["purchased_price"]) \
                                        .field("purchased_value", round(data["owned_quantity"] * data["purchased_price"])) \
                                        .field("owned_quantity", data["owned_quantity"]) \
                                        .field("profit_and_loss", round((last_closed_price - data["purchased_price"]) * data["owned_quantity"]))

            return data_point

    def regist_profit_and_loss_to_influxdb(self):
        data_point = self.calculate_profit_and_loss_data_point()
        write_api = self.influxdbClient.write_api(write_options=SYNCHRONOUS)
        bucket = os.getenv('INFLUXDB_BUCKET')
        write_api.write(bucket=bucket, record=data_point)


if __name__ == "__main__":
    try:
        calculator = CalculateProfitAndLoss()
        calculator.regist_profit_and_loss_to_influxdb()

    except Exception:
        traceback.print_exc()
        sys.exit(1)

position.json内容は以下の感じです。一番簡単なバージョンでドルの保持数量及び(一括)購入時のレート情報ですね。
このファイルは取引時に自動更新する必要がありますが今回のブログの範囲外にしました。

{
    "owned_quantity": 10000,
    "purchased_price": 138.45
}

4-2.TelegrafのExecプラグインに設定

Telegrafを構築時に必要な環境変数及びPythonスクリプト名を指定しましたので、Script更新やAPIキーを再設定する必要な時はdocker-composeのdown->upするだけで反映されると思います。

5.動作確認

Grafanaで為替損益グラフを確認できました。Dashboardが簡単に作れるのがGrafanaの特徴ですね。

まとめ

ここまでは一通り今回のやりたいことを実現できました。5分毎にAlphaVantageからUSDJPYの為替レートデータが自動的に格納されて、同時に為替損益も計算されましたので、Grafanaで為替レート変動だけではなく、損益の状況も随時に確認できますね。特にAlpha VantageのAPIが1分足データも無料で提供しているのでTelegrafの設定で頻度を上げると直近の1分で為替損益の状況を確認できます。
また、今回はドル円を絞って記事を書きましたが複数通貨ペアでも簡単に対応できると思いますので、確認する必要な方はぜひ試してください。

宣伝

次世代システム研究室では、最新のテクノロジーを調査・検証しながらインターネットのいろんなアプリケーションの開発を行うアーキテクトを募集しています。募集職種一覧 からご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事