️ DEPRECATED GITLAB INSTANCE ️ This GitLab is now read-only for reference. Please use https://gitlab.iauro.co for all new work.

Migration completed on September 17, 2025

Commit 28edbbe4 authored by Onkar Naik's avatar Onkar Naik

Initial commit

parent ebecdecf
import streamlit as st
import yfinance as yf
from datetime import date
from transformers import pipeline
st.set_page_config(page_title="Stock Dashboard", layout="wide")
st.title("Stock Price Dashboard + Summarizer")
st.sidebar.header("Stock Price Inputs")
ticker = st.sidebar.text_input("Enter Stock Ticker (e.g., AAPL)", value="AAPL")
start_date = st.sidebar.date_input("Start Date", date(2023, 1, 1))
end_date = st.sidebar.date_input("End Date", date.today())
if st.sidebar.button("Show Stock Chart"):
with st.spinner("Fetching stock data..."):
try:
data = yf.download(ticker, start=start_date, end=end_date)
st.subheader(f"{ticker} Closing Price from {start_date} to {end_date}")
st.line_chart(data['Close'])
except Exception as e:
st.error(f"Failed to fetch stock data: {e}")
st.sidebar.header("Text Summarizer")
text_input = st.sidebar.text_area("Paste stock-related news or article here:")
summarize_button = st.sidebar.button("Summarize Text")
if summarize_button:
if not text_input.strip():
st.warning("Please enter some text to summarize.")
else:
with st.spinner("Summarizing..."):
summarizer = pipeline("summarization")
summary = summarizer(text_input, max_length=120, min_length=30, do_sample=False)
st.subheader("📝 Summary:")
st.success(summary[0]['summary_text'])
streamlit
yfinance
transformers
class Calculator:
def __init__(self):
self.result = 0
def add(self, a, b):
try:
self.result = a + b
return self.result
except TypeError:
print("Error: Invalid input. Please enter numbers.")
return None
def subtract(self, a, b):
try:
self.result = a - b
return self.result
except TypeError:
print("Error: Invalid input. Please enter numbers.")
return None
def multiply(self, a, b):
try:
self.result = a * b
return self.result
except TypeError:
print("Error: Invalid input. Please enter numbers.")
return None
def divide(self, a, b):
try:
if b == 0:
raise ZeroDivisionError("Cannot divide by zero.")
self.result = a / b
return self.result
except ZeroDivisionError as zde:
print(f"Error: {zde}")
return None
except TypeError:
print("Error: Invalid input. Please enter numbers.")
return None
def power(self, a, b):
try:
self.result = a ** b
return self.result
except TypeError:
print("Error: Invalid input. Please enter numbers.")
return None
def clear(self):
self.result = 0
def get_number(prompt):
while True:
try:
return float(input(prompt))
except ValueError:
print("Invalid input. Please enter a valid number.")
def main():
calc = Calculator()
operations = {
'1': ('Add', calc.add),
'2': ('Subtract', calc.subtract),
'3': ('Multiply', calc.multiply),
'4': ('Divide', calc.divide),
'5': ('Power', calc.power),
'6': ('Clear', calc.clear),
'7': ('Exit', None)
}
while True:
print("\n--- Calculator Menu ---")
for key, (name, _) in operations.items():
print(f"{key}. {name}")
choice = input("Select operation: ")
if choice == '7':
print("Exiting calculator. Goodbye!")
break
elif choice == '6':
calc.clear()
print("Calculator cleared.")
continue
elif choice in operations and choice != '6':
a = get_number("Enter first number: ")
b = get_number("Enter second number: ")
func = operations[choice][1]
result = func(a, b)
if result is not None:
print(f"Result: {result}")
else:
print("Invalid choice. Please select a valid operation.")
if __name__ == "__main__":
main()
\ No newline at end of file
import argparse
import csv
import json
import os
import re
import sys
import xml.etree.ElementTree as ET
import yaml
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
VERSION = "1.0.0"
class FileParser:
"""Main file parser class to handle different file types and operations."""
def __init__(self):
self.data = None
self.file_path = None
self.file_type = None
self.output_format = None
def is_data_empty(self):
"""Check if data is empty based on its type."""
if self.data is None:
return True
if PANDAS_AVAILABLE and self.file_type == "pandas":
return self.data.empty
if isinstance(self.data, list) or isinstance(self.data, dict):
return len(self.data) == 0
if isinstance(self.data, str):
return len(self.data) == 0
return False
def load_file(self, file_path):
"""Load file based on extension."""
self.file_path = file_path
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if not os.path.exists(file_path):
print(f"Error: File '{file_path}' not found.")
sys.exit(1)
try:
if ext in ['.csv', '.tsv']:
delimiter = ',' if ext == '.csv' else '\t'
if PANDAS_AVAILABLE:
self.data = pd.read_csv(file_path, delimiter=delimiter)
self.file_type = "pandas"
else:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=delimiter)
self.data = list(reader)
self.file_type = "csv"
elif ext == '.json':
with open(file_path, 'r', encoding='utf-8') as f:
self.data = json.load(f)
self.file_type = "json"
elif ext == '.xml':
self.data = ET.parse(file_path)
self.file_type = "xml"
elif ext in ['.yaml', '.yml']:
with open(file_path, 'r', encoding='utf-8') as f:
self.data = yaml.safe_load(f)
self.file_type = "yaml"
elif ext in ['.txt', '.log', '.md', '.py', '.js', '.html', '.css']:
with open(file_path, 'r', encoding='utf-8') as f:
self.data = f.read()
self.file_type = "text"
else:
# Binary or unknown format
with open(file_path, 'rb') as f:
self.data = f.read()
self.file_type = "binary"
print(f"Successfully loaded {file_path}")
return True
except Exception as e:
print(f"Error loading file: {str(e)}")
return False
def analyze(self, analyze_type='general'):
"""Analyze file content based on file type."""
if self.is_data_empty():
print("No data loaded. Please load a file first.")
return None
result = {}
if analyze_type == 'general':
# General file information
result["file_name"] = os.path.basename(self.file_path)
result["file_size"] = os.path.getsize(self.file_path)
result["file_type"] = self.file_type
if self.file_type == "text":
result["line_count"] = len(self.data.splitlines())
result["word_count"] = len(self.data.split())
result["char_count"] = len(self.data)
elif self.file_type == "csv" or self.file_type == "pandas":
if PANDAS_AVAILABLE and self.file_type == "pandas":
result["row_count"] = len(self.data)
result["column_count"] = len(self.data.columns)
result["columns"] = list(self.data.columns)
else:
result["row_count"] = len(self.data)
if self.data and len(self.data) > 0:
result["column_count"] = len(self.data[0])
result["columns"] = list(self.data[0].keys())
elif self.file_type == "json":
if isinstance(self.data, list):
result["item_count"] = len(self.data)
elif isinstance(self.data, dict):
result["key_count"] = len(self.data)
result["keys"] = list(self.data.keys())
elif self.file_type == "xml":
root = self.data.getroot()
result["root_tag"] = root.tag
result["child_count"] = len(root)
elif analyze_type == 'structure':
# Analyze data structure
if self.file_type == "json" or self.file_type == "yaml":
def get_structure(data, max_depth=3, current_depth=0):
if current_depth >= max_depth:
return "..."
if isinstance(data, dict):
return {k: get_structure(v, max_depth, current_depth + 1) for k, v in data.items()}
elif isinstance(data, list) and data:
if len(data) > 1:
return [get_structure(data[0], max_depth, current_depth + 1), "..."]
elif len(data) == 1:
return [get_structure(data[0], max_depth, current_depth + 1)]
else:
return []
elif isinstance(data, (int, float, bool, str)) or data is None:
return type(data).__name__
else:
return str(type(data).__name__)
result["structure"] = get_structure(self.data)
elif self.file_type == "csv" or self.file_type == "pandas":
if PANDAS_AVAILABLE and self.file_type == "pandas":
result["column_types"] = {col: str(dtype) for col, dtype in self.data.dtypes.items()}
result["sample_data"] = self.data.head(3).to_dict(orient='records')
else:
if self.data:
result["column_names"] = list(self.data[0].keys())
result["sample_data"] = self.data[:3]
elif self.file_type == "xml":
root = self.data.getroot()
def xml_to_dict(element, max_depth=3, current_depth=0):
if current_depth >= max_depth:
return "..."
result = {}
for child in element:
if len(child):
result[child.tag] = xml_to_dict(child, max_depth, current_depth + 1)
else:
result[child.tag] = "text" if child.text and child.text.strip() else "empty"
if element.attrib:
result["@attributes"] = {k: "value" for k in element.attrib.keys()}
return result
result["structure"] = {root.tag: xml_to_dict(root)}
elif analyze_type == 'stats' and (self.file_type == "pandas" or self.file_type == "csv"):
# Statistical analysis for data files
if PANDAS_AVAILABLE and self.file_type == "pandas":
numeric_cols = self.data.select_dtypes(include=['number']).columns
if not numeric_cols.empty:
stats = self.data[numeric_cols].describe().to_dict()
result["statistics"] = stats
else:
result["statistics"] = "No numeric columns available for statistics"
else:
print("Advanced statistics require pandas library. Install with 'pip install pandas'")
result["statistics"] = "Requires pandas library"
return result
def search(self, pattern, case_sensitive=False):
"""Search for a pattern in the file."""
if self.is_data_empty():
print("No data loaded. Please load a file first.")
return None
result = {"matches": [], "match_count": 0}
if self.file_type == "text":
flags = 0 if case_sensitive else re.IGNORECASE
matches = list(re.finditer(pattern, self.data, flags))
result["match_count"] = len(matches)
for m in matches[:20]: # Limit results to avoid overwhelming output
start, end = max(0, m.start() - 20), min(len(self.data), m.end() + 20)
context = f"...{self.data[start:end]}..."
result["matches"].append({
"match": m.group(),
"position": m.start(),
"context": context
})
elif self.file_type == "csv" or self.file_type == "pandas":
if PANDAS_AVAILABLE and self.file_type == "pandas":
# Search across all columns in pandas DataFrame
pattern_func = lambda x: bool(re.search(pattern, str(x), 0 if case_sensitive else re.IGNORECASE))
mask = self.data.applymap(pattern_func).any(axis=1)
matches = self.data[mask]
result["match_count"] = len(matches)
if not matches.empty:
result["matches"] = matches.head(20).to_dict(orient='records')
else:
# Search across all fields in CSV
flags = 0 if case_sensitive else re.IGNORECASE
matches = []
for row in self.data:
for key, value in row.items():
if re.search(pattern, str(value), flags):
matches.append(row)
break
result["match_count"] = len(matches)
result["matches"] = matches[:20]
elif self.file_type == "json" or self.file_type == "yaml":
# Convert to string for simplicity
data_str = json.dumps(self.data)
flags = 0 if case_sensitive else re.IGNORECASE
matches = list(re.finditer(pattern, data_str, flags))
result["match_count"] = len(matches)
result["matches"] = [m.group() for m in matches[:20]]
return result
def extract(self, query):
"""Extract specific data based on the query and file type."""
if self.is_data_empty():
print("No data loaded. Please load a file first.")
return None
result = {"extracted_data": None}
try:
if self.file_type == "json" or self.file_type == "yaml":
# Simple JSON/YAML path extraction (dot notation)
paths = query.split('.')
data = self.data
for path in paths:
# Handle array indices
if '[' in path and path.endswith(']'):
key, idx_str = path.split('[', 1)
idx = int(idx_str[:-1])
if key:
data = data[key]
data = data[idx]
else:
data = data[path]
result["extracted_data"] = data
elif self.file_type == "csv" or self.file_type == "pandas":
if PANDAS_AVAILABLE and self.file_type == "pandas":
# Allow SQL-like queries with pandas
if query.lower().startswith("where "):
# Convert simple where clauses to pandas query
query_str = query[6:] # Remove 'where '
df_result = self.data.query(query_str, engine='python')
result["extracted_data"] = df_result.to_dict(orient='records')
elif query.lower().startswith("select "):
# Handle simple column selection
cols = query[7:].split(',') # Remove 'select '
cols = [c.strip() for c in cols]
result["extracted_data"] = self.data[cols].to_dict(orient='records')
else:
# Assume it's a column name
result["extracted_data"] = self.data[query].to_list()
else:
# For non-pandas CSV processing
if query.startswith("column:"):
column = query[7:]
result["extracted_data"] = [row.get(column) for row in self.data if column in row]
else:
print("Complex queries require pandas library. Install with 'pip install pandas'")
result["extracted_data"] = None
elif self.file_type == "xml":
# Simple XPath-like query
root = self.data.getroot()
elements = root.findall(query)
result["extracted_data"] = []
for elem in elements:
if elem.text and elem.text.strip():
result["extracted_data"].append(elem.text)
else:
# Get element attributes
elem_data = {"tag": elem.tag}
if elem.attrib:
elem_data["attributes"] = elem.attrib
result["extracted_data"].append(elem_data)
elif self.file_type == "text":
# For text files, extract via regex pattern
flags = re.MULTILINE
matches = re.findall(query, self.data, flags)
result["extracted_data"] = matches
except Exception as e:
print(f"Error in extraction: {str(e)}")
result["error"] = str(e)
return result
def transform(self, transform_type, output_file=None):
"""Transform file into a different format."""
if self.is_data_empty():
print("No data loaded. Please load a file first.")
return False
output_path = output_file or f"{os.path.splitext(self.file_path)[0]}.{transform_type}"
try:
if transform_type == "json":
# Convert to JSON
if self.file_type == "csv" or self.file_type == "pandas":
if PANDAS_AVAILABLE and self.file_type == "pandas":
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.data.to_dict(orient='records'), f, indent=2)
else:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=2)
elif self.file_type == "yaml":
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=2)
elif self.file_type == "xml":
# Simple XML to JSON conversion
root = self.data.getroot()
def xml_to_dict(element):
result = {}
for child in element:
if len(child):
result[child.tag] = xml_to_dict(child)
else:
result[child.tag] = child.text or ""
if element.attrib:
result["@attributes"] = element.attrib
return result
with open(output_path, 'w', encoding='utf-8') as f:
json.dump({root.tag: xml_to_dict(root)}, f, indent=2)
else:
print(f"Cannot convert {self.file_type} to JSON")
return False
elif transform_type == "csv":
# Convert to CSV
if self.file_type == "json" or self.file_type == "yaml":
if PANDAS_AVAILABLE:
pd.DataFrame(self.data).to_csv(output_path, index=False)
else:
if isinstance(self.data, list):
keys = set()
for item in self.data:
if isinstance(item, dict):
keys.update(item.keys())
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=list(keys))
writer.writeheader()
for item in self.data:
if isinstance(item, dict):
writer.writerow(item)
else:
print("JSON/YAML must contain a list of objects to convert to CSV")
return False
elif self.file_type == "xml":
# Simple XML to CSV conversion
root = self.data.getroot()
rows = []
for child in root:
row = {}
for elem in child:
row[elem.tag] = elem.text or ""
rows.append(row)
if rows:
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
else:
print("No suitable data structure found in XML for CSV conversion")
return False
else:
print(f"Cannot convert {self.file_type} to CSV")
return False
elif transform_type == "yaml":
# Convert to YAML
if self.file_type == "json":
if yaml:
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(self.data, f, default_flow_style=False)
else:
print("YAML conversion requires PyYAML. Install with 'pip install pyyaml'")
return False
elif self.file_type == "csv" or self.file_type == "pandas":
if yaml:
if PANDAS_AVAILABLE and self.file_type == "pandas":
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(self.data.to_dict(orient='records'), f, default_flow_style=False)
else:
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(self.data, f, default_flow_style=False)
else:
print("YAML conversion requires PyYAML. Install with 'pip install pyyaml'")
return False
else:
print(f"Cannot convert {self.file_type} to YAML")
return False
elif transform_type == "text":
# Convert to plain text
with open(output_path, 'w', encoding='utf-8') as f:
if self.file_type == "json" or self.file_type == "yaml":
json.dump(self.data, f, indent=2)
elif self.file_type == "csv" or self.file_type == "pandas":
if PANDAS_AVAILABLE and self.file_type == "pandas":
f.write(self.data.to_string())
else:
writer = csv.writer(f)
if self.data:
writer.writerow(self.data[0].keys())
for row in self.data:
writer.writerow(row.values())
elif self.file_type == "xml":
f.write(ET.tostring(self.data.getroot(), encoding='unicode'))
else:
f.write(str(self.data))
else:
print(f"Unsupported output format: {transform_type}")
return False
print(f"Successfully transformed to {output_path}")
return True
except Exception as e:
print(f"Error in transformation: {str(e)}")
return False
def output_results(self, data, format_type="terminal"):
"""Output results in specified format."""
if format_type == "terminal":
if isinstance(data, dict):
print(json.dumps(data, indent=2))
else:
print(data)
elif format_type == "json":
print(json.dumps(data, indent=2))
elif format_type == "csv":
if isinstance(data, list) and all(isinstance(item, dict) for item in data):
writer = csv.DictWriter(sys.stdout, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
elif isinstance(data, dict):
for key, value in data.items():
print(f"{key},{value}")
else:
print("Data format not suitable for CSV output")
def main():
parser = argparse.ArgumentParser(description="File Parser CLI Tool")
# Main arguments
parser.add_argument("file", nargs="?", help="Path to the file to parse")
parser.add_argument("--version", action="store_true", help="Show version information")
# Operations
operation_group = parser.add_argument_group("Operations")
operation_group.add_argument("--analyze", action="store_true", help="Analyze file content")
operation_group.add_argument("--analyze-type", choices=["general", "structure", "stats"],
default="general", help="Type of analysis to perform")
operation_group.add_argument("--search", metavar="PATTERN", help="Search for a pattern in the file")
operation_group.add_argument("--extract", metavar="QUERY",
help="Extract specific data from the file using a query")
operation_group.add_argument("--transform", metavar="FORMAT",
choices=["json", "csv", "yaml", "text"],
help="Transform file to a different format")
# Options
options_group = parser.add_argument_group("Options")
options_group.add_argument("--output", metavar="FILE", help="Output file for transformed data")
options_group.add_argument("--format", choices=["terminal", "json", "csv"],
default="terminal", help="Output format for results")
options_group.add_argument("--case-sensitive", action="store_true",
help="Make search case-sensitive")
args = parser.parse_args()
if args.version:
print(f"File Parser CLI Tool v{VERSION}")
sys.exit(0)
if not args.file:
parser.print_help()
sys.exit(1)
file_parser = FileParser()
if not file_parser.load_file(args.file):
sys.exit(1)
# Perform operations
if args.analyze:
result = file_parser.analyze(args.analyze_type)
file_parser.output_results(result, args.format)
elif args.search:
result = file_parser.search(args.search, args.case_sensitive)
file_parser.output_results(result, args.format)
elif args.extract:
result = file_parser.extract(args.extract)
file_parser.output_results(result, args.format)
elif args.transform:
file_parser.transform(args.transform, args.output)
else:
# Default to general analysis
result = file_parser.analyze("general")
file_parser.output_results(result, args.format)
if __name__ == "__main__":
main()
####
# python3 file_parser_cli_tool.py /home/iauro/sales_data.csv
# python3 file_parser_cli_tool.py /home/iauro/sales_data.csv --analyze --analyze-type structure
# python3 file_parser_cli_tool.py /home/iauro/sales_data.csv --search "besan"
# python3 file_parser_cli_tool.py /home/iauro/sales_data.csv --extract "select customer_id"
# python3 file_parser_cli_tool.py /home/iauro/sales_data.csv --extract "where customer_id > 100"
# python3 file_parser_cli_tool.py /home/iauro/sales_data.csv --transform json --output sales_data.json
{
"students": {
"S40E463EC": {
"id": "43fd0154-dc0b-44ee-87f8-90c65fbfab7f",
"first_name": "Alice",
"last_name": "Johnson",
"date_of_birth": "2000-05-15",
"contact_email": "alice@example.com",
"student_id": "S40E463EC",
"major": "Computer Science",
"enrollment_date": "2025-05-21",
"enrolled_courses": [
"MATH201",
"CS101"
],
"grades": [
{
"student_id": "S40E463EC",
"course_code": "CS101",
"grade": "A",
"semester": "Fall",
"year": 2023,
"date_recorded": "2025-05-21"
}
],
"status": "Active"
},
"S1803D86D": {
"id": "1ebaa60c-6f64-495b-8c52-1a1acc72198d",
"first_name": "Bob",
"last_name": "Smith",
"date_of_birth": "2001-02-20",
"contact_email": "bob@example.com",
"student_id": "S1803D86D",
"major": "Mathematics",
"enrollment_date": "2025-05-21",
"enrolled_courses": [
"CS102",
"CS101"
],
"grades": [
{
"student_id": "S1803D86D",
"course_code": "CS101",
"grade": "B+",
"semester": "Fall",
"year": 2023,
"date_recorded": "2025-05-21"
}
],
"status": "Active"
}
},
"courses": {
"CS101": {
"code": "CS101",
"name": "Introduction to Computer Science",
"credits": 3,
"description": "Basic programming concepts",
"students": [
"S1803D86D",
"S40E463EC"
]
},
"CS102": {
"code": "CS102",
"name": "Data Structures",
"credits": 4,
"description": "Fundamental data structures and algorithms",
"students": [
"S1803D86D"
]
},
"MATH201": {
"code": "MATH201",
"name": "Calculus I",
"credits": 4,
"description": "Limits, derivatives, and integrals",
"students": [
"S40E463EC"
]
}
},
"instructors": {
"E72B04555": {
"id": "f8e201fc-45a2-476e-bf6a-cbc7f8336309",
"first_name": "David",
"last_name": "Brown",
"date_of_birth": "1975-08-10",
"contact_email": "brown@example.edu",
"employee_id": "E72B04555",
"department": "Computer Science",
"position": "Professor",
"hire_date": "2025-05-21",
"courses_taught": [
"CS102",
"CS101"
]
}
}
}
\ No newline at end of file
import datetime
import uuid
import json
import os
from typing import Dict, List, Optional, Set, Tuple, Any, Union
class Person:
"""Base class for all people in the system."""
def __init__(self, first_name: str, last_name: str, date_of_birth: str, contact_email: str):
self.id = str(uuid.uuid4())
self.first_name = first_name
self.last_name = last_name
self.full_name = f"{first_name} {last_name}"
# Validate and set date of birth
try:
self.date_of_birth = datetime.datetime.strptime(date_of_birth, "%Y-%m-%d").date()
except ValueError:
raise ValueError("Date of birth must be in the format YYYY-MM-DD")
# Validate email format (basic check)
if "@" not in contact_email or "." not in contact_email:
raise ValueError("Invalid email format")
self.contact_email = contact_email
@property
def age(self) -> int:
"""Calculate age based on date of birth."""
today = datetime.date.today()
age = today.year - self.date_of_birth.year
if (today.month, today.day) < (self.date_of_birth.month, self.date_of_birth.day):
age -= 1
return age
def to_dict(self) -> Dict[str, Any]:
"""Convert person to dictionary for serialization."""
return {
"id": self.id,
"first_name": self.first_name,
"last_name": self.last_name,
"date_of_birth": self.date_of_birth.strftime("%Y-%m-%d"),
"contact_email": self.contact_email
}
def __str__(self) -> str:
return f"{self.full_name} (ID: {self.id})"
class Course:
"""Represents an academic course."""
def __init__(self, code: str, name: str, credits: int, description: str = ""):
self.code = code.upper() # Course code, e.g., "CS101"
self.name = name
# Validate credits
if not isinstance(credits, int) or credits <= 0:
raise ValueError("Credits must be a positive integer")
self.credits = credits
self.description = description
self.students: Set[str] = set() # Set of student IDs enrolled in the course
def enroll_student(self, student_id: str) -> None:
"""Enroll a student in this course."""
self.students.add(student_id)
def drop_student(self, student_id: str) -> bool:
"""Drop a student from this course."""
if student_id in self.students:
self.students.remove(student_id)
return True
return False
def get_enrollment_count(self) -> int:
"""Get the number of students enrolled in this course."""
return len(self.students)
def to_dict(self) -> Dict[str, Any]:
"""Convert course to dictionary for serialization."""
return {
"code": self.code,
"name": self.name,
"credits": self.credits,
"description": self.description,
"students": list(self.students)
}
def __str__(self) -> str:
return f"{self.code}: {self.name} ({self.credits} credits)"
class Grade:
"""Represents a grade for a student in a specific course."""
VALID_GRADES = {"A+", "A", "A-", "B+", "B", "B-", "C+", "C", "C-", "D+", "D", "D-", "F", "I", "W"}
GRADE_POINTS = {
"A+": 4.0, "A": 4.0, "A-": 3.7,
"B+": 3.3, "B": 3.0, "B-": 2.7,
"C+": 2.3, "C": 2.0, "C-": 1.7,
"D+": 1.3, "D": 1.0, "D-": 0.7,
"F": 0.0, "I": None, "W": None # I = Incomplete, W = Withdrawn
}
def __init__(self, student_id: str, course_code: str, grade: str, semester: str, year: int):
self.student_id = student_id
self.course_code = course_code.upper()
# Validate grade
if grade not in self.VALID_GRADES:
raise ValueError(f"Invalid grade. Must be one of {self.VALID_GRADES}")
self.grade = grade
# Validate semester (Fall, Spring, Summer)
valid_semesters = {"Fall", "Spring", "Summer"}
if semester not in valid_semesters:
raise ValueError(f"Invalid semester. Must be one of {valid_semesters}")
self.semester = semester
# Validate year
current_year = datetime.date.today().year
if not isinstance(year, int) or year < 1900 or year > current_year + 1:
raise ValueError(f"Invalid year. Must be between 1900 and {current_year + 1}")
self.year = year
# Record when the grade was added
self.date_recorded = datetime.date.today()
@property
def grade_points(self) -> Optional[float]:
"""Get grade points associated with this grade."""
return self.GRADE_POINTS[self.grade]
@property
def term(self) -> str:
"""Get formatted term string."""
return f"{self.semester} {self.year}"
def to_dict(self) -> Dict[str, Any]:
"""Convert grade to dictionary for serialization."""
return {
"student_id": self.student_id,
"course_code": self.course_code,
"grade": self.grade,
"semester": self.semester,
"year": self.year,
"date_recorded": self.date_recorded.strftime("%Y-%m-%d")
}
def __str__(self) -> str:
return f"{self.course_code}: {self.grade} ({self.term})"
class Student(Person):
"""Represents a student in the system."""
def __init__(
self,
first_name: str,
last_name: str,
date_of_birth: str,
contact_email: str,
student_id: Optional[str] = None,
major: str = "Undeclared",
enrollment_date: Optional[str] = None
):
super().__init__(first_name, last_name, date_of_birth, contact_email)
# Use provided ID or generate a new one
self.student_id = student_id if student_id else f"S{str(uuid.uuid4())[:8].upper()}"
self.major = major
# Set enrollment date to today if not provided
if enrollment_date:
try:
self.enrollment_date = datetime.datetime.strptime(enrollment_date, "%Y-%m-%d").date()
except ValueError:
raise ValueError("Enrollment date must be in the format YYYY-MM-DD")
else:
self.enrollment_date = datetime.date.today()
self.enrolled_courses: Set[str] = set() # Set of course codes
self.grades: List[Grade] = []
self.status = "Active" # Active, On Leave, Graduated, Withdrawn, etc.
def enroll_in_course(self, course_code: str) -> None:
"""Enroll student in a course."""
self.enrolled_courses.add(course_code.upper())
def drop_course(self, course_code: str) -> bool:
"""Drop a course."""
course_code = course_code.upper()
if course_code in self.enrolled_courses:
self.enrolled_courses.remove(course_code)
return True
return False
def add_grade(self, grade: Grade) -> None:
"""Add a grade for a course."""
if grade.student_id != self.student_id:
raise ValueError("Grade does not belong to this student")
self.grades.append(grade)
def get_gpa(self) -> Optional[float]:
"""Calculate student's GPA."""
valid_grades = [g for g in self.grades if g.grade_points is not None]
if not valid_grades:
return None
total_points = sum(g.grade_points * self.get_course_credits(g.course_code) for g in valid_grades)
total_credits = sum(self.get_course_credits(g.course_code) for g in valid_grades)
if total_credits == 0:
return None
return round(total_points / total_credits, 2)
def get_course_credits(self, course_code: str) -> int:
"""Helper method to get course credits (would normally look up in a course catalog)."""
# This would ideally query the SMS system's course catalog
# For now, we'll return a default value of 3
return 3
def get_transcript(self) -> List[Dict[str, Any]]:
"""Generate student transcript."""
transcript = []
for grade in sorted(self.grades, key=lambda g: (g.year, g.semester, g.course_code)):
transcript.append({
"course_code": grade.course_code,
"grade": grade.grade,
"term": grade.term,
"credits": self.get_course_credits(grade.course_code),
"grade_points": grade.grade_points
})
return transcript
def to_dict(self) -> Dict[str, Any]:
"""Convert student to dictionary for serialization."""
base_dict = super().to_dict()
student_dict = {
"student_id": self.student_id,
"major": self.major,
"enrollment_date": self.enrollment_date.strftime("%Y-%m-%d"),
"enrolled_courses": list(self.enrolled_courses),
"grades": [g.to_dict() for g in self.grades],
"status": self.status
}
return {**base_dict, **student_dict}
def __str__(self) -> str:
return f"{self.full_name} (Student ID: {self.student_id}, Major: {self.major})"
class Instructor(Person):
"""Represents an instructor in the system."""
def __init__(
self,
first_name: str,
last_name: str,
date_of_birth: str,
contact_email: str,
employee_id: Optional[str] = None,
department: str = "",
position: str = "Instructor",
hire_date: Optional[str] = None
):
super().__init__(first_name, last_name, date_of_birth, contact_email)
# Use provided ID or generate a new one
self.employee_id = employee_id if employee_id else f"E{str(uuid.uuid4())[:8].upper()}"
self.department = department
self.position = position
# Set hire date to today if not provided
if hire_date:
try:
self.hire_date = datetime.datetime.strptime(hire_date, "%Y-%m-%d").date()
except ValueError:
raise ValueError("Hire date must be in the format YYYY-MM-DD")
else:
self.hire_date = datetime.date.today()
self.courses_taught: Set[str] = set() # Set of course codes
def assign_course(self, course_code: str) -> None:
"""Assign instructor to teach a course."""
self.courses_taught.add(course_code.upper())
def remove_course(self, course_code: str) -> bool:
"""Remove a course from instructor's teaching load."""
course_code = course_code.upper()
if course_code in self.courses_taught:
self.courses_taught.remove(course_code)
return True
return False
def to_dict(self) -> Dict[str, Any]:
"""Convert instructor to dictionary for serialization."""
base_dict = super().to_dict()
instructor_dict = {
"employee_id": self.employee_id,
"department": self.department,
"position": self.position,
"hire_date": self.hire_date.strftime("%Y-%m-%d"),
"courses_taught": list(self.courses_taught)
}
return {**base_dict, **instructor_dict}
def __str__(self) -> str:
return f"{self.full_name} ({self.position}, {self.department})"
class StudentManagementSystem:
"""Main class for managing students, courses, and grades."""
def __init__(self, data_file: str = "sms_data.json"):
self.students: Dict[str, Student] = {} # Map student_id to Student object
self.courses: Dict[str, Course] = {} # Map course_code to Course object
self.instructors: Dict[str, Instructor] = {} # Map employee_id to Instructor object
self.data_file = data_file
# Load data if file exists
if os.path.exists(data_file):
self.load_data()
def add_student(self, student: Student) -> None:
"""Add a student to the system."""
if student.student_id in self.students:
raise ValueError(f"Student with ID {student.student_id} already exists")
self.students[student.student_id] = student
def get_student(self, student_id: str) -> Optional[Student]:
"""Get a student by ID."""
return self.students.get(student_id)
def add_course(self, course: Course) -> None:
"""Add a course to the system."""
if course.code in self.courses:
raise ValueError(f"Course with code {course.code} already exists")
self.courses[course.code] = course
def get_course(self, course_code: str) -> Optional[Course]:
"""Get a course by code."""
return self.courses.get(course_code.upper())
def add_instructor(self, instructor: Instructor) -> None:
"""Add an instructor to the system."""
if instructor.employee_id in self.instructors:
raise ValueError(f"Instructor with ID {instructor.employee_id} already exists")
self.instructors[instructor.employee_id] = instructor
def get_instructor(self, employee_id: str) -> Optional[Instructor]:
"""Get an instructor by ID."""
return self.instructors.get(employee_id)
def enroll_student_in_course(self, student_id: str, course_code: str) -> bool:
"""Enroll a student in a course."""
student = self.get_student(student_id)
course = self.get_course(course_code.upper())
if not student:
raise ValueError(f"Student with ID {student_id} not found")
if not course:
raise ValueError(f"Course with code {course_code} not found")
student.enroll_in_course(course.code)
course.enroll_student(student_id)
return True
def drop_student_from_course(self, student_id: str, course_code: str) -> bool:
"""Drop a student from a course."""
student = self.get_student(student_id)
course = self.get_course(course_code.upper())
if not student or not course:
return False
return student.drop_course(course.code) and course.drop_student(student_id)
def assign_grade(
self,
student_id: str,
course_code: str,
grade: str,
semester: str,
year: int
) -> bool:
"""Assign a grade to a student for a course."""
student = self.get_student(student_id)
course = self.get_course(course_code.upper())
if not student:
raise ValueError(f"Student with ID {student_id} not found")
if not course:
raise ValueError(f"Course with code {course_code} not found")
# Create and add grade
grade_obj = Grade(student_id, course_code, grade, semester, year)
student.add_grade(grade_obj)
return True
def get_student_gpa(self, student_id: str) -> Optional[float]:
"""Get a student's GPA."""
student = self.get_student(student_id)
if not student:
raise ValueError(f"Student with ID {student_id} not found")
return student.get_gpa()
def get_student_transcript(self, student_id: str) -> List[Dict[str, Any]]:
"""Get a student's transcript."""
student = self.get_student(student_id)
if not student:
raise ValueError(f"Student with ID {student_id} not found")
return student.get_transcript()
def assign_instructor_to_course(self, employee_id: str, course_code: str) -> bool:
"""Assign an instructor to teach a course."""
instructor = self.get_instructor(employee_id)
course = self.get_course(course_code.upper())
if not instructor:
raise ValueError(f"Instructor with ID {employee_id} not found")
if not course:
raise ValueError(f"Course with code {course_code} not found")
instructor.assign_course(course.code)
return True
def generate_course_roster(self, course_code: str) -> List[Dict[str, Any]]:
"""Generate a roster of students for a course."""
course = self.get_course(course_code.upper())
if not course:
raise ValueError(f"Course with code {course_code} not found")
roster = []
for student_id in course.students:
student = self.get_student(student_id)
if student:
roster.append({
"student_id": student.student_id,
"name": student.full_name,
"email": student.contact_email,
"major": student.major
})
return roster
def find_students_by_name(self, name: str) -> List[Student]:
"""Find students by name (case-insensitive partial match)."""
name = name.lower()
return [
student for student in self.students.values()
if name in student.full_name.lower()
]
def find_courses_by_name(self, name: str) -> List[Course]:
"""Find courses by name (case-insensitive partial match)."""
name = name.lower()
return [
course for course in self.courses.values()
if name in course.name.lower()
]
def get_students_by_major(self, major: str) -> List[Student]:
"""Get all students in a specific major."""
return [
student for student in self.students.values()
if student.major.lower() == major.lower()
]
def get_instructors_by_department(self, department: str) -> List[Instructor]:
"""Get all instructors in a specific department."""
return [
instructor for instructor in self.instructors.values()
if instructor.department.lower() == department.lower()
]
def save_data(self) -> None:
"""Save all data to file."""
data = {
"students": {sid: student.to_dict() for sid, student in self.students.items()},
"courses": {code: course.to_dict() for code, course in self.courses.items()},
"instructors": {eid: instructor.to_dict() for eid, instructor in self.instructors.items()}
}
with open(self.data_file, 'w') as f:
json.dump(data, f, indent=2)
def load_data(self) -> None:
"""Load data from file."""
try:
with open(self.data_file, 'r') as f:
data = json.load(f)
# Load courses first
for code, course_data in data.get("courses", {}).items():
course = Course(
code=course_data["code"],
name=course_data["name"],
credits=course_data["credits"],
description=course_data.get("description", "")
)
# Add student IDs to the course
for student_id in course_data.get("students", []):
course.enroll_student(student_id)
self.courses[code] = course
# Load students
for sid, student_data in data.get("students", {}).items():
# Create student object
student = Student(
first_name=student_data["first_name"],
last_name=student_data["last_name"],
date_of_birth=student_data["date_of_birth"],
contact_email=student_data["contact_email"],
student_id=student_data["student_id"],
major=student_data.get("major", "Undeclared"),
enrollment_date=student_data.get("enrollment_date")
)
# Add enrolled courses
for course_code in student_data.get("enrolled_courses", []):
student.enroll_in_course(course_code)
# Add grades
for grade_data in student_data.get("grades", []):
grade = Grade(
student_id=grade_data["student_id"],
course_code=grade_data["course_code"],
grade=grade_data["grade"],
semester=grade_data["semester"],
year=grade_data["year"]
)
student.add_grade(grade)
student.status = student_data.get("status", "Active")
self.students[sid] = student
# Load instructors
for eid, instructor_data in data.get("instructors", {}).items():
instructor = Instructor(
first_name=instructor_data["first_name"],
last_name=instructor_data["last_name"],
date_of_birth=instructor_data["date_of_birth"],
contact_email=instructor_data["contact_email"],
employee_id=instructor_data["employee_id"],
department=instructor_data.get("department", ""),
position=instructor_data.get("position", "Instructor"),
hire_date=instructor_data.get("hire_date")
)
# Add courses taught
for course_code in instructor_data.get("courses_taught", []):
instructor.assign_course(course_code)
self.instructors[eid] = instructor
except Exception as e:
print(f"Error loading data: {e}")
# Initialize empty data structures
self.students = {}
self.courses = {}
self.instructors = {}
# Example usage
if __name__ == "__main__":
# Create SMS instance
sms = StudentManagementSystem()
# Add some courses
cs101 = Course("CS101", "Introduction to Computer Science", 3, "Basic programming concepts")
cs102 = Course("CS102", "Data Structures", 4, "Fundamental data structures and algorithms")
math201 = Course("MATH201", "Calculus I", 4, "Limits, derivatives, and integrals")
sms.add_course(cs101)
sms.add_course(cs102)
sms.add_course(math201)
# Add some students
alice = Student(
first_name="Alice",
last_name="Johnson",
date_of_birth="2000-05-15",
contact_email="alice@example.com",
major="Computer Science"
)
bob = Student(
first_name="Bob",
last_name="Smith",
date_of_birth="2001-02-20",
contact_email="bob@example.com",
major="Mathematics"
)
sms.add_student(alice)
sms.add_student(bob)
# Add an instructor
prof_brown = Instructor(
first_name="David",
last_name="Brown",
date_of_birth="1975-08-10",
contact_email="brown@example.edu",
department="Computer Science",
position="Professor"
)
sms.add_instructor(prof_brown)
# Assign instructor to courses
sms.assign_instructor_to_course(prof_brown.employee_id, "CS101")
sms.assign_instructor_to_course(prof_brown.employee_id, "CS102")
# Enroll students in courses
sms.enroll_student_in_course(alice.student_id, "CS101")
sms.enroll_student_in_course(alice.student_id, "MATH201")
sms.enroll_student_in_course(bob.student_id, "CS101")
sms.enroll_student_in_course(bob.student_id, "CS102")
# Assign grades
sms.assign_grade(alice.student_id, "CS101", "A", "Fall", 2023)
sms.assign_grade(bob.student_id, "CS101", "B+", "Fall", 2023)
# Generate a course roster
roster = sms.generate_course_roster("CS101")
print("\nCS101 Course Roster:")
for student in roster:
print(f"- {student['name']} ({student['student_id']}), Major: {student['major']}")
# Get student GPA
alice_gpa = sms.get_student_gpa(alice.student_id)
print(f"\nAlice's GPA: {alice_gpa}")
# Get student transcript
alice_transcript = sms.get_student_transcript(alice.student_id)
print("\nAlice's Transcript:")
for entry in alice_transcript:
print(f"- {entry['course_code']}: {entry['grade']} ({entry['term']})")
# Save data
sms.save_data()
print("\nData saved to sms_data.json")
class SMSInterface:
"""Command-line interface for the Student Management System."""
def __init__(self, sms: StudentManagementSystem):
self.sms = sms
self.running = True
def display_menu(self) -> None:
"""Display the main menu."""
print("\n===== Student Management System =====")
print("1. Student Management")
print("2. Course Management")
print("3. Instructor Management")
print("4. Enrollment Management")
print("5. Grade Management")
print("6. Reports")
print("7. Save Data")
print("8. Exit")
print("=====================================")
def run(self) -> None:
"""Run the interface."""
while self.running:
self.display_menu()
choice = input("Enter your choice (1-8): ")
if choice == "1":
self.student_menu()
elif choice == "2":
self.course_menu()
elif choice == "3":
self.instructor_menu()
elif choice == "4":
self.enrollment_menu()
elif choice == "5":
self.grade_menu()
elif choice == "6":
self.reports_menu()
elif choice == "7":
self.sms.save_data()
print("Data saved successfully!")
elif choice == "8":
self.running = False
print("Exiting Student Management System. Goodbye!")
else:
print("Invalid choice. Please try again.")
def student_menu(self) -> None:
"""Display student management menu."""
while True:
print("\n----- Student Management -----")
print("1. Add Student")
print("2. View Student Details")
print("3. Update Student Information")
print("4. Search Students by Name")
print("5. List Students by Major")
print("6. Back to Main Menu")
print("-----------------------------")
choice = input("Enter your choice (1-6): ")
if choice == "1":
self.add_student()
elif choice == "2":
self.view_student()
elif choice == "3":
self.update_student()
elif choice == "4":
self.search_students_by_name()
elif choice == "5":
self.list_students_by_major()
elif choice == "6":
break
else:
print("Invalid choice. Please try again.")
def add_student(self) -> None:
"""Add a new student."""
print("\nEnter student information:")
first_name = input("First Name: ")
last_name = input("Last Name: ")
# Validate date of birth
while True:
date_of_birth = input("Date of Birth (YYYY-MM-DD): ")
try:
datetime.datetime.strptime(date_of_birth, "%Y-%m-%d")
break
except ValueError:
print("Invalid date format. Please use YYYY-MM-DD.")
contact_email = input("Email: ")
major = input("Major (or leave blank for Undeclared): ") or "Undeclared"
try:
student = Student(
first_name=first_name,
last_name=last_name,
date_of_birth=date_of_birth,
contact_email=contact_email,
major=major
)
self.sms.add_student(student)
print(f"Student added successfully! Student ID: {student.student_id}")
except ValueError as e:
print(f"Error: {e}")
def view_student(self) -> None:
"""View student details."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if student:
print("\nStudent Information:")
print(f"Name: {student.full_name}")
print(f"Student ID: {student.student_id}")
print(f"Major: {student.major}")
print(f"Date of Birth: {student.date_of_birth}")
print(f"Age: {student.age}")
print(f"Email: {student.contact_email}")
print(f"Enrollment Date: {student.enrollment_date}")
print(f"Status: {student.status}")
# Show enrolled courses
print("\nEnrolled Courses:")
if not student.enrolled_courses:
print("None")
else:
for course_code in student.enrolled_courses:
course = self.sms.get_course(course_code)
if course:
print(f"- {course.code}: {course.name}")
# Show GPA
gpa = student.get_gpa()
print(f"\nCurrent GPA: {gpa if gpa is not None else 'N/A'}")
else:
print(f"Student with ID {student_id} not found.")
def update_student(self) -> None:
"""Update student information."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if student:
print("\nUpdate Student Information (leave blank to keep current value):")
# Get new values with current values as defaults
first_name = input(f"First Name [{student.first_name}]: ") or student.first_name
last_name = input(f"Last Name [{student.last_name}]: ") or student.last_name
email = input(f"Email [{student.contact_email}]: ") or student.contact_email
major = input(f"Major [{student.major}]: ") or student.major
status = input(f"Status [{student.status}]: ") or student.status
# Update student information
student.first_name = first_name
student.last_name = last_name
student.full_name = f"{first_name} {last_name}"
student.contact_email = email
student.major = major
student.status = status
print("Student information updated successfully!")
else:
print(f"Student with ID {student_id} not found.")
def search_students_by_name(self) -> None:
"""Search for students by name."""
name = input("Enter name to search: ")
students = self.sms.find_students_by_name(name)
if students:
print(f"\nFound {len(students)} student(s):")
for student in students:
print(f"- {student.full_name} (ID: {student.student_id}, Major: {student.major})")
else:
print(f"No students found matching '{name}'.")
def list_students_by_major(self) -> None:
"""List students by major."""
major = input("Enter major: ")
students = self.sms.get_students_by_major(major)
if students:
print(f"\nStudents majoring in {major}:")
for student in students:
print(f"- {student.full_name} (ID: {student.student_id})")
else:
print(f"No students found majoring in {major}.")
def course_menu(self) -> None:
"""Display course management menu."""
while True:
print("\n----- Course Management -----")
print("1. Add Course")
print("2. View Course Details")
print("3. Search Courses")
print("4. List All Courses")
print("5. Back to Main Menu")
print("---------------------------")
choice = input("Enter your choice (1-5): ")
if choice == "1":
self.add_course()
elif choice == "2":
self.view_course()
elif choice == "3":
self.search_courses()
elif choice == "4":
self.list_all_courses()
elif choice == "5":
break
else:
print("Invalid choice. Please try again.")
def add_course(self) -> None:
"""Add a new course."""
print("\nEnter course information:")
code = input("Course Code (e.g., CS101): ")
name = input("Course Name: ")
# Validate credits
while True:
try:
credits = int(input("Credits: "))
if credits <= 0:
print("Credits must be a positive integer.")
continue
break
except ValueError:
print("Please enter a valid number.")
description = input("Description (optional): ")
try:
course = Course(code, name, credits, description)
self.sms.add_course(course)
print(f"Course {course.code} added successfully!")
except ValueError as e:
print(f"Error: {e}")
def view_course(self) -> None:
"""View course details."""
code = input("Enter Course Code: ")
course = self.sms.get_course(code)
if course:
print("\nCourse Information:")
print(f"Code: {course.code}")
print(f"Name: {course.name}")
print(f"Credits: {course.credits}")
print(f"Description: {course.description}")
print(f"Enrollment Count: {course.get_enrollment_count()}")
# Show enrolled students
if course.students:
print("\nEnrolled Students:")
for student_id in course.students:
student = self.sms.get_student(student_id)
if student:
print(f"- {student.full_name} (ID: {student.student_id})")
else:
print("\nNo students enrolled in this course.")
else:
print(f"Course with code {code} not found.")
def search_courses(self) -> None:
"""Search for courses by name."""
name = input("Enter course name to search: ")
courses = self.sms.find_courses_by_name(name)
if courses:
print(f"\nFound {len(courses)} course(s):")
for course in courses:
print(f"- {course.code}: {course.name} ({course.credits} credits)")
else:
print(f"No courses found matching '{name}'.")
def list_all_courses(self) -> None:
"""List all courses."""
if self.sms.courses:
print("\nAll Courses:")
for course in self.sms.courses.values():
print(f"- {course.code}: {course.name} ({course.credits} credits)")
else:
print("No courses available.")
def instructor_menu(self) -> None:
"""Display instructor management menu."""
while True:
print("\n----- Instructor Management -----")
print("1. Add Instructor")
print("2. View Instructor Details")
print("3. List Instructors by Department")
print("4. Assign Course to Instructor")
print("5. Back to Main Menu")
print("-------------------------------")
choice = input("Enter your choice (1-5): ")
if choice == "1":
self.add_instructor()
elif choice == "2":
self.view_instructor()
elif choice == "3":
self.list_instructors_by_department()
elif choice == "4":
self.assign_course_to_instructor()
elif choice == "5":
break
else:
print("Invalid choice. Please try again.")
def add_instructor(self) -> None:
"""Add a new instructor."""
print("\nEnter instructor information:")
first_name = input("First Name: ")
last_name = input("Last Name: ")
# Validate date of birth
while True:
date_of_birth = input("Date of Birth (YYYY-MM-DD): ")
try:
datetime.datetime.strptime(date_of_birth, "%Y-%m-%d")
break
except ValueError:
print("Invalid date format. Please use YYYY-MM-DD.")
contact_email = input("Email: ")
department = input("Department: ")
position = input("Position (default: Instructor): ") or "Instructor"
try:
instructor = Instructor(
first_name=first_name,
last_name=last_name,
date_of_birth=date_of_birth,
contact_email=contact_email,
department=department,
position=position
)
self.sms.add_instructor(instructor)
print(f"Instructor added successfully! Employee ID: {instructor.employee_id}")
except ValueError as e:
print(f"Error: {e}")
def view_instructor(self) -> None:
"""View instructor details."""
employee_id = input("Enter Employee ID: ")
instructor = self.sms.get_instructor(employee_id)
if instructor:
print("\nInstructor Information:")
print(f"Name: {instructor.full_name}")
print(f"Employee ID: {instructor.employee_id}")
print(f"Department: {instructor.department}")
print(f"Position: {instructor.position}")
print(f"Email: {instructor.contact_email}")
print(f"Hire Date: {instructor.hire_date}")
# Show courses taught
print("\nCourses Taught:")
if not instructor.courses_taught:
print("None")
else:
for course_code in instructor.courses_taught:
course = self.sms.get_course(course_code)
if course:
print(f"- {course.code}: {course.name}")
else:
print(f"Instructor with ID {employee_id} not found.")
def list_instructors_by_department(self) -> None:
"""List instructors by department."""
department = input("Enter department: ")
instructors = self.sms.get_instructors_by_department(department)
if instructors:
print(f"\nInstructors in {department} department:")
for instructor in instructors:
print(f"- {instructor.full_name} ({instructor.position}, ID: {instructor.employee_id})")
else:
print(f"No instructors found in {department} department.")
def assign_course_to_instructor(self) -> None:
"""Assign a course to an instructor."""
employee_id = input("Enter Instructor Employee ID: ")
instructor = self.sms.get_instructor(employee_id)
if not instructor:
print(f"Instructor with ID {employee_id} not found.")
return
course_code = input("Enter Course Code: ")
course = self.sms.get_course(course_code)
if not course:
print(f"Course with code {course_code} not found.")
return
if self.sms.assign_instructor_to_course(employee_id, course_code):
print(f"Course {course.code} assigned to {instructor.full_name} successfully!")
else:
print("Failed to assign course to instructor.")
def enrollment_menu(self) -> None:
"""Display enrollment management menu."""
while True:
print("\n----- Enrollment Management -----")
print("1. Enroll Student in Course")
print("2. Drop Student from Course")
print("3. View Course Roster")
print("4. Back to Main Menu")
print("--------------------------------")
choice = input("Enter your choice (1-4): ")
if choice == "1":
self.enroll_student_in_course()
elif choice == "2":
self.drop_student_from_course()
elif choice == "3":
self.view_course_roster()
elif choice == "4":
break
else:
print("Invalid choice. Please try again.")
def enroll_student_in_course(self) -> None:
"""Enroll a student in a course."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if not student:
print(f"Student with ID {student_id} not found.")
return
course_code = input("Enter Course Code: ")
course = self.sms.get_course(course_code)
if not course:
print(f"Course with code {course_code} not found.")
return
try:
if self.sms.enroll_student_in_course(student_id, course_code):
print(f"{student.full_name} enrolled in {course.code} successfully!")
except ValueError as e:
print(f"Error: {e}")
def drop_student_from_course(self) -> None:
"""Drop a student from a course."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if not student:
print(f"Student with ID {student_id} not found.")
return
course_code = input("Enter Course Code: ")
course = self.sms.get_course(course_code)
if not course:
print(f"Course with code {course_code} not found.")
return
if self.sms.drop_student_from_course(student_id, course_code):
print(f"{student.full_name} dropped from {course.code} successfully!")
else:
print("Failed to drop student from course.")
def view_course_roster(self) -> None:
"""View a course roster."""
course_code = input("Enter Course Code: ")
try:
roster = self.sms.generate_course_roster(course_code)
if roster:
print(f"\nRoster for course {course_code}:")
print(f"Total students: {len(roster)}")
for i, student in enumerate(roster, 1):
print(f"{i}. {student['name']} (ID: {student['student_id']}, Major: {student['major']})")
else:
print(f"No students enrolled in course {course_code}.")
except ValueError as e:
print(f"Error: {e}")
def grade_menu(self) -> None:
"""Display grade management menu."""
while True:
print("\n----- Grade Management -----")
print("1. Assign Grade")
print("2. View Student Grades")
print("3. Calculate Student GPA")
print("4. Back to Main Menu")
print("---------------------------")
choice = input("Enter your choice (1-4): ")
if choice == "1":
self.assign_grade()
elif choice == "2":
self.view_student_grades()
elif choice == "3":
self.calculate_student_gpa()
elif choice == "4":
break
else:
print("Invalid choice. Please try again.")
def assign_grade(self) -> None:
"""Assign a grade to a student for a course."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if not student:
print(f"Student with ID {student_id} not found.")
return
course_code = input("Enter Course Code: ")
course = self.sms.get_course(course_code)
if not course:
print(f"Course with code {course_code} not found.")
return
# Check if student is enrolled in the course
if course_code.upper() not in student.enrolled_courses:
enroll = input(f"{student.full_name} is not enrolled in {course_code}. Enroll now? (y/n): ")
if enroll.lower() == 'y':
self.sms.enroll_student_in_course(student_id, course_code)
else:
return
# Show valid grades
print("\nValid grades: A+, A, A-, B+, B, B-, C+, C, C-, D+, D, D-, F, I, W")
grade = input("Enter Grade: ").upper()
if grade not in Grade.VALID_GRADES:
print(f"Invalid grade. Must be one of {Grade.VALID_GRADES}")
return
# Get semester and year
print("\nSelect Semester:")
print("1. Fall")
print("2. Spring")
print("3. Summer")
semester_choice = input("Enter choice (1-3): ")
if semester_choice == "1":
semester = "Fall"
elif semester_choice == "2":
semester = "Spring"
elif semester_choice == "3":
semester = "Summer"
else:
print("Invalid choice.")
return
# Get year
current_year = datetime.date.today().year
while True:
try:
year = int(input(f"Enter Year ({current_year-5}-{current_year+1}): "))
if year < current_year - 5 or year > current_year + 1:
print(f"Year must be between {current_year-5} and {current_year+1}.")
continue
break
except ValueError:
print("Please enter a valid year.")
try:
if self.sms.assign_grade(student_id, course_code, grade, semester, year):
print(f"Grade {grade} assigned to {student.full_name} for {course_code} successfully!")
except ValueError as e:
print(f"Error: {e}")
def view_student_grades(self) -> None:
"""View a student's grades."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if not student:
print(f"Student with ID {student_id} not found.")
return
transcript = self.sms.get_student_transcript(student_id)
if transcript:
print(f"\nGrades for {student.full_name}:")
for entry in transcript:
print(f"- {entry['course_code']}: {entry['grade']} ({entry['term']}, {entry['credits']} credits)")
else:
print(f"No grades recorded for {student.full_name}.")
def calculate_student_gpa(self) -> None:
"""Calculate a student's GPA."""
student_id = input("Enter Student ID: ")
try:
gpa = self.sms.get_student_gpa(student_id)
student = self.sms.get_student(student_id)
if gpa is not None:
print(f"\n{student.full_name}'s GPA: {gpa}")
else:
print(f"No GPA available for {student.full_name} (no graded courses).")
except ValueError as e:
print(f"Error: {e}")
def reports_menu(self) -> None:
"""Display reports menu."""
while True:
print("\n----- Reports -----")
print("1. Student Transcript")
print("2. Course Enrollment Summary")
print("3. Department Faculty List")
print("4. GPA Summary by Major")
print("5. Back to Main Menu")
print("-------------------")
choice = input("Enter your choice (1-5): ")
if choice == "1":
self.generate_student_transcript()
elif choice == "2":
self.generate_course_enrollment_summary()
elif choice == "3":
self.generate_department_faculty_list()
elif choice == "4":
self.generate_gpa_summary_by_major()
elif choice == "5":
break
else:
print("Invalid choice. Please try again.")
def generate_student_transcript(self) -> None:
"""Generate a student transcript."""
student_id = input("Enter Student ID: ")
student = self.sms.get_student(student_id)
if not student:
print(f"Student with ID {student_id} not found.")
return
transcript = self.sms.get_student_transcript(student_id)
gpa = self.sms.get_student_gpa(student_id)
print("\n" + "=" * 40)
print(f"OFFICIAL TRANSCRIPT - {student.full_name}")
print("=" * 40)
print(f"Student ID: {student.student_id}")
print(f"Major: {student.major}")
print(f"Enrollment Date: {student.enrollment_date}")
print("-" * 40)
if transcript:
# Group by term
terms = {}
for entry in transcript:
if entry['term'] not in terms:
terms[entry['term']] = []
terms[entry['term']].append(entry)
# Print each term
for term in sorted(terms.keys()):
print(f"\n{term}:")
print("-" * 40)
term_credits = 0
term_points = 0
for entry in terms[term]:
print(f"{entry['course_code']:<10} {entry['credits']} credits Grade: {entry['grade']}")
if entry['grade_points'] is not None:
term_credits += entry['credits']
term_points += entry['credits'] * entry['grade_points']
# Calculate term GPA
if term_credits > 0:
term_gpa = round(term_points / term_credits, 2)
print(f"Term GPA: {term_gpa}")
print("\n" + "-" * 40)
print(f"Cumulative GPA: {gpa if gpa is not None else 'N/A'}")
else:
print("No courses completed.")
print("=" * 40)
def generate_course_enrollment_summary(self) -> None:
"""Generate a summary of course enrollments."""
if not self.sms.courses:
print("No courses available.")
return
print("\n" + "=" * 50)
print("COURSE ENROLLMENT SUMMARY")
print("=" * 50)
print(f"{'Course Code':<12} {'Course Name':<30} {'Enrollment':<10} {'Credits':<8}")
print("-" * 50)
total_students = 0
for course in sorted(self.sms.courses.values(), key=lambda c: c.code):
enrollment = course.get_enrollment_count()
total_students += enrollment
print(f"{course.code:<12} {course.name[:30]:<30} {enrollment:<10} {course.credits:<8}")
print("-" * 50)
print(f"Total Courses: {len(self.sms.courses)}")
print(f"Total Enrollments: {total_students}")
print("=" * 50)
def generate_department_faculty_list(self) -> None:
"""Generate a list of faculty by department."""
if not self.sms.instructors:
print("No instructors available.")
return
# Group instructors by department
departments = {}
for instructor in self.sms.instructors.values():
dept = instructor.department
if not dept:
dept = "No Department"
if dept not in departments:
departments[dept] = []
departments[dept].append(instructor)
print("\n" + "=" * 60)
print("FACULTY BY DEPARTMENT")
print("=" * 60)
for dept, instructors in sorted(departments.items()):
print(f"\n{dept}:")
print("-" * 30)
for instructor in sorted(instructors, key=lambda i: i.last_name):
courses = len(instructor.courses_taught)
print(f"- {instructor.full_name:<25} {instructor.position:<15} Courses: {courses}")
print("\n" + "=" * 60)
def generate_gpa_summary_by_major(self) -> None:
"""Generate a GPA summary by major."""
if not self.sms.students:
print("No students available.")
return
# Group students by major
majors = {}
for student in self.sms.students.values():
major = student.major
if major not in majors:
majors[major] = []
majors[major].append(student)
print("\n" + "=" * 60)
print("GPA SUMMARY BY MAJOR")
print("=" * 60)
print(f"{'Major':<20} {'Students':<10} {'Avg GPA':<10} {'Min GPA':<10} {'Max GPA':<10}")
print("-" * 60)
for major, students in sorted(majors.items()):
# Calculate GPA statistics
gpas = [s.get_gpa() for s in students if s.get_gpa() is not None]
if gpas:
avg_gpa = round(sum(gpas) / len(gpas), 2)
min_gpa = round(min(gpas), 2)
max_gpa = round(max(gpas), 2)
print(f"{major[:20]:<20} {len(students):<10} {avg_gpa:<10} {min_gpa:<10} {max_gpa:<10}")
else:
print(f"{major[:20]:<20} {len(students):<10} {'N/A':<10} {'N/A':<10} {'N/A':<10}")
print("=" * 60)
# Demo function to run the SMS interface
def run_demo():
"""Run a demonstration of the Student Management System."""
sms = StudentManagementSystem()
# Add some sample data
print("Initializing Student Management System with sample data...")
# Add courses
courses = [
Course("CS101", "Introduction to Programming", 3, "Basic programming concepts using Python"),
Course("CS102", "Data Structures", 4, "Fundamental data structures and algorithms"),
Course("MATH101", "College Algebra", 3, "Basic algebraic operations"),
Course("MATH201", "Calculus I", 4, "Limits, derivatives, and integrals"),
Course("ENGL101", "Composition", 3, "Academic writing and research")
]
for course in courses:
try:
sms.add_course(course)
except ValueError:
pass # Skip duplicates
# Add students
students = [
Student("John", "Smith", "2000-05-15", "john@example.com", major="Computer Science"),
Student("Emma", "Johnson", "2001-03-22", "emma@example.com", major="Mathematics"),
Student("Michael", "Davis", "1999-11-08", "michael@example.com", major="Computer Science"),
Student("Sophia", "Brown", "2002-01-30", "sophia@example.com", major="English")
]
for student in students:
try:
sms.add_student(student)
except ValueError:
pass # Skip duplicates
# Add instructors
instructors = [
Instructor("Robert", "Wilson", "1975-07-12", "rwilson@example.com", department="Computer Science", position="Professor"),
Instructor("Jennifer", "Lee", "1980-09-18", "jlee@example.com", department="Mathematics", position="Associate Professor"),
Instructor("William", "Anderson", "1968-04-25", "wanderson@example.com", department="English", position="Professor")
]
for instructor in instructors:
try:
sms.add_instructor(instructor)
except ValueError:
pass # Skip duplicates
# Assign courses to instructors
for instructor in instructors:
if instructor.department == "Computer Science":
sms.assign_instructor_to_course(instructor.employee_id, "CS101")
sms.assign_instructor_to_course(instructor.employee_id, "CS102")
elif instructor.department == "Mathematics":
sms.assign_instructor_to_course(instructor.employee_id, "MATH101")
sms.assign_instructor_to_course(instructor.employee_id, "MATH201")
elif instructor.department == "English":
sms.assign_instructor_to_course(instructor.employee_id, "ENGL101")
# Enroll students in courses
for student in students:
if student.major == "Computer Science":
sms.enroll_student_in_course(student.student_id, "CS101")
sms.enroll_student_in_course(student.student_id, "CS102")
sms.enroll_student_in_course(student.student_id, "MATH101")
sms.enroll_student_in_course(student.student_id, "ENGL101")
elif student.major == "Mathematics":
sms.enroll_student_in_course(student.student_id, "MATH101")
sms.enroll_student_in_course(student.student_id, "MATH201")
sms.enroll_student_in_course(student.student_id, "CS101")
sms.enroll_student_in_course(student.student_id, "ENGL101")
else:
sms.enroll_student_in_course(student.student_id, "ENGL101")
sms.enroll_student_in_course(student.student_id, "MATH101")
# Assign some grades
for student in students:
if "CS101" in student.enrolled_courses:
sms.assign_grade(student.student_id, "CS101", "B+", "Fall", 2024)
if "MATH101" in student.enrolled_courses:
sms.assign_grade(student.student_id, "MATH101", "A-", "Fall", 2024)
if "ENGL101" in student.enrolled_courses:
sms.assign_grade(student.student_id, "ENGL101", "B", "Fall", 2024)
print("Sample data loaded successfully!")
# Run the interface
interface = SMSInterface(sms)
interface.run()
if __name__ == "__main__":
run_demo()
\ No newline at end of file
FROM python:3.13.1-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from app.models.user_model import User
from app.database import get_db
from sqlalchemy.future import select
from uuid import UUID
# Constants
SECRET_KEY = "X3oSxzUEKJcHZAnMM-EuEMi6cBu5DpSoqIoA5vwBnyU"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# Hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
# Hash password
def hash_password(password: str) -> str:
return pwd_context.hash(password)
# Verify password
def verify_password(plain_password, hashed_password) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# Create access token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# Get current user from token
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = db.execute(select(User).where(User.id == UUID(user_id)))
user = result.scalars().first()
if user is None:
raise credentials_exception
return user
from fastapi import FastAPI , Request , status
from app.database import Database
from app.routes import (
simple_model_routes,
user_routes,
todo_routes
)
from app.auth import AuthRoutes
import logging
from fastapi.exceptions import RequestValidationError
from app.global_constants import GlobalConstants
from app.auth_routes import auth_router
# Custom exception handler for validation errors
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = exc.errors()
error_messages = []
for error in errors:
field = error.get("loc")[-1] # Extract the field name
msg = error.get("msg") # Extract the error message
error_messages.append(field)
return GlobalConstants.return_api_response(
message=f"{GlobalConstants.api_response_messages.missing_required_parameters}: {', '.join(error_messages)}",
result=None,
status_code=status.HTTP_400_BAD_REQUEST
)
def create_app():
app = FastAPI(title="FastAPI Application", version="1.0.0")
# Register the custom exception handler for validation errors
app.add_exception_handler(RequestValidationError, validation_exception_handler)
# Create database tables
print("Creating database tables...")
logging.info("Creating database tables...")
Database.Base.metadata.create_all(bind=Database.engine)
logging.info("Database tables created.")
print("Database tables created.")
# Include the user routes
# app.include_router(AuthRoutes.router,tags=["Authentication"])
# app.include_router(simple_model_routes.SimpleModelRoutes.router,tags=["Simple Model"])
app.include_router(auth_router, tags=["Authentication"])
app.include_router(user_routes.UserRoutes.router,tags=["User"])
app.include_router(todo_routes.ToDoRoutes.router,tags=["ToDo"])
return app
from fastapi import APIRouter, HTTPException, Request , status
from fastapi.responses import RedirectResponse
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import jwt
from typing import Dict
import httpx
import requests
from app.global_constants import GlobalConstants
from app.config import Config
import requests
from app.models.simple_model import RefreshTokenSchema
from app.utils.exception import OAuthException
import json
from app.config import Config
from cryptography.hazmat.primitives import serialization
from functools import wraps
import jwt
# Azure AD Configurations
TENANT_ID = Config.TENANT_ID
CLIENT_ID = Config.CLIENT_ID
CLIENT_SECRET = Config.CLIENT_SECRET
REDIRECT_URI = Config.REDIRECT_URI
AUTH_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/authorize"
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
KEYS_URL=f"https://login.microsoftonline.com/{TENANT_ID}/discovery/keys"
SCOPE=f"api://{CLIENT_ID}/access_as_user offline_access"
POST_LOGOUT_REDIRECT_URI = Config.POST_LOGOUT_REDIRECT_URI
# Validate Token using Azure AD Public Keys
def validate_azure_ad_token(token: str):
try:
if not token:
raise OAuthException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization token missing or invalid")
response1 = requests.get(KEYS_URL)
response1.raise_for_status()
keys = response1.json().get('keys', [])
token_headers = jwt.get_unverified_header(token)
token_kid = token_headers.get('kid')
public_key = next((key for key in keys if key.get('kid') == token_kid), None)
if not public_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Public key not found")
rsa_pem_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(public_key))
rsa_pem_key_bytes = rsa_pem_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
decoded_token = jwt.decode(
token,
key=rsa_pem_key_bytes,
algorithms=['RS256'],
audience=f"api://{CLIENT_ID}",
issuer=f"https://sts.windows.net/{TENANT_ID}/"
)
return json.dumps(decoded_token, indent=2)
except requests.RequestException as e:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Request error: {str(e)}")
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired")
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error: {str(e)}")
def validate_and_authorize():
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
decoded_token = validate_azure_ad_token(kwargs.get("token", None))
request: Request = kwargs.get("request")
# Convert user info into dictionary (assuming it's in string format)
user_info = json.loads(decoded_token)
name = user_info.get('name')
unique_name = user_info.get('unique_name')
# Check if the user has access to the requested API
requested_url = request.url.path
requested_method = request.method
allowed_paths = [
{"endpoint": "/simple-models", "method": "GET"},
]
has_access = any(
requested_url.startswith(path["endpoint"]) and path["method"] == requested_method
for path in allowed_paths
)
if not has_access:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=GlobalConstants.api_response_messages.forbidden)
# Define the allowed keys
allowed_keys = {"name", "user_name", "roles"}
# Retain only the allowed keys in user_info
user_info = {key: user_info[key] for key in allowed_keys if key in user_info}
# Update user_info with new values
user_info.update({
"name": name,
"user_name": unique_name
})
# Store in request state
request.state.user_info = user_info
# Proceed to the actual route function
return func(*args, **kwargs)
except HTTPException as e:
return GlobalConstants.return_api_response(
message=e.detail,
result=None,
status_code=e.status_code
)
except OAuthException as e:
return GlobalConstants.return_api_response(
message=e.detail,
result=None,
status_code=e.status_code
)
except requests.RequestException as e:
return GlobalConstants.return_api_response(
message=e.detail,
result=None,
status_code=e.status_code
)
except jwt.ExpiredSignatureError:
return GlobalConstants.return_api_response(
message=e.detail,
result=None,
status_code=e.status_code
)
except Exception as e:
return GlobalConstants.return_api_response(
message=e.detail,
result=None,
status_code=e.status_code
)
return wrapper
return decorator
class AuthRoutes:
router = APIRouter()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl=AUTH_URL,
tokenUrl=TOKEN_URL,
)
tokens: Dict[str, str] = {}
"""
Initiates the Microsoft login process.
"""
@staticmethod
@router.get("/login")
def login():
auth_endpoint = (
f"{AUTH_URL}?"
f"client_id={CLIENT_ID}&response_type=code&redirect_uri={REDIRECT_URI}"
f"&response_mode=query&scope={SCOPE}"
)
return RedirectResponse(url=auth_endpoint)
"""
Callback route to exchange authorization code for tokens.
"""
@staticmethod
@router.get("/authenticate")
async def callback(request: Request):
code = request.query_params.get("code")
if not code:
error = request.query_params.get("error")
error_description = request.query_params.get("error_description")
return GlobalConstants.return_api_response(
message=f"Authentication failed: {error} - {error_description}",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
token_data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI
}
try:
response = requests.post(TOKEN_URL, data=token_data)
response.raise_for_status() # Raises an error for non-200 responses
token_json = response.json()
access_token = token_json.get("access_token")
refresh_token = token_json.get("refresh_token")
if not access_token:
return GlobalConstants.return_api_response(
message="Access token not found in the response",
result=[],
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[{
"access_token": access_token,
"refresh_token": refresh_token,
}],
status_code=status.HTTP_200_OK
)
except requests.exceptions.Timeout:
print("Error in Timeout : ", str(e))
return GlobalConstants.return_api_response(
message="Token exchange service timeout",
result=None,
status_code=status.HTTP_504_GATEWAY_TIMEOUT
)
except requests.exceptions.ConnectionError:
print("Error in ConnectionError : ", str(e))
return GlobalConstants.return_api_response(
message="Failed to connect to token exchange service",
result=None,
status_code=status.HTTP_502_BAD_GATEWAY
)
except requests.RequestException as e:
print("Error in RequestException : ", str(e))
return GlobalConstants.return_api_response(
message="Error communicating with token exchange service",
result=None,
status_code=status.HTTP_502_BAD_GATEWAY
)
except Exception as e:
print("Error in Exception : ", str(e))
return GlobalConstants.return_api_response(
message="An unexpected error occurred",
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
"""
Refreshes the access token using the refresh token.
"""
@staticmethod
@router.post("/refresh")
async def refresh_token(refresh_token: RefreshTokenSchema):
refresh_token = refresh_token.refresh_token
if not refresh_token:
return GlobalConstants.return_api_response(
message="Refresh token not available",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
refresh_data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope":f"api://{CLIENT_ID}/access_as_user offline_access"
}
try:
async with httpx.AsyncClient(timeout=30) as client:
refresh_response = await client.post(TOKEN_URL, data=refresh_data)
refresh_response.raise_for_status() # Raises an error for non-200 responses
token_json = refresh_response.json()
access_token = token_json.get("access_token")
new_refresh_token = token_json.get("refresh_token", refresh_token) # Use new if provided
if not access_token:
return GlobalConstants.return_api_response(
message="Access token not found in the response",
result=[],
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[{"access_token": access_token, "refresh_token": new_refresh_token}],
status_code=status.HTTP_200_OK
)
except httpx.TimeoutException:
print("Error in Timeout : ", str(e))
return GlobalConstants.return_api_response(
message="Token refresh service timeout",
result=None,
status_code=status.HTTP_504_GATEWAY_TIMEOUT
)
except httpx.ConnectError as e:
print("Error in ConnectError : ", str(e))
return GlobalConstants.return_api_response(
message="Failed to connect to token refresh service",
result=None,
status_code=status.HTTP_502_BAD_GATEWAY
)
except httpx.RequestError as e:
print("Error in RequestError : ", str(e))
return GlobalConstants.return_api_response(
message="Error communicating with token refresh service",
result=None,
status_code=status.HTTP_502_BAD_GATEWAY
)
except Exception as e:
print("Error in Exception : ", str(e))
return GlobalConstants.return_api_response(
message="An unexpected error occurred",
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
"""
Logs out the user by redirecting to Microsoft's logout endpoint.
"""
@staticmethod
@router.get("/logout")
async def logout_user():
logout_url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/logout?post_logout_redirect_uri={POST_LOGOUT_REDIRECT_URI}"
try:
async with httpx.AsyncClient(timeout=30) as client:
logout_response = await client.get(logout_url)
logout_response.raise_for_status() # Raises an error for non-200 responses
if logout_url:
return RedirectResponse(url=logout_url)
except httpx.TimeoutException:
print("Error in Timeout : ", str(e))
return GlobalConstants.return_api_response(
message="Logout service timeout",
result=None,
status_code=status.HTTP_504_GATEWAY_TIMEOUT
)
except httpx.ConnectError as e:
print("Error in ConnectError : ", str(e))
return GlobalConstants.return_api_response(
message="Failed to connect to logout service",
result=None,
status_code=status.HTTP_502_BAD_GATEWAY
)
except httpx.RequestError as e:
print("Error in RequestError : ", str(e))
return GlobalConstants.return_api_response(
message="Error communicating with logout service",
result=None,
status_code=status.HTTP_502_BAD_GATEWAY
)
except Exception as e:
print("Error in Exception : ", str(e))
return GlobalConstants.return_api_response(
message="An unexpected error occurred",
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.Oauth import verify_password, create_access_token
from app.database import get_db
from app.models.user_model import User
from sqlalchemy.future import select
from datetime import timedelta
auth_router = APIRouter()
@auth_router.post("/login")
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
result = db.execute(select(User).where(User.email == form_data.username))
user = result.scalars().first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(minutes=60)
)
return {"access_token": access_token, "token_type": "bearer"}
import os
from dotenv import load_dotenv
def load_env(env):
env_path = os.path.join(os.path.dirname(__file__), 'env_files', env, '.env')
if os.path.exists(env_path):
load_dotenv(env_path)
print(f"Loaded {env} environment configuration")
else:
raise FileNotFoundError(f".env file for {env} not found.")
# Load environment (default to 'dev' if not provided)
current_env = os.getenv('ENV', 'dev')
load_env(current_env)
class Config:
SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL')
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Microsoft Azure AD Configurations
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
TENANT_ID = os.getenv('TENANT_ID')
REDIRECT_URI = os.getenv('REDIRECT_URI')
POST_LOGOUT_REDIRECT_URI = os.getenv('POST_LOGOUT_REDIRECT_URI')
from sqlalchemy import create_engine, exc
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import Config
import logging
class Database:
# Base class for declarative class definitions (main database)
Base = declarative_base()
# Configure logging for this module
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration: get the connection string from config
DATABASE_URL = Config.SQLALCHEMY_DATABASE_URI
# Create SQLAlchemy engine with connection pool settings
engine = create_engine(DATABASE_URL)
# Create a configured "Session" class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Redefine Base for ORM models (main database)
Base = declarative_base()
# Create all tables in the database (with error handling)
try:
Base.metadata.create_all(bind=engine)
logger.info("Database tables created successfully.")
except exc.SQLAlchemyError as e:
logger.error(f"Failed to create database tables: {e}")
# Dependency to get a database session (for use in FastAPI or similar frameworks)
def get_db():
db = Database.SessionLocal()
try:
yield db # Provide the session to the caller
except exc.SQLAlchemyError as e:
Database.logger.error(f"Database error occurred: {e}")
db.rollback() # Rollback in case of error
raise # Re-raise the exception for further handling
finally:
db.close() # Always close the session
# Development Environment Variables
DATABASE_URL=postgresql://postgres:password@localhost/to_dos
CLIENT_ID="63c339ba-876c-4724-b813-f7cbc77e7e3b"
TENANT_ID="6d084a41-2a0b-4141-a5fa-1eb080f65327"
CLIENT_SECRET="Hdd8Q~bSgayDmsaEA6L9DWqEg-U8zV.uZSohPbFc"
REDIRECT_URI="http://localhost:8000/authenticate"
POST_LOGOUT_REDIRECT_URI="http://localhost:8000/login"
# Production Environment Variables
DATABASE_URL=postgresql://postgres:password@localhost/rbac_db_may_2
CLIENT_ID="63c339ba-876c-4724-b813-f7cbc77e7e3b"
TENANT_ID="6d084a41-2a0b-4141-a5fa-1eb080f65327"
CLIENT_SECRET="Hdd8Q~bSgayDmsaEA6L9DWqEg-U8zV.uZSohPbFc"
REDIRECT_URI="http://localhost:8000/authenticate"
POST_LOGOUT_REDIRECT_URI="http://localhost:8000/login"
\ No newline at end of file
# Testing Environment Variables
DATABASE_URL=postgresql://postgres:password@localhost/rbac_db_may_2
CLIENT_ID="63c339ba-876c-4724-b813-f7cbc77e7e3b"
TENANT_ID="6d084a41-2a0b-4141-a5fa-1eb080f65327"
CLIENT_SECRET="Hdd8Q~bSgayDmsaEA6L9DWqEg-U8zV.uZSohPbFc"
REDIRECT_URI="http://localhost:8000/authenticate"
POST_LOGOUT_REDIRECT_URI="http://localhost:8000/login"
\ No newline at end of file
from fastapi import status
from fastapi.responses import JSONResponse
from typing import Any
from fastapi.encoders import jsonable_encoder
class DotNotationDict:
def __init__(self, dictionary):
self._data = dictionary
def __getattr__(self, name):
if name in self._data:
return self._data[name]
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
class GlobalConstants:
api_response_messages = DotNotationDict({
"success": "successfully",
"accepted": "Accepted",
"invalid_request_data": "Invalid request data",
"unauthorized": "Unauthorized access",
"forbidden": "Forbidden access",
"not_found": "Resource not found",
"method_not_allowed": "Method not allowed for the requested resource",
"internal_server_error": "Internal server error occurred",
"missing_required_parameters": "Missing required parameters",
"error_in": "Error in",
"db_error" : "An error occurred while accessing the database.",
})
# api_request_messages = DotNotationDict({
# "page_no": 1,
# "page_size": 10,
# "filter_data": "Filter data for the request",
# "page_no_description": "Page number, starting from 1",
# "page_size_description": "Number of records per page",
# })
"""
Custom function to generate a standardized JSON response.
:param message: Message string for the response.
:param result: The result data to include in the response.
:return: A FastAPI JSONResponse object.
"""
@staticmethod
def return_api_response(message: str, result: Any, status_code: int = status.HTTP_200_OK) -> JSONResponse:
if result:
result = jsonable_encoder(result)
return JSONResponse(
status_code=status_code,
content={
"message": message,
"result": result
}
)
# This file is intentionally left blank
\ No newline at end of file
from sqlalchemy import Column, Integer, String, DateTime
from app.database import Database
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from typing import Optional
class SimpleModel(Database.Base):
__tablename__ = "simple_model"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
name = Column(String, nullable=False)
created_at = Column(DateTime, default=datetime.now(timezone.utc))
updated_at = Column(DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
created_by = Column(String, nullable=True)
updated_by = Column(String, nullable=True)
class SimpleModelCreateSchema(BaseModel):
name: str = Field(..., example="Test Name")
created_by: Optional[str] = Field(None, example="admin")
class Config:
orm_mode = True
class SimpleModelUpdateSchema(BaseModel):
simple_model_id: int = Field(..., example=1)
name: Optional[str] = Field(None, example="Updated Name")
updated_by: Optional[str] = Field(None, example="admin")
class Config:
orm_mode = True
class SimpleModelResponseSchema(BaseModel):
id: int = Field(..., example=1)
name: str = Field(..., example="Test Name")
created_at: Optional[datetime] = Field(None, example="2021-08-01T00:00:00Z")
updated_at: Optional[datetime] = Field(None, example="2021-08-02T00:00:00Z")
created_by: Optional[str] = Field(None, example="admin")
updated_by: Optional[str] = Field(None, example="admin")
class Config:
orm_mode = True
class RefreshTokenSchema(BaseModel):
refresh_token: str = Field(..., example="1.AXEAQUoIbQsqQUGl-h6wgPZTJ7o5w2NshyRHuBP3y8d-fjsvAbNxAA.AgABAwEAAABVrSpeuWamRam2jAF1XRQEAwDs_")
class Config:
orm_mode = True
\ No newline at end of file
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import UUID
from app.database import Database
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field
from uuid import UUID as UUIDType
import uuid
class ToDo(Database.Base):
__tablename__ = "todos"
id = Column(UUID(as_uuid=True), primary_key=True, default=lambda: uuid.uuid4(), index=True)
title = Column(String, nullable=False)
description = Column(String, nullable=True)
is_completed = Column(Boolean, default=False)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
created_by = Column(UUID(as_uuid=True), nullable=True)
updated_by = Column(UUID(as_uuid=True), nullable=True)
class ToDoCreateSchema(BaseModel):
title: str = Field(..., example="Buy groceries")
description: Optional[str] = Field(None, example="Milk, eggs, bread")
created_by: Optional[UUIDType] = Field(None, example="123e4567-e89b-12d3-a456-426614174000")
class Config:
orm_mode = True
class ToDoUpdateSchema(BaseModel):
todo_id: UUIDType = Field(..., example="123e4567-e89b-12d3-a456-426614174000")
title: Optional[str] = Field(None, example="Buy groceries and fruits")
description: Optional[str] = Field(None, example="Milk, eggs, bread, apples")
is_completed: Optional[bool] = Field(None, example=True)
updated_by: Optional[UUIDType] = Field(None, example="123e4567-e89b-12d3-a456-426614174000")
class Config:
orm_mode = True
class ToDoResponseSchema(BaseModel):
id: UUIDType = Field(..., example="123e4567-e89b-12d3-a456-426614174000")
title: str = Field(..., example="Buy groceries")
description: Optional[str] = Field(None, example="Milk, eggs, bread")
is_completed: bool = Field(..., example=False)
created_at: Optional[datetime] = Field(None, example="2025-05-23T10:00:00Z")
updated_at: Optional[datetime] = Field(None, example="2025-05-23T11:00:00Z")
created_by: Optional[UUIDType] = Field(None, example="123e4567-e89b-12d3-a456-426614174000")
updated_by: Optional[UUIDType] = Field(None, example="123e4567-e89b-12d3-a456-426614174000")
class Config:
orm_mode = True
import uuid
from sqlalchemy import Column, String, DateTime
from sqlalchemy.dialects.postgresql import UUID
from app.database import Database
from datetime import datetime, timezone
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from uuid import UUID as UUID_TYPE
# SQLAlchemy User Model
class User(Database.Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
username = Column(String, unique=True, nullable=False)
email = Column(String, unique=True, nullable=False)
full_name = Column(String, nullable=True)
hashed_password = Column(String, nullable=False)
created_at = Column(DateTime, default=datetime.now(timezone.utc))
updated_at = Column(DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
created_by = Column(String, nullable=True)
updated_by = Column(String, nullable=True)
class UserCreateSchema(BaseModel):
username: str = Field(..., example="john_doe")
email: EmailStr = Field(..., example="john@example.com")
full_name: Optional[str] = Field(..., example="John Doe")
password: str = Field(..., example="securepassword")
created_by: Optional[str] = Field(..., example="admin")
class Config:
orm_mode = True
class UserUpdateSchema(BaseModel):
user_id: UUID_TYPE = Field(..., example="8c5d62dc-7b87-4e5b-9d8f-8d54757c8a2e")
email: Optional[EmailStr] = Field(None, example="john_updated@example.com")
full_name: Optional[str] = Field(None, example="John Doe Updated")
password: Optional[str] = Field(None, example="newsecurepassword")
updated_by: Optional[str] = Field(..., example="admin")
class Config:
orm_mode = True
class UserResponseSchema(BaseModel):
id: UUID_TYPE = Field(..., example="8c5d62dc-7b87-4e5b-9d8f-8d54757c8a2e")
username: str = Field(..., example="john_doe")
email: EmailStr = Field(..., example="john@example.com")
full_name: Optional[str] = Field(None, example="John Doe")
created_at: Optional[datetime] = Field(None, example="2021-08-01T00:00:00Z")
updated_at: Optional[datetime] = Field(None, example="2021-08-02T00:00:00Z")
created_by: Optional[str] = Field(None, example="admin")
updated_by: Optional[str] = Field(None, example="admin")
class Config:
orm_mode = True
# This file is intentionally left blank.
# It is used to mark the `routes` directory as a Python package.
\ No newline at end of file
from fastapi import APIRouter, HTTPException, Depends, status , Request
from sqlalchemy.orm import Session
from typing import List
from app.database import get_db
from app.models.simple_model import (
SimpleModelCreateSchema,
SimpleModelResponseSchema,
SimpleModelUpdateSchema,
)
from app.services.simple_model_service import SimpleModelService
from app.global_constants import GlobalConstants
from app.auth import validate_and_authorize , AuthRoutes
class SimpleModelRoutes:
router = APIRouter()
"""
Creates a new SimpleModel entry.
"""
@staticmethod
@router.post("/simple-models", response_model=SimpleModelResponseSchema)
def create_simple_model(simple_model: SimpleModelCreateSchema, db: Session = Depends(get_db)):
try:
service = SimpleModelService(db)
return service.create_simple_model(simple_model)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} create_simple_model route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
"""
Retrieves all SimpleModel entries.
"""
@staticmethod
@router.get("/simple-models", response_model=List[SimpleModelResponseSchema])
@validate_and_authorize()
def get_all_simple_models(request : Request, db: Session = Depends(get_db),token: dict = Depends(AuthRoutes.oauth2_scheme)):
try:
service = SimpleModelService(db)
user_info = request.state.user_info
print("User Info: ", user_info)
return service.get_all_simple_models()
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_all_simple_models route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
"""
Retrieves a specific SimpleModel by ID.
"""
@staticmethod
@router.get("/simple-model/{simple_model_id}", response_model=SimpleModelResponseSchema)
def get_simple_model_by_id(simple_model_id: int, db: Session = Depends(get_db)):
try:
service = SimpleModelService(db)
return service.get_simple_model_by_id(simple_model_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_simple_model_by_id route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
"""
Updates a SimpleModel entry by ID.
"""
@staticmethod
@router.patch("/simple-model", response_model=SimpleModelResponseSchema)
def update_simple_model(payload: SimpleModelUpdateSchema, db: Session = Depends(get_db)):
try:
service = SimpleModelService(db)
return service.update_simple_model(payload)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} update_simple_model route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
"""
Deletes a SimpleModel entry by ID.
"""
@staticmethod
@router.delete("/simple-model/{simple_model_id}")
def delete_simple_model(simple_model_id: int, db: Session = Depends(get_db)):
try:
service = SimpleModelService(db)
return service.delete_simple_model(simple_model_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} delete_simple_model route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
from fastapi import APIRouter, HTTPException, Depends, status, Request
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.database import get_db
from app.models.todo_model import (
ToDoCreateSchema,
ToDoResponseSchema,
ToDoUpdateSchema,
)
from app.services.todo_service import ToDoService
from app.global_constants import GlobalConstants
class ToDoRoutes:
router = APIRouter()
"""
Creates a new ToDo entry.
"""
@staticmethod
@router.post("/todos", response_model=ToDoResponseSchema)
def create_todo(todo: ToDoCreateSchema, db: Session = Depends(get_db)):
try:
service = ToDoService(db)
return service.create_todo(todo)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} create_todo route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
"""
Retrieves all ToDo entries.
"""
@staticmethod
@router.get("/todos", response_model=List[ToDoResponseSchema])
def get_all_todos(
request: Request,
db: Session = Depends(get_db)
):
try:
service = ToDoService(db)
return service.get_all_todos()
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_all_todos route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
"""
Retrieves a specific ToDo by ID.
"""
@staticmethod
@router.get("/todo/{todo_id}", response_model=ToDoResponseSchema)
def get_todo_by_id(todo_id: UUID, db: Session = Depends(get_db)):
try:
service = ToDoService(db)
return service.get_todo_by_id(todo_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_todo_by_id route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
"""
Updates a ToDo entry by ID.
"""
@staticmethod
@router.patch("/todo", response_model=ToDoResponseSchema)
def update_todo(payload: ToDoUpdateSchema, db: Session = Depends(get_db)):
try:
service = ToDoService(db)
return service.update_todo(payload)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} update_todo route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
"""
Deletes a ToDo entry by ID.
"""
@staticmethod
@router.delete("/todo/{todo_id}")
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
try:
service = ToDoService(db)
return service.delete_todo(todo_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} delete_todo route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
"""
Retrieves all ToDo entries created by a specific user.
"""
@staticmethod
@router.get("/todos/created_by/{created_by_id}", response_model=List[ToDoResponseSchema])
def get_todos_by_created_by_id(created_by_id: UUID, db: Session = Depends(get_db)):
try:
service = ToDoService(db)
return service.get_todos_by_created_by_id(created_by_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_todos_by_created_by_id route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
\ No newline at end of file
from fastapi import APIRouter, HTTPException, Depends, status, Request
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.database import get_db
from app.models.user_model import (
User,
UserCreateSchema,
UserResponseSchema,
UserUpdateSchema,
)
from app.services.user_service import UserService
from app.global_constants import GlobalConstants
from app.auth import validate_and_authorize, AuthRoutes
from app.Oauth import get_current_user
class UserRoutes:
router = APIRouter()
@staticmethod
@router.post("/users", response_model=UserResponseSchema)
def create_user(user: UserCreateSchema, db: Session = Depends(get_db)):
try:
service = UserService(db)
return service.create_user(user)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} create_user route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@staticmethod
@router.get("/users", response_model=List[UserResponseSchema])
def get_all_users(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
try:
service = UserService(db)
return service.get_all_users()
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_all_users route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@staticmethod
@router.get("/user/{user_id}", response_model=UserResponseSchema)
def get_user_by_id(user_id: UUID, db: Session = Depends(get_db)):
try:
service = UserService(db)
return service.get_user_by_id(user_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_user_by_id route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@staticmethod
@router.patch("/user", response_model=UserResponseSchema)
def update_user(payload: UserUpdateSchema, db: Session = Depends(get_db)):
try:
service = UserService(db)
return service.update_user(payload)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} update_user route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@staticmethod
@router.delete("/user/{user_id}")
def delete_user(user_id: UUID, db: Session = Depends(get_db)):
try:
service = UserService(db)
return service.delete_user(user_id)
except Exception as e:
print(f"{GlobalConstants.api_response_messages.error_in} delete_user route: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.internal_server_error,
result=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# This is the initialization file for the `services` package.
# Add any package-level imports or initialization code here.
\ No newline at end of file
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from app.models.simple_model import (
SimpleModel,
SimpleModelCreateSchema,
SimpleModelUpdateSchema,
SimpleModelResponseSchema,
)
from fastapi import status
from datetime import datetime, timezone
from app.global_constants import GlobalConstants
from sqlalchemy.future import select
class SimpleModelService:
def __init__(self, db_session: Session):
self.db_session = db_session
def create_simple_model(self, simple_model_data: SimpleModelCreateSchema):
try:
new_simple_model = SimpleModel(
name=simple_model_data.name,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
created_by=simple_model_data.created_by,
updated_by=simple_model_data.created_by,
)
self.db_session.add(new_simple_model)
self.db_session.commit()
self.db_session.refresh(new_simple_model)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.accepted,
result=[SimpleModelResponseSchema(
id=new_simple_model.id,
name=new_simple_model.name,
created_at=new_simple_model.created_at.isoformat() if new_simple_model.created_at else None,
updated_at=new_simple_model.updated_at.isoformat() if new_simple_model.updated_at else None,
created_by=new_simple_model.created_by,
updated_by=new_simple_model.updated_by,
)],
status_code=status.HTTP_201_CREATED
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} create_simple_model service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_all_simple_models(self):
try:
result = self.db_session.execute(select(SimpleModel))
simple_models = result.scalars().all()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[SimpleModelResponseSchema(
id=model.id,
name=model.name,
created_at=model.created_at.isoformat() if model.created_at else None,
updated_at=model.updated_at.isoformat() if model.updated_at else None,
created_by=model.created_by,
updated_by=model.updated_by,
) for model in simple_models],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_all_simple_models service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_simple_model_by_id(self, model_id: int):
try:
result = self.db_session.execute(
select(SimpleModel).where(SimpleModel.id == model_id)
)
model = result.scalars().first()
if not model:
return GlobalConstants.return_api_response(
message=f"SimpleModel with ID {model_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[SimpleModelResponseSchema(
id=model.id,
name=model.name,
created_at=model.created_at.isoformat() if model.created_at else None,
updated_at=model.updated_at.isoformat() if model.updated_at else None,
created_by=model.created_by,
updated_by=model.updated_by,
)],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_simple_model_by_id service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def update_simple_model(self, simple_model_update: SimpleModelUpdateSchema):
try:
result = self.db_session.execute(
select(SimpleModel).where(SimpleModel.id == simple_model_update.simple_model_id)
)
model = result.scalars().first()
if not model:
return GlobalConstants.return_api_response(
message=f"SimpleModel with ID {simple_model_update.simple_model_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
if simple_model_update.name is not None:
model.name = simple_model_update.name
if simple_model_update.updated_by is not None:
model.updated_by = simple_model_update.updated_by
model.updated_at = datetime.now(timezone.utc)
self.db_session.commit()
self.db_session.refresh(model)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[SimpleModelResponseSchema(
id=model.id,
name=model.name,
created_at=model.created_at.isoformat() if model.created_at else None,
updated_at=model.updated_at.isoformat() if model.updated_at else None,
created_by=model.created_by,
updated_by=model.updated_by,
)],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} update_simple_model service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def delete_simple_model(self, model_id: int):
try:
result = self.db_session.execute(
select(SimpleModel).where(SimpleModel.id == model_id)
)
model = result.scalars().first()
if not model:
return GlobalConstants.return_api_response(
message=f"SimpleModel with ID {model_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
self.db_session.delete(model)
self.db_session.commit()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=None,
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} delete_simple_model service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
\ No newline at end of file
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.future import select
from fastapi import status
from datetime import datetime, timezone
from app.global_constants import GlobalConstants
from app.models.user_model import User
from app.models.todo_model import (
ToDo,
ToDoCreateSchema,
ToDoUpdateSchema,
ToDoResponseSchema,
)
class ToDoService:
def __init__(self, db_session: Session):
self.db_session = db_session
def create_todo(self, todo_data: ToDoCreateSchema):
try:
user_exists = self.db_session.execute(
select(User).where(User.id == todo_data.created_by)
).scalars().first()
if not user_exists:
return GlobalConstants.return_api_response(
message=f"User with ID {todo_data.created_by} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
new_todo = ToDo(
title=todo_data.title,
description=todo_data.description,
is_completed=False,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
created_by=todo_data.created_by,
updated_by=todo_data.created_by,
)
self.db_session.add(new_todo)
self.db_session.commit()
self.db_session.refresh(new_todo)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.accepted,
result=[ToDoResponseSchema(
id=new_todo.id,
title=new_todo.title,
description=new_todo.description,
is_completed=new_todo.is_completed,
created_at=new_todo.created_at.isoformat() if new_todo.created_at else None,
updated_at=new_todo.updated_at.isoformat() if new_todo.updated_at else None,
created_by=new_todo.created_by,
updated_by=new_todo.updated_by,
)],
status_code=status.HTTP_201_CREATED
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} create_todo service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_all_todos(self):
try:
result = self.db_session.execute(select(ToDo))
todos = result.scalars().all()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[ToDoResponseSchema(
id=todo.id,
title=todo.title,
description=todo.description,
is_completed=todo.is_completed,
created_at=todo.created_at.isoformat() if todo.created_at else None,
updated_at=todo.updated_at.isoformat() if todo.updated_at else None,
created_by=todo.created_by,
updated_by=todo.updated_by,
) for todo in todos],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_all_todos service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_todo_by_id(self, todo_id: str):
try:
result = self.db_session.execute(
select(ToDo).where(ToDo.id == todo_id)
)
todo = result.scalars().first()
if not todo:
return GlobalConstants.return_api_response(
message=f"ToDo with ID {todo_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[ToDoResponseSchema(
id=todo.id,
title=todo.title,
description=todo.description,
is_completed=todo.is_completed,
created_at=todo.created_at.isoformat() if todo.created_at else None,
updated_at=todo.updated_at.isoformat() if todo.updated_at else None,
created_by=todo.created_by,
updated_by=todo.updated_by,
)],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_todo_by_id service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def update_todo(self, todo_update: ToDoUpdateSchema):
try:
result = self.db_session.execute(
select(ToDo).where(ToDo.id == todo_update.todo_id)
)
todo = result.scalars().first()
if not todo:
return GlobalConstants.return_api_response(
message=f"ToDo with ID {todo_update.todo_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
if todo_update.title is not None:
todo.title = todo_update.title
if todo_update.description is not None:
todo.description = todo_update.description
if todo_update.is_completed is not None:
todo.is_completed = todo_update.is_completed
if todo_update.updated_by is not None:
todo.updated_by = todo_update.updated_by
todo.updated_at = datetime.now(timezone.utc)
self.db_session.commit()
self.db_session.refresh(todo)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[ToDoResponseSchema(
id=todo.id,
title=todo.title,
description=todo.description,
is_completed=todo.is_completed,
created_at=todo.created_at.isoformat() if todo.created_at else None,
updated_at=todo.updated_at.isoformat() if todo.updated_at else None,
created_by=todo.created_by,
updated_by=todo.updated_by,
)],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} update_todo service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def delete_todo(self, todo_id: str):
try:
result = self.db_session.execute(
select(ToDo).where(ToDo.id == todo_id)
)
todo = result.scalars().first()
if not todo:
return GlobalConstants.return_api_response(
message=f"ToDo with ID {todo_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
self.db_session.delete(todo)
self.db_session.commit()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=None,
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} delete_todo service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_todos_by_created_by_id(self, created_by_id: str):
try:
user_exists = self.db_session.execute(
select(User).where(User.id == created_by_id)
).scalars().first()
if not user_exists:
return GlobalConstants.return_api_response(
message=f"User with ID {created_by_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
result = self.db_session.execute(
select(ToDo).where(ToDo.created_by == created_by_id)
)
todos = result.scalars().all()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[ToDoResponseSchema(
id=todo.id,
title=todo.title,
description=todo.description,
is_completed=todo.is_completed,
created_at=todo.created_at.isoformat() if todo.created_at else None,
updated_at=todo.updated_at.isoformat() if todo.updated_at else None,
created_by=todo.created_by,
updated_by=todo.updated_by,
) for todo in todos],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_todos_by_created_by_id service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
\ No newline at end of file
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from app.models.user_model import (
User,
UserCreateSchema,
UserUpdateSchema,
UserResponseSchema,
)
from fastapi import status
from datetime import datetime, timezone
from app.global_constants import GlobalConstants
from sqlalchemy.future import select
from uuid import UUID
import hashlib
from app.Oauth import hash_password
class UserService:
def __init__(self, db_session: Session):
self.db_session = db_session
def create_user(self, user_data: UserCreateSchema):
try:
# Check if email already exists
existing_user = self.db_session.execute(
select(User).where(User.email == user_data.email)
).scalars().first()
if existing_user:
return GlobalConstants.return_api_response(
message="Email already exists.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
# Use bcrypt to hash the password
hashed_password = hash_password(user_data.password)
new_user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hashed_password,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
created_by=user_data.created_by,
updated_by=user_data.created_by,
)
self.db_session.add(new_user)
self.db_session.commit()
self.db_session.refresh(new_user)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.accepted,
result=[UserResponseSchema(
id=new_user.id,
username=new_user.username,
email=new_user.email,
full_name=new_user.full_name,
created_at=new_user.created_at.isoformat() if new_user.created_at else None,
updated_at=new_user.updated_at.isoformat() if new_user.updated_at else None,
created_by=new_user.created_by,
updated_by=new_user.updated_by,
)],
status_code=status.HTTP_201_CREATED
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} create_user service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_all_users(self):
try:
result = self.db_session.execute(select(User))
users = result.scalars().all()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[UserResponseSchema(
id=user.id,
username=user.username,
email=user.email,
full_name=user.full_name,
created_at=user.created_at.isoformat() if user.created_at else None,
updated_at=user.updated_at.isoformat() if user.updated_at else None,
created_by=user.created_by,
updated_by=user.updated_by,
) for user in users],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_all_users service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def get_user_by_id(self, user_id: UUID):
try:
result = self.db_session.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
if not user:
return GlobalConstants.return_api_response(
message=f"User with ID {user_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[UserResponseSchema(
id=user.id,
username=user.username,
email=user.email,
full_name=user.full_name,
created_at=user.created_at.isoformat() if user.created_at else None,
updated_at=user.updated_at.isoformat() if user.updated_at else None,
created_by=user.created_by,
updated_by=user.updated_by,
)],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
print(f"{GlobalConstants.api_response_messages.error_in} get_user_by_id service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def update_user(self, user_update: UserUpdateSchema):
try:
result = self.db_session.execute(select(User).where(User.id == user_update.user_id))
user = result.scalars().first()
if not user:
return GlobalConstants.return_api_response(
message=f"User with ID {user_update.user_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
if user_update.email is not None:
user.email = user_update.email
if user_update.full_name is not None:
user.full_name = user_update.full_name
if user_update.password is not None:
user.hashed_password = hash_password(user_update.password)
if user_update.updated_by is not None:
user.updated_by = user_update.updated_by
user.updated_at = datetime.now(timezone.utc)
self.db_session.commit()
self.db_session.refresh(user)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=[UserResponseSchema(
id=user.id,
username=user.username,
email=user.email,
full_name=user.full_name,
created_at=user.created_at.isoformat() if user.created_at else None,
updated_at=user.updated_at.isoformat() if user.updated_at else None,
created_by=user.created_by,
updated_by=user.updated_by,
)],
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} update_user service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def delete_user(self, user_id: UUID):
try:
result = self.db_session.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
if not user:
return GlobalConstants.return_api_response(
message=f"User with ID {user_id} not found.",
result=[],
status_code=status.HTTP_400_BAD_REQUEST
)
self.db_session.delete(user)
self.db_session.commit()
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.success,
result=None,
status_code=status.HTTP_200_OK
)
except SQLAlchemyError as e:
self.db_session.rollback()
print(f"{GlobalConstants.api_response_messages.error_in} delete_user service: ", e)
return GlobalConstants.return_api_response(
message=GlobalConstants.api_response_messages.db_error,
result=None,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class OAuthException(Exception):
"""
Custom exception class to handle OAuth-related errors.
"""
def __init__(self,detail=None, status_code=None):
"""
Initialize the OAuthException with details.
:param error_description: Optional detailed description of the error.
:param status_code: Optional HTTP status code associated with the error.
"""
self.detail = detail
self.status_code = status_code
super().__init__(self.__str__())
from app import create_app
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
app = create_app()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
\ No newline at end of file
fastapi==0.115.4
uvicorn==0.32.0
sqlalchemy==2.0.31
databases==0.9.0
asyncpg==0.30.0
pydantic==2.9.2
psycopg2-binary==2.9.6
pydantic[email]==2.9.2
python-dotenv==1.0.0
python-jose==3.3.0
pytest==8.3.5
httpx==0.27.2
requests==2.32.3
cryptography==43.0.1
pandas==2.2.2
jwt
python-jose[cryptography]
passlib[bcrypt]
\ No newline at end of file
sonar.projectKey=fastapi_application
sonar.projectName=FastAPI Application
sonar.projectVersion=1.0
sonar.sources=app
sonar.tests=tests
sonar.python.coverage.reportPaths=coverage.xml
sonar.python.xunit.reportPaths=xunit.xml
sonar.host.url=http://localhost:9000
sonar.login=admin
sonar.password=admin
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment