Compare commits

...

17 Commits

8 changed files with 397 additions and 202 deletions

View File

@@ -2,9 +2,10 @@
docker build -t wordlinator:latest . docker build -t wordlinator:latest .
if [ "$1" = "--debug" ]; then if [ "$1" == "--debug" ]; then
shift shift
docker run -d --rm -p 8050:8050 -e DEBUG=true -e DB_PORT -e DB_HOST -e DB_PASS "$@" --name wordlinator wordlinator:latest echo "Running debug mode..."
docker run -d --rm -p 8050:8050 -e DASH_DEBUG=true -e DB_PORT -e DB_HOST -e DB_PASS "$@" --name wordlinator wordlinator:latest
else else
docker run -d --rm -p 8050:8050 -e DB_PORT -e DB_HOST -e DB_PASS "$@" --name wordlinator wordlinator:latest docker run -d --rm -p 8050:8050 -e DB_PORT -e DB_HOST -e DB_PASS "$@" --name wordlinator wordlinator:latest
fi fi

View File

@@ -39,6 +39,7 @@ db-load = "wordlinator.app:load_db_scores"
add-user = "wordlinator.app:sync_add_user" add-user = "wordlinator.app:sync_add_user"
create-round = "wordlinator.app:create_round" create-round = "wordlinator.app:create_round"
copy-users = "wordlinator.app:copy_users" copy-users = "wordlinator.app:copy_users"
gs-user-sync = "wordlinator.app:sync_gsheet_users"
[tool.mypy] [tool.mypy]
ignore_missing_imports = true ignore_missing_imports = true

View File

@@ -270,9 +270,40 @@ def copy_users():
wordlinator.db.pg.WordleDb().copy_players_from_round(args.from_round, args.to_round) wordlinator.db.pg.WordleDb().copy_players_from_round(args.from_round, args.to_round)
async def pull_gsheets_users(round_no):
db = wordlinator.db.pg.WordleDb()
db_users = db.get_users_by_round(round_no=round_no)
round = db.get_or_create_round(round_no)
wordle_day = wordlinator.utils.WordleDay.from_date(round.end_date)
sheets = wordlinator.sheets.SheetsClient(wordle_day=wordle_day)
sheets_users = sheets.get_users()
for user in sheets_users:
db_match = [u for u in db_users if u.username == user]
if not db_match:
rich.print(f"[yellow]Adding {user} to Round {round_no}")
await add_user(user, games=[round_no])
for user in db_users:
if user.username not in sheets_users:
rich.print(f"[yellow]Removing {user.username} from Round {round_no}")
db.remove_user_from_round(user.username, round_no)
def sync_gsheet_users():
parser = argparse.ArgumentParser()
parser.add_argument("round_no", help="The round to sync.")
args = parser.parse_args()
asyncio.run(pull_gsheets_users(args.round_no))
def sync_add_user(): def sync_add_user():
args = _add_user_args() args = _add_user_args()
asyncio.run(add_user(args)) asyncio.run(
add_user(args.username, args.games, args.unenroll_games, args.check_twitter)
)
def sync_main(): def sync_main():

View File

