فهرست بستن

پیاده‌سازی رگرسور خطی در پایتون

رگرسیون خطی تخمین یک پارامتر به‌صورت تابعی خطی از تعدادی داده دیگر می‌باشد. در عمل ما به دنبال به دست آوردن یک خط یا در حالت کلی اَبَر صفحه هستیم که با توجه به معیار خطای تعریف شده بهینه است. این روش کاربرد ویژه در علوم و مهندسی داشته و آشنایی با نحوه پیاده‌سازی آن در پایتون موضوع این نوشته می‌باشد.

بسم الله الرحمن الرحیم

تئوری این مبحث محل بحث در این مقاله نمی‌باشد و فرض می‌گردد خواننده با مباحث ریاضیاتی رگرسیون خطی آشنایی دارد. لذا مستقیم به پیاده‌سازی روش می‌پردازیم. ابتدای کار کتابخانه‌هایی که برای نوشتن و پیاده‌سازی رگرسیون خطی مورد نیاز از فراخوانی می‌نماییم. کتابخانه‌های مورد نیاز برای ما numpy برای محاسبات ماتریسی و ریاضی، matplotlib برای بازنمایی داده‌ها و خروجی رگرسیون و تابع make_regression از کتابخانه scikit-learn برای ایجاد مجموعه داده‌های مورد استفاده در این پروژه، می‌باشند:

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_regression

برای ادامه کار ما یک کلاس تحت عنوان LinearRegression خواهیم ساخت که دارای توابع زیر می‌باشد:

  • fit: این تابع ضرایب مدل ما را از روی داده‌ها تعیین خواهد نمود.
  • predict: با استفاده از این تابع خواهیم توانست خروجی مدل خود را برای ورودی‌های دلخواه ارزیابی نماییم.
  • rmse: این تابع ریشه میانگین مربعات خطا را توسط یک سری از داده‌ها برای مدل ساخته شده محاسبه می‌نماید.

پس ساختار کلی کلاس رگرسور ما به شکل زیر خواهد بود:

class LinearRegression:
    def fit(self, X, y, method, learning_rate=0.01, iterations=500, batch_size=32):
        return
    
    def predict(self, X):
        return
    
    def rmse(self, X, y):
        return

برای ادامه کار از تابع fit شروع می کنیم. این تابع وظیفه تعیین ضرایب مدل بر اساس داده‌های آموزش را بر عهده دارد. ورودی‌های این تابع عبارت اند از:

  1. داده‌های ورودی
  2. خروجی‌های متناظر با داده‌های ورودی
  3. روش یافتن ضرایب
  4. نرخ یادگیری
  5. تعداد تکرار
  6. اندازه دسته‌ها برای انجام یادگیری

در نظر گرفته می شود که این تابع بتواند ضرایب را هم با استفاده از رابطه فرم بسته و هم با استفاده از روش گرادیان نزولی تصادفی تعیین نماید. زمانی که ورودی method تابع fit برابر sgd باشد روش مورد نظر گرادیان نزولی خواهد بود و زمانی که برابر solve باشد از رابطه به فرم بسته استفاده خواهد گردید.

class LinearRegression:
    def fit(self, X, y, method, learning_rate=0.01, iterations=500, batch_size=32):
        X = np.concatenate([X, np.ones_like(y)], axis=1)
        rows, cols = X.shape
        if method == 'solve':
            pass
        elif method == 'sgd':
            pass
        else:
            print(f'Unknown method: \'{method}\'')
        
        return self

حال ببینیم با استفاده از هر کدام از این روش‌ها چگونه می‌توانیم ضرایب را تعیین کنیم. زمانی که ما از فرم بسته استفاده می کنیم به این معنی است که می خواهیم معکوس مجازی (یا در شرایط مشخص همان معکوس) یک ماتریس را که بیانگر دسته معادلات خطی است، پیدا کنیم. اینجا ما دو حالت داریم زمانی که تعداد معادلات مستقل بیشتر از یا مساوی با متغیرها می باشد و زمانی که کمتر از متغیرها است. وقتی تعداد معادلات کمتر از متغیرها باشد ما بیشمار پاسخ می توانیم داشته باشیم و باید از روش گرادیان نزولی تصادفی استفاده کنیم. ضرایب مدل با استفاده از معکوس مجازی از رابطه زیر قابل محاسبه می‌باشد:

قسمت اول تابع fit به محاسبه ضرایب با رابطه بالا اختصاص دارد که در زیر قابل مشاهده است:

