Examples#

Browser#

import requests as re

import ustore.ucset as ustore
import streamlit as st

collection_name_docs = 'Docs'
collection_name_network = 'Network'
collection_name_images = 'Images'


@st.experimental_singleton
def get_database_session(url=None):
    db = ustore.DataBase()
    col_docs = db[collection_name_docs].docs
    col_network = db[collection_name_network].graph
    col_images = db[collection_name_images]
    if True or not len(col_docs):
        col_docs[10] = {'a': 10, 'b': 20}
    if True or len(col_images):
        links = [
            'https://upload.wikimedia.org/wikipedia/commons/thumb/8/87/Mount_Ararat_and_the_Yerevan_skyline_in_spring_from_the_Cascade.jpg/556px-Mount_Ararat_and_the_Yerevan_skyline_in_spring_from_the_Cascade.jpg',
            'https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Vartavar_2014_Yerevan_%285%29.jpg/202px-Vartavar_2014_Yerevan_%285%29.jpg',
            'https://upload.wikimedia.org/wikipedia/commons/thumb/6/6e/Cascade_of_Yerevan.JPG/168px-Cascade_of_Yerevan.JPG',
            'https://upload.wikimedia.org/wikipedia/commons/thumb/6/64/Yerevan_Tsitsernakaberd_Armenian_Genocide_Museum_Memorial_msu-2018-3009.jpg/170px-Yerevan_Tsitsernakaberd_Armenian_Genocide_Museum_Memorial_msu-2018-3009.jpg',
            'https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/%C3%93pera%2C_Erev%C3%A1n%2C_Armenia%2C_2016-10-03%2C_DD_12.jpg/230px-%C3%93pera%2C_Erev%C3%A1n%2C_Armenia%2C_2016-10-03%2C_DD_12.jpg',
            'https://upload.wikimedia.org/wikipedia/commons/thumb/d/dc/Jerewan-Matenadaran-msu-wlm-2509.jpg/140px-Jerewan-Matenadaran-msu-wlm-2509.jpg'
        ]
        for idx, url in enumerate(links):
            col_images[50+idx] = re.get(url).content
    if True or len(col_network):
        pass
    return db


option = st.selectbox(
    'Which collection would you like to view?',
    (collection_name_docs, collection_name_images, collection_name_network))

st.write('You selected:', option)

if option == collection_name_docs:

    for key, value in get_database_session()[collection_name_docs].docs.items:
        st.json(value)

Consistent Dict#

"""
Compares the simplest usage of `dict` and `Collection`-s from UStore.
Benchmarks basic and batch consistent operations against native Python.
"""
from time import perf_counter

import ustore.ucset as ustore
import numpy as np
from random import sample

keys = np.random.randint(
    low=1, high=10_000_000,
    size=1_000_000, dtype=np.int64)
value = 'x'
max_batch_size = 10_000
values = [value] * max_batch_size

# Comparing write performance on bulk imports
t1 = perf_counter()

native_dict = dict()
for key in keys:
    native_dict[key] = value

t2 = perf_counter()

db = ustore.DataBase()
acid_dict = db.main
for key in keys:
    acid_dict[key] = value

t3 = perf_counter()

for slice_start in range(0, len(keys), max_batch_size):
    acid_dict[keys[slice_start:slice_start+max_batch_size]] = values

t4 = perf_counter()

for slice_start in range(0, len(keys), max_batch_size):
    acid_dict.broadcast(keys[slice_start:slice_start+max_batch_size], value)

t5 = perf_counter()

print('Elapsed time for imports: {:.3f}s and {:.3f}s. {:.3f}s for batches {:.3f}s for broadcast'.
      format(t2-t1, t3-t2, t4-t3, t5-t4))

# Comparing read performance on random gathers
t1 = perf_counter()

for key in keys:
    native_dict[key]

t2 = perf_counter()

for key in keys:
    acid_dict[key]

t3 = perf_counter()

for slice_start in range(0, len(keys), max_batch_size):
    acid_dict[keys[slice_start:slice_start+max_batch_size]]

t4 = perf_counter()

print('Elapsed time for lookups: {:.3f}s and {:.3f}s. {:.3f}s for batches'.
      format(t2-t1, t3-t2, t4-t3))


# Comparing read performance on bulk scans
t1 = perf_counter()

keys_sum = 0
for key in native_dict.keys():
    keys_sum += key

t2 = perf_counter()

keys_sum = 0
for key in acid_dict.keys:
    keys_sum += key

t3 = perf_counter()

print('Elapsed time for scans: {:.3f}s and {:.3f}s'.format(t2-t1, t3-t2))

