-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
319 lines (266 loc) · 11.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import gzip
import os
from simplejson import JSONDecodeError
import simplejson as json
from typing import Dict, List, Literal, Optional
import ijson
from analysis import Analysis, UnterminatedLine, SizedCount
event_types = {
-1: "Unknown",
1: "Load",
2: "FullSnapshot",
3: "IncrementalSnapshot",
4: "Meta",
5: "Custom",
6: "Plugin",
}
incremental_snapshot_event_source = {
0: "Mutation",
1: "MouseMove",
2: "MouseInteraction",
3: "Scroll",
4: "ViewportResize",
5: "Input",
6: "TouchMove",
7: "MediaInteraction",
8: "StyleSheetRule",
9: "CanvasMutation",
10: "Font",
11: "Log",
12: "Drag",
13: "StyleDeclaration",
14: "Selection",
15: "AdoptedStyleSheet",
}
node_types = {
# rrweb source code https://github.com/rrweb-io/rrweb/blob/master/packages/rrdom/src/document.ts#L781C3-L781C14
# their comment for zero says:
# // This isn't a node type. Enum type value starts from zero but NodeType value starts from 1.
# but we see mutations with 0 as note type 🤷️
0: "PLACEHOLDER",
1: "Element",
2: "Attribute",
3: "Text",
4: "CDATA",
5: "EntityReference",
6: "Entity",
7: "ProcessingInstruction",
8: "Comment",
9: "Document",
10: "DocumentType",
11: "DocumentFragment",
}
def analyse_exported_file(file_path: str) -> Analysis:
"""
When operating on an "exported recording" then you have one file which has a snapshots key.
That is an array of JSON objects. each has a window id but is otherwise an rrweb event
"""
analysis = Analysis.empty()
with open(file_path, "r") as file:
for list_of_snapshots in ijson.items(file, "data.snapshots"):
analysis += analyse_snapshots(list_of_snapshots)
return analysis
def analyse_s3_file(file_path: str) -> Analysis:
"""
If operating on an S3 bucket then you can have multiple files.
Each file is JSONL (regardless of if its extension is .json)
Each line is a JSON object.
Each has a window id and an array at the "data" key.
Each item in that array is an rrweb event
"""
analysis = Analysis.empty()
with open(file_path, "r") as file:
line_index = -1
for line in file:
line_index += 1
if not line:
continue
try:
json_line = json.loads(line)
line_analysis = analyse_snapshots(json_line.get("data", []))
analysis += line_analysis
except JSONDecodeError:
analysis.unterminated_lines.append(
UnterminatedLine(file_path, line_index, line[-20:])
)
return analysis
def ensure_all_mutation_types_are_handled(data: Dict) -> None:
handled_mutations = [
"removes",
"adds",
"attributes",
"texts",
"isAttachIframe",
"updates", # mobile
]
ignored_keys = ["source"]
ignore_list = handled_mutations + ignored_keys
mutations_present = data.keys()
unhandled_mutations = [
mutation for mutation in mutations_present if mutation not in ignore_list
]
if unhandled_mutations:
print(f"Unhandled mutations in {unhandled_mutations} in '{data}'")
raise ValueError(f"Unhandled mutations: {unhandled_mutations}")
def maybe_decompress(x: str | dict| None) -> dict|None:
if x is None:
return None
if isinstance(x, str):
# Decode the decompressed bytes to a UTF-8 string
decompressed_str = gzip.decompress(x.encode("latin-1")).decode("utf-8")
# Parse the JSON string
return json.loads(decompressed_str)
return x
# TODO ijson returns any :'(
def analyse_snapshots(list_of_snapshots: any) -> Analysis:
analysis = Analysis.empty()
for snapshot in list_of_snapshots:
if analysis.first_timestamp is None:
analysis.first_timestamp = snapshot["timestamp"]
if analysis.last_timestamp is None:
analysis.last_timestamp = snapshot["timestamp"]
analysis.first_timestamp = min(analysis.first_timestamp, snapshot["timestamp"])
analysis.last_timestamp = max(analysis.last_timestamp, snapshot["timestamp"])
event_type = event_types[snapshot.get("type", -1)]
if event_type == "Plugin":
plugin_name = snapshot["data"]["plugin"]
if plugin_name not in analysis.plugin_counts:
analysis.plugin_counts[plugin_name] = SizedCount(0, 0)
analysis.plugin_counts[plugin_name] += len(
json.dumps(snapshot["data"], separators=(",", ":"))
)
if plugin_name == "rrweb/console@1":
level = snapshot["data"]["payload"]["level"]
console_log_line = snapshot["data"]["payload"]["payload"][0]
fingerprint = level + "---" + console_log_line
if fingerprint not in analysis.console_log_counts:
analysis.console_log_counts[fingerprint] = SizedCount(0, 0)
analysis.console_log_counts[fingerprint] += len(
json.dumps(fingerprint, separators=(",", ":"))
)
if event_type == "FullSnapshot":
analysis.full_snapshot_timestamps.append(snapshot["timestamp"])
if event_type not in analysis.message_type_counts:
analysis.message_type_counts[event_type] = 0
analysis.message_type_counts[event_type] += 1
if event_type == "IncrementalSnapshot":
data_source_ = snapshot["data"].get("source", None)
if data_source_ is None:
print(f"WoAH unexpected data shape")
continue
try:
source_ = incremental_snapshot_event_source[data_source_]
except KeyError:
print(f"Unknown source {data_source_}")
continue
if source_ not in analysis.incremental_snapshot_event_source_counts:
analysis.incremental_snapshot_event_source_counts[source_] = SizedCount(
0, 0
)
analysis.incremental_snapshot_event_source_counts[source_] += len(
json.dumps(snapshot["data"], separators=(",", ":"))
)
if source_ == "Mutation":
# mutations we handle
ensure_all_mutation_types_are_handled(snapshot["data"])
if snapshot["data"].get("isAttachIframe", False):
# these have adds, removes, texts, and attributes like other mutations
# let's mostly ignore them right now
# TODO handle them
analysis.isAttachIFrameCount += 1
for removal in maybe_decompress(snapshot["data"].get("removes", [])):
analysis.mutation_removal_count += len(
json.dumps(removal, separators=(",", ":"))
)
for addition in maybe_decompress(snapshot["data"].get("adds", [])):
if "node" in addition:
node_type = node_types[addition["node"]["type"]]
if node_type not in analysis.mutation_addition_counts:
analysis.mutation_addition_counts[node_type] = SizedCount(
0, 0
)
addition_size = len(json.dumps(addition, separators=(",", ":")))
analysis.mutation_addition_counts[node_type] += addition_size
analysis.addition_sizes.append(addition_size)
# adds changes by value, so we can find particular additions that are adding to size
keyable_value = addition["node"].get(
"textContent", json.dumps(addition["node"])
)[:300]
if keyable_value not in analysis.mutation_addition_by_value:
analysis.mutation_addition_by_value[
keyable_value
] = SizedCount(0, 0)
analysis.mutation_addition_by_value[
keyable_value
] += addition_size
else:
# print("ooh a mobile recording")
# print(json.dumps(addition, separators=(",", ":")))
pass
## attributes individually
for altered_attribute in maybe_decompress(snapshot["data"].get("attributes", [])):
if "attributes" not in altered_attribute:
# print("ooh a mobile recording")
# print(json.dumps(altered_attribute, separators=(",", ":")))
pass
else:
# this is an array of dicts. each should have `attributes`
# and that is a dict whose key is the attibute
changeds = altered_attribute["attributes"].keys()
for changed in changeds:
if (
changed
not in analysis.individual_mutation_attributes_counts
):
analysis.individual_mutation_attributes_counts[
changed
] = SizedCount(0, 0)
analysis.individual_mutation_attributes_counts[
changed
] += len(
json.dumps(altered_attribute["attributes"][changed])
)
# attributes grouped
for mutated_attribute in maybe_decompress(snapshot["data"].get("attributes", [])):
# attribute mutations come together in a dict
# tracking them individually gives confusing counts
attribute_fingerprint = "---".join(
mutated_attribute["attributes"].keys()
)
if (
attribute_fingerprint
not in analysis.grouped_mutation_attributes_counts
):
analysis.grouped_mutation_attributes_counts[
attribute_fingerprint
] = SizedCount(0, 0)
analysis.grouped_mutation_attributes_counts[
attribute_fingerprint
] += len(
json.dumps(
snapshot["data"]["attributes"], separators=(",", ":")
)
)
for text in snapshot["data"].get("texts", []):
analysis.text_mutation_count += len(text)
return analysis
def analyse_recording(file_path: str, source: Literal["s3", "export"]) -> None:
analysis = Analysis.empty()
if source == "export":
analysis = analyse_exported_file(file_path)
elif source == "s3":
# open each file in the provided directoryatch
sorted_files = sorted(os.listdir(file_path))
for file_name in sorted_files:
print(f"processing file: {file_name}")
analysis += analyse_s3_file(os.path.join(file_path, file_name))
else:
raise ValueError(f"Unknown source {source}")
print(analysis)
if __name__ == "__main__":
# TODO get the file path from the command line
analyse_recording(
"/Users/paul/Downloads/large-sessions/export-019498a4-1d21-7d1b-9b6f-0b704784ee0f.ph-recording.json",
"export",
)
# analyse_recording("/Users/paul/Downloads/large-sessions/boom", "s3")