Skip to content

Commit 6407789

Browse files
committed
feat: Add live streaming example
1 parent f0ed9ab commit 6407789

File tree

1 file changed

+28
-5
lines changed

1 file changed

+28
-5
lines changed

notebooks/realtime-stream.ipynb

+28-5
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@
5757
" .interval(period=timedelta(seconds=auto_refresh)) \\\n",
5858
" .pipe(ops.map(lambda start: f'from(bucket: \"my-bucket\") '\n",
5959
" f'|> range(start: -{auto_refresh}s, stop: now()) '\n",
60-
" f'|> filter(fn: (r) => r._measurement == \"cpu\") '\n",
61-
" f'|> filter(fn: (r) => r._field == \"usage_user\") '\n",
62-
" f'|> filter(fn: (r) => r.cpu == \"cpu-total\")')) \\\n",
60+
" f'|> filter(fn: (r) => (r._measurement == \"cpu\") or (r._measurement == \"mem\")) ')) \\\n",
6361
" .pipe(ops.map(lambda query: client.query_api().query_stream(query))) \\\n",
6462
" .pipe(ops.flat_map(lambda records: rx.from_iterable(records))) \\\n",
6563
" .subscribe(observer=lambda record: sink.emit(record), on_error=lambda error: print(error))\n",
@@ -84,12 +82,28 @@
8482
"metadata": {},
8583
"outputs": [],
8684
"source": [
87-
"example = pd.DataFrame({'value': []}, columns=['value'])\n",
85+
"cpu_example = pd.DataFrame({'value': []}, columns=['value'])\n",
8886
"\n",
8987
"cpu_sink = sink\\\n",
9088
" .filter(lambda record: (record[\"_measurement\"] == \"cpu\") & (record[\"_field\"] == \"usage_user\"))\\\n",
9189
" .map(lambda record: pd.DataFrame({'value': [record[\"_value\"]]}, columns=['value'], index=[record[\"_time\"]]))\n",
92-
"cpu = DataFrame(cpu_sink, example=example)"
90+
"cpu = DataFrame(cpu_sink, example=cpu_example)"
91+
]
92+
},
93+
{
94+
"cell_type": "code",
95+
"execution_count": null,
96+
"metadata": {},
97+
"outputs": [],
98+
"source": [
99+
"mem_example = pd.DataFrame({'field': [], 'value': []}, columns=['field', 'value'])\n",
100+
"\n",
101+
"mem_sink = sink \\\n",
102+
" .filter(lambda record: record[\"_measurement\"] == \"mem\") \\\n",
103+
" .filter(lambda record: record[\"_field\"] in [\"total\", \"used\", \"free\", \"available\"]) \\\n",
104+
" .map(lambda record: pd.DataFrame({'field': record[\"_field\"], 'value': record[\"_value\"]},\n",
105+
" columns=['field', 'value'], index=[record[\"_time\"], record[\"_field\"]]))\n",
106+
"mem = DataFrame(mem_sink, example=mem_example)"
93107
]
94108
},
95109
{
@@ -119,6 +133,15 @@
119133
"cpu.hvplot(width=700, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)"
120134
]
121135
},
136+
{
137+
"cell_type": "code",
138+
"execution_count": null,
139+
"metadata": {},
140+
"outputs": [],
141+
"source": [
142+
" mem.groupby('field').sum().hvplot.bar()"
143+
]
144+
},
122145
{
123146
"cell_type": "code",
124147
"execution_count": null,

0 commit comments

Comments
 (0)