# Comparing random sampling
t1 = perf_counter()

for _ in range(10):
    sample(list(native_dict.keys()), max_batch_size)

t2 = perf_counter()

for _ in range(10):
    acid_dict.sample_keys(max_batch_size)

t3 = perf_counter()

print('Elapsed time for samples: {:.3f}s and {:.3f}s'.format(t2-t1, t3-t2))

Banks ATM OLAP#

"""
Example of building an application-specific OLAP DBMS on top of binary collections of UStore.

Imagine owning a big bank with thousands of ATMs all across the US.
You are aggregating the withdrawals and, once it happens, count the number of 
people in front of the embedded camera. You also count those at other times.

## Usage

Use The provided `banks_atm_olap.yml` file for the Conda environment configuration.
It will bring all the needed NVidia libraries: `conda env create -f python/examples/banks_atm_olap.yml`.

## Links

Row to Columnar Conversion in Arrow: 
https://arrow.apache.org/docs/cpp/examples/row_columnar_conversion.html
"""
import struct
from typing import Tuple
import requests

import geojson
import pyarrow as pa
import numpy as np
import pandas as pd

import cudf
import cuxfilter as cux
from cuxfilter import charts
from bokeh.server.server import Server
from bokeh.document.document import Document

# import ustore.ucset as ustore

MAPBOX_API_KEY = 'pk.eyJ1IjoiYXRob3J2ZSIsImEiOiJjazBmcmlhYzAwYXc2M25wMnE3bGttbzczIn0.JCDQliLc-XTU52iKa8L8-Q'
GEOJSON_URL = 'https://raw.githubusercontent.com/rapidsai/cuxfilter/GTC-2018-mortgage-visualization/javascript/demos/GTC%20demo/public/data/zip3-ms-rhs-lessprops.json'

Measurement = Tuple[
    int,  # ids - ignored when packaging
    int,  # timestamps
    float,  # latitudes
    float,  # longitudes
    float,  # amounts
    int,  # humans
    int,  # zips
]
measurement_format: str = 'Qfffii'

zip_codes_geojson: str = geojson.loads(requests.get(GEOJSON_URL).text)
zip_codes: list[int] = [int(x['properties']['ZIP3'])
                        for x in zip_codes_geojson['features']]


def generate_rows(
    batch_size: int = 10_000,
    city_center=(40.177200, 44.503490),
    radius_degrees: float = 0.005,
    start_time: int = 1669996348,
) -> pd.DataFrame:
    seconds_between_events = 60
    return pd.DataFrame({
        'ids': np.random.randint(low=1, high=None, size=batch_size),
        'timestamps': np.arange(start_time, start_time + batch_size * seconds_between_events, seconds_between_events, dtype=int),
        'latitudes': np.random.uniform(city_center[0] - radius_degrees, city_center[0] + radius_degrees, [batch_size]).astype(np.single),
        'longitudes': np.random.uniform(city_center[1] - radius_degrees, city_center[1] + radius_degrees, [batch_size]).astype(np.single),
        'amounts': np.random.lognormal(3., 1., [batch_size]).astype(np.single) * 100.0,
        'humans': np.random.randint(low=1, high=4, size=batch_size),
        'zips': np.random.choice(zip_codes, size=batch_size),
    })


def dump_rows(measurements: pd.DataFrame) -> pa.FixedSizeBinaryArray:
    count_rows = len(measurements)
    bytes_per_row = struct.calcsize(measurement_format)
    bytes_for_rows = bytearray(bytes_per_row * count_rows)
    timestamps = measurements['timestamps']
    latitudes = measurements['latitudes']
    longitudes = measurements['longitudes']
    amounts = measurements['amounts']
    humans = measurements['humans']
    zips = measurements['zips']
    for row_idx in range(count_rows):
        struct.pack_into(
            measurement_format, bytes_for_rows, bytes_per_row * row_idx,
            timestamps[row_idx], latitudes[row_idx], longitudes[row_idx],
            amounts[row_idx], humans[row_idx], zips[row_idx],
        )

    # https://arrow.apache.org/docs/python/generated/pyarrow.FixedSizeBinaryType.html#pyarrow.FixedSizeBinaryType
    datatype = pa.binary(bytes_per_row)
    # https://arrow.apache.org/docs/python/generated/pyarrow.FixedSizeBinaryArray.html#pyarrow.FixedSizeBinaryArray.from_buffers
    buffers = [None, pa.py_buffer(bytes_for_rows)]
    return pa.FixedSizeBinaryArray.from_buffers(datatype, count_rows, buffers, null_count=0)


