-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path10-producer.py
More file actions
80 lines (69 loc) · 3.36 KB
/
10-producer.py
File metadata and controls
80 lines (69 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Databricks notebook source
# MAGIC %ryn ./01-config
# COMMAND ----------
class Producer():
def __init__(self):
self.Conf = Config()
self.landing_zone = self.Conf.base_dir_data + "/raw"
self.test_data_dir = self.Conf.base_dir_data + "/test_data"
def user_registration(self, set_num):
source = f"{self.test_data_dir}/1-registered_users_{set_num}.csv"
target = f"{self.landing_zone}/registered_users_bz/1-registered_users_{set_num}.csv"
print(f"Producing {source}")
dbutils.fs.cp(source, target)
print("Done...")
def profile_cdc(self, set_num):
source = f"{self.test_data_dir}/2-user_info_{set_num}.json"
target = f"{self.landing_zone}/kafka_multiplex_bz/2-user_info_{set_num}.json"
print(f"producing {source}.....")
dbutils.fs.cp(source, target)
print("Done.....")
def workout(self, set_num):
source = f"{self.test_data_dir}/4-workout_{set_num}.json"
target = f"{self.landing_zone}/kafka_multiplex_bz/4-workout_{set_num}.json"
print(f"producing {source}.....")
dbutils.fs.cp(source, target)
print("Done.....")
def bpm(self, set_num):
source = f"{self.test_data_dir}/3-bpm_{set_num}.json"
target = f"{self.landing_zone}/kafka_multiplex_bz/3-bpm_{set_num}.json"
print(f"producing {source}.....")
dbutils.fs.cp(source, target)
print("Done.....")
def gym_logins(self, set_num):
source = f"{self.test_data_dir}/5-gym_logins_{set_num}.csv"
target = f"{self.landing_zone}/gym_logins_bz/5-gym_logins_{set_num}.csv"
print(f"producing {source}.....")
dbutils.fs.cp(source, target)
print("Done.....")
def produce(self, set_num):
import time
start = int(time.time())
print(f"Producing test data set {set_num}")
if set_num <= 2:
self.user_registration(set_num)
self.profile_cdc(set_num)
self.workout(set_num)
self.gym_logins(set_num)
if set_num <= 10:
self.bpm(set_num)
print(f"Test data set {set_num} produced in {int(time.time()) - start} seconds")
def _validate_count(self, format, location, expected_count):
print(f"Validating {location}....")
target = f"{self.landing_zone}/{location}_*.{format}"
actual_count = (spark.read
.format(format)
.option("header", "true")
.load(target).count())
assert actual_count == expected_count, f"Expected {expected_count:,} records, found {actual_count:,} in {location}"
print(f"Found {actual_count:,}/ Expected{expected_count:,} records: Success")
def validate(self, sets):
import time
start = int(time.time())
print(f"validating test data {sets} sets.....")
self._validate_count("csv", "registered_users_bz/1-registered_users", 5 if sets == 1 else 10)
self._validate_count("json", "kafka_multiplex_bz/2-user_info", 7 if sets == 1 else 13)
self._validate_count("json", "kafka_multiplex_bz/3-bpm", sets * 253801)
self._validate_count("json", "kafka_multiplex_bz/4-workout", 16 if sets == 1 else 32)
self._validate_count("csv", "gym_logins_bz/5-gym_logins", 8 if sets == 1 else 16)
print(f"Completed in this time {int(time.time()) - start} seconds")