算法学习之股票预测

栏目: 数据库 · 发布时间: 6年前

内容简介:学了好长时间的机器学习一直找不到实践的机会,工作上的场景都被算法大拿们占据了,小菜鸟只能自谋出路。恰逢前些时中概股大跌,损失惨重,一怒之下干脆研究研究能否用机器学习来搞搞了。历史股价数据可以从雅虎金融上获取,而且已经有现成的python lib来做这个。跳跃一下先把周边的工具准备完毕,有了股票预测数据之后一般都要打印一个预测和真实数据的对比图,这个可以用matplotlib.pyplot搞定。

动机

学了好长时间的机器学习一直找不到实践的机会,工作上的场景都被算法大拿们占据了,小菜鸟只能自谋出路。恰逢前些时中概股大跌,损失惨重,一怒之下干脆研究研究能否用机器学习来搞搞了。  

数据获取

历史股价数据可以从雅虎金融上获取,而且已经有现成的python lib来做这个。

import pandas_datareader.data as reader
import fix_yahoo_finance as yf

def fetch_stock_list_from_web():
    global COMPANIES
    COMPANIES = pd.read_csv(SP500_LIST_PATH)

    if _data_already_loaded():
        return

    yf.pdr_override()
    for cell in COMPANIES['Symbol']:
        try:
            data = reader.get_data_yahoo(cell)
            data.to_csv('./stock_data/' + cell + '.csv')
        except Exception as e:
            print(e)

其中SP500_LIST_PATH=‘ constituents.csv ’ 这个google一下就能找到。

打印预测结果

跳跃一下先把周边的 工具 准备完毕,有了股票预测数据之后一般都要打印一个预测和真实数据的对比图,这个可以用matplotlib.pyplot搞定。

mport matplotlib.pyplot as plt

def _print_comparison_gui(pred, real):
    truths = _flatten(real)[-200:]
    preds = _flatten(pred)[-200:]
    days = range(len(truths))[-200:]

    plt.figure(figsize=(12, 6))
    plt.plot(days, truths, label='truth')
    plt.plot(days, preds, label='pred')
    plt.legend(loc='upper left', frameon=False)
    plt.xlabel("day")
    plt.ylabel("normalized price")
    plt.ylim((min(preds), max(truths)))
    plt.grid(ls='--')
    plt.show()

naive版本

预测股价的模型应该是怎样的呢?初始想法是把股价表达成时间的函数,因为某些时间具有特殊性比如周五,年底等等,所以把时间拆成年,月,日而不是作为一个整体,其实我们预测股价的时候一般会看公司的基本面,大环境,一些相关消息等等,但搞定这些东西还远超出我的能力所以就先忽略了。最终x是(year, month, day, day_of_week), y就是price。

def preprocess_data(symbols):
    global PROCESSED_STOCK_DATA

    for i in range(len(symbols)):
        symbol = symbols[i]
        print('Loading ' + symbol)
        price = ORIGINAL_STOCK_DATA[symbol]['Open']
        price = list(map(lambda each: [each],price))
        date = list(map(lambda each: datetime.datetime.strptime(each, '%Y-%m-%d'), ORIGINAL_STOCK_DATA[symbol]['Date']))
        x = list(map(lambda each: (each.year, each.month, each.day, each.weekday()), date))

        train_size = int(len(x) * (1.0 - TEST_RATIO))
        train_X, test_X = x[:train_size], x[train_size:]
        train_y, test_y = price[:train_size], price[train_size:]

        PROCESSED_STOCK_DATA[symbol] = (train_X, train_y, test_X, test_y)

模型选择了一个简单的神经网络,

def build_model(x):
    L1 = 30
    w1 = tf.Variable(tf.random_uniform([INPUT_VARIABLE_COUNT, L1], 0, 1))
    b1 = tf.Variable(tf.zeros([1, L1]))
    wb1 = tf.matmul(x, w1) + b1
    layer1 = tf.nn.relu(wb1)

    L2 = 40
    w2 = tf.Variable(tf.random_uniform([L1, L2], 0, 1))
    b2 = tf.Variable(tf.zeros([1, L2]))
    wb2 = tf.matmul(layer1, w2) + b2
    layer2 = tf.nn.relu(wb2)

    w3 = tf.Variable(tf.random_uniform([L2, 1], 0, 1))
    b3 = tf.Variable(tf.zeros([1, 1]))
    wb3 = tf.matmul(layer2, w3) + b3

    return wb3

