Integrating AWS Kinesis for Live Stock Market Data Streaming
Written on
Chapter 1: Introduction to Real-Time Stock Market Data
The ability to analyze stock market data in real-time is crucial for making educated investment choices. This article explores how to implement AWS Kinesis for real-time data streaming of stock market trends using Python, detailing each step from data collection to inference generation with a PyTorch model.
Section 1.1: Retrieving Live Stock Market Data
To begin retrieving data, you will first need to acquire API keys for both FRED and Yahoo Finance. You can execute the following command:
python fetch_data.py - fred_api_key YOUR_FRED_API_KEY - yahoo_api_key YOUR_YAHOO_API_KEY
import pandas_datareader as pdr
import yfinance as yf
import argparse
parser = argparse.ArgumentParser(description='Retrieve stock market data.')
parser.add_argument('--fred_api_key', type=str, required=True, help='FRED API Key')
parser.add_argument('--yahoo_api_key', type=str, required=True, help='Yahoo Finance API Key')
args = parser.parse_args()
yf.pdr_override()
# Features
yield_curve = pdr.get_data_fred('T10Y3M', api_key=args.fred_api_key)
recession_prob = pdr.get_data_fred('RECPROUSM156N', api_key=args.fred_api_key)
sp500 = yf.download('^GSPC', start='2000-01-01', end='2023-01-01', api_key=args.yahoo_api_key)
# Target
usrec = pdr.get_data_fred('USREC', api_key=args.fred_api_key)
Section 1.2: Streaming Data into Kinesis
Make sure that your AWS CLI is set up with the required permissions. Use the command below to stream your data:
python stream_to_kinesis.py - stream_name YOUR_STREAM_NAME
import boto3
import argparse
parser = argparse.ArgumentParser(description='Send data to Kinesis.')
parser.add_argument('--stream_name', type=str, required=True, help='Kinesis Stream Name')
args = parser.parse_args()
kinesis = boto3.client('kinesis')
data = {
'usrec': usrec.to_dict(),
'yield_curve': yield_curve.to_dict(),
'recession_prob': recession_prob.to_dict(),
'sp500': sp500.to_dict()
}
response = kinesis.put_record(
StreamName=args.stream_name,
Data=str(data),
PartitionKey='stock_data'
)
Chapter 2: Data Transformation and Storage
The video "How to Build a Real-Time Streaming Data Pipeline with Kinesis" illustrates the process of creating a real-time data pipeline utilizing AWS Kinesis. It provides insights into setting up a robust data streaming architecture.
Section 2.1: Data Processing with AWS Lambda
Ensure your AWS CLI is configured correctly and the necessary Lambda functions are deployed. The command for Lambda deployment is:
aws lambda create-function --function-name firehose_transformation --runtime python3.8 --role YOUR_LAMBDA_ROLE_ARN --handler lambda_function.firehose_transformation --zip-file fileb://path_to_your_lambda_function.zip
import json
import base64
def firehose_transformation(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
transformed_payload = json.loads(payload)
# Transformation logic
# Extracting relevant features and formatting for further analysis
transformed_payload['usrec_monthly_change'] = transformed_payload['usrec'][-1] - transformed_payload['usrec'][-2]
transformed_payload['yield_curve_trend'] = sum(transformed_payload['yield_curve'][-5:]) / 5 # 5-month average
transformed_payload['sp500_monthly_return'] = (transformed_payload['sp500'][-1] - transformed_payload['sp500'][-2]) / transformed_payload['sp500'][-2]
transformed_payload['recession_prob_trend'] = sum(transformed_payload['recession_prob'][-5:]) / 5 # 5-month average
# Removing raw data to keep the payload concise
del transformed_payload['usrec']
del transformed_payload['yield_curve']
del transformed_payload['recession_prob']
del transformed_payload['sp500']
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(transformed_payload).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}
Chapter 3: Data Preprocessing and Model Training
The video titled "Analyzing Data Streams in Real Time with Amazon Kinesis: PNNL's Serverless Data Lake Ingestion" provides a detailed guide on how to efficiently analyze real-time data streams.
Section 3.1: Data Preparation Using Pandas and Scikit-Learn
To preprocess your data, run the following command:
python preprocess_data.py --input_file path_to_input_data.csv --output_file path_to_output_data.csv
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
import argparse
parser = argparse.ArgumentParser(description='Prepare stock market data.')
parser.add_argument('--input_file', type=str, required=True, help='Path to input data file')
parser.add_argument('--output_file', type=str, required=True, help='Path to output data file')
args = parser.parse_args()
data = pd.read_csv(args.input_file)
# Features and Target
features = ['yield_curve_trend', 'sp500_monthly_return', 'recession_prob_trend']
target = 'usrec_monthly_change'
X = data[features]
y = data[target]
pipeline = Pipeline([ ('scaler', MinMaxScaler())])
normalized_data = pipeline.fit_transform(X)
normalized_data = pd.DataFrame(normalized_data, columns=features)
normalized_data[target] = y
normalized_data.to_csv(args.output_file, index=False)
Section 3.2: Training the Recession Classification Model
To train your model, use the command:
python train_model.py --train_data path_to_train_data.csv --model_output path_to_model_output.pth
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn.model_selection import train_test_split
import argparse
parser = argparse.ArgumentParser(description='Train recession classification model.')
parser.add_argument('--train_data', type=str, required=True, help='Path to training data file')
parser.add_argument('--model_output', type=str, required=True, help='Path to save trained model')
args = parser.parse_args()
data = pd.read_csv(args.train_data)
# Preprocessing and splitting logic
X = data.drop('USREC', axis=1).values
y = data['USREC'].values
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
class RecessionNet(nn.Module):
def __init__(self):
super(RecessionNet, self).__init__()
self.fc1 = nn.Linear(3, 10)
self.fc2 = nn.Linear(10, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.sigmoid(self.fc2(x))
return x
model = RecessionNet()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
epochs = 50
for epoch in range(epochs):
inputs = torch.tensor(X_train, dtype=torch.float32)
labels = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}/{epochs}, Loss: {loss.item()}")
# Save the trained model
torch.save(model.state_dict(), args.model_output)
Section 3.3: Making Predictions
To use your trained model for making predictions, run:
python make_inferences.py --model_path path_to_trained_model.pth --input_data path_to_input_data.csv
import torch
import pandas as pd
import argparse
parser = argparse.ArgumentParser(description='Generate predictions using trained model.')
parser.add_argument('--model_path', type=str, required=True, help='Path to trained model')
parser.add_argument('--input_data', type=str, required=True, help='Path to input data for prediction')
args = parser.parse_args()
class RecessionNet(nn.Module):
def __init__(self):
super(RecessionNet, self).__init__()
self.fc1 = nn.Linear(3, 10)
self.fc2 = nn.Linear(10, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.sigmoid(self.fc2(x))
return x
model = RecessionNet()
model.load_state_dict(torch.load(args.model_path))
model.eval()
data = pd.read_csv(args.input_data)
X_test = data.drop('USREC', axis=1).values
with torch.no_grad():
inputs = torch.tensor(X_test, dtype=torch.float32)
predictions = model(inputs)
predictions = (predictions > 0.5).int() # Classify as 0 or 1 based on threshold
print(predictions)
Conclusion
By leveraging AWS Kinesis for the real-time streaming of stock market data, we can utilize AWS services and Python libraries for effective data processing and analysis. Ensure that you have the necessary API keys, AWS configurations, and data files prepared before executing the provided scripts and commands.
Please subscribe via email for updates!
In Plain English
Thank you for being a part of our community! Before you go, be sure to clap and follow the writer! 👏 You can find even more content at PlainEnglish.io 🚀 Sign up for our free weekly newsletter. 🗞️ Follow us on Twitter, LinkedIn, YouTube, and Discord.