-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapp.py
More file actions
253 lines (203 loc) · 8.89 KB
/
app.py
File metadata and controls
253 lines (203 loc) · 8.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
ADME-Fabric Control Plane
Streamlit application entry point
"""
import streamlit as st
from pathlib import Path
import sys
import os
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from config.logging_config import configure_logging
from database.db import init_database
from components.service_health import render_service_health_compact
# Import settings dynamically to reload from .env
def get_settings():
"""Get fresh settings from .env (reloads on each call)"""
from config.settings import Settings
return Settings()
settings = get_settings()
configure_logging(settings.LOG_LEVEL)
# Page configuration
st.set_page_config(
page_title=settings.APP_NAME,
page_icon="🔄",
layout="wide",
initial_sidebar_state="expanded",
menu_items={
"Get Help": "https://github.com/mhadaily/adme-fabric-control-plane",
"Report a bug": "https://github.com/mhadaily/adme-fabric-control-plane/issues",
"About": f"# {settings.APP_NAME}\nCreate, publish, and consume ADME data products via Microsoft Fabric"
}
)
# Initialize database on first run
@st.cache_resource
def setup_database():
"""Initialize database (runs once per session)"""
init_database()
return True
@st.cache_resource
def setup_scheduler():
"""Initialize background scheduler (runs once per session)"""
from services.scheduler_runner import start_scheduler
start_scheduler()
return True
setup_database()
setup_scheduler()
def main():
"""Main application entry point"""
# Sidebar navigation
with st.sidebar:
st.image("https://img.icons8.com/fluency/96/data-configuration.png", width=80)
st.title(settings.APP_NAME)
st.divider()
# Navigation info
st.caption("Navigate using the pages in the sidebar above ☝️")
st.divider()
# Quick stats
try:
from services.connection_service import connection_service
from services.job_service import job_service
envs = connection_service.list_environments()
jobs = job_service.list_jobs(limit=100)
col1, col2 = st.columns(2)
with col1:
st.metric("Environments", len(envs))
with col2:
st.metric("Data Products", len(jobs))
except Exception:
col1, col2 = st.columns(2)
with col1:
st.metric("Environments", "0")
with col2:
st.metric("Data Products", "0")
# Scheduled jobs quick view
try:
from services.scheduler_service import scheduler_service
active_schedules = scheduler_service.list_scheduled_jobs(status="active")
if active_schedules:
st.divider()
st.caption(f"⏰ **{len(active_schedules)}** active schedule(s)")
next_job = min(active_schedules, key=lambda x: x.next_run_at if x.next_run_at else float('inf'))
if next_job and next_job.next_run_at:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
next_run = next_job.next_run_at
if next_run.tzinfo is None:
next_run = next_run.replace(tzinfo=timezone.utc)
delta = next_run - now
if delta.total_seconds() > 0:
if delta.total_seconds() < 3600:
st.caption(f"Next: {int(delta.total_seconds() / 60)}m")
else:
st.caption(f"Next: {int(delta.total_seconds() / 3600)}h")
except Exception:
pass # Silent fail if scheduler not ready
st.divider()
# App info
current_settings = get_settings() # Reload from .env
st.caption(f"Version: 1.0.0")
st.caption(f"Mode: {'🧪 Mock' if current_settings.MOCK_MODE else '🔴 Live'}")
st.divider()
# Service health status
render_service_health_compact()
# Main content
st.title("🏠 ADME-Fabric Control Plane")
st.markdown("""
### The Control Plane demonstrates **Create, Publish & Consume** of Data Products
Seamlessly move data between **Azure Data Manager for Energy (ADME)** instances
through **Microsoft Fabric Lakehouses**.
""")
# Data Products overview - the three pillars
st.subheader("📦 Data Product Lifecycle")
col1, col2, col3 = st.columns(3)
with col1:
st.success("**📝 Create**\n\nDefine data product definitions by selecting kinds, partitions, and LUCENE queries from your ADME source")
with col2:
st.info("**🚀 Publish**\n\nExport data products to Fabric Lakehouse via notebooks, ready for sharing across organizations")
with col3:
st.warning("**📥 Consume**\n\nAccept shared data products and ingest them into your target ADME instance")
st.divider()
# Quick actions
st.subheader("🚀 Quick Actions")
col1, col2, col3, col4, col5, col6 = st.columns(6)
with col1:
if st.button("🔗 Connections", use_container_width=True):
st.switch_page("pages/1_🔗_Connections.py")
with col2:
if st.button("📝 Create Definition", use_container_width=True, type="primary"):
st.switch_page("pages/2_📝_Data_Product_Definitions.py")
with col3:
if st.button("🚀 Publish", use_container_width=True):
st.switch_page("pages/3_🚀_Publish.py")
with col4:
if st.button("⏰ Schedules", use_container_width=True):
st.switch_page("pages/8_⏰_Scheduled_Jobs.py")
with col5:
if st.button("📥 Consume", use_container_width=True):
st.switch_page("pages/4_📥_Consume.py")
with col6:
if st.button("📊 Dashboard", use_container_width=True):
st.switch_page("pages/5_📊_Dashboard.py")
st.divider()
# Getting started guide
with st.expander("📖 Getting Started Guide", expanded=True):
st.markdown("""
### Step 1: Configure Connections
Set up your ADME-Fabric environment pairs in the **Connections** page.
Each environment links an ADME instance with its associated Fabric workspace.
### Step 2: Create a Data Product Definition
Use the **Data Product Definitions** page to define what data to bundle:
- Select source environment and data partition
- Choose record types (kinds) to include
- Apply LUCENE query filters
### Step 3: Publish Your Data Product
Use the **Publish** page to export your data to Fabric:
- Data is exported via ADME Search & Storage APIs
- Fabric notebooks process and store the data
- Results are saved to Delta Lake tables
### Step 4: Consume Data Products
Use the **Consume** page to ingest shared data:
- Create external data shares for your published products
- Accept shares from other organizations
- Pull data into your target ADME instance
### Step 5: Monitor Progress
Track your data products on the **Dashboard**:
- View active publishing jobs
- Monitor logs and API calls
- Check for errors
""")
# Recent activity - show actual data products
st.subheader("📈 Recent Activity")
try:
from services.job_service import job_service
from database.models import JobStatus
# Get recent jobs
recent_jobs = job_service.list_jobs(limit=5)
if recent_jobs:
# Show data products first
st.markdown("**Recent Data Products:**")
for job in recent_jobs[:5]:
status_icons = {
"pending": "⏳",
"running": "🔄",
"completed": "✅",
"failed": "❌",
"partial": "⚠️",
"cancelled": "🚫",
}
icon = status_icons.get(str(job.status).lower(), "❓")
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
st.markdown(f"{icon} **{job.name}** - {job.kind[:30] if job.kind else 'N/A'}...")
with col2:
st.caption(f"{job.records_exported or 0} records")
with col3:
created = job.created_at.strftime("%m/%d %H:%M") if job.created_at else "Unknown"
st.caption(created)
else:
st.info("No recent activity. Start by creating your first data product definition!")
except Exception as e:
st.info("No recent activity. Start by creating your first data product definition!")
if __name__ == "__main__":
main()