Examples#
Browser#
import requests as re
import ustore.ucset as ustore
import streamlit as st
collection_name_docs = 'Docs'
collection_name_network = 'Network'
collection_name_images = 'Images'
@st.experimental_singleton
def get_database_session(url=None):
db = ustore.DataBase()
col_docs = db[collection_name_docs].docs
col_network = db[collection_name_network].graph
col_images = db[collection_name_images]
if True or not len(col_docs):
col_docs[10] = {'a': 10, 'b': 20}
if True or len(col_images):
links = [
'https://upload.wikimedia.org/wikipedia/commons/thumb/8/87/Mount_Ararat_and_the_Yerevan_skyline_in_spring_from_the_Cascade.jpg/556px-Mount_Ararat_and_the_Yerevan_skyline_in_spring_from_the_Cascade.jpg',
'https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Vartavar_2014_Yerevan_%285%29.jpg/202px-Vartavar_2014_Yerevan_%285%29.jpg',
'https://upload.wikimedia.org/wikipedia/commons/thumb/6/6e/Cascade_of_Yerevan.JPG/168px-Cascade_of_Yerevan.JPG',
'https://upload.wikimedia.org/wikipedia/commons/thumb/6/64/Yerevan_Tsitsernakaberd_Armenian_Genocide_Museum_Memorial_msu-2018-3009.jpg/170px-Yerevan_Tsitsernakaberd_Armenian_Genocide_Museum_Memorial_msu-2018-3009.jpg',
'https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/%C3%93pera%2C_Erev%C3%A1n%2C_Armenia%2C_2016-10-03%2C_DD_12.jpg/230px-%C3%93pera%2C_Erev%C3%A1n%2C_Armenia%2C_2016-10-03%2C_DD_12.jpg',
'https://upload.wikimedia.org/wikipedia/commons/thumb/d/dc/Jerewan-Matenadaran-msu-wlm-2509.jpg/140px-Jerewan-Matenadaran-msu-wlm-2509.jpg'
]
for idx, url in enumerate(links):
col_images[50+idx] = re.get(url).content
if True or len(col_network):
pass
return db
option = st.selectbox(
'Which collection would you like to view?',
(collection_name_docs, collection_name_images, collection_name_network))
st.write('You selected:', option)
if option == collection_name_docs:
for key, value in get_database_session()[collection_name_docs].docs.items:
st.json(value)
Consistent Dict#
"""
Compares the simplest usage of `dict` and `Collection`-s from UStore.
Benchmarks basic and batch consistent operations against native Python.
"""
from time import perf_counter
import ustore.ucset as ustore
import numpy as np
from random import sample
keys = np.random.randint(
low=1, high=10_000_000,
size=1_000_000, dtype=np.int64)
value = 'x'
max_batch_size = 10_000
values = [value] * max_batch_size
# Comparing write performance on bulk imports
t1 = perf_counter()
native_dict = dict()
for key in keys:
native_dict[key] = value
t2 = perf_counter()
db = ustore.DataBase()
acid_dict = db.main
for key in keys:
acid_dict[key] = value
t3 = perf_counter()
for slice_start in range(0, len(keys), max_batch_size):
acid_dict[keys[slice_start:slice_start+max_batch_size]] = values
t4 = perf_counter()
for slice_start in range(0, len(keys), max_batch_size):
acid_dict.broadcast(keys[slice_start:slice_start+max_batch_size], value)
t5 = perf_counter()
print('Elapsed time for imports: {:.3f}s and {:.3f}s. {:.3f}s for batches {:.3f}s for broadcast'.
format(t2-t1, t3-t2, t4-t3, t5-t4))
# Comparing read performance on random gathers
t1 = perf_counter()
for key in keys:
native_dict[key]
t2 = perf_counter()
for key in keys:
acid_dict[key]
t3 = perf_counter()
for slice_start in range(0, len(keys), max_batch_size):
acid_dict[keys[slice_start:slice_start+max_batch_size]]
t4 = perf_counter()
print('Elapsed time for lookups: {:.3f}s and {:.3f}s. {:.3f}s for batches'.
format(t2-t1, t3-t2, t4-t3))
# Comparing read performance on bulk scans
t1 = perf_counter()
keys_sum = 0
for key in native_dict.keys():
keys_sum += key
t2 = perf_counter()
keys_sum = 0
for key in acid_dict.keys:
keys_sum += key
t3 = perf_counter()
print('Elapsed time for scans: {:.3f}s and {:.3f}s'.format(t2-t1, t3-t2))
# Comparing random sampling
t1 = perf_counter()
for _ in range(10):
sample(list(native_dict.keys()), max_batch_size)
t2 = perf_counter()
for _ in range(10):
acid_dict.sample_keys(max_batch_size)
t3 = perf_counter()
print('Elapsed time for samples: {:.3f}s and {:.3f}s'.format(t2-t1, t3-t2))
Banks ATM OLAP#
"""
Example of building an application-specific OLAP DBMS on top of binary collections of UStore.
Imagine owning a big bank with thousands of ATMs all across the US.
You are aggregating the withdrawals and, once it happens, count the number of
people in front of the embedded camera. You also count those at other times.
## Usage
Use The provided `banks_atm_olap.yml` file for the Conda environment configuration.
It will bring all the needed NVidia libraries: `conda env create -f python/examples/banks_atm_olap.yml`.
## Links
Row to Columnar Conversion in Arrow:
https://arrow.apache.org/docs/cpp/examples/row_columnar_conversion.html
"""
import struct
from typing import Tuple
import requests
import geojson
import pyarrow as pa
import numpy as np
import pandas as pd
import cudf
import cuxfilter as cux
from cuxfilter import charts
from bokeh.server.server import Server
from bokeh.document.document import Document
# import ustore.ucset as ustore
MAPBOX_API_KEY = 'pk.eyJ1IjoiYXRob3J2ZSIsImEiOiJjazBmcmlhYzAwYXc2M25wMnE3bGttbzczIn0.JCDQliLc-XTU52iKa8L8-Q'
GEOJSON_URL = 'https://raw.githubusercontent.com/rapidsai/cuxfilter/GTC-2018-mortgage-visualization/javascript/demos/GTC%20demo/public/data/zip3-ms-rhs-lessprops.json'
Measurement = Tuple[
int, # ids - ignored when packaging
int, # timestamps
float, # latitudes
float, # longitudes
float, # amounts
int, # humans
int, # zips
]
measurement_format: str = 'Qfffii'
zip_codes_geojson: str = geojson.loads(requests.get(GEOJSON_URL).text)
zip_codes: list[int] = [int(x['properties']['ZIP3'])
for x in zip_codes_geojson['features']]
def generate_rows(
batch_size: int = 10_000,
city_center=(40.177200, 44.503490),
radius_degrees: float = 0.005,
start_time: int = 1669996348,
) -> pd.DataFrame:
seconds_between_events = 60
return pd.DataFrame({
'ids': np.random.randint(low=1, high=None, size=batch_size),
'timestamps': np.arange(start_time, start_time + batch_size * seconds_between_events, seconds_between_events, dtype=int),
'latitudes': np.random.uniform(city_center[0] - radius_degrees, city_center[0] + radius_degrees, [batch_size]).astype(np.single),
'longitudes': np.random.uniform(city_center[1] - radius_degrees, city_center[1] + radius_degrees, [batch_size]).astype(np.single),
'amounts': np.random.lognormal(3., 1., [batch_size]).astype(np.single) * 100.0,
'humans': np.random.randint(low=1, high=4, size=batch_size),
'zips': np.random.choice(zip_codes, size=batch_size),
})
def dump_rows(measurements: pd.DataFrame) -> pa.FixedSizeBinaryArray:
count_rows = len(measurements)
bytes_per_row = struct.calcsize(measurement_format)
bytes_for_rows = bytearray(bytes_per_row * count_rows)
timestamps = measurements['timestamps']
latitudes = measurements['latitudes']
longitudes = measurements['longitudes']
amounts = measurements['amounts']
humans = measurements['humans']
zips = measurements['zips']
for row_idx in range(count_rows):
struct.pack_into(
measurement_format, bytes_for_rows, bytes_per_row * row_idx,
timestamps[row_idx], latitudes[row_idx], longitudes[row_idx],
amounts[row_idx], humans[row_idx], zips[row_idx],
)
# https://arrow.apache.org/docs/python/generated/pyarrow.FixedSizeBinaryType.html#pyarrow.FixedSizeBinaryType
datatype = pa.binary(bytes_per_row)
# https://arrow.apache.org/docs/python/generated/pyarrow.FixedSizeBinaryArray.html#pyarrow.FixedSizeBinaryArray.from_buffers
buffers = [None, pa.py_buffer(bytes_for_rows)]
return pa.FixedSizeBinaryArray.from_buffers(datatype, count_rows, buffers, null_count=0)
def parse_rows(bytes_for_rows: bytes) -> pd.DataFrame:
if isinstance(bytes_for_rows, pa.FixedSizeBinaryArray):
bytes_for_rows = bytes_for_rows.buffers()[1].to_pybytes()
bytes_per_row = struct.calcsize(measurement_format)
count_rows = len(bytes_for_rows) // bytes_per_row
timestamps = np.zeros((count_rows), dtype=np.int64)
latitudes = np.zeros((count_rows), dtype=np.single)
longitudes = np.zeros((count_rows), dtype=np.single)
amounts = np.zeros((count_rows), dtype=np.single)
humans = np.zeros((count_rows), dtype=np.int32)
zips = np.zeros((count_rows), dtype=np.int32)
for row_idx in range(count_rows):
row = struct.unpack_from(
measurement_format, bytes_for_rows, bytes_per_row * row_idx)
timestamps[row_idx] = row[0]
latitudes[row_idx] = row[1]
longitudes[row_idx] = row[2]
amounts[row_idx] = row[3]
humans[row_idx] = row[4]
zips[row_idx] = row[5]
return pd.DataFrame({
'timestamps': timestamps,
'latitudes': latitudes,
'longitudes': longitudes,
'amounts': amounts,
'humans': humans,
'zips': zips,
})
def rolling_mean_tempretures(amounts, window_size=20):
"""
Computes an array of local averages with a gliding window.
https://stackoverflow.com/a/30141358
"""
pd.Series(amounts).rolling(
window=window_size).mean().iloc[window_size-1:].values
def make_dashboard(doc: Document):
# Preprocess for visualizations
df: pd.DataFrame = generate_rows(100)
df['time'] = pd.to_datetime(df['timestamps'], unit='s')
cudf_df = cudf.from_pandas(df)
cux_df = cux.DataFrame.from_dataframe(cudf_df)
chart_map = charts.choropleth(
x='zips',
elevation_column='humans',
elevation_aggregate_fn='mean',
color_column='amounts',
color_aggregate_fn='mean',
mapbox_api_key=MAPBOX_API_KEY,
geoJSONSource=zip_codes_geojson_url,
title='ATM Withdrawals Across the US',
)
chart_humans = charts.bokeh.bar('humans')
slider_time = charts.date_range_slider(
'time',
title='Timeframe',
)
overall_amounts = charts.number(
x='amounts',
aggregate_fn='sum',
format='${value:,.1f}',
title='Total Withdrawals',
)
overall_humans = charts.number(
x='humans',
aggregate_fn='sum',
format='{value:,.0f}',
title='Humans Detected',
)
d = cux_df.dashboard(
[chart_map],
layout_array=[[1]],
sidebar=[slider_time, overall_amounts, overall_humans],
theme=cux.themes.rapids,
title='Map of ATMs',
)
# run the dashboard as a webapp:
d._dashboard.generate_dashboard(
d.title, d._charts, d._theme
).server_doc(doc)
if __name__ == '__main__':
tmp = generate_rows(100)
arrow_array = dump_rows(tmp)
recovered = parse_rows(arrow_array)
# Validate the serialization procedure
assert tmp['timestamps'].equals(recovered['timestamps'])
assert tmp['latitudes'].equals(recovered['latitudes'])
assert tmp['longitudes'].equals(recovered['longitudes'])
assert tmp['amounts'].equals(recovered['amounts'])
# Using persistent store will be identical to using this
# db = ustore.DataBase()
# measurements = db.main
measurements: dict[int, bytes] = {}
server = Server(
make_dashboard,
num_procs=1,
allow_websocket_origin=['*'],
check_unused_sessions_milliseconds=500,
)
server.start()
print(f'running server on port {80}')
server.io_loop.start()
Movie Recommendations#
# Multi-Modal Model for Recommender Systems
from collections import Counter
import requests
import json
import networkx as nx
import numpy as np
import ustore.ucset as ustore
def flatten(l):
return [item for sublist in l for item in sublist]
def fill_db(all_follows, all_views, all_posters, all_people, all_movies):
jds = json.dumps
def load(x): return requests.get(x).content
alice, bob, charlie, david = 1, 2, 3, 4
all_people.set((alice, bob, charlie, david), (
jds({'name': 'Alice', 'lastname': 'Bobchinsky', 'stars': 1200}),
jds({'name': 'Bob', 'lastname': 'Charleston', 'stars': 700}),
jds({'name': 'Charlie', 'lastname': 'Allison', 'stars': 800}),
jds({'name': 'David', 'lastname': 'Davidson', 'stars': 500})
))
all_movies.set(range(101, 110), (
jds({'title': 'The Fast and the Furious', 'rating': 6.8}),
jds({'title': '2 Fast 2 Furious', 'rating': 5.9}),
jds({'title': 'The Fast and the Furious: Tokyo Drift', 'rating': 6}),
jds({'title': 'Fast & Furious', 'rating': 6.5}),
jds({'title': 'Fast Five', 'rating': 7.3}),
jds({'title': 'Fast & Furious 6', 'rating': 7}),
jds({'title': 'Furious 7', 'rating': 7.1}),
jds({'title': 'The Fate of the Furious', 'rating': 6.6}),
jds({'title': 'F9', 'rating': 5.2})
))
all_posters.set(range(101, 110), (
load('https://upload.wikimedia.org/wikipedia/en/5/54/Fast_and_the_furious_poster.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/9/9d/Two_fast_two_furious_ver5.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/4/4f/Poster_-_Fast_and_Furious_Tokyo_Drift.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/8/8f/Fast_and_Furious_Poster.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/0/0c/Fast_Five_poster.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/3/30/Fast_%26_Furious_6_film_poster.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/b/b8/Furious_7_poster.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/2/2d/The_Fate_of_The_Furious_Theatrical_Poster.jpg'),
load('https://upload.wikimedia.org/wikipedia/en/2/2b/F9_film_poster.jpg')
))
# Alice only watches even movies
all_views.add_edge(alice, 102)
all_views.add_edge(alice, 104)
all_views.add_edge(alice, 106)
all_views.add_edge(alice, 108)
# Bob has only watched the middle of the series
all_views.add_edge(bob, 103)
all_views.add_edge(bob, 104)
all_views.add_edge(bob, 105)
all_views.add_edge(bob, 107)
# Charlie only watched the first 5
all_views.add_edge(charlie, 101)
all_views.add_edge(charlie, 102)
all_views.add_edge(charlie, 103)
all_views.add_edge(charlie, 104)
all_views.add_edge(charlie, 105)
all_follows.add_edge(alice, bob)
all_follows.add_edge(david, alice)
all_follows.add_edge(david, bob)
all_follows.add_edge(david, charlie)
def test():
db = ustore.DataBase()
all_follows = db['people->people'].graph
all_views = db['people->movies'].graph
all_posters = db['movies.poster'].media
all_people = db['people'].docs
all_movies = db['movies'].docs
fill_db(all_follows, all_views, all_posters, all_people, all_movies)
# Let's recommend this user some movies.
# For that:
# 1. Find all the people our user follows
# 2. Sample all the movies they have watched
# 3. Remove the movies that our customer has watched
# 4. Rank movies by appearance frequency
user_id = 10
follows = all_follows.successors(user_id)
potential_movies = all_views.successors(follows)
movies_rank = Counter(flatten(potential_movies))
for watched_movie in all_views.successors(user_id):
movies_rank.pop(watched_movie)
common_movies = [x[0] for x in movies_rank.most_common(10)]
# As we have picked the movies, let's build up the bipartite graph
nx.draw_networkx(
all_views.subgraph(),
pos=nx.drawing.layout.bipartite_layout(B, follows),
labels={})
# As we have picked the movies, let's build up the bipartite graph
# For that:
# 1. Get the metadata
# 2. Build-up a NetworkX graph
people_sample = all_people[follows][['name', 'lastname', 'stars']]
movies_sample = all_movies[common_movies][['title', 'rating']]
posters_sample = all_posters[common_movies]
# Let's assume the user watched the recommended movie,
# didn't like it and wants to unfollow everyone who recommended it...
# in a single atomic transaction:
bad_movie_id = 0
bad_influencers = [follows]
with db.transact() as txn:
txn['people->movies'].graph.add_edge(user_id, bad_movie_id)
txn['people->people'].graph.remove_edges_from(
[user_id]*len(follows),
bad_influencers
)
# Let's assume, a new video is being uploaded
new_movie_id = 100
with db.transact() as txn:
txn['movies'].docs[new_movie_id] = {
'title': 'Fast & Extremely Furious 100',
'text': 'Dominic is racing on rockets in space.',
'rating': 5,
}
txn['movies.poster'] = open('fast100.jpeg', 'rb').read()