2022.10.11
InfluxDB及びGrafanaを利用して為替損益を自動計算し可視化してみた
はじめに
最近円安の進行で値動きが激しいですね。特に9月に円相場は、24年ぶりの円安水準となる1ドル=140円台前半まで下落しましたので、この状況で為替損益を常に監視したい方々が多くなるでしょう。今回はUSDJPYの為替レートを継続的にInfluxDBに格納し、Grafanaで為替損益を可視化するシステムの構築方法を紹介したいと思います。
ちなみに、為替レートの変動により生じた利益を為替差益、生じた損失を為替差損といいます。たとえば、米ドルを1ドル=105円のときに購入した後、為替レートが円安方向に動いて1ドル=120円となった場合、購入していたドルを円に交換すれば1ドルにつき15円の利益を得ることになり、これが為替差益となります。逆に、為替レートが円高に進んで1ドル=100円となった場合は1ドルにつき5円の損失をこうむることになり、これが為替差損となります。
1.やりたいこと
- Docker Composeで最新版のInfluxDB、Grafana、Telegrafを構築する
- USDJPY為替レート及び為替差損データを継続的にInfluxDBに永続化
- Grafanaで為替レート及び為替差損・為替差益を可視化する
2.環境構築
2-1.InfluxDBサーバ構築
InfluxDBはオープンソースの時系列データベース(Time series database)です。時系列データベースとは、ログなどの時間を軸とした連続したデータを蓄積、検索することに特化して作られたデータベースです。高い書込スループット、
高い読込スループット、大量データの削除(期限切れデータなど)、データの更新と削除よりも登録と参照のパフォーマンスを優先しているのはInfluxDBの主な特徴です。
DockerでInfluxDBを構築したいので以下の内容でdocker-compose.ymlファイルを追加します。
InfluxDBの2.4.0(9/30現在の最新版)を使います。Containerを起動時に必要な環境変数を設定しておきます。
version: "3" services: influxdb: image: influxdb:2.4.0 ports: - 8086:8086 environment: - DOCKER_INFLUXDB_INIT_MODE=setup - DOCKER_INFLUXDB_INIT_USERNAME=my-user - DOCKER_INFLUXDB_INIT_PASSWORD=my-password - DOCKER_INFLUXDB_INIT_ORG=my-org - DOCKER_INFLUXDB_INIT_BUCKET=my-bucket - DOCKER_INFLUXDB_INIT_RETENTION=1w - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token volumes: - ./data/influxdb2:/var/lib/influxdb2 - ./data/influxdb2-config:/etc/influxdb2 container_name: influxdb
2-2.Grafanaサーバー構築
Grafana(グラファナ)は、分析およびインタラクティブな視覚化を可能にする、マルチプラットフォームで動作するオープンソースのWebアプリケーションである。9/30現在はGrafanaの最新版の9.0を使いたいと思います。GrafanaはDockerで構築しやすいので以下の内容でdocker-compose.ymlファイルに追加するとGrafanaサーバの構築できます。
grafana: image: grafana/grafana container_name: grafana ports: - 3000:3000 volumes: - ./data/grafana:/var/lib/grafana depends_on: - influxdb
2-3.Telegrafの設定
InfluxDBの収集・送信エージェントであるTelegrafは、データベースやシステム、IoTセンサーからメトリクスやイベントを収集し、InfluxDBへ送信するためのプラグイン駆動型のサーバーエージェントです。今回は定期的に為替レートを収集してInfluxDBに送信するため使います。
TelegrafのDocker Imageが提供されていますが、今回は外部サービスから為替レートを継続的に収集した後、Telegrafで為替損益を計算してからInfluxDBに送信します。この処理はPythonで実装するので、以下のDockerfileでTelegrafのDocker ImageをベースにPython実行できるDocker Imageを作ります。
FROM telegraf:latest RUN DEBIAN_FRONTEND=noninteractive apt-get update && \ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends python3 python3-pip python3-setuptools python3-wheel && \ rm -rf /var/lib/apt/lists/* RUN pip3 install requests pyyaml pandas influxdb-client
また、Telegraf設定のtelegraf.confファイルを作る必要があるので準備しておきます。内容は以下の通りで、5分毎に1回calc_profit_and_loss.pyスクリプトを実行して、InfluxDBのmy-bucketにデータを送信するための設定です。
[global_tags] [agent] interval = "300s" round_interval = true metric_batch_size = 1000 metric_buffer_limit = 10000 collection_jitter = "0s" flush_interval = "10s" flush_jitter = "0s" precision = "" hostname = "" omit_hostname = false [[outputs.influxdb_v2]] urls = ["http://influxdb:8086"] token = "my-super-secret-auth-token" organization = "my-org" bucket = "my-bucket" timeout = "5s" [[inputs.ping]] [[inputs.cpu]] percpu = true totalcpu = true collect_cpu_time = false report_active = false [[inputs.disk]] ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"] [[inputs.exec]] ## Commands array commands = [ "python3 /etc/telegraf/calc_profit_and_loss.py", ] ## Timeout for each command to complete. timeout = "300s" ## measurement name suffix (for separating different commands) name_suffix = "_mycollector" ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" [[inputs.diskio]] [[inputs.kernel]] [[inputs.mem]] [[inputs.processes]] [[inputs.swap]] [[inputs.system]]
次は、docker-compose.ymlに以下の内容を追加するとTelegrafの設定が完了。
calc_profit_and_loss.py及びposition.jsonをTelegrafに展開しておきます。また、InfluxDBにデータ永続化するため接続情報が必要なので設定しておきます。
telegraf: image: telegraf container_name: telegraf build: context: . dockerfile: ./Dockerfile volumes: - ./src/position.json:/etc/telegraf/position.json:ro - ./src/calc_profit_and_loss.py:/etc/telegraf/calc_profit_and_loss.py:ro - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro - /var/run/docker.sock:/var/run/docker.sock:ro - /sys:/rootfs/sys:ro - /proc:/rootfs/proc:ro - /etc:/rootfs/etc:ro environment: HOST_PROC: /rootfs/proc HOST_SYS: /rootfs/sys HOST_ETC: /rootfs/etc INFLUXDB_HOST: influxdb INFLUXDB_PORT: 8086 INFLUXDB_TOKEN: my-super-secret-auth-token INFLUXDB_ORG: my-org INFLUXDB_BUCKET: my-bucket DOCKER_ALPHA_VANTAGE_API_KEY: MY_REGISTERED_KEY depends_on: - influxdb
最終的にdocker-compose.ymlファイルの内容はいかになりますね。
version: "3" services: influxdb: image: influxdb:2.4.0 ports: - 8086:8086 environment: - DOCKER_INFLUXDB_INIT_MODE=setup - DOCKER_INFLUXDB_INIT_USERNAME=my-user - DOCKER_INFLUXDB_INIT_PASSWORD=my-password - DOCKER_INFLUXDB_INIT_ORG=my-org - DOCKER_INFLUXDB_INIT_BUCKET=my-bucket - DOCKER_INFLUXDB_INIT_RETENTION=1w - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token volumes: - ./data/influxdb2:/var/lib/influxdb2 - ./data/influxdb2-config:/etc/influxdb2 container_name: influxdb grafana: image: grafana/grafana container_name: grafana ports: - 3000:3000 volumes: - ./data/grafana:/var/lib/grafana depends_on: - influxdb telegraf: image: telegraf container_name: telegraf build: context: . dockerfile: ./Dockerfile volumes: - ./src/position.json:/etc/telegraf/position.json:ro - ./src/calc_profit_and_loss.py:/etc/telegraf/calc_profit_and_loss.py:ro - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro - /var/run/docker.sock:/var/run/docker.sock:ro - /sys:/rootfs/sys:ro - /proc:/rootfs/proc:ro - /etc:/rootfs/etc:ro environment: HOST_PROC: /rootfs/proc HOST_SYS: /rootfs/sys HOST_ETC: /rootfs/etc INFLUXDB_HOST: influxdb INFLUXDB_PORT: 8086 INFLUXDB_TOKEN: my-super-secret-auth-token INFLUXDB_ORG: my-org INFLUXDB_BUCKET: my-bucket DOCKER_ALPHA_VANTAGE_API_KEY: MY_REGISTERED_KEY depends_on: - influxdb
2-4.各サービスの連携設定
ここまでGrafana、InfluxDB、Telegrafの環境構築を準備できましたので各サービスを起動しましょう。
まずは、TelegrafのDocker Imageをビルドします。
docker-compose build
次に、各Containerを起動しましょう
docker-compose up
無事にDocker Containerが起動しました。
InfluxDBのWebUIにアクセスしてみます。
Grafanaにアクセスしてから初期PWを変更しましょう。変更後にトップページが表示されます。
早速、GrafanaのData source設定画面でInfluxDBがデータソースを設定しましょう。まずはGrafanaからInfluxDBへの通信設定を行います。
次に読み込む対象InfluxDB Bucketを設定しましょう。
ここでGrafana、InfluxDB、Telegrafの環境構築は一旦終わりました。
3.為替レートデータ取得
Webサイト(為替)のスクレイピングや「MetaTrader + デモ口座(無料)の登録」など無料で為替の時系列を取得する方法はいくつかありますが、今回は「為替API + 無料プランの登録」の方法でデータ取得したいと思います。
3-1.Alpha VantageからAPIキー取得
AlphaVantageは、多くの研究者、エンジニア、ビジネス専門家が使用する無料のストックAPIです。データは株価、為替(FX)、仮想通貨の他にテクニカル指標、経済指標もあります。メールアドレスを登録すればAPIが取得でき、無料でデータを取得できます。
Alpha Vantageが提供するRestAPIにて、リアルタイム情報の取得が可能です。フリープランでの制限事項は「1分あたり最大5つのAPIリクエスト、1日あたり500のリクエスト」となっていますが十分使えるので今回はこのサービスに会員登録してAPIキーを取得しました。
3-2.為替レートデータ確認
Alpha VantageにAPIキー取得できたので、為替レートデータ取得するため以下のAPIを使います。
https://www.alphavantage.co/query?function=FX_INTRADAY&from_symbol=USD&to_symbol=JPY&interval=5min&apikey=demo
PostmanでAPIから返却したデータを確認できました。良さそうですね。
4.損益計算の実装
4-1.損益計算の計算
購入時の数量及びレートデータと最新レートで為替損益を計算するロジックは以下のPythonスクリプトで実装しました。
import pandas as pd import requests, json import time, datetime import os, sys, traceback from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS class CalculateProfitAndLoss(): def __init__(self): url = "http://" + os.getenv('INFLUXDB_HOST') + ":" + os.getenv('INFLUXDB_PORT') self.influxdbClient = InfluxDBClient( url = url, token = os.getenv('INFLUXDB_TOKEN'), org = os.getenv('INFLUXDB_ORG')) def get_alpha_vantage_api_url(self): # Using Alpha Vantage to get JPYVND prices every 5 minutes # Get your key here: https://www.alphavantage.co/support/#api-key apikey = os.getenv("DOCKER_ALPHA_VANTAGE_API_KEY") url = "https://www.alphavantage.co/query?" function = "FX_INTRADAY" from_symbol = "USD" to_symbol = "JPY" interval = "5min" outputsize = "compact" # Build target url target_url = url + "function=" + function + "&from_symbol=" + from_symbol + "&to_symbol=" \ + to_symbol + "&interval=" + interval + "&outputsize=" + outputsize \ + "&apikey=" + apikey return target_url def get_latest_quote(self): target_url = self.get_alpha_vantage_api_url() data = requests.get(target_url).json() #Data is returned in the following format: # https://www.alphavantage.co/query?function=FX_INTRADAY&from_symbol=USD&to_symbol=JPY&interval=5min&apikey=demo t = [t for t in data['Time Series FX (5min)']] # Convert human readable time to unix time and add 9 hour to match Japan Tokyo timezone last_tick = datetime.datetime.strptime(t[0], "%Y-%m-%d %H:%M:%S") + datetime.timedelta(hours=9) unix_ns = last_tick.strftime("%Y-%m-%d %H:%M:%S") fields = [v for k, v in data['Time Series FX (5min)'].items()] last_closed_price = float(fields[0]['4. close']) return unix_ns, last_closed_price def calculate_profit_and_loss_data_point(self): unix_ns, last_closed_price = self.get_latest_quote() with open('/etc/telegraf/position.json') as data_file: data = json.load(data_file) data_point = Point("profit_and_loss").tag("fx_pair", "USDJPY")\ .field("price", last_closed_price)\ .field("value", round(last_closed_price * data["owned_quantity"])) \ .field("purchased_price", data["purchased_price"]) \ .field("purchased_value", round(data["owned_quantity"] * data["purchased_price"])) \ .field("owned_quantity", data["owned_quantity"]) \ .field("profit_and_loss", round((last_closed_price - data["purchased_price"]) * data["owned_quantity"])) return data_point def regist_profit_and_loss_to_influxdb(self): data_point = self.calculate_profit_and_loss_data_point() write_api = self.influxdbClient.write_api(write_options=SYNCHRONOUS) bucket = os.getenv('INFLUXDB_BUCKET') write_api.write(bucket=bucket, record=data_point) if __name__ == "__main__": try: calculator = CalculateProfitAndLoss() calculator.regist_profit_and_loss_to_influxdb() except Exception: traceback.print_exc() sys.exit(1)
position.json内容は以下の感じです。一番簡単なバージョンでドルの保持数量及び(一括)購入時のレート情報ですね。
このファイルは取引時に自動更新する必要がありますが今回のブログの範囲外にしました。
{ "owned_quantity": 10000, "purchased_price": 138.45 }
4-2.TelegrafのExecプラグインに設定
Telegrafを構築時に必要な環境変数及びPythonスクリプト名を指定しましたので、Script更新やAPIキーを再設定する必要な時はdocker-composeのdown->upするだけで反映されると思います。
5.動作確認
Grafanaで為替損益グラフを確認できました。Dashboardが簡単に作れるのがGrafanaの特徴ですね。
まとめ
ここまでは一通り今回のやりたいことを実現できました。5分毎にAlphaVantageからUSDJPYの為替レートデータが自動的に格納されて、同時に為替損益も計算されましたので、Grafanaで為替レート変動だけではなく、損益の状況も随時に確認できますね。特にAlpha VantageのAPIが1分足データも無料で提供しているのでTelegrafの設定で頻度を上げると直近の1分で為替損益の状況を確認できます。
また、今回はドル円を絞って記事を書きましたが複数通貨ペアでも簡単に対応できると思いますので、確認する必要な方はぜひ試してください。
宣伝
次世代システム研究室では、最新のテクノロジーを調査・検証しながらインターネットのいろんなアプリケーションの開発を行うアーキテクトを募集しています。募集職種一覧 からご応募をお待ちしています。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD