import time import numpy as np import pandas as pd import holoviews as hv from holoviews.streams import Pipe, Buffer import streamz import streamz.dataframe hv.extension('bokeh') pipe = Pipe(data=[]) vector_dmap = hv.DynamicMap(hv.VectorField, streams=[pipe]) vector_dmap.redim.range(x=(-1, 1), y=(-1, 1)) x,y = np.mgrid[-10:11,-10:11] * 0.1 sine_rings = np.sin(x**2+y**2)*np.pi+np.pi exp_falloff = 1/np.exp((x**2+y**2)/8) for i in np.linspace(0, 1, 25): time.sleep(0.1) pipe.send([x,y,sine_rings*i, exp_falloff]) example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count']) dfstream = Buffer(example, length=100, index=False) curve_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream]) point_dmap = hv.DynamicMap(hv.Points, streams=[dfstream]) %%opts Points [color_index='count', xaxis=None, yaxis=None] (line_color='black', size=5) %%opts Curve (line_width=1, color='black') curve_dmap * point_dmap def gen_brownian(): x, y, count = 0, 0, 0 while True: x += np.random.randn() y += np.random.randn() count += 1 yield pd.DataFrame([(x, y, count)], columns=['x', 'y', 'count']) brownian = gen_brownian() for i in range(200): dfstream.send(next(brownian)) dfstream.clear() point_source = streamz.Stream() pipe = Pipe(data=[]) point_source.sliding_window(20).map(pd.concat).sink(pipe.send) # Connect streamz to the Pipe scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pipe]) %%opts Scatter [color_index='count', bgcolor='black'] scatter_dmap.redim.range(y=(-4, 4)) for i in range(100): df = pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i}, columns=['x', 'y', 'count']) point_source.emit(df) simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms') print(simple_sdf.index) simple_sdf.example.dtypes %%opts Curve [width=500 show_grid=True] sdf = (simple_sdf-0.5).cumsum() hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)]) simple_sdf.stop() %%opts Curve [width=500 show_grid=True] source_df = streamz.dataframe.Random(freq='5ms', interval='100ms') sdf = (source_df-0.5).cumsum() raw_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)]) smooth_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x.rolling('500ms').mean())]) raw_dmap.relabel('raw') * smooth_dmap.relabel('smooth') source_df.stop() from functools import partial multi_source = streamz.dataframe.Random(freq='5ms', interval='100ms') sdf = (multi_source-0.5).cumsum() hv.DynamicMap(hv.Table, streams=[Buffer(sdf.x, length=10)]) +\ hv.DynamicMap(partial(hv.BoxWhisker, kdims=[], vdims='x'), streams=[Buffer(sdf.x, length=100)]) hv.DynamicMap(hv.Scatter, streams=[Buffer(sdf.x)]).redim.label(x='value', index='time') multi_source.stop() hist_source = streamz.dataframe.Random(freq='5ms', interval='100ms') sdf = (hist_source-0.5).cumsum() dmap = hv.DynamicMap(hv.Dataset, streams=[Buffer(sdf.x, length=500)]) hv.operation.histogram(dmap, dimension='x') hist_source.stop() %%opts RGB [width=600] from holoviews.operation.datashader import datashade from bokeh.palettes import Blues8 large_source = streamz.dataframe.Random(freq='100us', interval='200ms') sdf = (large_source-0.5).cumsum() dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x, length=1000000)]) datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear', cmap=Blues8) large_source.stop() %%opts Curve [width=600] from tornado.ioloop import PeriodicCallback from tornado import gen count = 0 buffer = Buffer(np.zeros((0, 2)), length=50) @gen.coroutine def f(): global count count += 1 buffer.send(np.array([[count, np.random.rand()]])) cb = PeriodicCallback(f, 100) cb.start() hv.DynamicMap(hv.Curve, streams=[buffer]).redim.range(y=(0, 1)) cb.stop()