@@ -4,8 +4,6 @@ import typing
import peewee import peewee
import wordlinator.utils
db = peewee.PostgresqlDatabase( db = peewee.PostgresqlDatabase(
os.getenv("DB_NAME", "wordlegolf"), os.getenv("DB_NAME", "wordlegolf"),
user=os.getenv("DB_USER", "wordlegolf"), user=os.getenv("DB_USER", "wordlegolf"),
@@ -85,145 +83,174 @@ class WordleDb:
return list(User.select()) return list(User.select())
def get_users_by_round(self, round_no=None, round_id=None): def get_users_by_round(self, round_no=None, round_id=None):
query = ( with db.atomic():
User.select(User, Player.user_id, Game.game) query = (
.join(Player, on=(Player.user_id == User.user_id)) User.select(User, Player.user_id, Game.game)
.join(Game, on=(Game.game_id == Player.game_id)) .join(Player, on=(Player.user_id == User.user_id))
) .join(Game, on=(Game.game_id == Player.game_id))
if round_no: )
query = query.filter(Game.game == round_no) if round_no:
elif round_id: query = query.filter(Game.game == round_no)
query = query.filter(Game.game_id == round_id) elif round_id:
return list(query) query = query.filter(Game.game_id == round_id)
return list(query)
def get_user_id(self, username): def get_user_id(self, username):
user = self.get_user(username) with db.atomic():
return user.twitter_id if user else None user = self.get_user(username)
return user.twitter_id if user else None
def add_user(self, username, user_id, check_twitter=True): def add_user(self, username, user_id, check_twitter=True):
return User.create( with db.atomic():
username=username, twitter_id=user_id, check_twitter=check_twitter return User.create(
) username=username, twitter_id=user_id, check_twitter=check_twitter
)
def get_rounds(self): def get_rounds(self):
return list(sorted(Game.select(), key=lambda d: d.start_date)) with db.atomic():
return list(sorted(Game.select(), key=lambda d: d.start_date))
def get_or_create_round(self, round_no, start_date=None): def get_or_create_round(self, round_no, start_date=None):
try: with db.atomic():
return Game.get(Game.game == round_no) try:
except peewee.DoesNotExist: return Game.get(Game.game == round_no)
start_date = ( except peewee.DoesNotExist:
start_date or wordlinator.utils.WORDLE_GOLF_ROUND_DATES[round_no - 1] if not start_date:
) raise ValueError(
return Game.create(game=round_no, start_date=start_date) f"Round {round_no} does not exist, "
"and no start_date provide to create it"
)
return Game.create(game=round_no, start_date=start_date)
def get_or_create_hole(self, round_no, hole_no): def get_or_create_hole(self, round_no, hole_no):
round = self.get_or_create_round(round_no) with db.atomic():
round = self.get_or_create_round(round_no)
try: try:
return Hole.get(Hole.hole == hole_no, Hole.game_id == round.game_id) return Hole.get(Hole.hole == hole_no, Hole.game_id == round.game_id)
except peewee.DoesNotExist: except peewee.DoesNotExist:
return Hole.create(hole=hole_no, game_id=round.game_id) return Hole.create(hole=hole_no, game_id=round.game_id)
def get_holes(self, round_no, ensure_all=False): def get_holes(self, round_no, ensure_all=False):
round = self.get_or_create_round(round_no) with db.atomic():
if ensure_all: round = self.get_or_create_round(round_no)
self.create_round_holes(round_no) if ensure_all:
return list(Hole.select().filter(game_id=round.game_id)) self.create_round_holes(round_no)
return list(Hole.select().filter(game_id=round.game_id))
def create_round_holes(self, round_no): def create_round_holes(self, round_no):
for hole_no in range(1, 19): with db.atomic():
self.get_or_create_hole(round_no, hole_no) for hole_no in range(1, 19):
self.get_or_create_hole(round_no, hole_no)
def get_or_create_player_round(self, user_id, game_id): def get_or_create_player_round(self, user_id, game_id):
try: with db.atomic():
return Player.get(Player.user_id == user_id, Player.game_id == game_id) try:
except peewee.DoesNotExist: return Player.get(Player.user_id == user_id, Player.game_id == game_id)
return Player.create(user_id=user_id, game_id=game_id) except peewee.DoesNotExist:
return Player.create(user_id=user_id, game_id=game_id)
def add_user_to_round(self, username, round_no): def add_user_to_round(self, username, round_no):
user = self.get_user(username) with db.atomic():
if not user: user = self.get_user(username)
raise ValueError(f"No user found with username {username}") if not user:
round = self.get_or_create_round(round_no) raise ValueError(f"No user found with username {username}")
return self.get_or_create_player_round(user.user_id, round.game_id) round = self.get_or_create_round(round_no)
return self.get_or_create_player_round(user.user_id, round.game_id)
def remove_user_from_round(self, username, round_no): def remove_user_from_round(self, username, round_no):
user = self.get_user(username) with db.atomic():
if not user: user = self.get_user(username)
raise ValueError(f"No user found with username {username}") if not user:
round = self.get_or_create_round(round_no) raise ValueError(f"No user found with username {username}")
try: round = self.get_or_create_round(round_no)
player = Player.get(user_id=user.user_id, game_id=round.game_id) try:
player.delete_instance() player = Player.get(user_id=user.user_id, game_id=round.game_id)
except peewee.DoesNotExist: player.delete_instance()
return except peewee.DoesNotExist:
return
def copy_players_from_round(self, from_round_no, to_round_no): def copy_players_from_round(self, from_round_no, to_round_no):
to_round = self.get_or_create_round(to_round_no) with db.atomic():
to_round = self.get_or_create_round(to_round_no)
for user in self.get_users_by_round(from_round_no): for user in self.get_users_by_round(from_round_no):
self.get_or_create_player_round(user.user_id, to_round.game_id) self.get_or_create_player_round(user.user_id, to_round.game_id)
def add_score(self, username, game, hole, score): def add_score(self, username, game, hole, score):
if not score: with db.atomic():
return if not score:
return
user = self.get_user(username) user = self.get_user(username)
if not user: if not user:
raise ValueError(f'No Such User "{username}"') raise ValueError(f'No Such User "{username}"')
hole = self.get_or_create_hole(game, hole) hole = self.get_or_create_hole(game, hole)
try: try:
score_obj = Score.get( score_obj = Score.get(
Score.user_id == user.user_id, Score.user_id == user.user_id,
Score.game_id == hole.game_id, Score.game_id == hole.game_id,
Score.hole_id == hole.hole_id, Score.hole_id == hole.hole_id,
) )
score_obj.score = score score_obj.score = score
score_obj.save() score_obj.save()
return score_obj return score_obj
except peewee.DoesNotExist: except peewee.DoesNotExist:
return Score.create( return Score.create(
score=score, score=score,
user_id=user.user_id, user_id=user.user_id,
game_id=hole.game_id, game_id=hole.game_id,
hole_id=hole.hole_id, hole_id=hole.hole_id,
) )
def get_scores(self, round_no=None, round_id=None): def get_scores(self, round_no=None, round_id=None):
if round_no: with db.atomic():
round = self.get_or_create_round(round_no) if round_no:
elif round_id: round = self.get_or_create_round(round_no)
round = Game.get_by_id(round_id) elif round_id:
else: round = Game.get_by_id(round_id)
raise ValueError("Must provide Round Number or Round ID") else:
res = ( raise ValueError("Must provide Round Number or Round ID")
Score.select( res = (
Score, Score.select(
Hole.hole, Score,
User.username, Hole.hole,
Player.game_id, Hole.hole_id,
Game.game_id,
User.username,
User.user_id,
Player.game_id,
)
.join(Player, on=(Score.user_id == Player.user_id))
.switch(Score)
.join(Hole, on=(Score.hole_id == Hole.hole_id))
.join(Game, on=(Hole.game_id == Game.game_id))
.switch(Score)
.join(User, on=(Score.user_id == User.user_id))
.filter(Player.game_id == round.game_id)
.filter(Score.game_id == round.game_id)
) )
.join(Player, on=(Score.user_id == Player.user_id)) return list(res) if res else []
.switch(Score)
.join(Hole, on=(Score.hole_id == Hole.hole_id))
.switch(Score)
.join(User, on=(Score.user_id == User.user_id))
.filter(Player.game_id == round.game_id)
.filter(Score.game_id == round.game_id)
)
return list(res)
def bulk_insert_scores(self, scores: typing.List[typing.Dict]): def bulk_insert_scores(self, scores: typing.List[typing.Dict]):
with db.atomic(): with db.atomic() as txn:
for batch in peewee.chunked(scores, 50): for batch in peewee.chunked(scores, 50):
Score.insert_many(batch).execute() Score.insert_many(batch).execute()
def bulk_update_scores(self, scores: typing.List[Score]): def bulk_update_scores(self, scores: typing.List[Score]):
with db.atomic(): query_str = """UPDATE score
for score in scores: SET score = {score}, tweet_id = {tweet_id}
score.save() WHERE user_id = {user_id} AND game_id = {game_id} AND hole_id = {hole_id}"""
for score in scores:
query = query_str.format(
score=score.score,
tweet_id=score.tweet_id or "NULL",
user_id=score.user_id.user_id,
game_id=score.game_id.game_id,
hole_id=score.hole_id.hole_id,
)
db.execute_sql(query)
def get_users_without_score(self, round_no, hole_no, tweetable=True): def get_users_without_score(self, round_no, hole_no, tweetable=True):
hole = self.get_or_create_hole(round_no, hole_no) hole = self.get_or_create_hole(round_no, hole_no)

View File

@@ -17,7 +17,15 @@ import wordlinator.utils
BASE_URL = "https://api.twitter.com/2" BASE_URL = "https://api.twitter.com/2"
WORDLE_RE = re.compile( WORDLE_RE = re.compile(
r"Wordle(\w+)? (?P<number>\d+) (?P<score>[X\d])/6", re.IGNORECASE r"""
Wordle # Line Leader
(\w+)? # etta factor (catches ...Golf, etc)
\s # single space
(?P<number>\d+) # game number
\s # single space
(?P<score>[X\d])/6 # score out of 6
""",
re.IGNORECASE | re.VERBOSE,
) )
TOKEN = os.getenv("TWITTER_TOKEN") TOKEN = os.getenv("TWITTER_TOKEN")
@@ -148,6 +156,7 @@ class TwitterClient(httpx.AsyncClient):
user_id = await self.get_user_twitter_id(username) user_id = await self.get_user_twitter_id(username)
if user_id: if user_id:
self.db.add_user(username, user_id) self.db.add_user(username, user_id)
self.db.add_user_to_round(username, self.wordle_day.golf_hole.game_no)
return user_id return user_id
def _start_timestamp(self): def _start_timestamp(self):

View File

@@ -3,9 +3,11 @@ import dataclasses
import datetime import datetime
import typing import typing
import wordlinator.db.pg
WORDLE_DAY_ZERO = datetime.date(2021, 6, 19) WORDLE_DAY_ZERO = datetime.date(2021, 6, 19)
WORDLE_GOLF_ROUND_DATES = [datetime.date(2022, 5, 9), datetime.date(2022, 5, 31)] WORDLE_GOLF_ROUNDS = wordlinator.db.pg.WordleDb().get_rounds()
def date_from_string(datestr: str): def date_from_string(datestr: str):
@@ -23,10 +25,10 @@ class GolfHole:
@classmethod @classmethod
def from_date(cls, date: datetime.date): def from_date(cls, date: datetime.date):
for game_no, start_date in enumerate(WORDLE_GOLF_ROUND_DATES, start=1): for round in WORDLE_GOLF_ROUNDS:
hole_value = (date - start_date).days + 1 if round.start_date <= date <= round.end_date:
if 1 <= hole_value <= 18: hole_no = (date - round.start_date).days + 1
return cls(game_no=game_no, hole_no=hole_value) return cls(game_no=round.game, hole_no=hole_no)
return None return None
@@ -55,13 +57,17 @@ class WordleDay:
# Designed so that "today" will be the current date in CST # Designed so that "today" will be the current date in CST
# Regardless of where the code is run # Regardless of where the code is run
def get_wordle_today(): def get_today_central():
today = ( today = (
datetime.datetime.now(datetime.timezone.utc) datetime.datetime.now(datetime.timezone.utc)
.astimezone(datetime.timezone(datetime.timedelta(hours=-5), name="US Central")) .astimezone(datetime.timezone(datetime.timedelta(hours=-5), name="US Central"))
.date() .date()
) )
return WordleDay.from_date(today) return today
def get_wordle_today():
return WordleDay.from_date(get_today_central())
WORDLE_TODAY = get_wordle_today() WORDLE_TODAY = get_wordle_today()

View File

@@ -1,4 +1,5 @@
import collections import collections
import itertools
import typing import typing
import wordlinator.db.pg import wordlinator.db.pg
@@ -76,6 +77,15 @@ class UserRow(ScoreRow):
def golf_score(self) -> int: def golf_score(self) -> int:
return self.total - (self.count * 4) return self.total - (self.count * 4)
@property
def progressive_score_list(self) -> typing.List[int]:
score_progress = list(
itertools.accumulate(
self.sorted_scores(), func=lambda t, e: t + (e.score - 4), initial=0
)
)[1:]
return score_progress
def sorted_scores(self): def sorted_scores(self):
yield from sorted(self._scores, key=lambda s: s.hole_id.hole) yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
@@ -144,9 +154,12 @@ class UserRow(ScoreRow):
) )
else: else:
saved_score = score_match[0] saved_score = score_match[0]
if saved_score.score != score or saved_score.tweet_id != tweet_id: if saved_score.score != score or (
tweet_id and saved_score.tweet_id != tweet_id
):
saved_score.score = score saved_score.score = score
saved_score.tweet_id = tweet_id if tweet_id:
saved_score.tweet_id = tweet_id
results["update"].append(saved_score) results["update"].append(saved_score)
return results return results
@@ -189,3 +202,19 @@ class ScoreMatrix(ScoreContainer):
def user_rows(self, wordle_day): def user_rows(self, wordle_day):
hole_no = wordle_day.golf_hole.hole_no hole_no = wordle_day.golf_hole.hole_no
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()] return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
def top_by_day(self):
user_dict = {u: r.progressive_score_list for u, r in self.by_user().items()}
days = max(map(len, user_dict.values()))
rankings = collections.defaultdict(list)
for day_idx in range(days):
day_scores = {
u: v[day_idx] for u, v in user_dict.items() if len(v) >= day_idx + 1
}
tops = [(u, v) for u, v in sorted(day_scores.items(), key=lambda t: t[1])][
:20
]
for (user, score) in tops:
rankings[user].append((day_idx + 1, score))
return rankings