class LinearRegression:
    def fit(self, X, y, method, learning_rate=0.01, iterations=500, batch_size=32):
        X = np.concatenate([X, np.ones_like(y)], axis=1)
        rows, cols = X.shape
        if method == 'solve':
            if rows >= cols == np.linalg.matrix_rank(X):
                self.weights = np.matmul(
                    np.matmul(
                        np.linalg.inv(
                            np.matmul(
                                X.transpose(),
                                X)),
                        X.transpose()),
                    y)
            else:
                print('X has not full column rank. method=\'solve\' cannot be used.')

قسمت بعدی که با استفاده از روش گرادیان نزولی تصادفی نوشته خواهد شد دارای شبه کدی مشابه تصویر زیر می‌باشد:

در این روش همان طور که مشاهده می شود به تعداد تکرارهای تعیین شده دسته‌هایی از داده ها انتخاب می‌شود و متناسب با خطای موجود با در نظر گرفتن ضرایب و خروجی مطلوب اقدام به بروز رسانی ضرایب می گردد. کد این قسمت نیز به شکل زیر به تابع fit اضافه می گردد:

class LinearRegression:
    def fit(self, X, y, method, learning_rate=0.01, iterations=500, batch_size=32):
        X = np.concatenate([X, np.ones_like(y)], axis=1)
        rows, cols = X.shape
        if method == 'solve':
            if rows >= cols == np.linalg.matrix_rank(X):
                self.weights = np.matmul(
                    np.matmul(
                        np.linalg.inv(
                            np.matmul(
                                X.transpose(),
                                X)),
                        X.transpose()),
                    y)
            else:
                print('X has not full column rank. method=\'solve\' cannot be used.')
        elif method == 'sgd':
            self.weights = np.random.normal(scale=1/cols, size=(cols, 1))
            for i in range(iterations):
                Xy = np.concatenate([X, y], axis=1)
                np.random.shuffle(Xy)
                X, y = Xy[:, :-1], Xy[:, -1:]
                for j in range(int(np.ceil(rows/batch_size))):
                    start, end = batch_size*j, np.min([batch_size*(j+1), rows])
                    Xb, yb = X[start:end], y[start:end]
                    gradient = 2*np.matmul(
                        Xb.transpose(),
                        (np.matmul(Xb,
                                   self.weights)
                         - yb))
                    self.weights -= learning_rate*gradient
        else:
            print(f'Unknown method: \'{method}\'')
        
        return self

در ادامه نوبت به نوشتن توابع predict و rmse می‌رسد. ابتدا سراغ تابع predict می‌رویم:

def predict(self, X):
    if not hasattr(self, 'weights'):
        print('Cannot predict. You should call the .fit() method first.')
        return
    
    X = np.concatenate([X, np.ones((X.shape[0], 1))], axis=1)
    
    if X.shape[1] != self.weights.shape[0]:
        print(f'Shapes do not match. {X.shape[1]} != {self.weights.shape[0]}')
        return
    
    return np.matmul(X, self.weights)

این تابع که تنها ورودی آن دسته‌ای از داده‌ها می‌باشد، ابتدا از مقداردهی شدن ضرایب وزنی مطمئن می‌شود. در صورتی که کلاس فاقد عنصر weights باشد به کاربر اعلام می‌کند که باید ابتدا با استفاده از تعدادی داده ورودی اقدام به فراخوانی تابع fit برای تعیین وزن‌های رگرسور نماید. بعد از آن با استفاده از X و بردار تماما یک اقدام به تشکیل ماتریس افزوده می‌نماید که مورد نیاز برای محاسبات است. سپس تابع، تطابق ابعاد بردار وزن‌ها را با ماتریس X ارزیابی می‌نماید تا در صورت عدم تطابق کاربر را مطلع کرده و ادامه کار را متوقف نماید. در نهایت حاصل ضرب ماتریس X و بردار وزن‌ها را که خروجی رگرسور می‌باشد، باز می‌گرداند. تنها تابع باقی‌مانده برای بررسی، تابع خطا یعنی rmse می‌باشد که به شکل زیر است:

def rmse(self, X, y):
    y_hat = self.predict(X)
    
    if y_hat is None:
        return
    
    return np.sqrt(((y_hat - y)**2).mean())

در این تابع ابتدا خروجی‌های رگرسور به ازای ورودی داده شده ارزیابی می‌گردند. سپس اگر خروجی با موفقیت بازگردانده شده باشد، ریشه میانگین مربعات خطای خروجی حاصل نسبت به خروجی مطلوب محاسبه شده و بازگردانده می‌شود. حال بیابید تمام کدهای نوشته شده را در یک مثال به کار ببریم. برای استفاده از کلاس رگرسور خطی نوشته شده ورودی‌ها و خروجی‌های آزمایشی با استفاده از تابع make_regression از کتابخانه sklearn ایجاد و رسم می‌گردند.

