Compare commits
2 Commits
d79a6be274
...
4b4dfa69fd
| Author | SHA1 | Date | |
|---|---|---|---|
| 4b4dfa69fd | |||
| 1d865e605c |
@@ -9,6 +9,7 @@ import wordlinator.db.pg
|
|||||||
import wordlinator.sheets
|
import wordlinator.sheets
|
||||||
import wordlinator.twitter
|
import wordlinator.twitter
|
||||||
import wordlinator.utils
|
import wordlinator.utils
|
||||||
|
import wordlinator.utils.scores
|
||||||
|
|
||||||
|
|
||||||
async def get_scores(
|
async def get_scores(
|
||||||
@@ -70,8 +71,9 @@ def print_score_table(wordle_day, scores):
|
|||||||
|
|
||||||
|
|
||||||
def _save_db_scores(
|
def _save_db_scores(
|
||||||
wordle_day: wordlinator.utils.WordleDay, scores: dict, twitter_scores
|
wordle_day: wordlinator.utils.WordleDay, scores: dict, twitter_scores=None
|
||||||
):
|
):
|
||||||
|
twitter_scores = twitter_scores or {}
|
||||||
db = wordlinator.db.pg.WordleDb()
|
db = wordlinator.db.pg.WordleDb()
|
||||||
hole_data = wordle_day.golf_hole
|
hole_data = wordle_day.golf_hole
|
||||||
if not hole_data:
|
if not hole_data:
|
||||||
@@ -79,52 +81,28 @@ def _save_db_scores(
|
|||||||
game_no = hole_data.game_no
|
game_no = hole_data.game_no
|
||||||
|
|
||||||
db_users = db.get_users()
|
db_users = db.get_users()
|
||||||
db_holes = db.get_holes(game_no)
|
db_holes = db.get_holes(game_no, ensure_all=True)
|
||||||
db_scores = db.get_scores(game_no)
|
db_scores = db.get_scores(game_no)
|
||||||
|
|
||||||
|
db_scores_by_user = wordlinator.utils.scores.ScoreMatrix(db_scores).by_user()
|
||||||
|
|
||||||
to_update = []
|
to_update = []
|
||||||
to_create = []
|
to_create = []
|
||||||
|
|
||||||
for user, score_list in scores.items():
|
for user, score_list in scores.items():
|
||||||
db_user_match = [u for u in db_users if u.username == user]
|
db_user_scores = db_scores_by_user.get(user)
|
||||||
db_user = db_user_match[0] if db_user_match else None
|
if not db_user_scores:
|
||||||
|
|
||||||
if not db_user:
|
|
||||||
continue
|
continue
|
||||||
|
db_user_match = [u for u in db_users if u.username == user]
|
||||||
for day, score_entry in enumerate(score_list, start=1):
|
if not db_user_match:
|
||||||
try:
|
continue
|
||||||
score_entry = int(score_entry)
|
db_user = db_user_match[0]
|
||||||
except ValueError:
|
twitter_score = twitter_scores.get(user, None)
|
||||||
continue
|
changes = db_user_scores.get_changes(
|
||||||
|
score_list, twitter_score, db_user, db_holes
|
||||||
score_match = [
|
)
|
||||||
s
|
to_update.extend(changes["update"])
|
||||||
for s in db_scores
|
to_create.extend(changes["create"])
|
||||||
if s.user_id.username == user and s.hole_id.hole == day
|
|
||||||
]
|
|
||||||
db_score = score_match[0] if score_match else None
|
|
||||||
|
|
||||||
if db_score:
|
|
||||||
if db_score.score != score_entry:
|
|
||||||
db_score.score = score_entry
|
|
||||||
to_update.append(db_score)
|
|
||||||
|
|
||||||
else:
|
|
||||||
hole_match = [h for h in db_holes if h.hole == day]
|
|
||||||
if hole_match:
|
|
||||||
hole = hole_match[0]
|
|
||||||
else:
|
|
||||||
hole = db.get_or_create_hole(game_no, day)
|
|
||||||
db_holes.append(hole)
|
|
||||||
to_create.append(
|
|
||||||
{
|
|
||||||
"score": score_entry,
|
|
||||||
"user_id": db_user.user_id,
|
|
||||||
"game_id": hole.game_id.game_id,
|
|
||||||
"hole_id": hole.hole_id,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if to_update:
|
if to_update:
|
||||||
db.bulk_update_scores(to_update)
|
db.bulk_update_scores(to_update)
|
||||||
|
|||||||
@@ -91,8 +91,10 @@ class WordleDb:
|
|||||||
except peewee.DoesNotExist:
|
except peewee.DoesNotExist:
|
||||||
return Hole.create(hole=hole_no, game_id=round.game_id)
|
return Hole.create(hole=hole_no, game_id=round.game_id)
|
||||||
|
|
||||||
def get_holes(self, round_no):
|
def get_holes(self, round_no, ensure_all=False):
|
||||||
round = self.get_or_create_round(round_no)
|
round = self.get_or_create_round(round_no)
|
||||||
|
if ensure_all:
|
||||||
|
self.create_round_holes(round_no)
|
||||||
return list(Hole.select().filter(game_id=round.game_id))
|
return list(Hole.select().filter(game_id=round.game_id))
|
||||||
|
|
||||||
def create_round_holes(self, round_no):
|
def create_round_holes(self, round_no):
|
||||||
|
|||||||
183
wordlinator/utils/scores.py
Normal file
183
wordlinator/utils/scores.py
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
import collections
|
||||||
|
import typing
|
||||||
|
|
||||||
|
import wordlinator.db.pg
|
||||||
|
import wordlinator.twitter
|
||||||
|
|
||||||
|
############
|
||||||
|
# Mappings #
|
||||||
|
############
|
||||||
|
|
||||||
|
SCORE_NAME_MAP = {
|
||||||
|
1: "Hole-in-1",
|
||||||
|
2: "Eagle",
|
||||||
|
3: "Birdie",
|
||||||
|
4: "Par",
|
||||||
|
5: "Bogey",
|
||||||
|
6: "Double Bogey",
|
||||||
|
7: "Fail",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
###############
|
||||||
|
# ScoreMatrix #
|
||||||
|
###############
|
||||||
|
|
||||||
|
T = typing.TypeVar("T", bound="ScoreContainer")
|
||||||
|
Score = wordlinator.db.pg.Score
|
||||||
|
User = wordlinator.db.pg.User
|
||||||
|
Hole = wordlinator.db.pg.Hole
|
||||||
|
WordleTweet = wordlinator.twitter.WordleTweet
|
||||||
|
|
||||||
|
|
||||||
|
class ScoreContainer:
|
||||||
|
def __init__(self, scores: typing.List[Score]):
|
||||||
|
self._scores = scores
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_attribute(score: Score, attribute_path: typing.List[str]):
|
||||||
|
attribute = score
|
||||||
|
for path_part in attribute_path:
|
||||||
|
attribute = getattr(attribute, path_part)
|
||||||
|
return attribute
|
||||||
|
|
||||||
|
def dict_by(
|
||||||
|
self, attribute_path: str, container_class: typing.Type[T]
|
||||||
|
) -> typing.Dict[typing.Any, T]:
|
||||||
|
data_dict = collections.defaultdict(list)
|
||||||
|
path_parts = attribute_path.split(".")
|
||||||
|
|
||||||
|
for score in self._scores:
|
||||||
|
data_dict[self._get_attribute(score, path_parts)].append(score)
|
||||||
|
|
||||||
|
return {k: container_class(v) for k, v in data_dict.items()}
|
||||||
|
|
||||||
|
|
||||||
|
class ScoreRow(ScoreContainer):
|
||||||
|
@property
|
||||||
|
def total(self) -> int:
|
||||||
|
return sum(s.score for s in self._scores)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def count(self) -> int:
|
||||||
|
return len(self._scores)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def average(self) -> float:
|
||||||
|
return round(self.total / len(self._scores), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class UserRow(ScoreRow):
|
||||||
|
def __init__(self, scores, username=None):
|
||||||
|
super().__init__(scores)
|
||||||
|
self.username = username or scores[0].user_id.username
|
||||||
|
|
||||||
|
@property
|
||||||
|
def golf_score(self) -> int:
|
||||||
|
return self.total - (self.count * 4)
|
||||||
|
|
||||||
|
def sorted_scores(self):
|
||||||
|
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
|
||||||
|
|
||||||
|
def raw_values(self):
|
||||||
|
yield from (s.score for s in self.sorted_scores())
|
||||||
|
|
||||||
|
def _present_format(self, score):
|
||||||
|
if score.tweet_id:
|
||||||
|
return (
|
||||||
|
f"[{score.score}]"
|
||||||
|
f"(https://twitter.com/{self.username}/status/{score.tweet_id})"
|
||||||
|
)
|
||||||
|
return score.score
|
||||||
|
|
||||||
|
def presentation_values(self, hole_no=None):
|
||||||
|
res = {s.hole_id.hole: self._present_format(s) for s in self.sorted_scores()}
|
||||||
|
if hole_no:
|
||||||
|
for i in range(1, hole_no + 1):
|
||||||
|
if i not in res:
|
||||||
|
res[i] = ""
|
||||||
|
return res
|
||||||
|
|
||||||
|
def user_row(self, hole_no=None):
|
||||||
|
return {
|
||||||
|
"Name": self.username,
|
||||||
|
"Score": self.golf_score,
|
||||||
|
**self.presentation_values(hole_no=hole_no),
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_changes(
|
||||||
|
self,
|
||||||
|
sheets_scores: typing.List[int],
|
||||||
|
twitter_score: typing.Optional[WordleTweet],
|
||||||
|
db_user: User,
|
||||||
|
db_holes: typing.List[Hole],
|
||||||
|
) -> typing.Dict[str, typing.List[Score]]:
|
||||||
|
current_scores = list(self.sorted_scores())
|
||||||
|
|
||||||
|
results: typing.Dict[str, typing.List[typing.Any]] = {
|
||||||
|
"update": [],
|
||||||
|
"create": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
for day, score in enumerate(sheets_scores, start=1):
|
||||||
|
try:
|
||||||
|
score = int(score)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
hole = [h for h in db_holes if h.hole == day][0]
|
||||||
|
score_match = [s for s in current_scores if s.hole_id.hole == day]
|
||||||
|
|
||||||
|
tweet_id = None
|
||||||
|
if twitter_score and twitter_score.wordle_day.golf_hole.hole_no == day:
|
||||||
|
tweet_id = twitter_score.tweet_id
|
||||||
|
|
||||||
|
if not score_match:
|
||||||
|
results["create"].append(
|
||||||
|
{
|
||||||
|
"score": score,
|
||||||
|
"user_id": db_user.user_id,
|
||||||
|
"game_id": hole.game_id.game_id,
|
||||||
|
"hole_id": hole.hole_id,
|
||||||
|
"tweet_id": tweet_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
saved_score = score_match[0]
|
||||||
|
if saved_score.score != score or saved_score.tweet_id != tweet_id:
|
||||||
|
saved_score.score = score
|
||||||
|
saved_score.tweet_id = tweet_id
|
||||||
|
results["update"].append(saved_score)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
class ScoreMatrix(ScoreContainer):
|
||||||
|
def by_user(self):
|
||||||
|
return self.dict_by("user_id.username", UserRow)
|
||||||
|
|
||||||
|
def for_user(self, username):
|
||||||
|
user_scores = [s for s in self._scores if s.user_id.username == username]
|
||||||
|
return UserRow(scores=user_scores, username=username)
|
||||||
|
|
||||||
|
def by_hole(self):
|
||||||
|
return self.dict_by("hole_id.hole", ScoreRow)
|
||||||
|
|
||||||
|
def for_hole(self, hole_no):
|
||||||
|
hole_scores = [s for s in self._scores if s.hole_id.hole == hole_no]
|
||||||
|
return ScoreRow(hole_scores)
|
||||||
|
|
||||||
|
def _level_counts(self, level_scores: "ScoreMatrix"):
|
||||||
|
hole_dict = level_scores.by_hole()
|
||||||
|
return {k: v.count for k, v in sorted(hole_dict.items())}
|
||||||
|
|
||||||
|
def score_breakdown(self):
|
||||||
|
by_score_dict = self.dict_by("score", ScoreMatrix)
|
||||||
|
return {
|
||||||
|
SCORE_NAME_MAP[k]: self._level_counts(v)
|
||||||
|
for k, v in sorted(by_score_dict.items())
|
||||||
|
}
|
||||||
|
|
||||||
|
def user_rows(self, wordle_day):
|
||||||
|
hole_no = wordle_day.golf_hole.hole_no
|
||||||
|
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
|
||||||
@@ -1,139 +1,3 @@
|
|||||||
import collections
|
|
||||||
import typing
|
|
||||||
|
|
||||||
import wordlinator.db.pg
|
|
||||||
|
|
||||||
############
|
|
||||||
# Mappings #
|
|
||||||
############
|
|
||||||
|
|
||||||
SCORE_NAME_MAP = {
|
|
||||||
1: "Hole-in-1",
|
|
||||||
2: "Eagle",
|
|
||||||
3: "Birdie",
|
|
||||||
4: "Par",
|
|
||||||
5: "Bogey",
|
|
||||||
6: "Double Bogey",
|
|
||||||
7: "Fail",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
###############
|
|
||||||
# ScoreMatrix #
|
|
||||||
###############
|
|
||||||
|
|
||||||
T = typing.TypeVar("T", bound="ScoreContainer")
|
|
||||||
|
|
||||||
|
|
||||||
class ScoreContainer:
|
|
||||||
def __init__(self, scores: typing.List[wordlinator.db.pg.Score]):
|
|
||||||
self._scores = scores
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_attribute(
|
|
||||||
score: wordlinator.db.pg.Score, attribute_path: typing.List[str]
|
|
||||||
):
|
|
||||||
attribute = score
|
|
||||||
for path_part in attribute_path:
|
|
||||||
attribute = getattr(attribute, path_part)
|
|
||||||
return attribute
|
|
||||||
|
|
||||||
def dict_by(
|
|
||||||
self, attribute_path: str, container_class: typing.Type[T]
|
|
||||||
) -> typing.Dict[typing.Any, T]:
|
|
||||||
data_dict = collections.defaultdict(list)
|
|
||||||
path_parts = attribute_path.split(".")
|
|
||||||
|
|
||||||
for score in self._scores:
|
|
||||||
data_dict[self._get_attribute(score, path_parts)].append(score)
|
|
||||||
|
|
||||||
return {k: container_class(v) for k, v in data_dict.items()}
|
|
||||||
|
|
||||||
|
|
||||||
class ScoreRow(ScoreContainer):
|
|
||||||
@property
|
|
||||||
def total(self) -> int:
|
|
||||||
return sum(s.score for s in self._scores)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def count(self) -> int:
|
|
||||||
return len(self._scores)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def average(self) -> float:
|
|
||||||
return round(self.total / len(self._scores), 2)
|
|
||||||
|
|
||||||
|
|
||||||
class UserRow(ScoreRow):
|
|
||||||
def __init__(self, scores, username=None):
|
|
||||||
super().__init__(scores)
|
|
||||||
self.username = username or scores[0].user_id.username
|
|
||||||
|
|
||||||
@property
|
|
||||||
def golf_score(self) -> int:
|
|
||||||
return self.total - (self.count * 4)
|
|
||||||
|
|
||||||
def sorted_scores(self):
|
|
||||||
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
|
|
||||||
|
|
||||||
def raw_values(self):
|
|
||||||
yield from (s.score for s in self.sorted_scores())
|
|
||||||
|
|
||||||
def _present_format(self, score):
|
|
||||||
if score.tweet_id:
|
|
||||||
return (
|
|
||||||
f"[{score.score}]"
|
|
||||||
f"(https://twitter.com/{self.username}/status/{score.tweet_id})"
|
|
||||||
)
|
|
||||||
return score.score
|
|
||||||
|
|
||||||
def presentation_values(self, hole_no=None):
|
|
||||||
res = {s.hole_id.hole: self._present_format(s) for s in self.sorted_scores()}
|
|
||||||
if hole_no:
|
|
||||||
for i in range(1, hole_no + 1):
|
|
||||||
if i not in res:
|
|
||||||
res[i] = ""
|
|
||||||
return res
|
|
||||||
|
|
||||||
def user_row(self, hole_no=None):
|
|
||||||
return {
|
|
||||||
"Name": self.username,
|
|
||||||
"Score": self.golf_score,
|
|
||||||
**self.presentation_values(hole_no=hole_no),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class ScoreMatrix(ScoreContainer):
|
|
||||||
def by_user(self):
|
|
||||||
return self.dict_by("user_id.username", UserRow)
|
|
||||||
|
|
||||||
def for_user(self, username):
|
|
||||||
user_scores = [s for s in self._scores if s.user_id.username == username]
|
|
||||||
return UserRow(scores=user_scores, username=username)
|
|
||||||
|
|
||||||
def by_hole(self):
|
|
||||||
return self.dict_by("hole_id.hole", ScoreRow)
|
|
||||||
|
|
||||||
def for_hole(self, hole_no):
|
|
||||||
hole_scores = [s for s in self._scores if s.hole_id.hole == hole_no]
|
|
||||||
return ScoreRow(hole_scores)
|
|
||||||
|
|
||||||
def _level_counts(self, level_scores: "ScoreMatrix"):
|
|
||||||
hole_dict = level_scores.by_hole()
|
|
||||||
return {k: v.count for k, v in sorted(hole_dict.items())}
|
|
||||||
|
|
||||||
def score_breakdown(self):
|
|
||||||
by_score_dict = self.dict_by("score", ScoreMatrix)
|
|
||||||
return {
|
|
||||||
SCORE_NAME_MAP[k]: self._level_counts(v)
|
|
||||||
for k, v in sorted(by_score_dict.items())
|
|
||||||
}
|
|
||||||
|
|
||||||
def user_rows(self, wordle_day):
|
|
||||||
hole_no = wordle_day.golf_hole.hole_no
|
|
||||||
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
|
|
||||||
|
|
||||||
|
|
||||||
######################
|
######################
|
||||||
# Formatting Helpers #
|
# Formatting Helpers #
|
||||||
######################
|
######################
|
||||||
@@ -151,6 +15,7 @@ def column_formats(col, pct):
|
|||||||
"width": f"{pct}%",
|
"width": f"{pct}%",
|
||||||
"minWidth": f"{pct}%",
|
"minWidth": f"{pct}%",
|
||||||
},
|
},
|
||||||
|
# Plain and markdown over-par
|
||||||
{
|
{
|
||||||
"if": {
|
"if": {
|
||||||
"column_id": col["id"],
|
"column_id": col["id"],
|
||||||
@@ -158,6 +23,28 @@ def column_formats(col, pct):
|
|||||||
},
|
},
|
||||||
"backgroundColor": "red",
|
"backgroundColor": "red",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[5]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "red",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[6]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "red",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[7]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "red",
|
||||||
|
},
|
||||||
|
# Plain and Markdown par
|
||||||
{
|
{
|
||||||
"if": {
|
"if": {
|
||||||
"column_id": col["id"],
|
"column_id": col["id"],
|
||||||
@@ -165,6 +52,14 @@ def column_formats(col, pct):
|
|||||||
},
|
},
|
||||||
"backgroundColor": "orange",
|
"backgroundColor": "orange",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[4]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "orange",
|
||||||
|
},
|
||||||
|
# Plain and markdown under par
|
||||||
{
|
{
|
||||||
"if": {
|
"if": {
|
||||||
"column_id": col["id"],
|
"column_id": col["id"],
|
||||||
@@ -172,6 +67,28 @@ def column_formats(col, pct):
|
|||||||
},
|
},
|
||||||
"backgroundColor": "green",
|
"backgroundColor": "green",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[3]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "green",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[2]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "green",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"if": {
|
||||||
|
"column_id": col["id"],
|
||||||
|
"filter_query": format_string(col, 'contains "[1]"'),
|
||||||
|
},
|
||||||
|
"backgroundColor": "green",
|
||||||
|
},
|
||||||
|
# Plain no score
|
||||||
{
|
{
|
||||||
"if": {
|
"if": {
|
||||||
"column_id": col["id"],
|
"column_id": col["id"],
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import plotly.graph_objs
|
|||||||
import wordlinator.db.pg as db
|
import wordlinator.db.pg as db
|
||||||
import wordlinator.twitter
|
import wordlinator.twitter
|
||||||
import wordlinator.utils
|
import wordlinator.utils
|
||||||
|
import wordlinator.utils.scores
|
||||||
import wordlinator.utils.web
|
import wordlinator.utils.web
|
||||||
|
|
||||||
###################
|
###################
|
||||||
@@ -59,7 +60,7 @@ def _scores_from_db(ttl_hash=None):
|
|||||||
|
|
||||||
|
|
||||||
def scores_from_db():
|
def scores_from_db():
|
||||||
return wordlinator.utils.web.ScoreMatrix(_scores_from_db(get_ttl_hash()))
|
return wordlinator.utils.scores.ScoreMatrix(_scores_from_db(get_ttl_hash()))
|
||||||
|
|
||||||
|
|
||||||
#################
|
#################
|
||||||
|
|||||||
Reference in New Issue
Block a user