commit 9767adb7debaa08a19c53285d316064650c0ba5d Author: Thomas Rijpstra Date: Wed Mar 5 19:37:27 2025 +0100 Initial commit diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..d0bf80a --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,39 @@ +name: Test + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.ref }} + cancel-in-progress: true + +on: + pull_request: + workflow_call: + +jobs: + test: + name: run tests + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.10" ] + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + + - name: Test with pytest + run: | + coverage run -m pytest -v -s + + - name: Generate Coverage Report + run: | + coverage report -m diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a19790 --- /dev/null +++ b/.gitignore @@ -0,0 +1,174 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc diff --git a/docs/LLM-Research-PySpark-VaR.md b/docs/LLM-Research-PySpark-VaR.md new file mode 100644 index 0000000..56dbea0 --- /dev/null +++ b/docs/LLM-Research-PySpark-VaR.md @@ -0,0 +1,1041 @@ +# Using an LLM for researching financial risk management using Python + +_Model: Claude 3.7 Sonnet_ + +> Prompt: +> +> Act as a virtual tutor specializing in [certain skill]. Your task is to engage in one-on-one sessions with me, focusing on practicing and improving my [certain skill]. During these sessions, provide immediate, constructive feedback to help refine my technique, enhance my understanding, and boost my confidence in using the skill effectively. Utilize a variety of instructional strategies to cater to my learning style, including demonstrations, discussions, and hands-on practice. Encourage a positive learning environment where questions are welcomed and mistakes are viewed as learning opportunities. The goal is to ensure measurable progress in my proficiency with [certain skill], making the sessions both productive and enjoyable. + +Created at 2025-03-05 10:53:27 UTC + +> @ 2025-03-05 10:53:27 UTC +> +> The skill is 'PySpark' + +# PySpark Virtual Tutoring Session + +Welcome to your personalized PySpark tutoring session! I'm excited to help you develop your skills in this powerful big data processing framework. Let's get started with understanding what PySpark is and how we can structure our learning sessions. + +## What is PySpark? + +PySpark is the Python API for Apache Spark, enabling you to perform real-time, large-scale data processing in a distributed environment using Python. It also provides an interactive shell for analyzing your data. 【1】 + +PySpark was created to support the collaboration between Apache Spark and Python, essentially serving as a Python interface to the Spark framework. 【2】 + +## Why PySpark is Important in 2025 + +PySpark remains a crucial skill for data professionals in 2025, particularly for: + +- Processing massive datasets across multiple computers simultaneously 【3】 +- Performing exploratory data analysis at scale 【4】 +- Building machine learning pipelines 【4】 +- Creating ETL (Extract, Transform, Load) processes for data platforms 【4】 +- Handling big data processing tasks efficiently 【5】 + +## Key PySpark Capabilities We'll Cover + +During our sessions, we'll focus on developing your skills in these essential PySpark areas: + +1. Distributed computing fundamentals +2. DataFrame operations and SQL queries 【6】 +3. Real-time analytics processing 【6】 +4. Machine learning implementation 【6】 +5. Graph processing techniques 【6】 + +## Our Learning Approach + +For our tutoring sessions, I'll use a combination of: + +- Conceptual explanations +- Hands-on coding examples +- Real-world problem-solving exercises +- Immediate feedback on your code and approach + +## Getting Started + +Let's begin by understanding your current experience level with PySpark. This will help me tailor our sessions to your needs. + +1. Have you worked with PySpark before? +2. What's your background with Python and data processing? +3. Do you have any specific PySpark applications or projects in mind? +4. What aspects of PySpark are you most interested in learning? + +Once I understand your background, we can start with practical exercises tailored to your skill level and interests. + + +_References_: +- [PySpark Overview — PySpark 3.5.4 documentation - Apache Spark](https://spark.apache.org/docs/latest/api/python/index.html#:~:text=PySpark%20is,your%20data.) +- [What is PySpark? - Databricks](https://www.databricks.com/glossary/pyspark#:~:text=PySpark%20has,for%20Spark.) +- [PySpark Made Simple: From Basics to Big Data Mastery - Medium](https://medium.com/@nomannayeem/pyspark-made-simple-from-basics-to-big-data-mastery-cb1d702968be#:~:text=It%20allows,less%20time.) +- [A Brief Introduction to PySpark - Medium](https://medium.com/towards-data-science/a-brief-introduction-to-pyspark-ff4284701873#:~:text=PySpark%20is,data%20platform.) +- [A Brief Introduction to PySpark - Medium](https://medium.com/towards-data-science/a-brief-introduction-to-pyspark-ff4284701873#:~:text=PySpark%20is,data%20platform.) +- [A Brief Introduction to PySpark - Medium](https://medium.com/towards-data-science/a-brief-introduction-to-pyspark-ff4284701873#:~:text=PySpark%20is,data%20platform.) +- [PySpark Overview: Introduction to Big Data Processing with Python](https://pratikbarjatya.medium.com/pyspark-overview-introduction-to-big-data-processing-with-python-59e87b7bd4bc#:~:text=PySpark%2C%20the,data%20processing.) +- [PySpark for Data Science - KDnuggets](https://www.kdnuggets.com/2023/02/pyspark-data-science.html#:~:text=You%20can,graph%20processing.) +- [PySpark for Data Science - KDnuggets](https://www.kdnuggets.com/2023/02/pyspark-data-science.html#:~:text=You%20can,graph%20processing.) +- [PySpark for Data Science - KDnuggets](https://www.kdnuggets.com/2023/02/pyspark-data-science.html#:~:text=You%20can,graph%20processing.) +- [PySpark for Data Science - KDnuggets](https://www.kdnuggets.com/2023/02/pyspark-data-science.html#:~:text=You%20can,graph%20processing.) + +> @ 2025-03-05 10:55:46 UTC +> +> I have not worked with pyspark before. +> I've used Python for work for many years, and used it with Pandas and numpy etc. However it's been a couple of years since, so my knowledge is definitely out of date and rusty. +> I am applying for a job in finance risk management where they they use pyspark. +> I want to be able to solve finance risk management problems using pyspark. + +# PySpark for Finance Risk Management - Beginner Session + +Thank you for sharing your background! With your Python experience and familiarity with Pandas and NumPy, you have a solid foundation to build PySpark skills for finance risk management. + +## PySpark Fundamentals for Finance + +PySpark is the Python API for Apache Spark, enabling distributed data processing across multiple computers simultaneously - perfect for the large datasets common in financial risk analysis. 【1】【2】 + +### Why PySpark is Valuable in Finance Risk Management + +Financial institutions use PySpark for: + +1. Calculating investment risk levels +2. Processing large-scale banking transactions for fraud detection +3. Credit risk analysis and evaluation +4. Value-at-Risk (VaR) calculations at scale +5. Portfolio risk management and diversification analysis + +## Getting Started with PySpark for Finance + +Let's begin with setting up PySpark and understanding its basic structure: + +```python +# Installing PySpark (run in your terminal) +pip install pyspark[connect]==3.5.5 +``` + +### Basic PySpark Session Setup + +```python +from pyspark.sql import SparkSession + +# Create a Spark session +spark = SparkSession.builder \ + .appName("Finance Risk Management") \ + .getOrCreate() +``` + +### Loading Financial Data + +```python +# Example: Loading financial data +# This could be transaction data, market prices, etc. +financial_data = spark.read.csv("financial_data.csv", header=True, inferSchema=True) + +# Display the first few rows +financial_data.show(5) + +# Get schema information +financial_data.printSchema() +``` + +## PySpark vs. Pandas: Key Differences + +Since you're familiar with Pandas, here are the key differences: + +1. **Scale**: PySpark handles terabytes of data across clusters; Pandas is limited to memory on a single machine +2. **Execution**: PySpark uses lazy evaluation (executes only when needed); Pandas executes immediately +3. **API**: PySpark's DataFrame API is similar to Pandas but with some syntax differences +4. **Performance**: PySpark excels with large datasets; Pandas is faster for smaller datasets + +## Simple Risk Analysis Example + +Let's implement a basic financial risk calculation using PySpark: + +```python +from pyspark.sql.functions import col, stddev, mean + +# Calculate portfolio volatility (a basic risk measure) +# Assuming we have daily returns in a column called 'returns' +risk_metrics = financial_data.select( + stddev("returns").alias("volatility"), + mean("returns").alias("average_return") +) + +risk_metrics.show() +``` + +## Value-at-Risk (VaR) Calculation + +VaR is a common risk metric in finance. Here's a simple implementation: + +```python +from pyspark.sql.functions import expr, percentile_approx + +# Calculate 95% VaR +confidence_level = 0.95 +var_calculation = financial_data.select( + percentile_approx("returns", 1-confidence_level, 10000).alias("VaR_95") +) + +var_calculation.show() +``` + +## Next Steps in Our Learning Path + +Based on your background and goals, here's what we'll cover in our upcoming sessions: + +1. **Data preprocessing** for financial datasets using PySpark +2. **Feature engineering** for risk models +3. **Machine learning pipelines** for credit risk assessment +4. **Monte Carlo simulations** for market risk +5. **Real-time risk monitoring** with streaming data + +## Practice Exercise + +Let's start with a simple exercise to get you comfortable with PySpark syntax: + +1. Create a PySpark DataFrame with sample financial data +2. Calculate basic statistics (mean, standard deviation, min, max) for key columns +3. Filter the data based on risk thresholds +4. Group data by categories and analyze risk metrics by group + +Would you like to try this exercise now, or would you prefer to focus on a specific aspect of PySpark for finance risk management first? + + +_References_: +- [(PDF) Credit Card Fraud Detection with Machine Learning and Big...](https://www.researchgate.net/publication/381931450_Credit_Card_Fraud_Detection_with_Machine_Learning_and_Big_Data_Analytics_A_PySpark_Framework_Implementation#:~:text=risk%20management.,financial%20reporting.) +- [PySpark: A Powerful Tool in the Finance Industry | by Ritesh Bakliwal](https://medium.com/@ritesh.bakliwal/pyspark-a-powerful-tool-for-the-finance-industry-9b5e8efa936d#:~:text=Risk%20Management%E2%80%8B%E2%80%8B,risk%20through) + +> @ 2025-03-01 05:14:19 UTC +> +> Let's dive into the VaR calculation first. +> Explain what it is, and how it works. Why is it relevant, and what are its implications. What are its pitfalls? + +# Value-at-Risk (VaR) in Financial Risk Management with PySpark + +## What is Value-at-Risk (VaR)? + +Value-at-Risk (VaR) is a statistical risk measure that quantifies the potential loss in value of a portfolio over a defined time period for a given confidence interval. In simpler terms, VaR answers the question: "How much could I lose with X% confidence over the next N days?" + +For example, a one-day 95% VaR of $1 million means there's a 95% probability that the portfolio won't lose more than $1 million in a single day. + +## How VaR Works + +VaR calculation typically follows one of three methodologies: + +1. **Historical Simulation Method**: Uses historical returns to estimate potential future losses + ```python + from pyspark.sql.functions import percentile_approx + + # Assuming 'returns' column contains daily portfolio returns + historical_var = financial_data.select( + percentile_approx("returns", 0.05, 10000).alias("VaR_95_percent") + ) + ``` + +2. **Parametric Method (Variance-Covariance)**: Assumes returns follow a normal distribution + ```python + from pyspark.sql.functions import mean, stddev, lit + + # Calculate mean and standard deviation of returns + stats = financial_data.select( + mean("returns").alias("mean_return"), + stddev("returns").alias("std_return") + ).collect()[0] + + # Calculate VaR assuming normal distribution (95% confidence = 1.645 std deviations) + confidence_factor = 1.645 + parametric_var = stats.mean_return - (confidence_factor * stats.std_return) + ``` + +3. **Monte Carlo Simulation**: Generates thousands of random scenarios based on statistical properties of the data + ```python + # This would be implemented as a more complex PySpark job + # involving random number generation and parallel simulations + ``` + +## Why VaR is Relevant in Finance Risk Management + +VaR has become a standard risk metric in finance because it: + +1. Provides a single, easy-to-understand number that quantifies risk +2. Allows comparison of risk across different asset classes and portfolios +3. Helps financial institutions meet regulatory requirements (Basel regulations) +4. Enables risk-based capital allocation and limit setting + +## Implementing VaR at Scale with PySpark + +Financial institutions use PySpark for VaR calculations because: + +1. It can process massive datasets across distributed systems +2. Monte Carlo simulations can be parallelized efficiently +3. Historical data analysis can be performed on years of market data +4. Real-time risk monitoring becomes feasible + +## Pitfalls and Limitations of VaR + +Despite its popularity, VaR has several significant limitations: + +1. **Tail Risk Blindness**: VaR says nothing about the severity of losses beyond the confidence threshold +2. **Model Risk**: Different VaR calculation methods can produce vastly different results +3. **Correlation Breakdown**: Correlations between assets often change during market stress +4. **False Sense of Security**: A single number can oversimplify complex risk profiles +5. **Backward-Looking**: Historical data may not predict future market behavior, especially during crises + +## Modern Approaches to Enhance VaR + +To address these limitations, modern risk management often supplements VaR with: + +1. **Conditional VaR (CVaR)**: Measures the expected loss beyond the VaR threshold + ```python + from pyspark.sql.functions import col, when, mean + + # Calculate CVaR (Expected Shortfall) + cvar_calculation = financial_data.withColumn( + "beyond_var", when(col("returns") < var_value, col("returns")).otherwise(None) + ).select(mean("beyond_var").alias("CVaR_95_percent")) + ``` + +2. **Stress Testing**: Simulating extreme but plausible scenarios +3. **Sensitivity Analysis**: Understanding how VaR changes with different assumptions +4. **Alternative Data Integration**: Incorporating non-traditional data sources for more robust risk assessment + +## PySpark Implementation Considerations + +When implementing VaR calculations in PySpark for finance risk management: + +1. Ensure data quality and completeness +2. Consider computational efficiency for large portfolios +3. Implement proper validation and backtesting procedures +4. Design for both batch processing and near-real-time updates +5. Incorporate model governance and documentation + +Would you like me to elaborate on any specific aspect of VaR calculation or show you a more detailed PySpark implementation example? + +> @ 2025-03-01 05:19:37 UTC +> +> Let's explore the use-case of where a risk calculation has been made in an excel sheet and we want to convert it to pyspark, and augment it by adding both unit- and end-to-end tests. Show me how, assuming standard, tutorial values. Explain the context and input variables of the excel sheet and if and where the implementations (excel vs pyspark) differ, both in technical value as in business value (i.e. how it can be used in financial risk management)> + +# Migrating Financial Risk VaR Calculation from Excel to PySpark + +## Context: Excel-Based VaR Calculation + +In financial risk management, Value-at-Risk (VaR) calculations are often initially developed in Excel due to its accessibility and familiarity. Let's assume we have an Excel model that calculates VaR for a portfolio of stocks using the historical simulation method. + +### Excel Implementation Context + +The Excel sheet typically contains: + +1. Historical daily returns for multiple stocks (e.g., 5 stocks over 2 years) +2. Portfolio weights for each stock +3. Calculation of portfolio returns based on weighted stock returns +4. VaR calculation at 95% confidence level using percentile function + +## Migrating to PySpark: Implementation + +Let's convert this Excel-based VaR calculation to PySpark with proper testing: + +### Step 1: Set Up PySpark Environment + +```python +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, expr, percentile_approx, sum, lit, array +from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType +import datetime as dt + +# Initialize Spark session +spark = SparkSession.builder \ + .appName("Financial Risk VaR Calculation") \ + .getOrCreate() +``` + +### Step 2: Create Sample Data (Mimicking Excel Input) + +```python +# Define schema for stock returns +schema = StructType([ + StructField("date", DateType(), False), + StructField("AAPL", DoubleType(), True), + StructField("MSFT", DoubleType(), True), + StructField("AMZN", DoubleType(), True), + StructField("GOOGL", DoubleType(), True), + StructField("META", DoubleType(), True) +]) + +# Sample data (daily returns as decimals) +# In real implementation, you would load this from a data source +data = [ + (dt.date(2023, 1, 3), 0.02, 0.01, -0.015, 0.005, 0.01), + (dt.date(2023, 1, 4), -0.01, 0.02, 0.01, -0.005, 0.02), + (dt.date(2023, 1, 5), 0.005, -0.01, 0.02, 0.01, -0.015), + # Add more historical data points here +] + +# Create DataFrame +returns_df = spark.createDataFrame(data, schema) + +# Portfolio weights (as would be defined in Excel) +weights = {"AAPL": 0.25, "MSFT": 0.25, "AMZN": 0.20, "GOOGL": 0.15, "META": 0.15} +``` + +### Step 3: Implement VaR Calculation Function + +```python +def calculate_var(returns_df, weights, confidence_level=0.95): + """ + Calculate Value-at-Risk using historical simulation method + + Args: + returns_df: DataFrame with date and daily returns by stock + weights: Dictionary of portfolio weights by stock + confidence_level: Confidence level for VaR (default 95%) + + Returns: + VaR value at specified confidence level + """ + # Create a weights DataFrame for joining + weights_data = [(k, v) for k, v in weights.items()] + weights_df = spark.createDataFrame(weights_data, ["stock", "weight"]) + + # Reshape returns from wide to long format + stocks = [c for c in returns_df.columns if c != "date"] + returns_long = returns_df.select( + "date", + *[expr(f"'{stock}' as stock, {stock} as return") + for stock in stocks] + ).selectExpr("date", "stack(" + str(len(stocks)) + ", " + + ", ".join([f"'{s}', {s}" for s in stocks]) + ") as (stock, return)") + + # Join returns with weights + weighted_returns = returns_long.join(weights_df, "stock") + + # Calculate weighted returns + weighted_returns = weighted_returns.withColumn( + "weighted_return", col("return") * col("weight") + ) + + # Aggregate to portfolio level by date + portfolio_returns = weighted_returns.groupBy("date").agg( + sum("weighted_return").alias("portfolio_return") + ) + + # Calculate VaR + var_value = portfolio_returns.select( + percentile_approx("portfolio_return", 1 - confidence_level, 10000).alias("VaR") + ).collect()[0]["VaR"] + + # Return the absolute value as VaR is typically reported as a positive number + return abs(var_value) +``` + +### Step 4: Implement Unit Tests + +```python +def test_calculate_var(): + """Unit test for VaR calculation function""" + # Create test data with known outcome + test_schema = StructType([ + StructField("date", DateType(), False), + StructField("Stock1", DoubleType(), True), + StructField("Stock2", DoubleType(), True) + ]) + + test_data = [ + (dt.date(2023, 1, 1), -0.05, -0.03), + (dt.date(2023, 1, 2), 0.02, 0.01), + (dt.date(2023, 1, 3), 0.01, -0.01), + (dt.date(2023, 1, 4), -0.02, 0.02), + (dt.date(2023, 1, 5), 0.03, 0.02) + ] + + test_df = spark.createDataFrame(test_data, test_schema) + test_weights = {"Stock1": 0.6, "Stock2": 0.4} + + # With these values, the portfolio returns are: + # [-0.042, 0.016, 0.002, -0.004, 0.026] + # The 5% VaR should be 0.042 + + var_result = calculate_var(test_df, test_weights, 0.95) + + # Assert with some tolerance for floating point + assert abs(var_result - 0.042) < 0.001, f"Expected VaR around 0.042, got {var_result}" + print("Unit test passed!") + +# Run the unit test +test_calculate_var() +``` + +### Step 5: End-to-End Test + +```python +def end_to_end_test(): + """End-to-end test for the VaR calculation pipeline""" + # Calculate VaR with our sample data + var_result = calculate_var(returns_df, weights, 0.95) + + print(f"95% VaR for the portfolio: {var_result:.4f} (or {var_result*100:.2f}%)") + + # Additional validation checks + # 1. Check that VaR is positive + assert var_result > 0, "VaR should be positive" + + # 2. Check that VaR is reasonable (typically between 0.5% and 10% for daily VaR) + assert 0.005 <= var_result <= 0.10, f"VaR value {var_result} outside reasonable range" + + # 3. Verify with different confidence levels + var_99 = calculate_var(returns_df, weights, 0.99) + assert var_99 > var_result, "99% VaR should be higher than 95% VaR" + + print("End-to-end test passed!") + +# Run the end-to-end test +end_to_end_test() +``` + +## Technical Differences: Excel vs. PySpark + +| Aspect | Excel Implementation | PySpark Implementation | +|--------|----------------------|------------------------| +| Data Volume | Limited by Excel's row limit (~1M rows) | Can handle billions of rows distributed across clusters | +| Computation Speed | Slow for large datasets | Fast parallel processing across multiple nodes | +| Data Integration | Manual import/export | Direct connection to various data sources | +| Reproducibility | Prone to manual errors | Code-based, version-controlled, reproducible | +| Scalability | Limited to single machine | Horizontally scalable across clusters | +| Automation | Limited scheduling options | Can be integrated into automated pipelines | + +## Business Value Differences + +### Excel Limitations in Risk Management + +1. **Limited Historical Data**: Excel's row limitations restrict the amount of historical data for VaR calculations +2. **Calculation Speed**: Slow processing for large portfolios or frequent recalculations +3. **Error Prone**: Manual formula adjustments can introduce errors +4. **Limited Scenario Analysis**: Difficult to run thousands of Monte Carlo simulations +5. **Collaboration Challenges**: Version control issues when multiple risk analysts work together + +### PySpark Advantages for Financial Risk Management + +1. **Comprehensive Risk Assessment**: Analyze entire trading history across all instruments +2. **Real-time Risk Monitoring**: Process streaming market data for near-real-time VaR updates +3. **Stress Testing at Scale**: Run thousands of scenarios simultaneously to identify vulnerabilities +4. **Multi-factor Risk Models**: Incorporate more risk factors than possible in Excel +5. **Regulatory Compliance**: Meet stringent regulatory requirements with comprehensive data analysis +6. **Alternative Data Integration**: Incorporate non-traditional data sources for more robust risk assessment + +## Practical Applications in Finance Risk Management + +With the PySpark implementation, financial institutions can: + +1. **Intraday Risk Monitoring**: Calculate VaR multiple times per day across the entire portfolio +2. **Cross-Asset Risk Analysis**: Analyze correlations between different asset classes at scale +3. **Tail Risk Assessment**: Supplement VaR with additional metrics like Expected Shortfall +4. **Scenario Analysis**: Test portfolio performance under various market conditions +5. **Regulatory Reporting**: Generate comprehensive risk reports for Basel regulations +6. **Portfolio Optimization**: Use risk metrics to optimize portfolio allocation + +## Conclusion + +Migrating VaR calculations from Excel to PySpark represents a significant advancement in financial risk management capabilities. While Excel provides a familiar starting point for developing risk models, PySpark offers the scalability, reproducibility, and computational power needed for enterprise-level risk management in modern financial institutions. + +The addition of proper testing frameworks ensures the reliability of these critical calculations, providing confidence in risk assessments that drive important business decisions. + +> @ 2025-03-01 07:17:34 UTC +> +> Let's explore those practical applications in finance risk management in detail. Let's go through them one-by-one and explain the problem that application solves and how it solves it. Show some example code and example outcomes and explain the benefits and common pitfalls for that specific application. + +# Practical Applications of PySpark in Finance Risk Management + +Let's explore key PySpark applications in financial risk management, examining how each solves specific problems with example code, outcomes, benefits, and pitfalls. + +## 1. Intraday Risk Monitoring + +### Problem +Financial institutions need to continuously monitor portfolio risk throughout trading hours as market conditions change rapidly. + +### Solution +PySpark enables near real-time VaR calculations by processing streaming market data and portfolio positions. + +```python +from pyspark.sql import SparkSession +from pyspark.sql.functions import window, col, sum, expr, percentile_approx +from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType + +# Initialize Spark session +spark = SparkSession.builder \ + .appName("Intraday Risk Monitoring") \ + .getOrCreate() + +# Define schema for streaming market data +schema = StructType([ + StructField("timestamp", TimestampType(), False), + StructField("instrument_id", StringType(), False), + StructField("price", DoubleType(), False), + StructField("volume", DoubleType(), True) +]) + +# Read streaming market data +market_data_stream = spark.readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "broker:9092") \ + .option("subscribe", "market_data") \ + .load() \ + .select(from_json(col("value").cast("string"), schema).alias("data")) \ + .select("data.*") + +# Load portfolio positions (static for this example) +positions = spark.createDataFrame([ + ("AAPL", 1000), + ("MSFT", 1500), + ("AMZN", 200) +], ["instrument_id", "quantity"]) + +# Join streaming data with positions +position_values = market_data_stream \ + .join(positions, "instrument_id") \ + .withColumn("position_value", col("price") * col("quantity")) + +# Calculate portfolio value in 5-minute windows +portfolio_values = position_values \ + .groupBy(window(col("timestamp"), "5 minutes")) \ + .agg(sum("position_value").alias("portfolio_value")) + +# Calculate rolling VaR (simplified) +def calculate_var(df, window_size="30 minutes"): + return df \ + .withWatermark("window", window_size) \ + .groupBy() \ + .agg(percentile_approx("portfolio_value", 0.05, 10000).alias("VaR_95")) + +# Output results to dashboard +query = calculate_var(portfolio_values) \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ + .start() + +query.awaitTermination() +``` + +### Example Outcome +``` ++-------------------+ +| VaR_95 | ++-------------------+ +| 2450000 | ++-------------------+ +``` + +### Benefits +- Enables timely risk-based decisions during market volatility +- Allows traders to adjust positions before end-of-day risk limits are breached +- Provides early warning signals for potential market stress + +### Pitfalls +- Stream processing introduces complexity in error handling +- Requires careful handling of late-arriving data +- May generate false alarms during temporary market fluctuations + +## 2. Credit Risk Analysis and Scoring + +### Problem +Financial institutions need to assess borrower creditworthiness by analyzing large volumes of historical and alternative data. + +### Solution +PySpark enables processing of diverse data sources to build and deploy credit risk models at scale. + +```python +from pyspark.sql import SparkSession +from pyspark.ml.feature import VectorAssembler, StandardScaler +from pyspark.ml.classification import LogisticRegression +from pyspark.ml.evaluation import BinaryClassificationEvaluator +from pyspark.ml import Pipeline + +# Initialize Spark session +spark = SparkSession.builder \ + .appName("Credit Risk Scoring") \ + .getOrCreate() + +# Load credit application data +credit_data = spark.read.parquet("s3://finance-data/credit_applications.parquet") + +# Load alternative data sources +payment_history = spark.read.parquet("s3://finance-data/payment_history.parquet") +account_data = spark.read.parquet("s3://finance-data/account_data.parquet") + +# Join datasets +joined_data = credit_data \ + .join(payment_history, "customer_id") \ + .join(account_data, "customer_id") + +# Feature engineering +joined_data = joined_data \ + .withColumn("payment_ratio", col("payments_made") / col("payments_due")) \ + .withColumn("utilization_ratio", col("balance") / col("credit_limit")) \ + .withColumn("debt_to_income", col("total_debt") / col("annual_income")) + +# Prepare features +feature_cols = ["age", "annual_income", "payment_ratio", "utilization_ratio", + "debt_to_income", "num_late_payments", "credit_history_length"] + +assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw") +scaler = StandardScaler(inputCol="features_raw", outputCol="features") +lr = LogisticRegression(featuresCol="features", labelCol="default_flag", + maxIter=10, regParam=0.01) + +# Create and train pipeline +pipeline = Pipeline(stages=[assembler, scaler, lr]) +train, test = joined_data.randomSplit([0.7, 0.3], seed=42) +model = pipeline.fit(train) + +# Evaluate model +predictions = model.transform(test) +evaluator = BinaryClassificationEvaluator(labelCol="default_flag") +auc = evaluator.evaluate(predictions) +print(f"AUC: {auc}") + +# Score new applications +new_applications = spark.read.parquet("s3://finance-data/new_applications.parquet") +scored_applications = model.transform(new_applications) \ + .select("customer_id", "probability", "prediction") \ + .withColumn("credit_score", (1 - col("probability")[1]) * 850) + +# Save results +scored_applications.write.mode("overwrite").parquet("s3://finance-data/credit_scores.parquet") +``` + +### Example Outcome +``` ++------------+--------------------+----------+------------+ +| customer_id| probability|prediction|credit_score| ++------------+--------------------+----------+------------+ +| C10045|[0.92, 0.08] | 0.0| 782.0| +| C10046|[0.75, 0.25] | 0.0| 637.5| +| C10047|[0.35, 0.65] | 1.0| 297.5| ++------------+--------------------+----------+------------+ +``` + +### Benefits +- Processes diverse data sources beyond traditional credit bureau data +- Scales to millions of applications with consistent scoring +- Enables rapid model updates as economic conditions change + +### Pitfalls +- Requires careful handling of missing data and outliers +- Model bias can lead to unfair lending practices +- Regulatory compliance requires model explainability + +## 3. Monte Carlo Simulations for Market Risk + +### Problem +Financial institutions need to understand potential portfolio outcomes under thousands of market scenarios. + +### Solution +PySpark enables parallel execution of Monte Carlo simulations to estimate VaR and other risk metrics. + +```python +from pyspark.sql import SparkSession +from pyspark.sql.functions import udf, col, lit, expr +from pyspark.sql.types import ArrayType, DoubleType +import numpy as np + +# Initialize Spark session +spark = SparkSession.builder \ + .appName("Monte Carlo Simulations") \ + .getOrCreate() + +# Load portfolio data +portfolio = spark.createDataFrame([ + ("Equity", "AAPL", 1000000, 0.08, 0.25), + ("Equity", "MSFT", 1500000, 0.07, 0.22), + ("Bond", "US10Y", 2000000, 0.03, 0.05), + ("FX", "EUR/USD", 800000, 0.01, 0.12) +], ["asset_class", "instrument", "position", "expected_return", "volatility"]) + +# Define simulation parameters +num_simulations = 10000 +time_horizon = 10 # days +confidence_level = 0.99 + +# Define Monte Carlo simulation function +@udf(returnType=ArrayType(DoubleType())) +def simulate_returns(expected_return, volatility, position): + daily_return = expected_return / 252 + daily_volatility = volatility / np.sqrt(252) + + # Generate random returns using geometric Brownian motion + random_returns = np.random.normal( + daily_return - 0.5 * daily_volatility**2, + daily_volatility, + (num_simulations, time_horizon) + ) + + # Calculate cumulative returns + cumulative_returns = np.cumprod(np.exp(random_returns), axis=1) + + # Calculate final portfolio values + final_values = position * cumulative_returns[:, -1] + + return final_values.tolist() + +# Run simulations +simulated_portfolio = portfolio.withColumn( + "simulated_values", + simulate_returns(col("expected_return"), col("volatility"), col("position")) +) + +# Create a DataFrame with one row per simulation +simulation_results = simulated_portfolio.select( + "asset_class", "instrument", "position", "simulated_values" +).selectExpr( + "asset_class", "instrument", "position", "explode(simulated_values) as simulated_value" +) + +# Aggregate results by simulation ID +portfolio_simulations = simulation_results.groupBy().pivot("instrument").sum("simulated_value") + +# Calculate portfolio-level statistics +portfolio_stats = portfolio_simulations.select( + expr("percentile_approx(AAPL + MSFT + US10Y + `EUR/USD`, 1-0.99, 10000)").alias("VaR_99"), + expr("percentile_approx(AAPL + MSFT + US10Y + `EUR/USD`, 1-0.95, 10000)").alias("VaR_95"), + expr("avg(AAPL + MSFT + US10Y + `EUR/USD`)").alias("Expected_Value") +) + +portfolio_stats.show() +``` + +### Example Outcome +``` ++-----------------+-----------------+------------------+ +| VaR_99| VaR_95| Expected_Value| ++-----------------+-----------------+------------------+ +| -452,367| -298,124| 5,512,846 | ++-----------------+-----------------+------------------+ +``` + +### Benefits +- Captures complex market dynamics and correlations +- Provides distribution of potential outcomes beyond point estimates +- Enables stress testing under extreme scenarios + +### Pitfalls +- Results highly sensitive to model assumptions +- Computational intensity requires careful resource management +- Correlation assumptions may break down during market stress + +## 4. Regulatory Reporting and Compliance + +### Problem +Financial institutions must generate comprehensive risk reports for regulatory compliance (Basel III, FRTB, etc.). + +### Solution +PySpark enables processing of enterprise-wide data to generate regulatory risk metrics and reports. + +```python +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, sum, when, lit, expr +from pyspark.sql.window import Window + +# Initialize Spark session +spark = SparkSession.builder \ + .appName("Regulatory Risk Reporting") \ + .getOrCreate() + +# Load trading book data +trading_book = spark.read.parquet("s3://finance-data/trading_positions.parquet") + +# Load market data +market_data = spark.read.parquet("s3://finance-data/market_data.parquet") + +# Load risk factor sensitivities +risk_sensitivities = spark.read.parquet("s3://finance-data/risk_sensitivities.parquet") + +# Join datasets +position_risk = trading_book \ + .join(market_data, "instrument_id") \ + .join(risk_sensitivities, "instrument_id") + +# Calculate market value +position_risk = position_risk \ + .withColumn("market_value", col("quantity") * col("price")) + +# Calculate risk-weighted assets (RWA) for credit risk +credit_rwa = position_risk \ + .withColumn("credit_rwa", + when(col("counterparty_rating") == "AAA", col("market_value") * 0.2) + .when(col("counterparty_rating") == "AA", col("market_value") * 0.5) + .when(col("counterparty_rating") == "A", col("market_value") * 0.7) + .when(col("counterparty_rating") == "BBB", col("market_value") * 1.0) + .when(col("counterparty_rating") == "BB", col("market_value") * 1.5) + .when(col("counterparty_rating") == "B", col("market_value") * 2.5) + .otherwise(col("market_value") * 4.0)) + +# Calculate market risk capital (simplified FRTB approach) +market_risk = position_risk \ + .withColumn("delta_risk", col("delta") * col("market_value") * col("risk_weight")) \ + .withColumn("vega_risk", col("vega") * col("market_value") * col("risk_weight") * 0.5) \ + .withColumn("curvature_risk", col("gamma") * col("market_value") * col("risk_weight") * 0.25) + +# Aggregate by risk class +risk_by_class = market_risk \ + .groupBy("risk_class") \ + .agg( + sum("delta_risk").alias("total_delta_risk"), + sum("vega_risk").alias("total_vega_risk"), + sum("curvature_risk").alias("total_curvature_risk") + ) \ + .withColumn("total_market_risk", + col("total_delta_risk") + col("total_vega_risk") + col("total_curvature_risk")) + +# Calculate liquidity coverage ratio (LCR) +liquidity_metrics = position_risk \ + .withColumn("hqla", + when(col("asset_class") == "Sovereign", col("market_value")) + .when(col("asset_class") == "Cash", col("market_value")) + .otherwise(0)) \ + .withColumn("outflow", + when(col("maturity") < 30, col("market_value")) + .otherwise(0)) \ + .agg( + sum("hqla").alias("total_hqla"), + sum("outflow").alias("total_outflow") + ) \ + .withColumn("lcr", col("total_hqla") / col("total_outflow")) + +# Generate final regulatory report +regulatory_report = spark.createDataFrame([ + ("Credit RWA", credit_rwa.agg(sum("credit_rwa")).collect()[0][0]), + ("Market Risk Capital", risk_by_class.agg(sum("total_market_risk")).collect()[0][0]), + ("Liquidity Coverage Ratio", liquidity_metrics.select("lcr").collect()[0][0]) +], ["metric", "value"]) + +regulatory_report.show() +``` + +### Example Outcome +``` ++------------------------+---------------+ +| metric| value| ++------------------------+---------------+ +| Credit RWA| 325450000 | +| Market Risk Capital| 42750000 | +|Liquidity Coverage Ratio| 1.24| ++------------------------+---------------+ +``` + +### Benefits +- Consolidates data from multiple systems for consistent reporting +- Enables drill-down capabilities for regulatory inquiries +- Automates complex regulatory calculations + +### Pitfalls +- Regulatory interpretations may vary across jurisdictions +- Data quality issues can lead to reporting errors +- Changing regulations require frequent model updates + +## 5. Fraud Detection and Anti-Money Laundering (AML) + +### Problem +Financial institutions must identify suspicious transactions among billions of legitimate ones. + +### Solution +PySpark enables real-time and batch processing of transaction data to detect potential fraud and money laundering. + +```python +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, window, avg, stddev, count, sum, when, expr +from pyspark.ml.feature import VectorAssembler +from pyspark.ml.clustering import KMeans + +# Initialize Spark session +spark = SparkSession.builder \ + .appName("Fraud Detection") \ + .getOrCreate() + +# Load transaction data +transactions = spark.read.parquet("s3://finance-data/transactions.parquet") + +# Feature engineering for fraud detection +customer_profiles = transactions \ + .groupBy("customer_id") \ + .agg( + avg("amount").alias("avg_amount"), + stddev("amount").alias("stddev_amount"), + count("transaction_id").alias("transaction_count"), + avg(when(col("transaction_type") == "online", col("amount")).otherwise(0)).alias("avg_online_amount"), + count(when(col("transaction_type") == "online", 1).otherwise(None)).alias("online_count"), + avg(when(col("country") != "home_country", col("amount")).otherwise(0)).alias("avg_foreign_amount"), + count(when(col("country") != "home_country", 1).otherwise(None)).alias("foreign_count") + ) + +# Calculate transaction velocity +transaction_velocity = transactions \ + .groupBy("customer_id", window("timestamp", "1 hour")) \ + .count() \ + .groupBy("customer_id") \ + .agg(avg("count").alias("avg_hourly_transactions"), + stddev("count").alias("stddev_hourly_transactions"), + max("count").alias("max_hourly_transactions")) + +# Join features +fraud_features = customer_profiles.join(transaction_velocity, "customer_id") + +# Prepare for anomaly detection +assembler = VectorAssembler( + inputCols=["avg_amount", "stddev_amount", "transaction_count", + "avg_online_amount", "online_count", "avg_foreign_amount", + "foreign_count", "avg_hourly_transactions", + "max_hourly_transactions"], + outputCol="features" +) + +fraud_vectors = assembler.transform(fraud_features) + +# Apply K-means clustering for anomaly detection +kmeans = KMeans(k=5, seed=42) +model = kmeans.fit(fraud_vectors.select("features")) +clustered = model.transform(fraud_vectors) + +# Identify anomalous clusters +cluster_sizes = clustered.groupBy("prediction").count().collect() +smallest_clusters = sorted([(row["prediction"], row["count"]) for row in cluster_sizes], + key=lambda x: x[1])[:2] +anomalous_clusters = [cluster[0] for cluster in smallest_clusters] + +# Flag suspicious customers +suspicious_customers = clustered \ + .withColumn("suspicious", + when(col("prediction").isin(anomalous_clusters), 1).otherwise(0)) \ + .filter(col("suspicious") == 1) + +# Generate alerts for recent transactions +alerts = transactions \ + .join(suspicious_customers.select("customer_id", "suspicious"), "customer_id") \ + .filter(col("timestamp") > expr("current_timestamp() - interval 1 day")) \ + .select("transaction_id", "customer_id", "timestamp", "amount", + "transaction_type", "country") + +alerts.show() +``` + +### Example Outcome +``` ++---------------+------------+-------------------+--------+----------------+---------+ +| transaction_id| customer_id| timestamp| amount|transaction_type| country| ++---------------+------------+-------------------+--------+----------------+---------+ +| T98765432| C123456|2025-03-01 09:45:22| 5000.00| online| Nigeria| +| T98765433| C123456|2025-03-01 09:47:15| 4500.00| online| Nigeria| +| T98765450| C789012|2025-03-01 12:32:08|12500.00| atm| Russia| ++---------------+------------+-------------------+--------+----------------+---------+ +``` + +### Benefits +- Identifies complex fraud patterns across multiple transactions +- Reduces false positives compared to rule-based systems +- Adapts to evolving fraud techniques + +### Pitfalls +- Imbalanced datasets make model training challenging +- Legitimate unusual transactions may trigger false alerts +- Requires careful handling of privacy concerns + + +--- + +_Generated by [The Assistant](https://kagi.com/assistant)_ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..2661203 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[tool.ruff] +line-length = 100 \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..ebed230 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +pythonpath = . +testpaths = src +python_files = *_test.py \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b8cbc32 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +numpy==2.2.3 +pandas==2.2.3 +pytest==8.3.5 +scipy==1.15.2 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/assignment.md b/src/assignment.md new file mode 100644 index 0000000..5411f26 --- /dev/null +++ b/src/assignment.md @@ -0,0 +1,30 @@ +#%% md +## Answer 1 - Option + +* Implied Volatility (IV) provides a prediction of the future, while Historical Volatility (HV) provides an observation of the past. +* IV is derived from option pricing models while HV is calculated directly from historical price data. +* IV provides a subjective measure on future volatility, while HV provides an objective measure of baseline volatility. + +## Answer 2 - VaR + +Value-at-Risk (VaR) is a measurement of the maximum potential loss in value of a portfolio over a defined period (e.g. 1 day) and for a given confidence interval (e.g. 95%). + +There are various methods of calculation that mainly differ in the way they approximate portfolio volatility. + +The simplest method (parametric) assumes that portfolio volatility is constant and thus return variability can be approximated using a normal distribution. It requires very little data (only the current portfolio value and a measure of volatility, e.g. annual volatility of the portfolio) and is easily computed but has practical limitations due to its assumptions. + +A more complex method is the Peaks Over Threshold (POT) method. This method requires more data, specifically containing sufficient extreme events, as it uses historical excesses in portfolio returns to extrapolate risk beyond historical observations. +In other words, it estimates the chance of future excesses in returns. +This estimation is dependent on the method's parameters, so stress testing is needed make sure the parameters are aptly chosen. + +Because VaR is used in risk management, and specifically to reduce portfolio risk if it's deemed to large, a method that specifically models extreme events (i.e. "tail focus") is preferable. + +## Answer 3 - Option + +Refer to `options.py` for the implementation and `options_test.py` for both unit- and end-to-end tests. + +## Answer 4 - VaR + +Refer to `var.py` for the implementation and `var_test.py` for both unit- and end-to-end tests. +#%% + diff --git a/src/data/var_fx_prices.csv b/src/data/var_fx_prices.csv new file mode 100644 index 0000000..bc3df32 --- /dev/null +++ b/src/data/var_fx_prices.csv @@ -0,0 +1,261 @@ +date,ccy-1,ccy-2 +14-11-2019,1.1684427,0.886564121 +13-11-2019,1.165976797,0.884329678 +12-11-2019,1.16603118,0.883470271 +11-11-2019,1.166901992,0.87753938 +8-11-2019,1.160725686,0.877658927 +7-11-2019,1.160712213,0.876270592 +6-11-2019,1.162466288,0.877654906 +5-11-2019,1.162020521,0.876693114 +4-11-2019,1.158050769,0.881600987 +1-11-2019,1.159084323,0.881911985 +31-10-2019,1.160995205,0.885700368 +30-10-2019,1.156992283,0.886839305 +29-10-2019,1.160671797,0.878271562 +28-10-2019,1.159164938,0.879816998 +25-10-2019,1.157420803,0.881290209 +24-10-2019,1.157126161,0.880902044 +23-10-2019,1.158761979,0.88163985 +22-10-2019,1.15928588,0.878464444 +21-10-2019,1.164903779,0.87788605 +18-10-2019,1.157889862,0.879198171 +17-10-2019,1.155081202,0.883197174 +16-10-2019,1.158426394,0.879700902 +15-10-2019,1.157836236,0.881329044 +14-10-2019,1.141630705,0.885151582 +11-10-2019,1.148791471,0.88210647 +10-10-2019,1.115299681,0.879971841 +9-10-2019,1.112743134,0.875465091 +8-10-2019,1.114876918,0.876001927 +7-10-2019,1.12083749,0.876270592 +4-10-2019,1.119870991,0.876193814 +3-10-2019,1.129318231,0.876616261 +2-10-2019,1.124062813,0.876923751 +1-10-2019,1.119733056,0.874431619 +30-9-2019,1.128935752,0.873667657 +27-9-2019,1.124492573,0.880824452 +26-9-2019,1.12731946,0.877654906 +25-9-2019,1.129496809,0.876424189 +24-9-2019,1.130867236,0.872600349 +23-9-2019,1.133758437,0.873896705 +20-9-2019,1.135151089,0.868809731 +19-9-2019,1.129484052,0.867829558 +18-9-2019,1.128579006,0.871269876 +17-9-2019,1.129905201,0.872105699 +16-9-2019,1.129292724,0.874087671 +13-9-2019,1.124151266,0.87753938 +12-9-2019,1.123458883,0.878734622 +11-9-2019,1.122258883,0.87966221 +10-9-2019,1.118918677,0.879894413 +9-9-2019,1.116220922,0.88051422 +6-9-2019,1.11477749,0.878811846 +5-9-2019,1.116245842,0.880630531 +4-9-2019,1.105534305,0.877308418 +3-9-2019,1.103046615,0.874469853 +2-9-2019,1.099601944,0.87507623 +30-8-2019,1.103022281,0.875273523 +29-8-2019,1.10282765,0.87001914 +28-8-2019,1.103387399,0.871991629 +27-8-2019,1.010123245,0.874928912 +26-8-2019,1.099747058,0.876501008 +23-8-2019,1.104899123,0.878889084 +22-8-2019,1.105546527,0.880630531 +21-8-2019,1.093469798,0.882768362 +20-8-2019,1.094235567,0.886839305 +19-8-2019,1.093637219,0.884955752 +16-8-2019,1.093744873,0.886014265 +15-8-2019,1.090655266,0.886839305 +14-8-2019,1.082508822,0.887941751 +13-8-2019,1.078865034,0.884564352 +12-8-2019,1.077156737,0.882339966 +9-8-2019,1.07820199,0.881057269 +8-8-2019,1.083247576,0.881290209 +7-8-2019,1.083505791,0.881911985 +6-8-2019,1.087961704,0.881367883 +5-8-2019,1.085422772,0.88028169 +2-8-2019,1.091381361,0.878618811 +1-8-2019,1.098490674,0.876232202 +31-7-2019,1.096234287,0.878194432 +30-7-2019,1.090905124,0.883119177 +29-7-2019,1.098719991,0.884603476 +26-7-2019,1.113337787,0.883587365 +25-7-2019,1.119670369,0.890194507 +24-7-2019,1.121642084,0.890234132 +23-7-2019,1.11564808,0.890947969 +22-7-2019,1.113647753,0.88691796 +19-7-2019,1.114243373,0.884173298 +18-7-2019,1.112978442,0.884642604 +17-7-2019,1.107162232,0.883002208 +16-7-2019,1.106304831,0.882028666 +15-7-2019,1.111296327,0.881406725 +12-7-2019,1.116196004,0.880902044 +11-7-2019,1.114293037,0.88028169 +10-7-2019,1.1105805,0.886367665 +9-7-2019,1.112433671,0.883197174 +8-7-2019,1.115672974,0.886485528 +5-7-2019,1.114504157,0.888967908 +4-7-2019,1.114789918,0.88999644 +3-7-2019,1.113635351,0.890323 +2-7-2019,1.115038525,0.891067053 +1-7-2019,1.117006423,0.893535272 +28-6-2019,1.114050958,0.890511599 +27-6-2019,1.115050958,0.891583452 +26-6-2019,1.114541422,0.89098766 +25-6-2019,1.117355889,0.887981175 +24-6-2019,1.117218573,0.88711466 +21-6-2019,1.121453404,0.888809884 +20-6-2019,1.124985938,0.88711466 +19-6-2019,1.126164172,0.883821645 +18-6-2019,1.120925436,0.884407889 +17-6-2019,1.118480656,0.884955752 +14-6-2019,1.123204277,0.884760009 +13-6-2019,1.125365744,0.889363216 +12-6-2019,1.124163903,0.889086464 +11-6-2019,1.124075448,0.888533475 +10-6-2019,1.121239643,0.892299456 +7-6-2019,1.125163149,0.892538379 +6-6-2019,1.127116161,0.8942312 +5-6-2019,1.130467222,0.89512332 +4-6-2019,1.12847712,0.89561596 +3-6-2019,1.127471982,0.8952134 +31-5-2019,1.1281843,0.889600569 +30-5-2019,1.131528922,0.894134478 +29-5-2019,1.133542661,0.893894699 +28-5-2019,1.134404211,0.893255918 +27-5-2019,1.132964742,0.893854749 +24-5-2019,1.132618274,0.892538379 +23-5-2019,1.135976372,0.89047195 +22-5-2019,1.134803282,0.889263216 +21-5-2019,1.14310536,0.889363216 +20-5-2019,1.140706097,0.891901534 +17-5-2019,1.14087528,0.892259648 +16-5-2019,1.144348065,0.89465444 +15-5-2019,1.147802532,0.896137647 +14-5-2019,1.152113552,0.89561596 +13-5-2019,1.154921119,0.894374385 +10-5-2019,1.158842548,0.896619744 +9-5-2019,1.159393405,0.896901206 +8-5-2019,1.160294715,0.892458724 +7-5-2019,1.16638479,0.893575194 +6-5-2019,1.169590643,0.894854586 +3-5-2019,1.172525385,0.897504936 +2-5-2019,1.165324601,0.898714838 +1-5-2019,1.163995297,0.897021887 +30-4-2019,1.16132379,0.892618049 +29-4-2019,1.157407407,0.89031339 +26-4-2019,1.57284634,0.887862914 +25-4-2019,1.568324628,0.886406949 +24-4-2019,1.156590833,0.882067566 +23-4-2019,1.154227937,0.883704489 +22-4-2019,1.152963693,0.883743538 +19-4-2019,1.156122826,0.883431247 +18-4-2019,1.156925355,0.887232721 +17-4-2019,1.154521105,0.890868597 +16-4-2019,1.155214639,0.890789239 +15-4-2019,1.159729551,0.894054537 +12-4-2019,1.158158065,0.891583452 +11-4-2019,1.160739159,0.885778821 +10-4-2019,1.163277651,0.883314195 +9-4-2019,1.157420803,0.878040214 +8-4-2019,1.158721698,0.878425861 +5-4-2019,1.159218687,0.879043601 +4-4-2019,1.166412,0.879584836 +3-4-2019,1.171687347,0.878889084 +2-4-2019,1.164225674,0.883041194 +1-4-2019,1.170686022,0.885269122 +29-3-2019,1.168346532,0.885367665 +28-3-2019,1.164646006,0.886367665 +27-3-2019,1.172374175,0.891106755 +26-3-2019,1.171440286,0.891543708 +25-3-2019,1.165555504,0.892379083 +22-3-2019,1.171550078,0.888888889 +21-3-2019,1.150483203,0.888730892 +20-3-2019,1.161737029,0.888691402 +19-3-2019,1.169057389,0.887862914 +18-3-2019,1.165949608,0.891106755 +15-3-2019,1.17249789,0.890828916 +14-3-2019,1.174853437,0.891662951 +13-3-2019,1.169590643,0.891106755 +12-3-2019,1.161642563,0.891027355 +11-3-2019,1.166520852,0.896860987 +8-3-2019,1.158506916,0.897424392 +7-3-2019,1.16780138,0.896097495 +6-3-2019,1.162007019,0.898795614 +5-3-2019,1.159810255,0.898311175 +4-3-2019,1.163372384,0.897786955 +1-3-2019,1.16295297,0.90244334 +28-2-2019,1.16512094,0.90440445 +27-2-2019,1.171508903,0.900738606 +26-2-2019,1.165311022,0.894054537 +25-2-2019,1.151768541,0.893974611 +22-2-2019,1.151330939,0.891662951 +21-2-2019,1.151410478,0.89273758 +20-2-2019,1.151012891,0.892777431 +19-2-2019,1.149729239,0.89146423 +18-2-2019,1.143301397,0.893694982 +15-2-2019,1.140693085,0.896860987 +14-2-2019,1.133118796,0.900252071 +13-2-2019,1.14153948,0.900576369 +12-2-2019,1.140133852,0.901225667 +11-2-2019,1.140914329,0.901794571 +8-2-2019,1.142739604,0.901388138 +7-2-2019,1.142400183,0.902445628 +6-2-2019,1.139211666,0.900009 +5-2-2019,1.134198349,0.899806542 +4-2-2019,1.144885225,0.901243 +1-2-2019,1.14187839,0.902934537 +31-1-2019,1.143379831,0.903628067 +30-1-2019,1.144112398,0.906043309 +29-1-2019,1.148349572,0.911701691 +28-1-2019,1.15036409,0.912325518 +25-1-2019,1.155374804,0.906618314 +24-1-2019,1.148567162,0.905387053 +23-1-2019,1.14825064,0.905223138 +22-1-2019,1.140797189,0.904486252 +21-1-2019,1.134751773,0.905715062 +18-1-2019,1.135383078,0.909338911 +17-1-2019,1.134134032,0.906700517 +16-1-2019,1.128515325,0.902282775 +15-1-2019,1.119482351,0.909256228 +14-1-2019,1.125707789,0.904977376 +11-1-2019,1.117143687,0.90424089 +10-1-2019,1.108696616,0.904568069 +9-1-2019,1.107125459,0.908265213 +8-1-2019,1.112644087,0.90962735 +7-1-2019,1.1141813,0.91124248 +4-1-2019,1.115026092,0.912575287 +3-1-2019,1.106060103,0.913993236 +2-1-2019,1.110592834,0.913408842 +31-12-2018,1.117156167,0.916674306 +28-12-2018,1.109299256,0.916086479 +27-12-2018,1.107910481,0.913575735 +26-12-2018,1.110975325,0.910332271 +24-12-2018,1.114305453,0.910829766 +21-12-2018,1.1105805,0.909545682 +20-12-2018,1.106831363,0.912825194 +19-12-2018,1.10702741,0.910705341 +18-12-2018,1.111716379,0.908182726 +17-12-2018,1.112334679,0.905879156 +14-12-2018,1.111790539,0.907029478 +13-12-2018,1.112099644,0.906084356 +12-12-2018,1.113833816,0.904813608 +11-12-2018,1.106427236,0.899523253 +10-12-2018,1.101770545,0.897585495 +7-12-2018,1.12027245,0.897182846 +6-12-2018,1.123048133,0.898391879 +5-12-2018,1.123078133,0.899523253 +4-12-2018,1.1212145,0.900414191 +3-12-2018,1.122498232,0.901631954 +30-11-2018,1.122384843,0.901550667 +29-11-2018,1.122120359,0.899604174 +28-11-2018,1.130671732,0.899118864 +27-11-2018,1.128387985,0.896539358 +26-11-2018,1.130965845,0.895255148 +23-11-2018,1.128795575,0.896458987 +22-11-2018,1.128349788,0.903097625 +21-11-2018,1.121730157,0.903056847 +20-11-2018,1.123898579,0.912825194 +19-11-2018,1.247645328,0.910705341 +16-11-2018,1.126722477,0.900414191 +15-11-2018,1.129598879,0.905879156 +14-11-2018,1.14998045,0.895255148 diff --git a/src/options.py b/src/options.py new file mode 100644 index 0000000..8ab0625 --- /dev/null +++ b/src/options.py @@ -0,0 +1,60 @@ +import pandas as pd +import numpy as np +from scipy.stats import norm + + +def as_df(option): + """Creates a dataframe holding the option, and calculates time to maturity""" + df = pd.DataFrame([option]) + df["T"] = (df["t1"] - df["t0"]) / np.timedelta64(365, "D") + return df + + +def black_scholes_option_price_spot(S, K, T, r, sigma, option_type="call"): + """Calculates option price using Black-Scholes formula with spot price assuming zero dividend""" + sigma_scaled = sigma * np.sqrt(T) + d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / sigma_scaled + d2 = d1 - sigma_scaled + + if option_type == "call": + return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2) + return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1) # put + + +def black_scholes_option_price_forward(F, K, T, r, sigma, option_type="call"): + """Calculates option price using Black-Scholes formula with forward price assuming zero dividend""" + sigma_scaled = sigma * np.sqrt(T) + d1 = (np.log(F / K) + (0.5 * sigma**2) * T) / sigma_scaled + d2 = d1 - sigma_scaled + + discount = np.exp(-r * T) + + if option_type == "call": + return discount * (F * norm.cdf(d1) - K * norm.cdf(d2)) + return discount * (K * norm.cdf(-d2) - F * norm.cdf(-d1)) # put + + +def forward_price(S, T, r): + """Calculates the forward price assuming zero dividend""" + return S * np.exp(r * T) + + +def moneyness(strike_price, current_price, option_type="call", threshold=0.005): + """Calculates if an option is ITM, ATM, or OTM""" + fuzzy_price = threshold * strike_price + + if option_type == "put": + if current_price < strike_price - fuzzy_price: + return "ITM" + if current_price > strike_price + fuzzy_price: + return "OTM" + return "ATM" + + if option_type == "call": + if current_price > strike_price + fuzzy_price: + return "ITM" + if current_price < strike_price - fuzzy_price: + return "OTM" + return "ATM" + + return "" diff --git a/src/options_test.py b/src/options_test.py new file mode 100644 index 0000000..efb92be --- /dev/null +++ b/src/options_test.py @@ -0,0 +1,114 @@ +import pytest +import pandas as pd +from pandas.testing import assert_series_equal + +from . import options + + +# https://analystprep.com/study-notes/actuarial-exams/soa/ifm-investment-and-financial-markets/black-scholes-option-pricing-model/ +@pytest.mark.parametrize( + "S, K, T, r, sigma, option_type, expected", [(100, 90, 0.5, 0.10, 0.25, "call", 16.11)] +) +def test_black_scholes_option_price_spot(S, K, T, r, sigma, option_type, expected): + assert ( + abs(options.black_scholes_option_price_spot(S, K, T, r, sigma, option_type)) - expected + < 0.05 + ) + + +@pytest.mark.parametrize( + "F, K, T, r, sigma, option_type, expected", [(100, 90, 0.5, 0.10, 0.25, "call", 16.11)] +) +def test_black_scholes_option_price_forward(F, K, T, r, sigma, option_type, expected): + assert ( + abs(options.black_scholes_option_price_forward(F, K, T, r, sigma, option_type)) - expected + < 0.05 + ) + + +@pytest.mark.parametrize( + "current_price, strike_price, option_type, expected", + [ + (1, 10, "call", "ITM"), + (10, 1, "call", "OTM"), + (10, 10, "call", "ATM"), + (1, 10, "put", "OTM"), + (10, 1, "put", "ITM"), + (10, 10, "put", "ATM"), + ], +) +def test_moneyness(current_price, strike_price, option_type, expected): + assert options.moneyness(current_price, strike_price, option_type) == expected + + +def test_end_to_end(): + option = { + "t0": pd.Timestamp("2024-11-23"), + "t1": pd.Timestamp("2025-05-10"), + "S": 19, + "K": 17, + "r": 0.005, + "sigma": 0.3, + } + options_df = options.as_df(option) + + expected = { + "F": 19.04367, + "C_F": 2.70, + "C_S": 2.70, + "P_F": 0.66, + "P_PCP": 0.66, + } + expected_df = pd.DataFrame([expected]) + + assert_series_equal( + options.forward_price(options_df["S"], options_df["T"], options_df["r"]), + expected_df["F"], + check_names=False, + check_exact=False, + ) + + assert_series_equal( + options.black_scholes_option_price_spot( + options_df["S"], + options_df["K"], + options_df["T"], + options_df["r"], + options_df["sigma"], + "call", + ), + expected_df["C_S"], + rtol=0.01, + check_names=False, + check_exact=False, + ) + + assert_series_equal( + options.black_scholes_option_price_forward( + options.forward_price(options_df["S"], options_df["T"], options_df["r"]), + options_df["K"], + options_df["T"], + options_df["r"], + options_df["sigma"], + "call", + ), + expected_df["C_F"], + rtol=0.01, + check_names=False, + check_exact=False, + ) + + assert_series_equal( + options.black_scholes_option_price_forward( + options.forward_price(options_df["S"], options_df["T"], options_df["r"]), + options_df["K"], + options_df["T"], + options_df["r"], + options_df["sigma"], + "put", + ), + expected_df["P_F"], + rtol=0.01, + check_names=False, + check_exact=False, + ) diff --git a/src/var.py b/src/var.py new file mode 100644 index 0000000..56d6909 --- /dev/null +++ b/src/var.py @@ -0,0 +1,20 @@ +import numpy as np + + +def calculate_returns(prices, horizon=1, sort="ascending"): + """Calculate (simple) returns from price action""" + step = 1 if sort == "ascending" else -1 + if horizon == 1: + returns = prices / prices.shift(step) - 1 + else: + returns = np.exp(np.log(prices / prices.shift(step)) * np.sqrt(horizon)) - 1 + return returns.dropna() + + +def var(portfolio, prices, sort="ascending"): + """Calculate VaR 1-day for portfolio""" + returns = calculate_returns(prices, 1, sort) + pnl = portfolio.dot(returns.T) + daily_pnl = pnl.sum(axis=0) + daily_pnl_asc = daily_pnl.sort_values(ascending=True) + return 0.4 * daily_pnl_asc[1] + 0.6 * daily_pnl_asc[2] diff --git a/src/var_test.py b/src/var_test.py new file mode 100644 index 0000000..e339d88 --- /dev/null +++ b/src/var_test.py @@ -0,0 +1,45 @@ +import os +import pandas as pd +import pytest +from pandas.testing import assert_series_equal + +from . import var + + +@pytest.mark.parametrize( + "prices, horizon, ascending, expected", + [ + ([1, 2, 3, 4, 5], 1, "ascending", [1.0, 0.5, 0.333333, 0.25]), + ([5, 4, 3, 2, 1], 1, "descending",[0.25, 0.333333, 0.5, 1.0]) + ], +) +def test_calculate_returns(prices, horizon, ascending, expected): + assert_series_equal( + pd.Series(var.calculate_returns(pd.DataFrame(prices), horizon, ascending).squeeze()), + pd.Series(expected), + check_names=False, + check_exact=False, + check_index=False, + ) + +@pytest.mark.parametrize( + "portfolio, prices, expected", + [ + ({"FX-1": [100.00]}, {"FX-1": [1, 1.1, 0.9, 0.8]}, -6.909090), + ({"FX-1": [100.00]}, {"FX-1": [1, 0.9, 0.8, 0.8]}, -10.66666), + ], +) +def test_var(portfolio, prices, expected): + assert var.var(pd.DataFrame(portfolio), pd.DataFrame(prices)) == pytest.approx(expected) + + +def test_end_to_end(): + portfolio = pd.DataFrame([{"ccy-1": 153084.81, "ccy-2": 95891.51}]) + fx_prices = pd.read_csv( + os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "var_fx_prices.csv"), + parse_dates=["date"], + index_col="date", + dtype=float, + ) + fx_prices.sort_index(inplace=True, ascending=True) + assert var.var(portfolio, fx_prices) == pytest.approx(-13572.73)