View File

@@ -1,6 +1,9 @@
import collections
import functools import functools
import math
import os import os
import pathlib import pathlib
import re
import time import time
import dash import dash
@@ -16,8 +19,9 @@ import wordlinator.utils
import wordlinator.utils.scores import wordlinator.utils.scores
import wordlinator.utils.web import wordlinator.utils.web
TTL_TIME = 10 if os.getenv("DEBUG") else 600 TTL_TIME = 30 if os.getenv("DEBUG") else 90
LEADERBOARD_COUNT = 20 LEADERBOARD_COUNT = 20
VALUE_RE = re.compile(r"\[(?P<value>-?\d+)\]")
################### ###################
# Setup Functions # # Setup Functions #
@@ -25,7 +29,10 @@ LEADERBOARD_COUNT = 20
assets_dir = pathlib.Path(__file__).parent / "assets" assets_dir = pathlib.Path(__file__).parent / "assets"
app = dash.Dash( app = dash.Dash(
name="WordleGolf", title="#WordleGolf", assets_folder=str(assets_dir.resolve()) name="WordleGolf",
title="#WordleGolf",
assets_folder=str(assets_dir.resolve()),
suppress_callback_exceptions=True,
) )
@@ -39,7 +46,7 @@ long_callback_manager = dash.long_callback.DiskcacheLongCallbackManager(
) )
@functools.lru_cache() @functools.lru_cache(maxsize=1)
def _games_from_db(ttl_hash=None): def _games_from_db(ttl_hash=None):
return db.WordleDb().get_rounds() return db.WordleDb().get_rounds()
@@ -48,7 +55,7 @@ def games_from_db():
return _games_from_db() return _games_from_db()
@functools.lru_cache() @functools.lru_cache(maxsize=1)
def _wordle_today(ttl_hash=None): def _wordle_today(ttl_hash=None):
today = wordlinator.utils.get_wordle_today() today = wordlinator.utils.get_wordle_today()
if today.golf_hole: if today.golf_hole:
@@ -64,22 +71,27 @@ def wordle_today():
return _wordle_today(get_ttl_hash()) return _wordle_today(get_ttl_hash())
@functools.lru_cache() def round_wordle_day(round_id):
wt = wordle_today()
rounds = games_from_db()
matching_round = [r for r in rounds if r.game_id == round_id][0]
if matching_round.game == wt.golf_hole.game_no:
return wt
return wordlinator.utils.WordleDay.from_date(matching_round.end_date)
@functools.lru_cache(maxsize=3)
def _scores_from_db(round_id, ttl_hash=None): def _scores_from_db(round_id, ttl_hash=None):
return db.WordleDb().get_scores(round_id=round_id) wordle_db = db.WordleDb()
scores = wordle_db.get_scores(round_id=round_id)
users = wordle_db.get_users_by_round(round_id=round_id)
@functools.lru_cache() usernames = [u.username for u in users]
def _players_from_db(round_id, ttl_hash=None): return wordlinator.utils.scores.ScoreMatrix(scores, usernames=usernames)
return db.WordleDb().get_users_by_round(round_id=round_id)
def scores_from_db(round_id): def scores_from_db(round_id):
users = _players_from_db(round_id) return _scores_from_db(round_id, get_ttl_hash())
usernames = [u.username for u in users]
return wordlinator.utils.scores.ScoreMatrix(
_scores_from_db(round_id, get_ttl_hash()), usernames=usernames
)
####################### #######################
@@ -108,13 +120,20 @@ def get_leaderboard(round_id):
################# #################
def get_scores(round_id): def _get_scores(round_id):
score_matrix = scores_from_db(round_id) score_matrix = scores_from_db(round_id)
table_rows = score_matrix.user_rows(wordle_today()) round_day = round_wordle_day(round_id)
table_rows = score_matrix.user_rows(round_day)
return table_rows
def get_scores(round_id):
round_day = round_wordle_day(round_id)
table_rows = _get_scores(round_id)
hole_columns = [ hole_columns = [
{"name": f"{i}", "id": f"{i}", "type": "text", "presentation": "markdown"} {"name": f"{i}", "id": f"{i}", "type": "text", "presentation": "markdown"}
for i in range(1, wordle_today().golf_hole.hole_no + 1) for i in range(1, round_day.golf_hole.hole_no + 1)
] ]
columns = [ columns = [
{"name": "Name", "id": "Name", "type": "text"}, {"name": "Name", "id": "Name", "type": "text"},
@@ -143,6 +162,7 @@ def get_scores(round_id):
return dash.dash_table.DataTable( return dash.dash_table.DataTable(
table_rows, table_rows,
columns, columns,
id="user-scores-table",
style_table={ style_table={
"width": "80%", "width": "80%",
"margin": "auto", "margin": "auto",
@@ -156,7 +176,9 @@ def get_scores(round_id):
style_data={"width": "10%"}, style_data={"width": "10%"},
style_as_list_view=True, style_as_list_view=True,
style_data_conditional=formatting, style_data_conditional=formatting,
sort_action="native", sort_action="custom",
sort_mode="single",
sort_by=[{"column_id": "Name", "direction": "asc"}],
) )
@@ -195,7 +217,10 @@ def get_daily_stats(round_id):
{"name": n, "id": n} {"name": n, "id": n}
for n in ( for n in (
"Score", "Score",
*[f"{i}" for i in range(1, wordle_today().golf_hole.hole_no + 1)], *[
f"{i}"
for i in range(1, round_wordle_day(round_id).golf_hole.hole_no + 1)
],
) )
] ]
return dash.dash_table.DataTable( return dash.dash_table.DataTable(
@@ -256,6 +281,41 @@ def get_line_graph(round_id):
return dash.dcc.Graph(figure=figure) return dash.dcc.Graph(figure=figure)
#####################
# Line Race Helpers #
#####################
def line_race_graph(round_id):
score_matrix = scores_from_db(round_id)
tops_by_day = score_matrix.top_by_day()
round_day = round_wordle_day(round_id)
hole_no = round_day.golf_hole.hole_no
figure = plotly.graph_objs.Figure()
figure.update_yaxes(autorange="reversed")
figure.update_xaxes(tickmode="linear", tick0=1, dtick=1)
annotation_names = collections.defaultdict(list)
for name, entries in tops_by_day.items():
figure.add_trace(
plotly.graph_objs.Scatter(
name=name,
mode="lines+markers",
x=[e[0] for e in entries],
y=[e[1] for e in entries],
)
)
if entries[-1][0] == hole_no:
annotation_names[entries[-1]].append(name)
annotations = [
{"x": k[0], "y": k[1], "text": ", ".join(v)}
for k, v in annotation_names.items()
]
figure.update_layout(annotations=annotations)
return dash.dcc.Graph(figure=figure)
############# #############
# App Setup # # App Setup #
############# #############
@@ -270,83 +330,114 @@ app.layout = dash.html.Div(
id="round-selector", id="round-selector",
style={"maxWidth": "300px"}, style={"maxWidth": "300px"},
), ),
dash.html.Div( dash.dcc.Tabs(
[ id="main-tabs",
dash.html.H2( value="leaderboard",
f"Leaderboard - Top {LEADERBOARD_COUNT}", children=[
style={"textAlign": "center"}, dash.dcc.Tab(label="Leaderboard", value="leaderboard"),
), dash.dcc.Tab(label="Statistics", value="statistics"),
dash.html.Div("Loading...", id="leaderboard"), dash.dcc.Tab(label="User Scores", value="user-scores"),
] ],
),
dash.html.Div(
[
dash.html.H2("User Scores", style={"textAlign": "center"}),
dash.html.Div("Loading...", id="user-scores"),
]
),
dash.html.Div(
[
dash.html.H2("Score Graph", style={"textAlign": "center"}),
dash.html.Div("Loading...", id="stats-graph"),
]
),
dash.html.Div(
[
dash.html.H2("Daily Stats", style={"textAlign": "center"}),
dash.html.Div("Loading...", id="daily-stats"),
]
), ),
dash.dcc.Loading(dash.html.Div(id="tab-content"), id="tab-content-loading"),
] ]
) )
@app.long_callback( @app.callback(
output=dash.dependencies.Output("leaderboard", "children"), dash.dependencies.Output("tab-content", "children"),
inputs=[ [
dash.dependencies.Input("title", "children"), dash.dependencies.Input("main-tabs", "value"),
dash.dependencies.Input("round-selector-dropdown", "value"), dash.dependencies.Input("round-selector-dropdown", "value"),
], ],
manager=long_callback_manager,
) )
def get_leaderboard_table(_, round_id): def render_tab(tab, round_id):
return get_leaderboard(round_id) if tab == "leaderboard":
return [
dash.html.Div(
[
dash.html.H2(
f"Leaderboard - Top {LEADERBOARD_COUNT}",
style={"textAlign": "center"},
),
dash.dcc.Loading(
id="leaderboard-race-loading",
children=dash.html.Div(
line_race_graph(round_id), id="leaderboard-race"
),
),
dash.dcc.Loading(
id="leaderboard-loading",
children=dash.html.Div(
get_leaderboard(round_id), id="leaderboard"
),
),
]
),
]
elif tab == "user-scores":
return [
dash.html.Div(
[
dash.html.H2("User Scores", style={"textAlign": "center"}),
dash.dcc.Loading(
id="user-scores-loading",
children=dash.html.Div(get_scores(round_id), id="user-scores"),
),
]
),
]
elif tab == "statistics":
return [
dash.html.Div(
[
dash.html.H2("Score Graph", style={"textAlign": "center"}),
dash.dcc.Loading(
id="stats-graph-loading",
children=dash.html.Div(
get_line_graph(round_id), id="stats-graph"
),
),
]
),
dash.html.Div(
[
dash.html.H2("Daily Stats", style={"textAlign": "center"}),
dash.dcc.Loading(
id="daily-stats-loading",
children=dash.html.Div(
get_daily_stats(round_id), id="daily-stats"
),
),
]
),
]
@app.long_callback( @app.callback(
output=dash.dependencies.Output("user-scores", "children"), dash.dependencies.Output("user-scores-table", "data"),
inputs=[ dash.dependencies.Input("user-scores-table", "sort_by"),
dash.dependencies.Input("title", "children"), dash.dependencies.State("user-scores-table", "data"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
) )
def get_scores_chart(_, round_id): def sort_scores(sort_by, data):
return get_scores(round_id) if not sort_by:
return data
sort_by = sort_by[0]
def _sort_val(entry):
col_id = sort_by["column_id"]
raw_val = entry.get(col_id)
if raw_val is None or raw_val == "":
return math.inf
if col_id == "Name":
return raw_val
if isinstance(raw_val, int) or raw_val.isdigit():
return int(raw_val)
match = VALUE_RE.match(raw_val).groupdict()["value"]
return int(match)
@app.long_callback( data = sorted(data, key=_sort_val, reverse=sort_by["direction"] == "desc")
output=dash.dependencies.Output("daily-stats", "children"), return data
inputs=[
dash.dependencies.Input("title", "children"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
)
def get_stats_chart(_, round_id):
return get_daily_stats(round_id)
@app.long_callback(
output=dash.dependencies.Output("stats-graph", "children"),
inputs=[
dash.dependencies.Input("title", "children"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
)
def get_stats_graph(_, round_id):
return get_line_graph(round_id)
server = app.server server = app.server