Compare commits

...

14 Commits

7 changed files with 296 additions and 125 deletions

View File

@@ -39,6 +39,7 @@ db-load = "wordlinator.app:load_db_scores"
add-user = "wordlinator.app:sync_add_user"
create-round = "wordlinator.app:create_round"
copy-users = "wordlinator.app:copy_users"
gs-user-sync = "wordlinator.app:sync_gsheet_users"
[tool.mypy]
ignore_missing_imports = true

View File

@@ -270,9 +270,40 @@ def copy_users():
wordlinator.db.pg.WordleDb().copy_players_from_round(args.from_round, args.to_round)
async def pull_gsheets_users(round_no):
db = wordlinator.db.pg.WordleDb()
db_users = db.get_users_by_round(round_no=round_no)
round = db.get_or_create_round(round_no)
wordle_day = wordlinator.utils.WordleDay.from_date(round.end_date)
sheets = wordlinator.sheets.SheetsClient(wordle_day=wordle_day)
sheets_users = sheets.get_users()
for user in sheets_users:
db_match = [u for u in db_users if u.username == user]
if not db_match:
rich.print(f"[yellow]Adding {user} to Round {round_no}")
await add_user(user, games=[round_no])
for user in db_users:
if user.username not in sheets_users:
rich.print(f"[yellow]Removing {user.username} from Round {round_no}")
db.remove_user_from_round(user.username, round_no)
def sync_gsheet_users():
parser = argparse.ArgumentParser()
parser.add_argument("round_no", help="The round to sync.")
args = parser.parse_args()
asyncio.run(pull_gsheets_users(args.round_no))
def sync_add_user():
args = _add_user_args()
asyncio.run(add_user(args))
asyncio.run(
add_user(args.username, args.games, args.unenroll_games, args.check_twitter)
)
def sync_main():

View File

