Python ビットコイン データ自動取得・指標算出 サンプル実装

今回は、Pythonでビットコインのデータを定期取得し、簡単なデータ指標を算出する実装のメモです。

内容は、無料で公開されているAPIを利用してビットコインの価格を定期的に保存し、
簡単な移動平均基準のトレンド判別処理等を実装しています。

※以下を事前にインストール・動作環境構築済みの前提の内容です。

  • Python
  • Docker

バージョン

  • Python:3.13.9
  • 各ライブラリバージョン(requirements.txt)
fastapi==0.115.0
uvicorn[standard]==0.32.0
requests==2.31.0
pymysql==1.1.0
cryptography==41.0.7
python-dotenv==1.0.1
apscheduler==3.10.4
numpy==1.26.2

Python環境準備

プロジェクト用に、Pythonを実行する環境を構築します。

仮想環境構築

python -m venv .venv

仮想環境有効化

source .venv/bin/activate

各種モジュールインストール

pip install --no-cache-dir -r requirements.txt

DB準備

今回は、DockerでMariaDBを用意します。
ビットコイン価格を保存するテーブルは以下とします。

・db/init.sql


-- 価格データテーブル
CREATE TABLE IF NOT EXISTS btc_prices (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    last_price DECIMAL(15, 2) NOT NULL,
    buy_price DECIMAL(15, 2) NOT NULL,
    sell_price DECIMAL(15, 2) NOT NULL,
    high_24h_price DECIMAL(15, 2) NOT NULL,
    low_24h_price DECIMAL(15, 2) NOT NULL,
    volume_24h DECIMAL(20, 8) NOT NULL,
    created_at DATETIME(6) DEFAULT CURRENT_TIMESTAMP(6),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

以下を用意して、docker-compose up -dします。

・docker-compose.yaml

version: '3.8'

services:
  mariadb:
    image: mariadb:11.4
    container_name: mariadb_container
    environment:
      MARIADB_ROOT_PASSWORD: root_password
      MARIADB_DATABASE: db_name
      MARIADB_USER: db_user
      MARIADB_PASSWORD: db_user_password
      TZ: Asia/Tokyo
    ports:
      - "5306:3306"
    volumes:
      - mariadb_data:/var/lib/mysql
      - ./db/init.sql:/docker-entrypoint-initdb.d/init.sql # コンテナ新規作成時に初期化
    command: >
      --character-set-server=utf8mb4 
      --collation-server=utf8mb4_unicode_ci
      --default-time-zone='+09:00'
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "healthcheck.sh", "--connect", "--innodb_initialized"]
      interval: 10s
      timeout: 5s
      retries: 3

volumes:
  mariadb_data:

実装 (設定定義)

接続設定などをまとめておきます。
実業務では環境変数から読み込んだりしますが、今回は定数定義して利用します。

config.py


class Settings:
    """設定クラス"""
    
    def __init__(self):
        # Database Settings
        self.db_host = 'localhost'
        self.db_port = 5306
        self.db_root_password = 'root_password'
        self.db_name = 'testing_db'
        self.db_user = 'test_user'
        self.db_password = 'test_password'
        # API Settings
        self.coincheck_api_url_ticker = 'https://coincheck.com/api/ticker'

実装 (ビットコイン価格取得・保存)

今回は、CoinCheckの提供しているAPIを利用します。
https://coincheck.com/api/ticker

こちらのAPIは無料で公開されています。
・公式ドキュメント:https://coincheck.com/ja/documents/exchange/api
※無料ですが、短時間に連続で呼び出すなど負荷がかかる呼び出しは止めましょう。
詳細は公式情報をご確認ください。

サービスクラスに実行したいアクションごとの関数を用意しました。

app/service/bitcoin_data_service.py


import requests
from typing import Dict, Optional
from config import Settings
from repository.btc_price_repository import BtcPriceRepository


