Compare commits
4 Commits
e94d0a7714
...
0dd78f1507
| Author | SHA1 | Date | |
|---|---|---|---|
| 0dd78f1507 | |||
| c689fcb4e7 | |||
| 2f7b0af20c | |||
| f5bf263635 |
@@ -36,6 +36,9 @@ show-user = "wordlinator.app:sync_show_user"
|
||||
show-missing = "wordlinator.app:sync_show_missing"
|
||||
tweet-missing = "wordlinator.app:sync_tweet_missing"
|
||||
db-load = "wordlinator.app:load_db_scores"
|
||||
add-user = "wordlinator.app:sync_add_user"
|
||||
create-round = "wordlinator.app:create_round"
|
||||
copy-users = "wordlinator.app:copy_users"
|
||||
|
||||
[tool.mypy]
|
||||
ignore_missing_imports = true
|
||||
|
||||
@@ -84,7 +84,9 @@ def _save_db_scores(
|
||||
db_holes = db.get_holes(game_no, ensure_all=True)
|
||||
db_scores = db.get_scores(game_no)
|
||||
|
||||
db_scores_by_user = wordlinator.utils.scores.ScoreMatrix(db_scores).by_user()
|
||||
db_scores_by_user = wordlinator.utils.scores.ScoreMatrix(db_scores).by_user(
|
||||
usernames=list(scores.keys())
|
||||
)
|
||||
|
||||
to_update = []
|
||||
to_create = []
|
||||
@@ -95,6 +97,7 @@ def _save_db_scores(
|
||||
continue
|
||||
db_user_match = [u for u in db_users if u.username == user]
|
||||
if not db_user_match:
|
||||
rich.print(f"[yellow]User {user} not in database, cannot add scores.")
|
||||
continue
|
||||
db_user = db_user_match[0]
|
||||
twitter_score = twitter_scores.get(user, None)
|
||||
@@ -169,6 +172,11 @@ def _get_day():
|
||||
days.add_argument(
|
||||
"--wordle-day", type=int, help="The wordle day number for the score report."
|
||||
)
|
||||
days.add_argument(
|
||||
"--date",
|
||||
type=wordlinator.utils.date_from_string,
|
||||
help="a YYYY-MM-DD format date to pull a score report.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
wordle_day = wordlinator.utils.WORDLE_TODAY
|
||||
if args.wordle_day:
|
||||
@@ -177,6 +185,8 @@ def _get_day():
|
||||
wordle_day = wordlinator.utils.WordleDay.from_wordle_no(
|
||||
wordle_day.wordle_no - args.days_ago
|
||||
)
|
||||
elif args.date:
|
||||
wordle_day = wordlinator.utils.WordleDay.from_date(args.date)
|
||||
return wordle_day
|
||||
|
||||
|
||||
@@ -187,6 +197,84 @@ def load_db_scores():
|
||||
_save_db_scores(wordle_day, scores)
|
||||
|
||||
|
||||
def _add_user_args():
|
||||
parser = argparse.ArgumentParser("add-user")
|
||||
parser.add_argument("username", help="The user twitter handle or name.")
|
||||
parser.add_argument(
|
||||
"--no-check-twitter",
|
||||
dest="check_twitter",
|
||||
action="store_false",
|
||||
default=True,
|
||||
help="don't check Twitter for user's scores",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-g", "--games", nargs="*", help="The game/round number(s) to enroll the user."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--unenroll-games",
|
||||
nargs="*",
|
||||
help="Game/round number(s) to unenroll the user.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
async def add_user(username, games=None, unenroll_games=None, check_twitter=True):
|
||||
db = wordlinator.db.pg.WordleDb()
|
||||
|
||||
user = db.get_user(username)
|
||||
if not user:
|
||||
rich.print(f"[green]Creating user {username}")
|
||||
user_id = None
|
||||
if check_twitter:
|
||||
twitter = wordlinator.twitter.TwitterClient()
|
||||
user_id = await twitter.get_user_twitter_id(username)
|
||||
if not user_id:
|
||||
check_twitter = False
|
||||
rich.print(
|
||||
f"[yellow]No twitter ID found for {username}, "
|
||||
"disabling twitter check"
|
||||
)
|
||||
user_id = user_id or f"{username}-NA"
|
||||
user = db.add_user(username, user_id, check_twitter=check_twitter)
|
||||
for round in games or []:
|
||||
rich.print(f"[green]Adding {username} to round {round}")
|
||||
db.add_user_to_round(username, round)
|
||||
for round in unenroll_games or []:
|
||||
rich.print(f"[green]Removing {username} from round {round}")
|
||||
db.remove_user_from_round(username, round)
|
||||
|
||||
|
||||
def create_round():
|
||||
parser = argparse.ArgumentParser("create-round")
|
||||
parser.add_argument("round_no", type=int, help="The round number to create.")
|
||||
parser.add_argument(
|
||||
"start_date",
|
||||
type=wordlinator.utils.date_from_string,
|
||||
help="the YYYY-mm-DD format date the round starts.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
db = wordlinator.db.pg.WordleDb()
|
||||
db.get_or_create_round(args.round_no, args.start_date)
|
||||
db.create_round_holes(args.round_no)
|
||||
|
||||
|
||||
def copy_users():
|
||||
parser = argparse.ArgumentParser("copy-users")
|
||||
parser.add_argument("from_round", type=int, help="The source round number.")
|
||||
parser.add_argument("to_round", type=int, help="The destination round.")
|
||||
args = parser.parse_args()
|
||||
|
||||
wordlinator.db.pg.WordleDb().copy_players_from_round(args.from_round, args.to_round)
|
||||
|
||||
|
||||
def sync_add_user():
|
||||
args = _add_user_args()
|
||||
asyncio.run(add_user(args))
|
||||
|
||||
|
||||
def sync_main():
|
||||
wordle_day = _get_day()
|
||||
asyncio.run(main(wordle_day=wordle_day))
|
||||
|
||||
@@ -81,12 +81,23 @@ class WordleDb:
|
||||
def get_users(self):
|
||||
return list(User.select())
|
||||
|
||||
def get_users_by_round(self, round_no):
|
||||
res = (
|
||||
User.select(User, Player.user_id, Game.game)
|
||||
.join(Player, on=(Player.user_id == User.user_id))
|
||||
.join(Game, on=(Game.game_id == Player.game_id))
|
||||
.filter(Game.game == round_no)
|
||||
)
|
||||
return list(res)
|
||||
|
||||
def get_user_id(self, username):
|
||||
user = self.get_user(username)
|
||||
return user.twitter_id if user else None
|
||||
|
||||
def add_user(self, username, user_id):
|
||||
return User.create(username=username, twitter_id=user_id)
|
||||
def add_user(self, username, user_id, check_twitter=True):
|
||||
return User.create(
|
||||
username=username, twitter_id=user_id, check_twitter=check_twitter
|
||||
)
|
||||
|
||||
def get_rounds(self):
|
||||
return list(sorted(Game.select(), key=lambda d: d.start_date))
|
||||
@@ -118,6 +129,36 @@ class WordleDb:
|
||||
for hole_no in range(1, 19):
|
||||
self.get_or_create_hole(round_no, hole_no)
|
||||
|
||||
def get_or_create_player_round(self, user_id, game_id):
|
||||
try:
|
||||
return Player.get(user_id=user_id, game_id=game_id)
|
||||
except peewee.DoesNotExist:
|
||||
return Player.create(user_id=user_id, game_id=game_id)
|
||||
|
||||
def add_user_to_round(self, username, round_no):
|
||||
user = self.get_user(username)
|
||||
if not user:
|
||||
raise ValueError(f"No user found with username {username}")
|
||||
round = self.get_or_create_round(round_no)
|
||||
return self.get_or_create_player_round(user.user_id, round.game_id)
|
||||
|
||||
def remove_user_from_round(self, username, round_no):
|
||||
user = self.get_user(username)
|
||||
if not user:
|
||||
raise ValueError(f"No user found with username {username}")
|
||||
round = self.get_or_create_round(round_no)
|
||||
try:
|
||||
player = Player.get(user_id=user.user_id, game_id=round.game_id)
|
||||
player.delete_instance()
|
||||
except peewee.DoesNotExist:
|
||||
return
|
||||
|
||||
def copy_players_from_round(self, from_round_no, to_round_no):
|
||||
to_round = self.get_or_create_round(to_round_no)
|
||||
|
||||
for user in self.get_users_by_round(from_round_no):
|
||||
self.get_or_create_player_round(user.user_id, to_round.game_id)
|
||||
|
||||
def add_score(self, username, game, hole, score):
|
||||
if not score:
|
||||
return
|
||||
|
||||
@@ -133,15 +133,19 @@ class TwitterClient(httpx.AsyncClient):
|
||||
async def get_user_by(self, username: str):
|
||||
return await self.get(self.USER_PATH.format(username=username))
|
||||
|
||||
async def get_user_twitter_id(self, username: str):
|
||||
user_id = None
|
||||
twitter_user = await self.get_user_by(username)
|
||||
if twitter_user.is_success:
|
||||
user_id = twitter_user.json().get("data", {}).get("id", None)
|
||||
return user_id
|
||||
|
||||
async def get_user_id(self, username: str):
|
||||
db_user = self.db.get_user(username)
|
||||
if db_user:
|
||||
return db_user.twitter_id if db_user.check_twitter else False
|
||||
else:
|
||||
twitter_user = await self.get_user_by(username)
|
||||
user_id = None
|
||||
if twitter_user.is_success:
|
||||
user_id = twitter_user.json().get("data", {}).get("id", None)
|
||||
user_id = await self.get_user_twitter_id(username)
|
||||
if user_id:
|
||||
self.db.add_user(username, user_id)
|
||||
return user_id
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import argparse
|
||||
import dataclasses
|
||||
import datetime
|
||||
import typing
|
||||
@@ -7,6 +8,14 @@ WORDLE_DAY_ZERO = datetime.date(2021, 6, 19)
|
||||
WORDLE_GOLF_ROUND_DATES = [datetime.date(2022, 5, 9), datetime.date(2022, 5, 31)]
|
||||
|
||||
|
||||
def date_from_string(datestr: str):
|
||||
try:
|
||||
return datetime.date.fromisoformat(datestr)
|
||||
except ValueError:
|
||||
msg = "Invalid date string, expected format: YYYY-mm-DD"
|
||||
raise argparse.ArgumentTypeError(msg)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class GolfHole:
|
||||
game_no: int
|
||||
|
||||
@@ -153,8 +153,12 @@ class UserRow(ScoreRow):
|
||||
|
||||
|
||||
class ScoreMatrix(ScoreContainer):
|
||||
def by_user(self):
|
||||
return self.dict_by("user_id.username", UserRow)
|
||||
def by_user(self, usernames: typing.List[str] = []):
|
||||
res = self.dict_by("user_id.username", UserRow)
|
||||
for username in usernames:
|
||||
if username not in res:
|
||||
res[username] = UserRow([], username)
|
||||
return res
|
||||
|
||||
def for_user(self, username):
|
||||
user_scores = [s for s in self._scores if s.user_id.username == username]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import datetime
|
||||
import functools
|
||||
import os
|
||||
import pathlib
|
||||
import time
|
||||
|
||||
@@ -16,6 +16,9 @@ import wordlinator.utils
|
||||
import wordlinator.utils.scores
|
||||
import wordlinator.utils.web
|
||||
|
||||
TTL_TIME = 10 if os.getenv("DEBUG") else 600
|
||||
LEADERBOARD_COUNT = 20
|
||||
|
||||
###################
|
||||
# Setup Functions #
|
||||
###################
|
||||
@@ -26,7 +29,7 @@ app = dash.Dash(
|
||||
)
|
||||
|
||||
|
||||
def get_ttl_hash(seconds=600):
|
||||
def get_ttl_hash(seconds=TTL_TIME):
|
||||
return round(time.time() / seconds)
|
||||
|
||||
|
||||
@@ -72,6 +75,27 @@ def scores_from_db(round_id):
|
||||
)
|
||||
|
||||
|
||||
#######################
|
||||
# Leaderboard helpers #
|
||||
#######################
|
||||
|
||||
|
||||
def get_leaderboard(round_id):
|
||||
score_matrix = scores_from_db(round_id)
|
||||
user_scores = score_matrix.by_user()
|
||||
top_20 = dict(
|
||||
list(sorted(user_scores.items(), key=lambda u: u[1].golf_score))[
|
||||
:LEADERBOARD_COUNT
|
||||
]
|
||||
)
|
||||
return dash.dash_table.DataTable(
|
||||
[{"Name": k, "Score": v.golf_score} for k, v in top_20.items()],
|
||||
style_as_list_view=True,
|
||||
style_table={"width": "40%", "margin": "auto"},
|
||||
style_cell={"textAlign": "center"},
|
||||
)
|
||||
|
||||
|
||||
#################
|
||||
# Score Helpers #
|
||||
#################
|
||||
@@ -237,6 +261,15 @@ app.layout = dash.html.Div(
|
||||
id="round-selector",
|
||||
style={"maxWidth": "300px"},
|
||||
),
|
||||
dash.html.Div(
|
||||
[
|
||||
dash.html.H2(
|
||||
f"Leaderboard - Top {LEADERBOARD_COUNT}",
|
||||
style={"textAlign": "center"},
|
||||
),
|
||||
dash.html.Div("Loading...", id="leaderboard"),
|
||||
]
|
||||
),
|
||||
dash.html.Div(
|
||||
[
|
||||
dash.html.H2("User Scores", style={"textAlign": "center"}),
|
||||
@@ -259,6 +292,18 @@ app.layout = dash.html.Div(
|
||||
)
|
||||
|
||||
|
||||
@app.long_callback(
|
||||
output=dash.dependencies.Output("leaderboard", "children"),
|
||||
inputs=[
|
||||
dash.dependencies.Input("title", "children"),
|
||||
dash.dependencies.Input("round-selector-dropdown", "value"),
|
||||
],
|
||||
manager=long_callback_manager,
|
||||
)
|
||||
def get_leaderboard_table(_, round_id):
|
||||
return get_leaderboard(round_id)
|
||||
|
||||
|
||||
@app.long_callback(
|
||||
output=dash.dependencies.Output("user-scores", "children"),
|
||||
inputs=[
|
||||
|
||||
Reference in New Issue
Block a user