@@ -4,8 +4,6 @@ import typing
import peewee
import wordlinator.utils
db = peewee.PostgresqlDatabase(
os.getenv("DB_NAME", "wordlegolf"),
user=os.getenv("DB_USER", "wordlegolf"),
@@ -117,10 +115,11 @@ class WordleDb:
try:
return Game.get(Game.game == round_no)
except peewee.DoesNotExist:
start_date = (
start_date
or wordlinator.utils.WORDLE_GOLF_ROUND_DATES[round_no - 1]
)
if not start_date:
raise ValueError(
f"Round {round_no} does not exist, "
"and no start_date provide to create it"
)
return Game.create(game=round_no, start_date=start_date)
def get_or_create_hole(self, round_no, hole_no):
@@ -217,12 +216,16 @@ class WordleDb:
Score.select(
Score,
Hole.hole,
Hole.hole_id,
Game.game_id,
User.username,
User.user_id,
Player.game_id,
)
.join(Player, on=(Score.user_id == Player.user_id))
.switch(Score)
.join(Hole, on=(Score.hole_id == Hole.hole_id))
.join(Game, on=(Hole.game_id == Game.game_id))
.switch(Score)
.join(User, on=(Score.user_id == User.user_id))
.filter(Player.game_id == round.game_id)
@@ -233,33 +236,41 @@ class WordleDb:
def bulk_insert_scores(self, scores: typing.List[typing.Dict]):
with db.atomic() as txn:
for batch in peewee.chunked(scores, 50):
Score.insert_many(batch).execute(txn)
Score.insert_many(batch).execute()
def bulk_update_scores(self, scores: typing.List[Score]):
with db.atomic():
for score in scores:
score.save()
query_str = """UPDATE score
SET score = {score}, tweet_id = {tweet_id}
WHERE user_id = {user_id} AND game_id = {game_id} AND hole_id = {hole_id}"""
for score in scores:
query = query_str.format(
score=score.score,
tweet_id=score.tweet_id or "NULL",
user_id=score.user_id.user_id,
game_id=score.game_id.game_id,
hole_id=score.hole_id.hole_id,
)
db.execute_sql(query)
def get_users_without_score(self, round_no, hole_no, tweetable=True):
with db.atomic() as txn:
hole = self.get_or_create_hole(round_no, hole_no)
# Find users who *have* played in this round,
# but have no score on the current hole
query_str = """SELECT u.username, player.game_id
FROM user_tbl u
JOIN player ON player.user_id = u.user_id
WHERE (
player.game_id = {}
) AND NOT EXISTS (
SELECT FROM score WHERE score.user_id = u.user_id AND score.hole_id = {}
)
""".format(
hole.game_id, hole.hole_id
)
hole = self.get_or_create_hole(round_no, hole_no)
# Find users who *have* played in this round,
# but have no score on the current hole
query_str = """SELECT u.username, player.game_id
FROM user_tbl u
JOIN player ON player.user_id = u.user_id
WHERE (
player.game_id = {}
) AND NOT EXISTS (
SELECT FROM score WHERE score.user_id = u.user_id AND score.hole_id = {}
)
""".format(
hole.game_id, hole.hole_id
)
if tweetable:
# Restrict to users who post scores on twitter
query_str += " AND u.check_twitter = true"
if tweetable:
# Restrict to users who post scores on twitter
query_str += " AND u.check_twitter = true"
res = txn.execute_sql(query_str)
return [r[0] for r in res]
res = db.execute_sql(query_str)
return [r[0] for r in res]

View File

@@ -17,7 +17,15 @@ import wordlinator.utils
BASE_URL = "https://api.twitter.com/2"
WORDLE_RE = re.compile(
r"Wordle(\w+)? (?P<number>\d+) (?P<score>[X\d])/6", re.IGNORECASE
r"""
Wordle # Line Leader
(\w+)? # etta factor (catches ...Golf, etc)
\s # single space
(?P<number>\d+) # game number
\s # single space
(?P<score>[X\d])/6 # score out of 6
""",
re.IGNORECASE | re.VERBOSE,
)
TOKEN = os.getenv("TWITTER_TOKEN")
@@ -148,6 +156,7 @@ class TwitterClient(httpx.AsyncClient):
user_id = await self.get_user_twitter_id(username)
if user_id:
self.db.add_user(username, user_id)
self.db.add_user_to_round(username, self.wordle_day.golf_hole.game_no)
return user_id
def _start_timestamp(self):

View File

@@ -3,9 +3,11 @@ import dataclasses
import datetime
import typing
import wordlinator.db.pg
WORDLE_DAY_ZERO = datetime.date(2021, 6, 19)
WORDLE_GOLF_ROUND_DATES = [datetime.date(2022, 5, 9), datetime.date(2022, 5, 31)]
WORDLE_GOLF_ROUNDS = wordlinator.db.pg.WordleDb().get_rounds()
def date_from_string(datestr: str):
@@ -23,10 +25,10 @@ class GolfHole:
@classmethod
def from_date(cls, date: datetime.date):
for game_no, start_date in enumerate(WORDLE_GOLF_ROUND_DATES, start=1):
hole_value = (date - start_date).days + 1
if 1 <= hole_value <= 18:
return cls(game_no=game_no, hole_no=hole_value)
for round in WORDLE_GOLF_ROUNDS:
if round.start_date <= date <= round.end_date:
hole_no = (date - round.start_date).days + 1
return cls(game_no=round.game, hole_no=hole_no)
return None
@@ -55,13 +57,17 @@ class WordleDay:
# Designed so that "today" will be the current date in CST
# Regardless of where the code is run
def get_wordle_today():
def get_today_central():
today = (
datetime.datetime.now(datetime.timezone.utc)
.astimezone(datetime.timezone(datetime.timedelta(hours=-5), name="US Central"))
.date()
)
return WordleDay.from_date(today)
return today
def get_wordle_today():
return WordleDay.from_date(get_today_central())
WORDLE_TODAY = get_wordle_today()

View File

@@ -1,4 +1,5 @@
import collections
import itertools
import typing
import wordlinator.db.pg
@@ -76,6 +77,15 @@ class UserRow(ScoreRow):
def golf_score(self) -> int:
return self.total - (self.count * 4)
@property
def progressive_score_list(self) -> typing.List[int]:
score_progress = list(
itertools.accumulate(
self.sorted_scores(), func=lambda t, e: t + (e.score - 4), initial=0
)
)[1:]
return score_progress
def sorted_scores(self):
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
@@ -144,9 +154,12 @@ class UserRow(ScoreRow):
)
else:
saved_score = score_match[0]
if saved_score.score != score or saved_score.tweet_id != tweet_id:
if saved_score.score != score or (
tweet_id and saved_score.tweet_id != tweet_id
):
saved_score.score = score
saved_score.tweet_id = tweet_id
if tweet_id:
saved_score.tweet_id = tweet_id
results["update"].append(saved_score)
return results
@@ -189,3 +202,19 @@ class ScoreMatrix(ScoreContainer):
def user_rows(self, wordle_day):
hole_no = wordle_day.golf_hole.hole_no
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
def top_by_day(self):
user_dict = {u: r.progressive_score_list for u, r in self.by_user().items()}
days = max(map(len, user_dict.values()))
rankings = collections.defaultdict(list)
for day_idx in range(days):
day_scores = {
u: v[day_idx] for u, v in user_dict.items() if len(v) >= day_idx + 1
}
tops = [(u, v) for u, v in sorted(day_scores.items(), key=lambda t: t[1])][
:20
]
for (user, score) in tops:
rankings[user].append((day_idx + 1, score))
return rankings

