ものづくりのブログ

うちのネコを題材にしたものづくりができたらいいなと思っていろいろ奮闘してます。

【raspberrypi】MPU6050 で移動した履歴を描画してみる

ラズペリーパイで、mpu6050 で取得したデータをmatplotlibを使って XY 軸のマップに描画する処理を作ってみました。

処理イメージ


準備

ライブラリインストール

$ pip install flask flask-socketio smbus matplotlib numpy

コード

MPU6050 データ取得

mpu6050.py というファイルを作成し、以下のコードを追加します。

import smbus

class MPU6050:
    def __init__(self, bus_num=1, device_address=0x68):
        self.bus = smbus.SMBus(bus_num)
        self.device_address = device_address

        # MPU6050 初期化
        self.bus.write_byte_data(self.device_address, 0x6B, 0)

    def read_raw_data(self, addr):
        # 2バイトのデータを読み取る
        high = self.bus.read_byte_data(self.device_address, addr)
        low = self.bus.read_byte_data(self.device_address, addr + 1)

        value = ((high << 8) | low)
        if value > 32768:
            value = value - 65536
        return value

    def get_acceleration(self):
        # 加速度データ取得
        acc_x = self.read_raw_data(0x3B)
        acc_y = self.read_raw_data(0x3D)
        return acc_x, acc_y

Flask-SocketIO と Matplotlib を使った処理

Flask と Flask-SocketIO を使ってリアルタイムでデータを送信し、履歴を残しつつデータをスムージングして Matplotlib で描画するサーバースクリプトを作成します。app.py というファイルを作成し、以下のコードを追加します。

from flask import Flask, render_template, send_file
from flask_socketio import SocketIO
from mpu6050 import MPU6050
import threading
import time
import matplotlib.pyplot as plt
import numpy as np
import io

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)

mpu = MPU6050()
data_points = {'x': [], 'y': []}

def smooth_data(data, window_size=5):
    """データを移動平均フィルタでスムージング"""
    if len(data) < window_size:
        return data
    return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

def read_sensor_data():
    while True:
        # MPU6050 から加速度データを取得
        acc_x, acc_y = mpu.get_acceleration()
        # データを保存
        data_points['x'].append(acc_x)
        data_points['y'].append(acc_y)

        # 履歴が1000点を超えたら古いデータを削除
        if len(data_points['x']) > 1000:
            data_points['x'].pop(0)
            data_points['y'].pop(0)

        # クライアントにプロットの更新を通知
        socketio.emit('update_plot')
        time.sleep(0.1)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/plot.png')
def plot_png():

    smoothed_x = smooth_data(data_points['x'])
    smoothed_y = smooth_data(data_points['y'])

    plt.figure(figsize=(6, 6))
    plt.plot(smoothed_x, smoothed_y, 'bo-', markersize=3)
    plt.title('MPU6050 XY Axis Data (Smoothed)')
    plt.xlabel('X Acceleration')
    plt.ylabel('Y Acceleration')
    plt.grid()

    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    plt.close()
    return send_file(buf, mimetype='image/png')

# 別スレッドでセンサーのデータ取得を開始
sensor_thread = threading.Thread(target=read_sensor_data)
sensor_thread.daemon = True
sensor_thread.start()

if __name__ == '__main__':
    socketio.run(app, host='0.0.0.0', port=5000)

フロントエンド

HTML 側で 1 秒間隔でプロット画像を自動更新するために、フロントエンドを作成します。templates/index.html ファイルを作成し、以下のコードを追加します。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MPU6050 XY Axis Visualization</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
    <script>
        var socket = io();

        function updatePlot() {
            var plot = document.getElementById('plot');
            plot.src = '/plot.png?' + new Date().getTime(); // キャッシュを回避するためにタイムスタンプを追加
        }

        socket.on('update_plot', function() {
            setTimeout(updatePlot, 1000); // 1秒ごとに更新
        });
    </script>
</head>
<body>
    <h1>MPU6050 XY Axis Visualization (Smoothed)</h1>
    <img id="plot" src="/plot.png" width="500" height="500" alt="XY Plot">
</body>
</html>

起動

ブラウザで http://:5000 にアクセスすると、スムージングされたデータと履歴付きで表示されます。

$ python app.py

その他

最新のプロット色が濃く、古くなるにつれて色が薄くなるようにする処理を入れたところ更新が遅くなり見にくくなってしまいました。