题解 | #设备故障预测程序# | Python

设备故障预测程序

http://www.nowcoder.com/questionTerminal/ef4bd23c0f7c4f2ca04dfdf3d4e66337

"""
implement logistic regression from scratch
no data normalization as its not required in the question and will not pass some of the test cases if added
"""

import sys
import math
import statistics

# num of rows for training data
N = int(sys.stdin.readline())

# print('process training data')
# IDs_train = []
writes_train = []
reads_train = []
avg_write_ms_train = []
avg_read_ms_train = []
years_train = []
status_train = []
train_cnt = 0
for i in range(N):
    # add strip to remove '\n' in the end of each row
    device_id,w,r,w_ms,r_ms,y,s = sys.stdin.readline().strip().split(',')
    if s == 'NaN': continue
    
    train_cnt += 1
    # IDs_train.append(device_id)
    writes_train.append(int(w) if w != 'NaN' else w)
    reads_train.append(int(r) if r != 'NaN' else r)
    avg_write_ms_train.append(float(w_ms) if w_ms != 'NaN' else w_ms)
    avg_read_ms_train.append(float(r_ms) if r_ms != 'NaN' else r_ms)
    years_train.append(float(y) if y != 'NaN' else y)
    status_train.append(int(s) if s != 'NaN' else s)  

# print(f'\t{train_cnt}/{N} valid rows in training data')



def clean_data(tag, data, lb, ub, valid_med=None, valid_avg=None):
    # print(f'process {tag} data')
    if valid_med is None or valid_avg is None:
        valid = []
        for d in data:
            if d == 'NaN' or d < lb or d > ub: continue
            valid.append(d)
        # print(f'\t{len(valid)}/{len(data)} valid rows of data')
        if len(valid) > 0:
            valid_med = statistics.median(valid)
            valid_avg = statistics.mean(valid)
        else:
            valid_med = valid_avg = 0
    

    data_p = []
    for d in data:
        if d == 'NaN':
            data_p.append(valid_avg)
        elif d < lb or d > ub:
            data_p.append(valid_med)
        else:
            data_p.append(d)
    
    return data_p, valid_med, valid_avg


writes_train, writes_med, writes_avg = clean_data('writes', writes_train, 0, float('infinity'))
reads_train, reads_med, reads_avg = clean_data('reads', reads_train, 0, float('infinity'))
avg_write_ms_train, avg_write_ms_med, avg_write_ms_avg = clean_data('avg_write_ms', avg_write_ms_train, 0, 1000)
avg_read_ms_train, avg_read_ms_med, avg_read_ms_avg = clean_data('avg_read_ms', avg_read_ms_train, 0, 1000)
years_train, years_med, years_avg = clean_data('years', years_train, 0, 20)



# num of rows for testing data
M = int(sys.stdin.readline())
# print('process testing data')
# IDs_test = []
writes_test = []
reads_test = []
avg_write_ms_test = []
avg_read_ms_test = []
years_test = []
test_cnt = 0
for i in range(M):
    device_id,w,r,w_ms,r_ms,y = sys.stdin.readline().strip().split(',')
    test_cnt += 1
    # IDs_test.append(device_id)

    writes_test.append(int(w) if w != 'NaN' else w)
    reads_test.append(int(r) if r != 'NaN' else r)
    avg_write_ms_test.append(float(w_ms) if w_ms != 'NaN' else w_ms)
    avg_read_ms_test.append(float(r_ms) if r_ms != 'NaN' else r_ms)
    years_test.append(float(y) if y != 'NaN' else y)
        
# print(f'\t{test_cnt}/{M} valid rows in testing data')

writes_test, _, _ = clean_data('writes', writes_test, 0, float('infinity'), writes_med, writes_avg)
reads_test, _, _ = clean_data('reads', reads_test, 0, float('infinity'), reads_med, reads_avg)
avg_write_ms_test, _, _ = clean_data('avg_write_ms', avg_write_ms_test, 0, 1000, avg_write_ms_med, avg_write_ms_avg)
avg_read_ms_test, _, _ = clean_data('avg_read_ms', avg_read_ms_test, 0, 1000, avg_read_ms_med, avg_read_ms_avg)
years_test, _, _ = clean_data('years', years_test, 0, 20, years_med, years_avg)



class LogisticRegression(object):
    def __init__(self, learning_rate=0.01, n_iteration=100):
        self.learning_rate = learning_rate
        self.n_iteration = n_iteration
        self.weights = None
        self.bias = None
        self.loss_history = []

    
    def sigmoid(self, z):
        z = [1. / (1 + math.exp(-v)) for v in z]
        return z

    
    def initialize_params(self, n_feature):
        self.weights = [0 for _ in range(n_feature)]
        self.bias = 0

    
    def compute_loss(self, y_true, y_pred):
        N = len(y_true)
        loss = 0
        for i in range(N):
          loss += -1.0 * (y_true[i]*math.log(y_pred[i]) + (1-y_true[i])* math.log(1-y_pred[i] + 1e-6))
            
        loss = loss / N
        return loss

    
    def compute_gradient(self, X, y_true, y_pred):
        N = len(X)
        n_feature = len(X[0])
        
        dL_dz = [y_p - y_t for y_p, y_t in zip(y_pred, y_true)]

        dL_db = statistics.mean(dL_dz)

        dL_dw = [0. for _ in range(n_feature)]
        for i in range(N):
            for k in range(n_feature):
                dL_dw[k] += dL_dz[i] * X[i][k]

        dL_dw = [v / N for v in dL_dw]
        return dL_dw, dL_db


    def predict_proba(self, X):
        out = []
        N =  len(X)
        n_feature = len(X[0])
        for i in range(N):
            z = self.bias
            for k in range(n_feature):
                z += X[i][k] * self.weights[k]
            out.append(z)
        return self.sigmoid(out)
    
    
    def fit(self, X, y):
        N = len(X)
        n_feature = len(X[0])
        self.initialize_params(n_feature)
        
        d_w = [0 for _ in range(n_feature)]
        d_b = 0
        
        for it in range(1, self.n_iteration+1):
            
            y_proba = self.predict_proba(X)
            loss = self.compute_loss(y, y_proba)
            self.loss_history.append(loss)
            dw, db = self.compute_gradient(X, y, y_proba)

            # update params
            self.weights = [w - self.learning_rate * d_w for w, d_w in zip(self.weights, dw)]
            self.bias = self.bias - self.learning_rate * db
			"""
            if it % 5 == 0:
                print(f'{it}/{self.n_iteration} iteration, loss: {loss:.3f}')
			"""

    def predict(self, X, threshold=0.5):
        y_proba = self.predict_proba(X)
        y_pred = [1 if y > threshold else 0 for y in y_proba]
        return y_pred


        
model = LogisticRegression()

# prepare training data X,y
X = [[w, r, aw, ar, y] for w, r, aw, ar, y in zip(
    writes_train, 
    reads_train,
    avg_write_ms_train,
    avg_read_ms_train,
    years_train)
]
y = status_train

# training
model.fit(X, y)

# test
X_test = [[w, r, aw, ar, y] for w, r, aw, ar, y in zip(
    writes_test, 
    reads_test,
    avg_write_ms_test,
    avg_read_ms_test,
    years_test)
]

# X_test = scaler.fit_transform(X_test)
pred = model.predict(X_test)
print(f'predictions:\n{pred}')
            
            
            

    
        
        


全部评论

相关推荐

Lorn的意义:我的前辈都劝我年后再投,说那时候会好一点
点赞 评论 收藏
分享
ddd7_:跟我一模一样,加微信的hr都同一个,扫码了白年书人查看图片
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务