def parse_rows(bytes_for_rows: bytes) -> pd.DataFrame:

    if isinstance(bytes_for_rows, pa.FixedSizeBinaryArray):
        bytes_for_rows = bytes_for_rows.buffers()[1].to_pybytes()

    bytes_per_row = struct.calcsize(measurement_format)
    count_rows = len(bytes_for_rows) // bytes_per_row
    timestamps = np.zeros((count_rows), dtype=np.int64)
    latitudes = np.zeros((count_rows), dtype=np.single)
    longitudes = np.zeros((count_rows), dtype=np.single)
    amounts = np.zeros((count_rows), dtype=np.single)
    humans = np.zeros((count_rows), dtype=np.int32)
    zips = np.zeros((count_rows), dtype=np.int32)
    for row_idx in range(count_rows):
        row = struct.unpack_from(
            measurement_format, bytes_for_rows, bytes_per_row * row_idx)
        timestamps[row_idx] = row[0]
        latitudes[row_idx] = row[1]
        longitudes[row_idx] = row[2]
        amounts[row_idx] = row[3]
        humans[row_idx] = row[4]
        zips[row_idx] = row[5]

    return pd.DataFrame({
        'timestamps': timestamps,
        'latitudes': latitudes,
        'longitudes': longitudes,
        'amounts': amounts,
        'humans': humans,
        'zips': zips,
    })


def rolling_mean_tempretures(amounts, window_size=20):
    """
        Computes an array of local averages with a gliding window.
        https://stackoverflow.com/a/30141358
    """
    pd.Series(amounts).rolling(
        window=window_size).mean().iloc[window_size-1:].values


def make_dashboard(doc: Document):

    # Preprocess for visualizations
    df: pd.DataFrame = generate_rows(100)
    df['time'] = pd.to_datetime(df['timestamps'], unit='s')
    cudf_df = cudf.from_pandas(df)
    cux_df = cux.DataFrame.from_dataframe(cudf_df)

    chart_map = charts.choropleth(
        x='zips',
        elevation_column='humans',
        elevation_aggregate_fn='mean',
        color_column='amounts',
        color_aggregate_fn='mean',
        mapbox_api_key=MAPBOX_API_KEY,
        geoJSONSource=zip_codes_geojson_url,
        title='ATM Withdrawals Across the US',
    )

    chart_humans = charts.bokeh.bar('humans')
    slider_time = charts.date_range_slider(
        'time',
        title='Timeframe',
    )
    overall_amounts = charts.number(
        x='amounts',
        aggregate_fn='sum',
        format='${value:,.1f}',
        title='Total Withdrawals',
    )
    overall_humans = charts.number(
        x='humans',
        aggregate_fn='sum',
        format='{value:,.0f}',
        title='Humans Detected',
    )

    d = cux_df.dashboard(
        [chart_map],
        layout_array=[[1]],
        sidebar=[slider_time, overall_amounts, overall_humans],
        theme=cux.themes.rapids,
        title='Map of ATMs',
    )

    # run the dashboard as a webapp:
    d._dashboard.generate_dashboard(
        d.title, d._charts, d._theme
    ).server_doc(doc)


if __name__ == '__main__':

    tmp = generate_rows(100)
    arrow_array = dump_rows(tmp)
    recovered = parse_rows(arrow_array)

    # Validate the serialization procedure
    assert tmp['timestamps'].equals(recovered['timestamps'])
    assert tmp['latitudes'].equals(recovered['latitudes'])
    assert tmp['longitudes'].equals(recovered['longitudes'])
    assert tmp['amounts'].equals(recovered['amounts'])

    # Using persistent store will be identical to using this
    # db = ustore.DataBase()
    # measurements = db.main
    measurements: dict[int, bytes] = {}

    server = Server(
        make_dashboard,
        num_procs=1,
        allow_websocket_origin=['*'],
        check_unused_sessions_milliseconds=500,
    )
    server.start()

    print(f'running server on port {80}')
    server.io_loop.start()

Movie Recommendations#

# Multi-Modal Model for Recommender Systems
from collections import Counter
import requests
import json

import networkx as nx
import numpy as np

import ustore.ucset as ustore


def flatten(l):
    return [item for sublist in l for item in sublist]