View File

@@ -1,6 +1,9 @@
import collections
import functools
import math
import os
import pathlib
import re
import time
import dash
@@ -16,8 +19,9 @@ import wordlinator.utils
import wordlinator.utils.scores
import wordlinator.utils.web
TTL_TIME = 30 if os.getenv("DEBUG") else 600
TTL_TIME = 30 if os.getenv("DEBUG") else 90
LEADERBOARD_COUNT = 20
VALUE_RE = re.compile(r"\[(?P<value>-?\d+)\]")
###################
# Setup Functions #
@@ -25,7 +29,10 @@ LEADERBOARD_COUNT = 20
assets_dir = pathlib.Path(__file__).parent / "assets"
app = dash.Dash(
name="WordleGolf", title="#WordleGolf", assets_folder=str(assets_dir.resolve())
name="WordleGolf",
title="#WordleGolf",
assets_folder=str(assets_dir.resolve()),
suppress_callback_exceptions=True,
)
@@ -64,6 +71,16 @@ def wordle_today():
return _wordle_today(get_ttl_hash())
def round_wordle_day(round_id):
wt = wordle_today()
rounds = games_from_db()
matching_round = [r for r in rounds if r.game_id == round_id][0]
if matching_round.game == wt.golf_hole.game_no:
return wt
return wordlinator.utils.WordleDay.from_date(matching_round.end_date)
@functools.lru_cache(maxsize=3)
def _scores_from_db(round_id, ttl_hash=None):
wordle_db = db.WordleDb()
@@ -74,7 +91,7 @@ def _scores_from_db(round_id, ttl_hash=None):
def scores_from_db(round_id):
return _scores_from_db(round_id)
return _scores_from_db(round_id, get_ttl_hash())
#######################
@@ -103,13 +120,20 @@ def get_leaderboard(round_id):
#################
def get_scores(round_id):
def _get_scores(round_id):
score_matrix = scores_from_db(round_id)
table_rows = score_matrix.user_rows(wordle_today())
round_day = round_wordle_day(round_id)
table_rows = score_matrix.user_rows(round_day)
return table_rows
def get_scores(round_id):
round_day = round_wordle_day(round_id)
table_rows = _get_scores(round_id)
hole_columns = [
{"name": f"{i}", "id": f"{i}", "type": "text", "presentation": "markdown"}
for i in range(1, wordle_today().golf_hole.hole_no + 1)
for i in range(1, round_day.golf_hole.hole_no + 1)
]
columns = [
{"name": "Name", "id": "Name", "type": "text"},
@@ -138,6 +162,7 @@ def get_scores(round_id):
return dash.dash_table.DataTable(
table_rows,
columns,
id="user-scores-table",
style_table={
"width": "80%",
"margin": "auto",
@@ -151,7 +176,9 @@ def get_scores(round_id):
style_data={"width": "10%"},
style_as_list_view=True,
style_data_conditional=formatting,
sort_action="native",
sort_action="custom",
sort_mode="single",
sort_by=[{"column_id": "Name", "direction": "asc"}],
)
@@ -190,7 +217,10 @@ def get_daily_stats(round_id):
{"name": n, "id": n}
for n in (
"Score",
*[f"{i}" for i in range(1, wordle_today().golf_hole.hole_no + 1)],
*[
f"{i}"
for i in range(1, round_wordle_day(round_id).golf_hole.hole_no + 1)
],
)
]
return dash.dash_table.DataTable(
@@ -251,6 +281,41 @@ def get_line_graph(round_id):
return dash.dcc.Graph(figure=figure)
#####################
# Line Race Helpers #
#####################
def line_race_graph(round_id):
score_matrix = scores_from_db(round_id)
tops_by_day = score_matrix.top_by_day()
round_day = round_wordle_day(round_id)
hole_no = round_day.golf_hole.hole_no
figure = plotly.graph_objs.Figure()
figure.update_yaxes(autorange="reversed")
figure.update_xaxes(tickmode="linear", tick0=1, dtick=1)
annotation_names = collections.defaultdict(list)
for name, entries in tops_by_day.items():
figure.add_trace(
plotly.graph_objs.Scatter(
name=name,
mode="lines+markers",
x=[e[0] for e in entries],
y=[e[1] for e in entries],
)
)
if entries[-1][0] == hole_no:
annotation_names[entries[-1]].append(name)
annotations = [
{"x": k[0], "y": k[1], "text": ", ".join(v)}
for k, v in annotation_names.items()
]
figure.update_layout(annotations=annotations)
return dash.dcc.Graph(figure=figure)
#############
# App Setup #
#############
@@ -265,95 +330,114 @@ app.layout = dash.html.Div(
id="round-selector",
style={"maxWidth": "300px"},
),
dash.html.Div(
[
dash.html.H2(
f"Leaderboard - Top {LEADERBOARD_COUNT}",
style={"textAlign": "center"},
),
dash.dcc.Loading(
id="leaderboard-loading",
children=dash.html.Div("Loading...", id="leaderboard"),
),
]
),
dash.html.Div(
[
dash.html.H2("User Scores", style={"textAlign": "center"}),
dash.dcc.Loading(
id="user-scores-loading",
children=dash.html.Div("Loading...", id="user-scores"),
),
]
),
dash.html.Div(
[
dash.html.H2("Score Graph", style={"textAlign": "center"}),
dash.dcc.Loading(
id="stats-graph-loading",
children=dash.html.Div("Loading...", id="stats-graph"),
),
]
),
dash.html.Div(
[
dash.html.H2("Daily Stats", style={"textAlign": "center"}),
dash.dcc.Loading(
id="daily-stats-loading",
children=dash.html.Div("Loading...", id="daily-stats"),
),
]
dash.dcc.Tabs(
id="main-tabs",
value="leaderboard",
children=[
dash.dcc.Tab(label="Leaderboard", value="leaderboard"),
dash.dcc.Tab(label="Statistics", value="statistics"),
dash.dcc.Tab(label="User Scores", value="user-scores"),
],
),
dash.dcc.Loading(dash.html.Div(id="tab-content"), id="tab-content-loading"),
]
)
@app.long_callback(
output=dash.dependencies.Output("leaderboard", "children"),
inputs=[
dash.dependencies.Input("title", "children"),
@app.callback(
dash.dependencies.Output("tab-content", "children"),
[
dash.dependencies.Input("main-tabs", "value"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
)
def get_leaderboard_table(_, round_id):
return get_leaderboard(round_id)
def render_tab(tab, round_id):
if tab == "leaderboard":
return [
dash.html.Div(
[
dash.html.H2(
f"Leaderboard - Top {LEADERBOARD_COUNT}",
style={"textAlign": "center"},
),
dash.dcc.Loading(
id="leaderboard-race-loading",
children=dash.html.Div(
line_race_graph(round_id), id="leaderboard-race"
),
),
dash.dcc.Loading(
id="leaderboard-loading",
children=dash.html.Div(
get_leaderboard(round_id), id="leaderboard"
),
),
]
),
]
elif tab == "user-scores":
return [
dash.html.Div(
[
dash.html.H2("User Scores", style={"textAlign": "center"}),
dash.dcc.Loading(
id="user-scores-loading",
children=dash.html.Div(get_scores(round_id), id="user-scores"),
),
]
),
]
elif tab == "statistics":
return [
dash.html.Div(
[
dash.html.H2("Score Graph", style={"textAlign": "center"}),
dash.dcc.Loading(
id="stats-graph-loading",
children=dash.html.Div(
get_line_graph(round_id), id="stats-graph"
),
),
]
),
dash.html.Div(
[
dash.html.H2("Daily Stats", style={"textAlign": "center"}),
dash.dcc.Loading(
id="daily-stats-loading",
children=dash.html.Div(
get_daily_stats(round_id), id="daily-stats"
),
),
]
),
]
@app.long_callback(
output=dash.dependencies.Output("user-scores", "children"),
inputs=[
dash.dependencies.Input("title", "children"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
@app.callback(
dash.dependencies.Output("user-scores-table", "data"),
dash.dependencies.Input("user-scores-table", "sort_by"),
dash.dependencies.State("user-scores-table", "data"),
)
def get_scores_chart(_, round_id):
return get_scores(round_id)
def sort_scores(sort_by, data):
if not sort_by:
return data
sort_by = sort_by[0]
def _sort_val(entry):
col_id = sort_by["column_id"]
raw_val = entry.get(col_id)
if raw_val is None or raw_val == "":
return math.inf
if col_id == "Name":
return raw_val
if isinstance(raw_val, int) or raw_val.isdigit():
return int(raw_val)
match = VALUE_RE.match(raw_val).groupdict()["value"]
return int(match)
@app.long_callback(
output=dash.dependencies.Output("daily-stats", "children"),
inputs=[
dash.dependencies.Input("title", "children"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
)
def get_stats_chart(_, round_id):
return get_daily_stats(round_id)
@app.long_callback(
output=dash.dependencies.Output("stats-graph", "children"),
inputs=[
dash.dependencies.Input("title", "children"),
dash.dependencies.Input("round-selector-dropdown", "value"),
],
manager=long_callback_manager,
)
def get_stats_graph(_, round_id):
return get_line_graph(round_id)
data = sorted(data, key=_sort_val, reverse=sort_by["direction"] == "desc")
return data
server = app.server