class BitcoinDataService:
    """ビットコインデータの取得・保存を担当するService"""
    
    def __init__(self, settings: Settings, repository: BtcPriceRepository):
        """
        コンストラクタ
        
        Args:
            settings: 設定オブジェクト
            repository: BtcPriceRepository
        """
        self.settings = settings
        self.repository = repository
    
    def fetch_bitcoin_data(self) -> Dict:
        """
        Coincheck APIからビットコインのティッカーデータを取得
        
        Returns:
            APIレスポンス {
                'last': 最終取引価格,
                'bid': 買い価格,
                'ask': 売り価格,
                'high': 24時間高値,
                'low': 24時間安値,
                'volume': 24時間取引量,
                'timestamp': タイムスタンプ
            }
        
        Raises:
            requests.exceptions.RequestException: API呼び出しに失敗した場合
        """
        try:
            response = requests.get(
                self.settings.coincheck_api_url_ticker,
                timeout=10
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            raise Exception(f"Failed to fetch Bitcoin data from Coincheck API: {str(e)}")
    
    def save_bitcoin_data(self, api_data: Dict) -> int:
        """
        取得したAPIデータをDBに保存
        
        Args:
            api_data: Coincheck APIから取得したデータ
        
        Returns:
            挿入されたレコードのID
        """
        # APIレスポンスからDB用のデータを構築
        price_data = {
            'last_price': float(api_data.get('last', 0)),
            'buy_price': float(api_data.get('bid', 0)),
            'sell_price': float(api_data.get('ask', 0)),
            'high_24h_price': float(api_data.get('high', 0)),
            'low_24h_price': float(api_data.get('low', 0)),
            'volume_24h': float(api_data.get('volume', 0))
        }
        
        return self.repository.insert(price_data)
    
    def fetch_and_save(self) -> Dict:
        """
        APIからビットコインデータを取得して、DBに保存する一連の処理
        
        Returns:
            {
                'success': bool,
                'record_id': 保存されたレコードID,
                'data': 保存されたデータ,
                'error': エラーメッセージ(エラー時のみ)
            }
        """
        try:
            # APIからデータ取得
            api_data = self.fetch_bitcoin_data()
            
            # DBに保存
            record_id = self.save_bitcoin_data(api_data)
            
            return {
                'success': True,
                'record_id': record_id,
                'data': api_data
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_latest_price(self) -> Optional[Dict]:
        """
        DBから最新の価格データを1件取得
        
        Returns:
            最新の価格データ、データがない場合None
        """
        latest_data = self.repository.find_latest(limit=1)
        return latest_data[0] if latest_data else None

DBに関する処理は、Repositoryにまとめています。


import pymysql
from typing import List, Dict
from datetime import datetime
from config import Settings


class BtcPriceRepository:
    """ビットコイン価格データのRepository"""
    
    def __init__(self, settings: Settings):
        """
        コンストラクタ
        
        Args:
            settings: 設定オブジェクト
        """
        self.settings = settings
    
    def _get_connection(self):
        """DB接続を取得"""
        return pymysql.connect(
            host=self.settings.db_host,
            port=self.settings.db_port,
            user=self.settings.db_user,
            password=self.settings.db_password,
            database=self.settings.db_name,
            cursorclass=pymysql.cursors.DictCursor,
            charset='utf8mb4'
        )
    
    def insert(self, price_data: Dict) -> int:
        """
        価格データを保存
        
        Args:
            price_data: {
                'last_price': float,
                'buy_price': float,
                'sell_price': float,
                'high_24h_price': float,
                'low_24h_price': float,
                'volume_24h': float
            }
        
        Returns:
            挿入されたレコードのID
        """
        connection = self._get_connection()
        try:
            with connection.cursor() as cursor:
                sql = """
                    INSERT INTO btc_prices 
                    (last_price, buy_price, sell_price, high_24h_price, low_24h_price, volume_24h)
                    VALUES (%s, %s, %s, %s, %s, %s)
                """
                cursor.execute(sql, (
                    price_data['last_price'],
                    price_data['buy_price'],
                    price_data['sell_price'],
                    price_data['high_24h_price'],
                    price_data['low_24h_price'],
                    price_data['volume_24h']
                ))
                connection.commit()
                return cursor.lastrowid
        finally:
            connection.close()
    
    def find_latest(self, limit: int = 1) -> List[Dict]:
        """
        最新の価格データを取得
        
        Args:
            limit: 取得件数
        
        Returns:
            価格データのリスト
        """
        connection = self._get_connection()
        try:
            with connection.cursor() as cursor:
                sql = """
                    SELECT * FROM btc_prices 
                    ORDER BY created_at DESC 
                    LIMIT %s
                """
                cursor.execute(sql, (limit,))
                return cursor.fetchall()
        finally:
            connection.close()
    
    def find_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Dict]:
        """
        期間を指定して価格データを取得
        
        Args:
            start_date: 開始日時
            end_date: 終了日時
        
        Returns:
            価格データのリスト(created_at昇順)
        """
        connection = self._get_connection()
        try:
            with connection.cursor() as cursor:
                sql = """
                    SELECT * FROM btc_prices 
                    WHERE created_at BETWEEN %s AND %s
                    ORDER BY created_at ASC
                """
                cursor.execute(sql, (start_date, end_date))
                return cursor.fetchall()
        finally:
            connection.close()
    
    def find_recent(self, limit: int) -> List[Dict]:
        """
        最新N件の価格データを取得(昇順)
        
        Args:
            limit: 取得件数
        
        Returns:
            価格データのリスト(created_at昇順)
        """
        connection = self._get_connection()
        try:
            with connection.cursor() as cursor:
                sql = """
                    SELECT * FROM (
                        SELECT * FROM btc_prices 
                        ORDER BY created_at DESC 
                        LIMIT %s
                    ) AS subquery
                    ORDER BY created_at ASC
                """
                cursor.execute(sql, (limit,))
                return cursor.fetchall()
        finally:
            connection.close()
    
    def count_all(self) -> int:
        """
        全レコード数を取得
        
        Returns:
            レコード数
        """
        connection = self._get_connection()
        try:
            with connection.cursor() as cursor:
                sql = "SELECT COUNT(*) as count FROM btc_prices"
                cursor.execute(sql)
                result = cursor.fetchone()
                return result['count'] if result else 0
        finally:
            connection.close()

実装 (ビットコイン移動平均基準のトレンド判定等)

ビットコインの価格データを元に、トレンド判定や移動平均等の指標を算出するサービスも実装してみます。

app/service/trend_analysis_service.py


from typing import List, Dict
from enum import Enum
import numpy as np
from repository.btc_price_repository import BtcPriceRepository


class TrendType(Enum):
    """トレンドタイプ"""
    UPTREND = "上昇トレンド"
    DOWNTREND = "下降トレンド"
    SIDEWAYS = "横ばい"
    UNKNOWN = "不明"


class CrossoverType(Enum):
    """クロスオーバータイプ"""
    GOLDEN_CROSS = "ゴールデンクロス"  # 短期MAが長期MAを上抜け
    DEAD_CROSS = "デッドクロス"  # 短期MAが長期MAを下抜け
    NONE = "なし"


class TrendAnalysisService:
    """移動平均を基準にトレンド判定を行うService"""
    
    def __init__(self, repository: BtcPriceRepository):
        """
        コンストラクタ
        
        Args:
            repository: BtcPriceRepository
        """
        self.repository = repository
    
    def calculate_moving_average(self, prices: List[float], period: int) -> List[float]:
        """
        単純移動平均(SMA)を計算
        
        Args:
            prices: 価格のリスト
            period: 移動平均の期間
        
        Returns:
            移動平均のリスト(期間未満のデータはNaNになる)
        """
        if len(prices) < period:
            return []
        
        prices_array = np.array(prices)
        moving_averages = []
        
        for i in range(len(prices_array)):
            if i < period - 1: moving_averages.append(np.nan) else: ma = np.mean(prices_array[i - period + 1:i + 1]) moving_averages.append(ma) return moving_averages def analyze_trend(self, short_period: int = 5, long_period: int = 25) -> Dict:
        """
        移動平均を基準にトレンドを判定
        
        Args:
            short_period: 短期移動平均の期間(デフォルト: 5)
            long_period: 長期移動平均の期間(デフォルト: 25)
        
        Returns:
            {
                'trend': TrendType,
                'short_ma': 短期移動平均の最新値,
                'long_ma': 長期移動平均の最新値,
                'current_price': 最新価格,
                'data_count': 使用したデータ数,
                'message': 判定メッセージ
            }
        """
        # 長期MAを計算するのに必要なデータを取得
        required_data = long_period + 10  # 余裕を持って取得
        price_records = self.repository.find_recent(limit=required_data)
        
        if len(price_records) < long_period: return { 'trend': TrendType.UNKNOWN, 'short_ma': None, 'long_ma': None, 'current_price': None, 'data_count': len(price_records), 'message': f'データ不足: 長期MA計算には{long_period}件以上必要' } # 価格データを抽出 prices = [float(record['last_price']) for record in price_records] # 移動平均を計算 short_ma_list = self.calculate_moving_average(prices, short_period) long_ma_list = self.calculate_moving_average(prices, long_period) # 最新値を取得 current_price = prices[-1] short_ma = short_ma_list[-1] long_ma = long_ma_list[-1] # トレンド判定 if short_ma > long_ma:
            trend = TrendType.UPTREND
            message = f"短期MA({short_ma:.2f}) > 長期MA({long_ma:.2f}) - 上昇トレンド"
        elif short_ma < long_ma:
            trend = TrendType.DOWNTREND
            message = f"短期MA({short_ma:.2f}) < 長期MA({long_ma:.2f}) - 下降トレンド" else: trend = TrendType.SIDEWAYS message = f"短期MA({short_ma:.2f}) = 長期MA({long_ma:.2f}) - 横ばい" return { 'trend': trend, 'short_ma': short_ma, 'long_ma': long_ma, 'current_price': current_price, 'data_count': len(price_records), 'message': message } def detect_crossover(self, short_period: int = 5, long_period: int = 25) -> Dict:
        """
        ゴールデンクロス・デッドクロスを検出
        
        Args:
            short_period: 短期移動平均の期間(デフォルト: 5)
            long_period: 長期移動平均の期間(デフォルト: 25)
        
        Returns:
            {
                'crossover_type': CrossoverType,
                'current_short_ma': 最新の短期MA,
                'current_long_ma': 最新の長期MA,
                'previous_short_ma': 1つ前の短期MA,
                'previous_long_ma': 1つ前の長期MA,
                'message': 判定メッセージ
            }
        """
        # クロスを検出するため、最低2つのMA値が必要
        required_data = long_period + 10
        price_records = self.repository.find_recent(limit=required_data)
        
        if len(price_records) < long_period + 1: return { 'crossover_type': CrossoverType.NONE, 'current_short_ma': None, 'current_long_ma': None, 'previous_short_ma': None, 'previous_long_ma': None, 'message': 'データ不足: クロス検出には十分なデータが必要' } # 価格データを抽出 prices = [float(record['last_price']) for record in price_records] # 移動平均を計算 short_ma_list = self.calculate_moving_average(prices, short_period) long_ma_list = self.calculate_moving_average(prices, long_period) # 最新と1つ前のMA値を取得 current_short_ma = short_ma_list[-1] current_long_ma = long_ma_list[-1] previous_short_ma = short_ma_list[-2] if len(short_ma_list) > 1 else None
        previous_long_ma = long_ma_list[-2] if len(long_ma_list) > 1 else None
        
        # クロスオーバー判定
        crossover_type = CrossoverType.NONE
        message = "クロスは発生していません"
        
        if previous_short_ma and previous_long_ma:
            # ゴールデンクロス: 前回は短期MA < 長期MA、今回は短期MA > 長期MA
            if previous_short_ma < previous_long_ma and current_short_ma > current_long_ma:
                crossover_type = CrossoverType.GOLDEN_CROSS
                message = f"ゴールデンクロス発生! 短期MA({current_short_ma:.2f})が長期MA({current_long_ma:.2f})を上抜け"
            # デッドクロス: 前回は短期MA > 長期MA、今回は短期MA < 長期MA elif previous_short_ma > previous_long_ma and current_short_ma < current_long_ma:
                crossover_type = CrossoverType.DEAD_CROSS
                message = f"デッドクロス発生! 短期MA({current_short_ma:.2f})が長期MA({current_long_ma:.2f})を下抜け"
        
        return {
            'crossover_type': crossover_type,
            'current_short_ma': current_short_ma,
            'current_long_ma': current_long_ma,
            'previous_short_ma': previous_short_ma,
            'previous_long_ma': previous_long_ma,
            'message': message
        }

実装 (ビットコインのRSI判定等)

ビットコインの価格データを元に、RSIに基づく判定を実装してみます。

app/service/rsi_calculation_service.py


from typing import List, Dict
from enum import Enum
import numpy as np
from repository.btc_price_repository import BtcPriceRepository


class RsiSignal(Enum):
    """RSIシグナル"""
    OVERBOUGHT = "買われすぎ"  # RSI > 70
    OVERSOLD = "売られすぎ"  # RSI < 30
    NEUTRAL = "中立"  # 30 <= RSI <= 70 class RsiCalculationService: """RSI(相対力指数)を計算するService""" def __init__(self, repository: BtcPriceRepository): """ コンストラクタ Args: repository: BtcPriceRepository """ self.repository = repository def calculate_rsi(self, prices: List[float], period: int = 14) -> List[float]:
        """
        RSIを計算
        
        Args:
            prices: 価格のリスト
            period: RSI計算期間(デフォルト: 14)
        
        Returns:
            RSI値のリスト(期間未満のデータはNaNになる)
        """
        if len(prices) < period + 1: return [] # 価格変化を計算 prices_array = np.array(prices) deltas = np.diff(prices_array) # 上昇幅と下落幅に分ける gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0) rsi_list = [] # 最初のRSIは単純平均で計算 avg_gain = np.mean(gains[:period]) avg_loss = np.mean(losses[:period]) # 最初のperiod日分はNaN for _ in range(period): rsi_list.append(np.nan) # 最初のRSI値 if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi_list.append(rsi) # 以降は移動平均(スムージング)で計算 for i in range(period, len(deltas)): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi_list.append(rsi) return rsi_list def get_latest_rsi(self, period: int = 14) -> Dict:
        """
        最新のRSI値を取得・計算
        
        Args:
            period: RSI計算期間(デフォルト: 14)
        
        Returns:
            {
                'rsi': RSI値,
                'current_price': 最新価格,
                'data_count': 使用したデータ数,
                'message': 計算メッセージ
            }
        """
        # RSI計算に必要なデータを取得(period + 1 + 余裕)
        required_data = period + 20
        price_records = self.repository.find_recent(limit=required_data)
        
        if len(price_records) < period + 1: return { 'rsi': None, 'current_price': None, 'data_count': len(price_records), 'message': f'データ不足: RSI計算には{period + 1}件以上必要' } # 価格データを抽出 prices = [float(record['last_price']) for record in price_records] # RSIを計算 rsi_list = self.calculate_rsi(prices, period) # 最新値を取得 current_price = prices[-1] latest_rsi = rsi_list[-1] if rsi_list else None if latest_rsi is None or np.isnan(latest_rsi): return { 'rsi': None, 'current_price': current_price, 'data_count': len(price_records), 'message': 'RSI計算エラー' } return { 'rsi': latest_rsi, 'current_price': current_price, 'data_count': len(price_records), 'message': f'RSI({period}): {latest_rsi:.2f}' } def analyze_rsi_signal(self, rsi: float, overbought_threshold: float = 70.0, oversold_threshold: float = 30.0) -> Dict:
        """
        RSIに基づく売買シグナルを判定
        
        Args:
            rsi: RSI値
            overbought_threshold: 買われすぎ閾値(デフォルト: 70)
            oversold_threshold: 売られすぎ閾値(デフォルト: 30)
        
        Returns:
            {
                'signal': RsiSignal,
                'rsi': RSI値,
                'message': シグナルメッセージ
            }
        """
        if rsi is None:
            return {
                'signal': RsiSignal.NEUTRAL,
                'rsi': None,
                'message': 'RSI値が取得できません'
            }
        
        if rsi > overbought_threshold:
            signal = RsiSignal.OVERBOUGHT
            message = f"買われすぎ: RSI {rsi:.2f} > {overbought_threshold} - 売りシグナル"
        elif rsi < oversold_threshold:
            signal = RsiSignal.OVERSOLD
            message = f"売られすぎ: RSI {rsi:.2f} < {oversold_threshold} - 買いシグナル" else: signal = RsiSignal.NEUTRAL message = f"中立: RSI {rsi:.2f} - シグナルなし" return { 'signal': signal, 'rsi': rsi, 'message': message } def get_rsi_with_signal(self, period: int = 14, overbought_threshold: float = 70.0, oversold_threshold: float = 30.0) -> Dict:
        """
        最新のRSI値を計算し、シグナルも含めて返す
        
        Args:
            period: RSI計算期間(デフォルト: 14)
            overbought_threshold: 買われすぎ閾値(デフォルト: 70)
            oversold_threshold: 売られすぎ閾値(デフォルト: 30)
        
        Returns:
            {
                'rsi': RSI値,
                'signal': RsiSignal,
                'current_price': 最新価格,
                'data_count': 使用したデータ数,
                'message': 総合メッセージ
            }
        """
        # RSI値を取得
        rsi_result = self.get_latest_rsi(period)
        
        if rsi_result['rsi'] is None:
            return {
                'rsi': None,
                'signal': RsiSignal.NEUTRAL,
                'current_price': rsi_result['current_price'],
                'data_count': rsi_result['data_count'],
                'message': rsi_result['message']
            }
        
        # シグナルを判定
        signal_result = self.analyze_rsi_signal(
            rsi_result['rsi'],
            overbought_threshold,
            oversold_threshold
        )
        
        return {
            'rsi': rsi_result['rsi'],
            'signal': signal_result['signal'],
            'current_price': rsi_result['current_price'],
            'data_count': rsi_result['data_count'],
            'message': signal_result['message']
        }


実装 ルーティング・定期実行処理

Fast APIで移動平均の処理や、RSI分析処理を実行するためのエンドポイントを提供します。

また、定期実行するためにAPSchedulerを利用して実装します。

app/main.py


"""
FastAPIで各処理のエンドポイント設置 + APScheduler でビットコインデータを定期取得
"""

from fastapi import FastAPI, Query
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from contextlib import asynccontextmanager

from config import Settings
from repository.btc_price_repository import BtcPriceRepository
from service.bitcoin_data_service import BitcoinDataService
from service.trend_analysis_service import TrendAnalysisService, TrendType
from service.rsi_calculation_service import RsiCalculationService, RsiSignal

# グローバル変数でスケジューラーと実行状態を管理
scheduler = BackgroundScheduler()
last_execution = {"time": None, "status": None, "message": None}


def scheduled_fetch_and_save():
    """スケジュール実行される関数"""
    try:
        print(f"[{datetime.now()}] データ取得開始...")
        
        # 設定とリポジトリの初期化
        settings = Settings()
        repository = BtcPriceRepository(settings)
        
        # BitcoinDataServiceの初期化
        bitcoin_service = BitcoinDataService(settings, repository)
        
        # APIからデータ取得 → DB保存
        result = bitcoin_service.fetch_and_save()
        
        if result['success']:
            message = f"✓ データ保存成功! レコードID: {result['record_id']}, 価格: {result['data']['last']} JPY"
            print(message)
            last_execution["time"] = datetime.now().isoformat()
            last_execution["status"] = "success"
            last_execution["message"] = message
            last_execution["data"] = result['data']
        else:
            message = f"✗ エラー: {result['error']}"
            print(message)
            last_execution["time"] = datetime.now().isoformat()
            last_execution["status"] = "error"
            last_execution["message"] = message
            
    except Exception as e:
        error_message = f"予期しないエラー: {str(e)}"
        print(f"[{datetime.now()}] {error_message}")
        last_execution["time"] = datetime.now().isoformat()
        last_execution["status"] = "error"
        last_execution["message"] = error_message


@asynccontextmanager
async def lifespan(app: FastAPI):
    """アプリケーションのライフサイクル管理"""
    # 起動時の処理
    print("=" * 60)
    print("FastAPI + APScheduler 起動")
    print("=" * 60)
    
    # スケジューラーにジョブを追加(毎分実行)
    scheduler.add_job(
        scheduled_fetch_and_save,
        trigger=CronTrigger(minute="*"),  # 毎分実行
        id="fetch_bitcoin_data",
        name="ビットコインデータ取得",
        replace_existing=True
    )
    
    # スケジューラー開始
    scheduler.start()
    print("✓ スケジューラー開始(毎分実行)")
    print("=" * 60)
    
    yield
    
    # 終了時の処理
    print("\nスケジューラーを停止しています...")
    scheduler.shutdown()
    print("✓ スケジューラー停止完了")


# FastAPIアプリケーション作成
app = FastAPI(
    title="Bitcoin Data Collector API",
    description="ビットコインデータを定期的に取得してDBに保存するAPI",
    version="1.0.0",
    lifespan=lifespan
)


@app.get("/")
async def root():
    """ルートエンドポイント"""
    return {
        "message": "Bitcoin Data Collector API",
        "status": "running",
        "scheduler_status": "active" if scheduler.running else "inactive"
    }


@app.get("/status")
async def get_status():
    """スケジューラーの状態と最終実行結果を取得"""
    return {
        "scheduler_running": scheduler.running,
        "last_execution": last_execution,
        "next_run": str(scheduler.get_jobs()[0].next_run_time) if scheduler.get_jobs() else None
    }


@app.get("/latest")
async def get_latest_price():
    """最新のビットコイン価格をDBから取得"""
    try:
        settings = Settings()
        repository = BtcPriceRepository(settings)
        bitcoin_service = BitcoinDataService(settings, repository)
        
        latest = bitcoin_service.get_latest_price()
        
        if latest:
            return {
                "success": True,
                "data": latest
            }
        else:
            return {
                "success": False,
                "message": "データが見つかりません"
            }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


@app.post("/fetch-now")
async def fetch_now():
    """手動でデータ取得を実行"""
    try:
        scheduled_fetch_and_save()
        return {
            "success": True,
            "message": "データ取得を実行しました",
            "last_execution": last_execution
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


@app.get("/analysis/trend")
async def get_trend_analysis(
  short_period: int = Query(default=5, description="短期移動平均の期間(1分単位)"),
  long_period: int = Query(default=25, description="長期移動平均の期間(1分単位)")
):
    """移動平均によるトレンド分析"""
    try:
        settings = Settings()
        repository = BtcPriceRepository(settings)
        trend_service = TrendAnalysisService(repository)
        
        result = trend_service.analyze_trend(short_period=short_period, long_period=long_period)
        
        return {
            "success": True,
            "trend": result['trend'].value,
            "short_ma": result['short_ma'],
            "long_ma": result['long_ma'],
            "current_price": result['current_price'],
            "message": result['message']
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


@app.get("/analysis/crossover")
async def get_crossover_detection(
    short_period: int = Query(default=5, description="短期移動平均の期間"),
    long_period: int = Query(default=25, description="長期移動平均の期間")
):
    """ゴールデンクロス・デッドクロスの検出"""
    try:
        settings = Settings()
        repository = BtcPriceRepository(settings)
        trend_service = TrendAnalysisService(repository)
        
        result = trend_service.detect_crossover(short_period=short_period, long_period=long_period)
        
        return {
            "success": True,
            "crossover_type": result['crossover_type'].value,
            "current_short_ma": result['current_short_ma'],
            "current_long_ma": result['current_long_ma'],
            "previous_short_ma": result['previous_short_ma'],
            "previous_long_ma": result['previous_long_ma'],
            "message": result['message']
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


@app.get("/analysis/rsi")
async def get_rsi_analysis(
    period: int = Query(default=14, description="RSI計算期間")
):
    """RSI計算とシグナル判定"""
    try:
        settings = Settings()
        repository = BtcPriceRepository(settings)
        rsi_service = RsiCalculationService(repository)
        
        result = rsi_service.get_rsi_with_signal(period=period)
        
        return {
            "success": True,
            "rsi": result['rsi'],
            "signal": result['signal'].value if result['signal'] else None,
            "current_price": result['current_price'],
            "message": result['message']
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


@app.get("/analysis/comprehensive")
async def get_comprehensive_analysis(
    short_period: int = Query(default=5, description="短期移動平均の期間"),
    long_period: int = Query(default=25, description="長期移動平均の期間"),
    rsi_period: int = Query(default=14, description="RSI計算期間")
):
    """総合分析(トレンド + クロスオーバー + RSI)"""
    try:
        settings = Settings()
        repository = BtcPriceRepository(settings)
        
        bitcoin_service = BitcoinDataService(settings, repository)
        trend_service = TrendAnalysisService(repository)
        rsi_service = RsiCalculationService(repository)
        
        # 最新価格取得
        latest_price = bitcoin_service.get_latest_price()
        
        # トレンド分析
        trend_result = trend_service.analyze_trend(short_period=short_period, long_period=long_period)
        
        # クロスオーバー検出
        crossover_result = trend_service.detect_crossover(short_period=short_period, long_period=long_period)
        
        # RSI分析
        rsi_result = rsi_service.get_rsi_with_signal(period=rsi_period)
        
        # 総合判断
        recommendation = ""
        if trend_result['trend'] == TrendType.UPTREND and rsi_result['signal'] != RsiSignal.OVERBOUGHT:
            recommendation = "上昇トレンド継続中、買いポジション検討可能"
        elif trend_result['trend'] == TrendType.DOWNTREND and rsi_result['signal'] != RsiSignal.OVERSOLD:
            recommendation = "下降トレンド継続中、売りポジション検討可能"
        elif rsi_result['signal'] == RsiSignal.OVERBOUGHT:
            recommendation = "買われすぎ警戒、利確または様子見推奨"
        elif rsi_result['signal'] == RsiSignal.OVERSOLD:
            recommendation = "売られすぎ、買いエントリー検討可能"
        else:
            recommendation = "様子見、明確なシグナルなし"
        
        return {
            "success": True,
            "latest_price": latest_price,
            "trend_analysis": {
                "trend": trend_result['trend'].value,
                "short_ma": trend_result['short_ma'],
                "long_ma": trend_result['long_ma'],
                "message": trend_result['message']
            },
            "crossover_detection": {
                "crossover_type": crossover_result['crossover_type'].value,
                "message": crossover_result['message']
            },
            "rsi_analysis": {
                "rsi": rsi_result['rsi'],
                "signal": rsi_result['signal'].value if rsi_result['signal'] else None,
                "message": rsi_result['message']
            },
            "recommendation": recommendation
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=True
    )


実行確認

サービス起動

以下コマンド等で、サービスを起動します。

python app/main.py

起動するとスケジューラも毎分起動して、ビットコインの終値を定期的に取得してくれます。

Python bitcoin Price auto collect implement sample

Python bitcoin Price auto collect implement sample

特定の処理実行用のAPIも利用できるようになっています。

Python bitcoin Price auto collect implement sample

25分間以上のデータが溜まった段階で、トレンド判定APIも実行してみます。
http://localhost:8000/docsでSwaggerを開いて実行確認します。

python bitcoin analysis implement sample

python bitcoin analysis implement sample


今回のメモは以上となります。

データとして採用しやすいため、今回はビットコインのデータを収集して試してみました。
暗号通貨取引所・口座の各社が仮想通貨に関する便利なAPIを公開しているので、
自作の自動取引処理やデータ分析を実装する際に助かります。

今回は、numpyで計算して処理していますが、
指標計算用のライブラリなどもPythonは存在します。(TA-Libなど)

今回のレベルの内容であれば、ライブラリを利用すると簡易的に実装できると思いますのでおすすめです。

都内でエンジニアをやっています。 2017年に脱サラ(法人設立)しました。 仕事で調べたことや、気になったことをメモしています。
投稿を作成しました 189

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA


関連投稿

検索語を上に入力し、 Enter キーを押して検索します。キャンセルするには ESC を押してください。

トップに戻る