可惜一跑就杯具了,跑了一小会所有的预测值就都在12上下跳动了,感觉像是梯度消失只好搬出batchnorm来救场,

def build_model(x, is_test, iteration):
    L1 = 30
    w1 = tf.Variable(tf.random_uniform([INPUT_VARIABLE_COUNT, L1], 0, 1))
    S1 = tf.Variable(tf.ones([L1]))
    b1 = tf.Variable(tf.zeros([1, L1]))
    wb1 = tf.matmul(x, w1) + b1
    Y1bn, update_ema1 = batchnorm(wb1, b1, S1, is_test, iteration)
    layer1 = tf.nn.relu(Y1bn)

    L2 = 40
    w2 = tf.Variable(tf.random_uniform([L1, L2], 0, 1))
    S2 = tf.Variable(tf.ones([L2]))
    b2 = tf.Variable(tf.zeros([1, L2]))
    wb2 = tf.matmul(layer1, w2) + b2
    Y2bn, update_ema2 = batchnorm(wb2, b2, S2, is_test, iteration)
    layer2 = tf.nn.relu(Y2bn)

    w3 = tf.Variable(tf.random_uniform([L2, 1], 0, 1))
    b3 = tf.Variable(tf.zeros([1, 1]))
    wb3 = tf.matmul(layer2, w3) + b3
    update_ema = tf.group(update_ema1, update_ema2)

    return wb3, update_ema

最终拿到的结果是这样的,

算法学习之股票预测

感觉相当的不靠谱,程序似乎学到了一点大的趋势,但在基准值和连续性上面都差距较大。连续性差感觉可能是变量不够,网络层次太浅,可惜加大层数后在我的macbook pro上面跑了一晚上也没有一点要收敛的样子,只能放弃了。

LSTM版本

回想一下股价预测的方法中有一个流派叫技术分析也就是看k线图,这个似乎比较符合我现在只有股价做输入的状况,而k线图不就是和LSTM模型的场景一样一样的么?不过从头写一个LSTM还是有点超出本菜鸟的能力了,所以就在别人的基础上改吧,借鉴了 使用rnn预测股票价格

rnn文章里面用的面向对象编程风格,而且把训练过程和测试过程放到一起,都让程序的理解难度加大了,所以我改写成了简单的过程式风格而且分开了训练和测试过程。 

一些要点

1. LSTM版本除了模型不一样之外比较重要的是对股价做了规范化处理,把绝对股价换算成股价变动百分比,这样就不会碰到预测不出训练时没训练过的股价的问题了。

data = [data[0] / data[0][0] - 1.0] + [
            curr / data[i][-1] - 1.0 for i, curr in enumerate(data[1:])]

2. 数据预处理的时候有INPUT_SIZE的概念,就是把连续几天的股价作为一个向量输入,这样更能体现股价变化的特征