X, y = make_regression(n_features=1,
                       n_informative=1,
                       bias=1, noise=35)

plt.figure()
plt.scatter(X, y)

در ادامه بعد از اعمال تغییرات مناسب برای سازگار شدن ابعاد بردار پاسخ، شیی از نوع LinearRegression و یافتن ضرایب با روش به فرم بسته ساخته می‌شود:

y = y.reshape((-1, 1))

lr_solve = LinearRegression().fit(X, y, method='solve')

در ادامه خروجی رگرسور برای ورودی‌ها محاسبه و رسم می‌گردند و خطای رکرسیون در خروجی چاپ می‌گردد:

plt.figure()
plt.scatter(X, y)
plt.plot(X, lr_solve.predict(X), color='orange')

print(lr_solve.rmse(X, y))

همچنین بطور مشابه شی LinearRegression دوباره ایجاد می‌گردد، با این تفاوت که این بار روش یافتن ضرایب رگرسور گرادیان نزولی تصادفی می‌باشد:

lr_sgd = LinearRegression().fit(X, y, method='sgd')

باز بطور مشابه خروجی‌های حاصل از این رگرسور محاسبه و رسم می‌گردند و خطای رگرسیون نیز در خروجی چاپ می‌شود:

plt.figure()
plt.scatter(X, y)
plt.plot(X, lr_sgd.predict(X), color='orange')

print(lr_sgd.rmse(X, y))

در نهایت می‌توان تمام کدهای نوشته شده را به شکل زیر جمع بندی نمود:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Sep  2 20:35:05 2020

@author: arslan
"""

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_regression

class LinearRegression:
    def fit(self, X, y, method, learning_rate=0.01, iterations=500, batch_size=32):
        X = np.concatenate([X, np.ones_like(y)], axis=1)
        rows, cols = X.shape
        if method == 'solve':
            if rows >= cols == np.linalg.matrix_rank(X):
                self.weights = np.matmul(
                    np.matmul(
                        np.linalg.inv(
                            np.matmul(
                                X.transpose(),
                                X)),
                        X.transpose()),
                    y)
            else:
                print('X has not full column rank. method=\'solve\' cannot be used.')
        elif method == 'sgd':
            self.weights = np.random.normal(scale=1/cols, size=(cols, 1))
            for i in range(iterations):
                Xy = np.concatenate([X, y], axis=1)
                np.random.shuffle(Xy)
                X, y = Xy[:, :-1], Xy[:, -1:]
                for j in range(int(np.ceil(rows/batch_size))):
                    start, end = batch_size*j, np.min([batch_size*(j+1), rows])
                    Xb, yb = X[start:end], y[start:end]
                    gradient = 2*np.matmul(
                        Xb.transpose(),
                        (np.matmul(Xb,
                                   self.weights)
                         - yb))
                    self.weights -= learning_rate*gradient
        else:
            print(f'Unknown method: \'{method}\'')
        
        return self
    
    def predict(self, X):
        if not hasattr(self, 'weights'):
            print('Cannot predict. You should call the .fit() method first.')
            return
        
        X = np.concatenate([X, np.ones((X.shape[0], 1))], axis=1)
        
        if X.shape[1] != self.weights.shape[0]:
            print(f'Shapes do not match. {X.shape[1]} != {self.weights.shape[0]}')
            return
        
        return np.matmul(X, self.weights)
    
    def rmse(self, X, y):
        y_hat = self.predict(X)
        
        if y_hat is None:
            return
        
        return np.sqrt(((y_hat - y)**2).mean())




X, y = make_regression(n_features=1,
                       n_informative=1,
                       bias=1, noise=35)

plt.figure()
plt.scatter(X, y)


y = y.reshape((-1, 1))

lr_solve = LinearRegression().fit(X, y, method='solve')

plt.figure()
plt.scatter(X, y)
plt.plot(X, lr_solve.predict(X), color='orange')

print(lr_solve.rmse(X, y))


lr_sgd = LinearRegression().fit(X, y, method='sgd')

plt.figure()
plt.scatter(X, y)
plt.plot(X, lr_sgd.predict(X), color='orange')

print(lr_sgd.rmse(X, y))


نمودارهای خروجی نیز به شکل زیر خواهند بود: