Compare commits
2 Commits
d79a6be274
...
4b4dfa69fd
| Author | SHA1 | Date | |
|---|---|---|---|
| 4b4dfa69fd | |||
| 1d865e605c |
@@ -9,6 +9,7 @@ import wordlinator.db.pg
|
||||
import wordlinator.sheets
|
||||
import wordlinator.twitter
|
||||
import wordlinator.utils
|
||||
import wordlinator.utils.scores
|
||||
|
||||
|
||||
async def get_scores(
|
||||
@@ -70,8 +71,9 @@ def print_score_table(wordle_day, scores):
|
||||
|
||||
|
||||
def _save_db_scores(
|
||||
wordle_day: wordlinator.utils.WordleDay, scores: dict, twitter_scores
|
||||
wordle_day: wordlinator.utils.WordleDay, scores: dict, twitter_scores=None
|
||||
):
|
||||
twitter_scores = twitter_scores or {}
|
||||
db = wordlinator.db.pg.WordleDb()
|
||||
hole_data = wordle_day.golf_hole
|
||||
if not hole_data:
|
||||
@@ -79,52 +81,28 @@ def _save_db_scores(
|
||||
game_no = hole_data.game_no
|
||||
|
||||
db_users = db.get_users()
|
||||
db_holes = db.get_holes(game_no)
|
||||
db_holes = db.get_holes(game_no, ensure_all=True)
|
||||
db_scores = db.get_scores(game_no)
|
||||
|
||||
db_scores_by_user = wordlinator.utils.scores.ScoreMatrix(db_scores).by_user()
|
||||
|
||||
to_update = []
|
||||
to_create = []
|
||||
|
||||
for user, score_list in scores.items():
|
||||
db_user_scores = db_scores_by_user.get(user)
|
||||
if not db_user_scores:
|
||||
continue
|
||||
db_user_match = [u for u in db_users if u.username == user]
|
||||
db_user = db_user_match[0] if db_user_match else None
|
||||
|
||||
if not db_user:
|
||||
if not db_user_match:
|
||||
continue
|
||||
|
||||
for day, score_entry in enumerate(score_list, start=1):
|
||||
try:
|
||||
score_entry = int(score_entry)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
score_match = [
|
||||
s
|
||||
for s in db_scores
|
||||
if s.user_id.username == user and s.hole_id.hole == day
|
||||
]
|
||||
db_score = score_match[0] if score_match else None
|
||||
|
||||
if db_score:
|
||||
if db_score.score != score_entry:
|
||||
db_score.score = score_entry
|
||||
to_update.append(db_score)
|
||||
|
||||
else:
|
||||
hole_match = [h for h in db_holes if h.hole == day]
|
||||
if hole_match:
|
||||
hole = hole_match[0]
|
||||
else:
|
||||
hole = db.get_or_create_hole(game_no, day)
|
||||
db_holes.append(hole)
|
||||
to_create.append(
|
||||
{
|
||||
"score": score_entry,
|
||||
"user_id": db_user.user_id,
|
||||
"game_id": hole.game_id.game_id,
|
||||
"hole_id": hole.hole_id,
|
||||
}
|
||||
db_user = db_user_match[0]
|
||||
twitter_score = twitter_scores.get(user, None)
|
||||
changes = db_user_scores.get_changes(
|
||||
score_list, twitter_score, db_user, db_holes
|
||||
)
|
||||
to_update.extend(changes["update"])
|
||||
to_create.extend(changes["create"])
|
||||
|
||||
if to_update:
|
||||
db.bulk_update_scores(to_update)
|
||||
|
||||
@@ -91,8 +91,10 @@ class WordleDb:
|
||||
except peewee.DoesNotExist:
|
||||
return Hole.create(hole=hole_no, game_id=round.game_id)
|
||||
|
||||
def get_holes(self, round_no):
|
||||
def get_holes(self, round_no, ensure_all=False):
|
||||
round = self.get_or_create_round(round_no)
|
||||
if ensure_all:
|
||||
self.create_round_holes(round_no)
|
||||
return list(Hole.select().filter(game_id=round.game_id))
|
||||
|
||||
def create_round_holes(self, round_no):
|
||||
|
||||
183
wordlinator/utils/scores.py
Normal file
183
wordlinator/utils/scores.py
Normal file
@@ -0,0 +1,183 @@
|
||||
import collections
|
||||
import typing
|
||||
|
||||
import wordlinator.db.pg
|
||||
import wordlinator.twitter
|
||||
|
||||
############
|
||||
# Mappings #
|
||||
############
|
||||
|
||||
SCORE_NAME_MAP = {
|
||||
1: "Hole-in-1",
|
||||
2: "Eagle",
|
||||
3: "Birdie",
|
||||
4: "Par",
|
||||
5: "Bogey",
|
||||
6: "Double Bogey",
|
||||
7: "Fail",
|
||||
}
|
||||
|
||||
|
||||
###############
|
||||
# ScoreMatrix #
|
||||
###############
|
||||
|
||||
T = typing.TypeVar("T", bound="ScoreContainer")
|
||||
Score = wordlinator.db.pg.Score
|
||||
User = wordlinator.db.pg.User
|
||||
Hole = wordlinator.db.pg.Hole
|
||||
WordleTweet = wordlinator.twitter.WordleTweet
|
||||
|
||||
|
||||
class ScoreContainer:
|
||||
def __init__(self, scores: typing.List[Score]):
|
||||
self._scores = scores
|
||||
|
||||
@staticmethod
|
||||
def _get_attribute(score: Score, attribute_path: typing.List[str]):
|
||||
attribute = score
|
||||
for path_part in attribute_path:
|
||||
attribute = getattr(attribute, path_part)
|
||||
return attribute
|
||||
|
||||
def dict_by(
|
||||
self, attribute_path: str, container_class: typing.Type[T]
|
||||
) -> typing.Dict[typing.Any, T]:
|
||||
data_dict = collections.defaultdict(list)
|
||||
path_parts = attribute_path.split(".")
|
||||
|
||||
for score in self._scores:
|
||||
data_dict[self._get_attribute(score, path_parts)].append(score)
|
||||
|
||||
return {k: container_class(v) for k, v in data_dict.items()}
|
||||
|
||||
|
||||
class ScoreRow(ScoreContainer):
|
||||
@property
|
||||
def total(self) -> int:
|
||||
return sum(s.score for s in self._scores)
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
return len(self._scores)
|
||||
|
||||
@property
|
||||
def average(self) -> float:
|
||||
return round(self.total / len(self._scores), 2)
|
||||
|
||||
|
||||
class UserRow(ScoreRow):
|
||||
def __init__(self, scores, username=None):
|
||||
super().__init__(scores)
|
||||
self.username = username or scores[0].user_id.username
|
||||
|
||||
@property
|
||||
def golf_score(self) -> int:
|
||||
return self.total - (self.count * 4)
|
||||
|
||||
def sorted_scores(self):
|
||||
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
|
||||
|
||||
def raw_values(self):
|
||||
yield from (s.score for s in self.sorted_scores())
|
||||
|
||||
def _present_format(self, score):
|
||||
if score.tweet_id:
|
||||
return (
|
||||
f"[{score.score}]"
|
||||
f"(https://twitter.com/{self.username}/status/{score.tweet_id})"
|
||||
)
|
||||
return score.score
|
||||
|
||||
def presentation_values(self, hole_no=None):
|
||||
res = {s.hole_id.hole: self._present_format(s) for s in self.sorted_scores()}
|
||||
if hole_no:
|
||||
for i in range(1, hole_no + 1):
|
||||
if i not in res:
|
||||
res[i] = ""
|
||||
return res
|
||||
|
||||
def user_row(self, hole_no=None):
|
||||
return {
|
||||
"Name": self.username,
|
||||
"Score": self.golf_score,
|
||||
**self.presentation_values(hole_no=hole_no),
|
||||
}
|
||||
|
||||
def get_changes(
|
||||
self,
|
||||
sheets_scores: typing.List[int],
|
||||
twitter_score: typing.Optional[WordleTweet],
|
||||
db_user: User,
|
||||
db_holes: typing.List[Hole],
|
||||
) -> typing.Dict[str, typing.List[Score]]:
|
||||
current_scores = list(self.sorted_scores())
|
||||
|
||||
results: typing.Dict[str, typing.List[typing.Any]] = {
|
||||
"update": [],
|
||||
"create": [],
|
||||
}
|
||||
|
||||
for day, score in enumerate(sheets_scores, start=1):
|
||||
try:
|
||||
score = int(score)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
hole = [h for h in db_holes if h.hole == day][0]
|
||||
score_match = [s for s in current_scores if s.hole_id.hole == day]
|
||||
|
||||
tweet_id = None
|
||||
if twitter_score and twitter_score.wordle_day.golf_hole.hole_no == day:
|
||||
tweet_id = twitter_score.tweet_id
|
||||
|
||||
if not score_match:
|
||||
results["create"].append(
|
||||
{
|
||||
"score": score,
|
||||
"user_id": db_user.user_id,
|
||||
"game_id": hole.game_id.game_id,
|
||||
"hole_id": hole.hole_id,
|
||||
"tweet_id": tweet_id,
|
||||
}
|
||||
)
|
||||
else:
|
||||
saved_score = score_match[0]
|
||||
if saved_score.score != score or saved_score.tweet_id != tweet_id:
|
||||
saved_score.score = score
|
||||
saved_score.tweet_id = tweet_id
|
||||
results["update"].append(saved_score)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class ScoreMatrix(ScoreContainer):
|
||||
def by_user(self):
|
||||
return self.dict_by("user_id.username", UserRow)
|
||||
|
||||
def for_user(self, username):
|
||||
user_scores = [s for s in self._scores if s.user_id.username == username]
|
||||
return UserRow(scores=user_scores, username=username)
|
||||
|
||||
def by_hole(self):
|
||||
return self.dict_by("hole_id.hole", ScoreRow)
|
||||
|
||||
def for_hole(self, hole_no):
|
||||
hole_scores = [s for s in self._scores if s.hole_id.hole == hole_no]
|
||||
return ScoreRow(hole_scores)
|
||||
|
||||
def _level_counts(self, level_scores: "ScoreMatrix"):
|
||||
hole_dict = level_scores.by_hole()
|
||||
return {k: v.count for k, v in sorted(hole_dict.items())}
|
||||
|
||||
def score_breakdown(self):
|
||||
by_score_dict = self.dict_by("score", ScoreMatrix)
|
||||
return {
|
||||
SCORE_NAME_MAP[k]: self._level_counts(v)
|
||||
for k, v in sorted(by_score_dict.items())
|
||||
}
|
||||
|
||||
def user_rows(self, wordle_day):
|
||||
hole_no = wordle_day.golf_hole.hole_no
|
||||
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
|
||||
@@ -1,139 +1,3 @@
|
||||
import collections
|
||||
import typing
|
||||
|
||||
import wordlinator.db.pg
|
||||
|
||||
############
|
||||
# Mappings #
|
||||
############
|
||||
|
||||
SCORE_NAME_MAP = {
|
||||
1: "Hole-in-1",
|
||||
2: "Eagle",
|
||||
3: "Birdie",
|
||||
4: "Par",
|
||||
5: "Bogey",
|
||||
6: "Double Bogey",
|
||||
7: "Fail",
|
||||
}
|
||||
|
||||
|
||||
###############
|
||||
# ScoreMatrix #
|
||||
###############
|
||||
|
||||
T = typing.TypeVar("T", bound="ScoreContainer")
|
||||
|
||||
|
||||
class ScoreContainer:
|
||||
def __init__(self, scores: typing.List[wordlinator.db.pg.Score]):
|
||||
self._scores = scores
|
||||
|
||||
@staticmethod
|
||||
def _get_attribute(
|
||||
score: wordlinator.db.pg.Score, attribute_path: typing.List[str]
|
||||
):
|
||||
attribute = score
|
||||
for path_part in attribute_path:
|
||||
attribute = getattr(attribute, path_part)
|
||||
return attribute
|
||||
|
||||
def dict_by(
|
||||
self, attribute_path: str, container_class: typing.Type[T]
|
||||
) -> typing.Dict[typing.Any, T]:
|
||||
data_dict = collections.defaultdict(list)
|
||||
path_parts = attribute_path.split(".")
|
||||
|
||||
for score in self._scores:
|
||||
data_dict[self._get_attribute(score, path_parts)].append(score)
|
||||
|
||||
return {k: container_class(v) for k, v in data_dict.items()}
|
||||
|
||||
|
||||
class ScoreRow(ScoreContainer):
|
||||
@property
|
||||
def total(self) -> int:
|
||||
return sum(s.score for s in self._scores)
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
return len(self._scores)
|
||||
|
||||
@property
|
||||
def average(self) -> float:
|
||||
return round(self.total / len(self._scores), 2)
|
||||
|
||||
|
||||
class UserRow(ScoreRow):
|
||||
def __init__(self, scores, username=None):
|
||||
super().__init__(scores)
|
||||
self.username = username or scores[0].user_id.username
|
||||
|
||||
@property
|
||||
def golf_score(self) -> int:
|
||||
return self.total - (self.count * 4)
|
||||
|
||||
def sorted_scores(self):
|
||||
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
|
||||
|
||||
def raw_values(self):
|
||||
yield from (s.score for s in self.sorted_scores())
|
||||
|
||||
def _present_format(self, score):
|
||||
if score.tweet_id:
|
||||
return (
|
||||
f"[{score.score}]"
|
||||
f"(https://twitter.com/{self.username}/status/{score.tweet_id})"
|
||||
)
|
||||
return score.score
|
||||
|
||||
def presentation_values(self, hole_no=None):
|
||||
res = {s.hole_id.hole: self._present_format(s) for s in self.sorted_scores()}
|
||||
if hole_no:
|
||||
for i in range(1, hole_no + 1):
|
||||
if i not in res:
|
||||
res[i] = ""
|
||||
return res
|
||||
|
||||
def user_row(self, hole_no=None):
|
||||
return {
|
||||
"Name": self.username,
|
||||
"Score": self.golf_score,
|
||||
**self.presentation_values(hole_no=hole_no),
|
||||
}
|
||||
|
||||
|
||||
class ScoreMatrix(ScoreContainer):
|
||||
def by_user(self):
|
||||
return self.dict_by("user_id.username", UserRow)
|
||||
|
||||
def for_user(self, username):
|
||||
user_scores = [s for s in self._scores if s.user_id.username == username]
|
||||
return UserRow(scores=user_scores, username=username)
|
||||
|
||||
def by_hole(self):
|
||||
return self.dict_by("hole_id.hole", ScoreRow)
|
||||
|
||||
def for_hole(self, hole_no):
|
||||
hole_scores = [s for s in self._scores if s.hole_id.hole == hole_no]
|
||||
return ScoreRow(hole_scores)
|
||||
|
||||
def _level_counts(self, level_scores: "ScoreMatrix"):
|
||||
hole_dict = level_scores.by_hole()
|
||||
return {k: v.count for k, v in sorted(hole_dict.items())}
|
||||
|
||||
def score_breakdown(self):
|
||||
by_score_dict = self.dict_by("score", ScoreMatrix)
|
||||
return {
|
||||
SCORE_NAME_MAP[k]: self._level_counts(v)
|
||||
for k, v in sorted(by_score_dict.items())
|
||||
}
|
||||
|
||||
def user_rows(self, wordle_day):
|
||||
hole_no = wordle_day.golf_hole.hole_no
|
||||
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
|
||||
|
||||
|
||||
######################
|
||||
# Formatting Helpers #
|
||||
######################
|
||||
@@ -151,6 +15,7 @@ def column_formats(col, pct):
|
||||
"width": f"{pct}%",
|
||||
"minWidth": f"{pct}%",
|
||||
},
|
||||
# Plain and markdown over-par
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
@@ -158,6 +23,28 @@ def column_formats(col, pct):
|
||||
},
|
||||
"backgroundColor": "red",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[5]"'),
|
||||
},
|
||||
"backgroundColor": "red",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[6]"'),
|
||||
},
|
||||
"backgroundColor": "red",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[7]"'),
|
||||
},
|
||||
"backgroundColor": "red",
|
||||
},
|
||||
# Plain and Markdown par
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
@@ -165,6 +52,14 @@ def column_formats(col, pct):
|
||||
},
|
||||
"backgroundColor": "orange",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[4]"'),
|
||||
},
|
||||
"backgroundColor": "orange",
|
||||
},
|
||||
# Plain and markdown under par
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
@@ -172,6 +67,28 @@ def column_formats(col, pct):
|
||||
},
|
||||
"backgroundColor": "green",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[3]"'),
|
||||
},
|
||||
"backgroundColor": "green",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[2]"'),
|
||||
},
|
||||
"backgroundColor": "green",
|
||||
},
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
"filter_query": format_string(col, 'contains "[1]"'),
|
||||
},
|
||||
"backgroundColor": "green",
|
||||
},
|
||||
# Plain no score
|
||||
{
|
||||
"if": {
|
||||
"column_id": col["id"],
|
||||
|
||||
@@ -13,6 +13,7 @@ import plotly.graph_objs
|
||||
import wordlinator.db.pg as db
|
||||
import wordlinator.twitter
|
||||
import wordlinator.utils
|
||||
import wordlinator.utils.scores
|
||||
import wordlinator.utils.web
|
||||
|
||||
###################
|
||||
@@ -59,7 +60,7 @@ def _scores_from_db(ttl_hash=None):
|
||||
|
||||
|
||||
def scores_from_db():
|
||||
return wordlinator.utils.web.ScoreMatrix(_scores_from_db(get_ttl_hash()))
|
||||
return wordlinator.utils.scores.ScoreMatrix(_scores_from_db(get_ttl_hash()))
|
||||
|
||||
|
||||
#################
|
||||
|
||||
Reference in New Issue
Block a user