data = [np.array(data[i * INPUT_SIZE: (i + 1) * INPUT_SIZE])
                for i in range(len(data) // INPUT_SIZE)]

3. 预测的模型实际上是用连续多天(比如30天)的股价预测后面一天的股价,所以x和y是下面的样子

X = np.array([data[i: i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
        y = np.array([data[i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])

4. rnn文章对股票价格的input还做了一个向量转换( tf.nn.embedding_lookup ),但我实际测试的结果不理想,而且感觉做了数据规范化以后的股价已经是向量了,似乎没必要再做一层转换,所以我的模型是这样的

def build_model(symbols,inputs,keep_prob):
    def _create_one_cell():
        lstm_cell = tf.contrib.rnn.LSTMCell(LSTM_SIZE, state_is_tuple=True)
        lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob)
        return lstm_cell

    cell = tf.contrib.rnn.MultiRNNCell(
        [_create_one_cell() for _ in range(NUMBER_OF_LAYERS)],
        state_is_tuple=True
    ) if NUMBER_OF_LAYERS > 1 else _create_one_cell()

    # Run dynamic RNN
    val, state_ = tf.nn.dynamic_rnn(cell, inputs, dtype=tf.float32, scope="dynamic_rnn")
    # Before transpose, val.get_shape() = (batch_size, num_steps, lstm_size)
    # After transpose, val.get_shape() = (num_steps, batch_size, lstm_size)
    val = tf.transpose(val, [1, 0, 2])
    # 这里取向量的最后一位也就是最后一个step对应的数据,也就是模型预测的股票数据,也解释了为啥要用transpose
    last = tf.gather(val, int(val.get_shape()[0]) - 1, name="lstm_state")
    ws = tf.Variable(tf.truncated_normal([LSTM_SIZE, INPUT_SIZE]), name="w")
    bias = tf.get_variable("b", [INPUT_SIZE])
    pred = tf.matmul(last, ws) + bias
    return pred, cell

结果

算法学习之股票预测

从上面的图来看,预测结果还是有点靠谱的,有几个较大的波动都给预测出来了。当然如果真的拿这个去炒股我还是不敢的:)

代码

import pandas_datareader.data as reader
import fix_yahoo_finance as yf
import numpy as np
import os
import pandas as pd
import random
import tensorflow as tf
import matplotlib.pyplot as plt


SP500_LIST_PATH = './stock_data/constituents.csv'
DATA_PATH = './stock_data'
CHECK_POINTS_PATH = "./checkpoints/stock_check_points"
TRAIN_STOCK_LIST = ['AAPL','AMZN','GOOG','FB','MSFT','NFLX','NVDA','ORCL']
#TRAIN_STOCK_LIST = ['AAPL']


COMPANIES = None
ORIGINAL_STOCK_DATA = {}
PROCESSED_STOCK_DATA = {}

INPUT_SIZE = 5
# 可以理解成序列长度
NUMBER_OF_STEPS = 30
LSTM_SIZE = 128
EMBED_SIZE = 128
NUMBER_OF_LAYERS = 2
STOCK_COUNT = 10
MAX_EPOCH = 50000
INIT_EPOCH = 5

LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY_STEP = 1000
LEARNING_RATE_DECAY_RATE = 0.95

INIT_LEARNING_RATE = 0.001
LEARNING_RATE_DECAY = 0.99

TEST_RATIO = 0.1
BATCH_SIZE = 64
KEEP_PROB = 0.8
SAVE_STEP = 1000


def _data_already_loaded():
    dirs = os.listdir(DATA_PATH)
    if dirs and len(dirs) > 100:
        return True
    else:
        return False


def fetch_stock_list_from_web():
    global COMPANIES
    COMPANIES = pd.read_csv(SP500_LIST_PATH)

    if _data_already_loaded():
        return

    yf.pdr_override()
    for cell in COMPANIES['Symbol']:
        try:
            data = reader.get_data_yahoo(cell)
            data.to_csv('./stock_data/' + cell + '.csv')
        except Exception as e:
            print(e)


def load_stock_data():
    global ORIGINAL_STOCK_DATA
    for cell in COMPANIES['Symbol']:
        csv_file = './stock_data/' + cell + '.csv'
        if not (os.path.exists(csv_file)):
            continue
        csv_data = pd.read_csv(csv_file)
        ORIGINAL_STOCK_DATA[cell] = csv_data


def preprocess_data(symbols):
    global PROCESSED_STOCK_DATA

    for symbol in symbols:
        print('Loading ' + symbol)
        data = ORIGINAL_STOCK_DATA[symbol]['Open']
        # 按input_size进行拆分
        data = [np.array(data[i * INPUT_SIZE: (i + 1) * INPUT_SIZE])
                for i in range(len(data) // INPUT_SIZE)]
        # 计算相对增量,注意后面的for里面i是从0开始的
        data = [data[0] / data[0][0] - 1.0] + [
            curr / data[i][-1] - 1.0 for i, curr in enumerate(data[1:])]

        X = np.array([data[i: i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])
        y = np.array([data[i + NUMBER_OF_STEPS] for i in range(len(data) - NUMBER_OF_STEPS)])

        train_size = int(len(X) * (1.0 - TEST_RATIO))
        train_X, test_X = X[:train_size], X[train_size:]
        train_y, test_y = y[:train_size], y[train_size:]

        PROCESSED_STOCK_DATA[symbol] = (train_X, train_y, test_X, test_y)


def _generate_batch(symbol):
    train_X, train_y, test_X, test_y = PROCESSED_STOCK_DATA[symbol]
    num_batches = int(len(train_X)) // BATCH_SIZE
    if BATCH_SIZE * num_batches < len(train_X):
        num_batches += 1

    batch_indices = list(range(num_batches))
    random.shuffle(batch_indices)
    for j in batch_indices:
        batch_X = train_X[j * BATCH_SIZE: (j + 1) * BATCH_SIZE]
        batch_y = train_y[j * BATCH_SIZE: (j + 1) * BATCH_SIZE]
        assert set(map(len, batch_X)) == {NUMBER_OF_STEPS}
        yield batch_X, batch_y


def build_model(symbols,inputs,keep_prob):
    def _create_one_cell():
        lstm_cell = tf.contrib.rnn.LSTMCell(LSTM_SIZE, state_is_tuple=True)
        lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob)
        return lstm_cell

    cell = tf.contrib.rnn.MultiRNNCell(
        [_create_one_cell() for _ in range(NUMBER_OF_LAYERS)],
        state_is_tuple=True
    ) if NUMBER_OF_LAYERS > 1 else _create_one_cell()


    # embed_matrix = tf.Variable(
    #     tf.random_uniform([STOCK_COUNT, EMBED_SIZE], -1.0, 1.0),
    #     name="embed_matrix"
    # )

    # 对embedding lookup的效果比较怀疑
    # stock_label_embeds.shape = (batch_size, embedding_size)
    # stacked_symbols = tf.tile(symbols, [1, NUMBER_OF_STEPS], name='stacked_stock_labels')
    # stacked_embeds = tf.nn.embedding_lookup(embed_matrix, stacked_symbols)

    # After concat, inputs.shape = (batch_size, num_steps, input_size + embed_size)
    #inputs_with_embed = tf.concat([inputs, stacked_embeds], axis=2, name="inputs_with_embed")

    inputs_with_embed = tf.identity(inputs)

    print("inputs.shape:", inputs.shape)
    print("inputs_with_embed.shape:", inputs_with_embed.shape)

    # Run dynamic RNN
    val, state_ = tf.nn.dynamic_rnn(cell, inputs_with_embed, dtype=tf.float32, scope="dynamic_rnn")
    # Before transpose, val.get_shape() = (batch_size, num_steps, lstm_size)
    # After transpose, val.get_shape() = (num_steps, batch_size, lstm_size)
    val = tf.transpose(val, [1, 0, 2])
    # 这里取向量的最后一位也就是最后一个step对应的数据,也就是模型预测的股票数据,也解释了为啥要用transpose
    last = tf.gather(val, int(val.get_shape()[0]) - 1, name="lstm_state")
    ws = tf.Variable(tf.truncated_normal([LSTM_SIZE, INPUT_SIZE]), name="w")
    bias = tf.get_variable("b", [INPUT_SIZE])
    pred = tf.matmul(last, ws) + bias
    return pred, cell


def train():
    print('start training ...')

    learning_rate = tf.placeholder(tf.float32, None, name="learning_rate")
    keep_prob = tf.placeholder(tf.float32, None, name="keep_prob")

    # Stock symbols are mapped to integers.
    symbols = tf.placeholder(tf.int32, [None, 1], name='stock_labels')

    inputs = tf.placeholder(tf.float32, [None, NUMBER_OF_STEPS, INPUT_SIZE], name="inputs")
    targets = tf.placeholder(tf.float32, [None, INPUT_SIZE], name="targets")

    pred, cell = build_model(symbols,inputs,keep_prob)

    # 方差损失
    loss = tf.reduce_mean(tf.square(pred - targets), name="loss_mse_train")
    global_step = tf.Variable(0, trainable=False)
    add_global_step = global_step.assign_add(1)

    trainable_variables = tf.trainable_variables()
    grads, a = tf.clip_by_global_norm(tf.gradients(loss, trainable_variables), 5) # prevent loss divergence caused by gradient explosion
    learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step=global_step,
                                               decay_steps=LEARNING_RATE_DECAY_STEP, decay_rate=LEARNING_RATE_DECAY_RATE)
    optimizer = tf.train.AdamOptimizer(learning_rate)
    optim = optimizer.apply_gradients(zip(grads, trainable_variables))

    #optim = tf.train.RMSPropOptimizer(learning_rate).minimize(loss, name="rmsprop_optim")


    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        saver = tf.train.Saver()
        if not os.path.exists(CHECK_POINTS_PATH):
            os.makedirs(CHECK_POINTS_PATH)

        check_point = tf.train.get_checkpoint_state(CHECK_POINTS_PATH)
        # if have checkPoint, restore checkPoint
        if check_point and check_point.model_checkpoint_path:
            saver.restore(sess, check_point.model_checkpoint_path)
            print("restored %s" % check_point.model_checkpoint_path)
        else:
            print("no checkpoint found!")

        g_step = 0
        for epoch in range(MAX_EPOCH):
            epoch_step = 0
            each_turn_learning_rate = INIT_LEARNING_RATE * (
                    LEARNING_RATE_DECAY ** max(float(epoch + 1 - INIT_EPOCH), 0.0)
            )

            for label_, d_ in PROCESSED_STOCK_DATA.items():
                label_pos = list(PROCESSED_STOCK_DATA.keys()).index(label_)
                for batch_x, batch_y in _generate_batch(label_):
                    g_step += 1
                    epoch_step += 1
                    batch_labels = np.array([[label_pos]] * len(batch_x))
                    train_data_feed = {
                        learning_rate: each_turn_learning_rate,
                        keep_prob: KEEP_PROB,
                        inputs: batch_x,
                        targets: batch_y,
                        symbols: batch_labels,
                    }
                    train_loss, train_pred, _, _ = sess.run(
                        [loss, pred, optim, add_global_step], train_data_feed)

                    print("epoch: %d, steps: %d, loss: %3f" % (epoch + 1, epoch_step, train_loss))
                    # save and test
                    if g_step % SAVE_STEP == SAVE_STEP - 1: # prevent save at the beginning
                        print("save model")
                        saver.save(sess, os.path.join(CHECK_POINTS_PATH, 'stock.model'), global_step=g_step)


def test():
    print('start test ...')

    learning_rate = tf.placeholder(tf.float32, None, name="learning_rate")
    keep_prob = tf.placeholder(tf.float32, None, name="keep_prob")

    # Stock symbols are mapped to integers.
    symbols = tf.placeholder(tf.int32, [None, 1], name='stock_labels')

    inputs = tf.placeholder(tf.float32, [None, NUMBER_OF_STEPS, INPUT_SIZE], name="inputs")
    targets = tf.placeholder(tf.float32, [None, INPUT_SIZE], name="targets")

    pred, cell = build_model(symbols,inputs,keep_prob)

    # 方差损失
    loss = tf.reduce_mean(tf.square(pred - targets), name="loss_mse_train")

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        saver = tf.train.Saver()
        check_point = tf.train.get_checkpoint_state(CHECK_POINTS_PATH)
        # if have checkPoint, restore checkPoint
        if check_point and check_point.model_checkpoint_path:
            saver.restore(sess, check_point.model_checkpoint_path)
            print("restored %s" % check_point.model_checkpoint_path)
        else:
            print("no checkpoint found!")
            exit(1)

        for label_, d_ in PROCESSED_STOCK_DATA.items():
            label_pos = list(PROCESSED_STOCK_DATA.keys()).index(label_)
            a, b, test_x, test_y = PROCESSED_STOCK_DATA[label_]

            batch_labels = np.array([[label_pos]] * len(test_x))
            test_data_feed = {
                learning_rate: 0.0,
                keep_prob: 1.0,
                inputs: test_x,
                targets: test_y,
                symbols: batch_labels,
            }

            test_loss, test_pred = sess.run(
                [loss,pred], test_data_feed)

            print("stock: %s, loss: %3f" % (label_, test_loss))
            _print_comparison_gui(test_pred, test_y, label_)
            _print_comparison_text(test_pred, test_y)

        # writer=tf.summary.FileWriter('./logs',sess.graph)
        # writer.close()

# 把数据打印出来试试
def _print_comparison_gui(pred, real, stock_sym):
    truths = _flatten(real)[-200:]
    preds = 1 * (_flatten(pred)[-200:])
    days = range(len(truths))[-200:]

    plt.figure(figsize=(12, 6))
    plt.plot(days, truths, label='truth')
    plt.plot(days, preds, label='pred')
    plt.legend(loc='upper left', frameon=False)
    plt.xlabel("day")
    plt.ylabel("normalized price")
    plt.ylim((min(truths), max(truths)))
    plt.grid(ls='--')
    plt.title(stock_sym + " | Last %d days in test" % len(truths))
    plt.show()


def _print_comparison_text(pred, real):
    pred = _flatten(pred)
    real = _flatten(real)
    for each in pred:
        print("%5f" % each, end=' ')
    print('')
    print('')
    for each in real:
        print("%5f" % each, end=' ')
    print('')
    print('')

def _flatten(seq):
    return np.array([x for y in seq for x in y])

fetch_stock_list_from_web()
load_stock_data()
preprocess_data(TRAIN_STOCK_LIST)
#train()
test()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Effective Engineer

The Effective Engineer

Edmond Lau / The Effective Bookshelf, Palo Alto, CA. / 2015-3-19 / USD 39.00

Introducing The Effective Engineer — the only book designed specifically for today's software engineers, based on extensive interviews with engineering leaders at top tech companies, and packed with h......一起来看看 《The Effective Engineer》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

MD5 加密
MD5 加密

MD5 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具