-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·146 lines (112 loc) · 3.63 KB
/
Copy pathmain.py
File metadata and controls
executable file
·146 lines (112 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import threading
import time
from datetime import datetime
import uvicorn
from sensors.BH1750 import BH1750
from sensors.SE101020635 import SE101020635
from sensors.MQ135 import MQ135
from sensors.DHT22 import DHT22
from sensors.CT0016MS import CT10016MS
from utils.api import LaravelAPIClient
from utils.localApi import app
# light sensor
def read_light():
light_sensor = BH1750()
lux = light_sensor.read_lux()
print("Lux: ", lux)
return round(lux, 2)
# water level
def read_water_level():
sensor = SE101020635()
low, high = sensor.read_sections()
level = sensor.compute_level(low, high, threshold=100)
print("Water Level: ", level, "%")
return round(level if level is not None else 0, 1)
# temperature / humidity
def read_temperature_humidity():
sensor = DHT22()
if sensor.temperature is None or sensor.humidity is None:
print("Failed to read from DHT22 sensor.")
return None, None
print("Temperature:", sensor.temperature, "°C")
print("Humidity:", sensor.humidity, "%")
return sensor.temperature, sensor.humidity
# ph
def voltage_to_ph(voltage, offset=0.0):
ph = 7.0 + ((2.5 - voltage) / 0.18) + offset
print("PH: ", ph)
return round(ph, 2)
# air quality
def air_quality():
sensor = MQ135()
try:
data = sensor.read_all()
print(f"Voltage: {data['voltage']:.3f} V \nRs: {data['rs']:.1f} Ω \nppm: {data['ppm']:.2f}")
time.sleep(1)
except KeyboardInterrupt:
print("\nMeasurement ended.")
finally:
sensor.close()
# automatic watering
def pump():
controller = CT10016MS()
try:
while True:
if not controller.run_check():
time.sleep(LaravelAPIClient(env_path=".env").getEnvValue("UPDATE_INTERVALL"))
else:
time.sleep(15)
except KeyboardInterrupt:
print("\n Stops manually.")
finally:
controller.shutdown()
# read from api
def api_get():
try:
client = LaravelAPIClient(env_path=".env")
response = client.get("plants")
if response and isinstance(response, list) and len(response) > 0:
print(response)
except Exception as e:
print(f"Error while reaching the API: {e}")
# send to api
def api_post_sensor_data():
try:
# get date from sensors
temperature, humidity = read_temperature_humidity()
light = read_light()
water = read_water_level()
ph = voltage_to_ph(voltage=3)
# API-Client initialising
client = LaravelAPIClient(env_path=".env")
# Data Object
sensor_data = {
"plant_id" : client.getEnvValue("PLANT_ID"),
"humidity" : humidity,
"temperature" : temperature,
"light" : light,
"water" : water,
"ph" : ph,
"air_quality" : air_quality()
}
# send data
response = client.post("measures", sensor_data)
print("Send Data...: ", response)
except Exception as e:
print("Error while sending data: ", e)
# start main
def run_server():
uvicorn.run(app, host="0.0.0.0", port=8000)
def data_loop():
while True:
print("Start upload:", datetime.now().isoformat())
api_post_sensor_data()
api_get()
intervall = int(LaravelAPIClient(env_path=".env").getEnvValue("UPDATE_INTERVALL"))
print(f"Wait {int(intervall / 60)} Minutes before sending data again...")
time.sleep(intervall)
if __name__ == '__main__':
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
# Main thread handles the data loop
data_loop()