def fill_db(all_follows, all_views, all_posters, all_people, all_movies):
    jds = json.dumps
    def load(x): return requests.get(x).content

    alice, bob, charlie, david = 1, 2, 3, 4
    all_people.set((alice, bob, charlie, david), (
        jds({'name': 'Alice', 'lastname': 'Bobchinsky', 'stars': 1200}),
        jds({'name': 'Bob', 'lastname': 'Charleston', 'stars': 700}),
        jds({'name': 'Charlie', 'lastname': 'Allison', 'stars': 800}),
        jds({'name': 'David', 'lastname': 'Davidson', 'stars': 500})
    ))
    all_movies.set(range(101, 110), (
        jds({'title': 'The Fast and the Furious', 'rating': 6.8}),
        jds({'title': '2 Fast 2 Furious', 'rating': 5.9}),
        jds({'title': 'The Fast and the Furious: Tokyo Drift', 'rating': 6}),
        jds({'title': 'Fast & Furious', 'rating': 6.5}),
        jds({'title': 'Fast Five', 'rating': 7.3}),
        jds({'title': 'Fast & Furious 6', 'rating': 7}),
        jds({'title': 'Furious 7', 'rating': 7.1}),
        jds({'title': 'The Fate of the Furious', 'rating': 6.6}),
        jds({'title': 'F9', 'rating': 5.2})
    ))

    all_posters.set(range(101, 110), (
        load('https://upload.wikimedia.org/wikipedia/en/5/54/Fast_and_the_furious_poster.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/9/9d/Two_fast_two_furious_ver5.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/4/4f/Poster_-_Fast_and_Furious_Tokyo_Drift.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/8/8f/Fast_and_Furious_Poster.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/0/0c/Fast_Five_poster.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/3/30/Fast_%26_Furious_6_film_poster.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/b/b8/Furious_7_poster.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/2/2d/The_Fate_of_The_Furious_Theatrical_Poster.jpg'),
        load('https://upload.wikimedia.org/wikipedia/en/2/2b/F9_film_poster.jpg')
    ))

    # Alice only watches even movies
    all_views.add_edge(alice, 102)
    all_views.add_edge(alice, 104)
    all_views.add_edge(alice, 106)
    all_views.add_edge(alice, 108)
    # Bob has only watched the middle of the series
    all_views.add_edge(bob, 103)
    all_views.add_edge(bob, 104)
    all_views.add_edge(bob, 105)
    all_views.add_edge(bob, 107)
    # Charlie only watched the first 5
    all_views.add_edge(charlie, 101)
    all_views.add_edge(charlie, 102)
    all_views.add_edge(charlie, 103)
    all_views.add_edge(charlie, 104)
    all_views.add_edge(charlie, 105)

    all_follows.add_edge(alice, bob)
    all_follows.add_edge(david, alice)
    all_follows.add_edge(david, bob)
    all_follows.add_edge(david, charlie)


def test():
    db = ustore.DataBase()

    all_follows = db['people->people'].graph
    all_views = db['people->movies'].graph
    all_posters = db['movies.poster'].media
    all_people = db['people'].docs
    all_movies = db['movies'].docs
    fill_db(all_follows, all_views, all_posters, all_people, all_movies)

    # Let's recommend this user some movies.
    # For that:
    # 1. Find all the people our user follows
    # 2. Sample all the movies they have watched
    # 3. Remove the movies that our customer has watched
    # 4. Rank movies by appearance frequency
    user_id = 10
    follows = all_follows.successors(user_id)
    potential_movies = all_views.successors(follows)
    movies_rank = Counter(flatten(potential_movies))
    for watched_movie in all_views.successors(user_id):
        movies_rank.pop(watched_movie)
    common_movies = [x[0] for x in movies_rank.most_common(10)]

    # As we have picked the movies, let's build up the bipartite graph
    nx.draw_networkx(
        all_views.subgraph(),
        pos=nx.drawing.layout.bipartite_layout(B, follows),
        labels={})

    # As we have picked the movies, let's build up the bipartite graph
    # For that:
    # 1. Get the metadata
    # 2. Build-up a NetworkX graph
    people_sample = all_people[follows][['name', 'lastname', 'stars']]
    movies_sample = all_movies[common_movies][['title', 'rating']]
    posters_sample = all_posters[common_movies]

    # Let's assume the user watched the recommended movie,
    # didn't like it and wants to unfollow everyone who recommended it...
    # in a single atomic transaction:
    bad_movie_id = 0
    bad_influencers = [follows]
    with db.transact() as txn:
        txn['people->movies'].graph.add_edge(user_id, bad_movie_id)
        txn['people->people'].graph.remove_edges_from(
            [user_id]*len(follows),
            bad_influencers
        )

    # Let's assume, a new video is being uploaded
    new_movie_id = 100
    with db.transact() as txn:
        txn['movies'].docs[new_movie_id] = {
            'title': 'Fast & Extremely Furious 100',
            'text': 'Dominic is racing on rockets in space.',
            'rating': 5,
        }
        txn['movies.poster'] = open('fast100.jpeg', 'rb').read()