Compare commits

...

2 Commits

Author SHA1 Message Date
4b4dfa69fd add uncommitted file 2022-06-14 14:56:17 -05:00
1d865e605c store twitter links 2022-06-14 14:04:02 -05:00
5 changed files with 259 additions and 178 deletions

View File

@@ -9,6 +9,7 @@ import wordlinator.db.pg
import wordlinator.sheets
import wordlinator.twitter
import wordlinator.utils
import wordlinator.utils.scores
async def get_scores(
@@ -70,8 +71,9 @@ def print_score_table(wordle_day, scores):
def _save_db_scores(
wordle_day: wordlinator.utils.WordleDay, scores: dict, twitter_scores
wordle_day: wordlinator.utils.WordleDay, scores: dict, twitter_scores=None
):
twitter_scores = twitter_scores or {}
db = wordlinator.db.pg.WordleDb()
hole_data = wordle_day.golf_hole
if not hole_data:
@@ -79,52 +81,28 @@ def _save_db_scores(
game_no = hole_data.game_no
db_users = db.get_users()
db_holes = db.get_holes(game_no)
db_holes = db.get_holes(game_no, ensure_all=True)
db_scores = db.get_scores(game_no)
db_scores_by_user = wordlinator.utils.scores.ScoreMatrix(db_scores).by_user()
to_update = []
to_create = []
for user, score_list in scores.items():
db_user_scores = db_scores_by_user.get(user)
if not db_user_scores:
continue
db_user_match = [u for u in db_users if u.username == user]
db_user = db_user_match[0] if db_user_match else None
if not db_user:
if not db_user_match:
continue
for day, score_entry in enumerate(score_list, start=1):
try:
score_entry = int(score_entry)
except ValueError:
continue
score_match = [
s
for s in db_scores
if s.user_id.username == user and s.hole_id.hole == day
]
db_score = score_match[0] if score_match else None
if db_score:
if db_score.score != score_entry:
db_score.score = score_entry
to_update.append(db_score)
else:
hole_match = [h for h in db_holes if h.hole == day]
if hole_match:
hole = hole_match[0]
else:
hole = db.get_or_create_hole(game_no, day)
db_holes.append(hole)
to_create.append(
{
"score": score_entry,
"user_id": db_user.user_id,
"game_id": hole.game_id.game_id,
"hole_id": hole.hole_id,
}
db_user = db_user_match[0]
twitter_score = twitter_scores.get(user, None)
changes = db_user_scores.get_changes(
score_list, twitter_score, db_user, db_holes
)
to_update.extend(changes["update"])
to_create.extend(changes["create"])
if to_update:
db.bulk_update_scores(to_update)

View File

@@ -91,8 +91,10 @@ class WordleDb:
except peewee.DoesNotExist:
return Hole.create(hole=hole_no, game_id=round.game_id)
def get_holes(self, round_no):
def get_holes(self, round_no, ensure_all=False):
round = self.get_or_create_round(round_no)
if ensure_all:
self.create_round_holes(round_no)
return list(Hole.select().filter(game_id=round.game_id))
def create_round_holes(self, round_no):

183
wordlinator/utils/scores.py Normal file
View File

@@ -0,0 +1,183 @@
import collections
import typing
import wordlinator.db.pg
import wordlinator.twitter
############
# Mappings #
############
SCORE_NAME_MAP = {
1: "Hole-in-1",
2: "Eagle",
3: "Birdie",
4: "Par",
5: "Bogey",
6: "Double Bogey",
7: "Fail",
}
###############
# ScoreMatrix #
###############
T = typing.TypeVar("T", bound="ScoreContainer")
Score = wordlinator.db.pg.Score
User = wordlinator.db.pg.User
Hole = wordlinator.db.pg.Hole
WordleTweet = wordlinator.twitter.WordleTweet
class ScoreContainer:
def __init__(self, scores: typing.List[Score]):
self._scores = scores
@staticmethod
def _get_attribute(score: Score, attribute_path: typing.List[str]):
attribute = score
for path_part in attribute_path:
attribute = getattr(attribute, path_part)
return attribute
def dict_by(
self, attribute_path: str, container_class: typing.Type[T]
) -> typing.Dict[typing.Any, T]:
data_dict = collections.defaultdict(list)
path_parts = attribute_path.split(".")
for score in self._scores:
data_dict[self._get_attribute(score, path_parts)].append(score)
return {k: container_class(v) for k, v in data_dict.items()}
class ScoreRow(ScoreContainer):
@property
def total(self) -> int:
return sum(s.score for s in self._scores)
@property
def count(self) -> int:
return len(self._scores)
@property
def average(self) -> float:
return round(self.total / len(self._scores), 2)
class UserRow(ScoreRow):
def __init__(self, scores, username=None):
super().__init__(scores)
self.username = username or scores[0].user_id.username
@property
def golf_score(self) -> int:
return self.total - (self.count * 4)
def sorted_scores(self):
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
def raw_values(self):
yield from (s.score for s in self.sorted_scores())
def _present_format(self, score):
if score.tweet_id:
return (
f"[{score.score}]"
f"(https://twitter.com/{self.username}/status/{score.tweet_id})"
)
return score.score
def presentation_values(self, hole_no=None):
res = {s.hole_id.hole: self._present_format(s) for s in self.sorted_scores()}
if hole_no:
for i in range(1, hole_no + 1):
if i not in res:
res[i] = ""
return res
def user_row(self, hole_no=None):
return {
"Name": self.username,
"Score": self.golf_score,
**self.presentation_values(hole_no=hole_no),
}
def get_changes(
self,
sheets_scores: typing.List[int],
twitter_score: typing.Optional[WordleTweet],
db_user: User,
db_holes: typing.List[Hole],
) -> typing.Dict[str, typing.List[Score]]:
current_scores = list(self.sorted_scores())
results: typing.Dict[str, typing.List[typing.Any]] = {
"update": [],
"create": [],
}
for day, score in enumerate(sheets_scores, start=1):
try:
score = int(score)
except ValueError:
continue
hole = [h for h in db_holes if h.hole == day][0]
score_match = [s for s in current_scores if s.hole_id.hole == day]
tweet_id = None
if twitter_score and twitter_score.wordle_day.golf_hole.hole_no == day:
tweet_id = twitter_score.tweet_id
if not score_match:
results["create"].append(
{
"score": score,
"user_id": db_user.user_id,
"game_id": hole.game_id.game_id,
"hole_id": hole.hole_id,
"tweet_id": tweet_id,
}
)
else:
saved_score = score_match[0]
if saved_score.score != score or saved_score.tweet_id != tweet_id:
saved_score.score = score
saved_score.tweet_id = tweet_id
results["update"].append(saved_score)
return results
class ScoreMatrix(ScoreContainer):
def by_user(self):
return self.dict_by("user_id.username", UserRow)
def for_user(self, username):
user_scores = [s for s in self._scores if s.user_id.username == username]
return UserRow(scores=user_scores, username=username)
def by_hole(self):
return self.dict_by("hole_id.hole", ScoreRow)
def for_hole(self, hole_no):
hole_scores = [s for s in self._scores if s.hole_id.hole == hole_no]
return ScoreRow(hole_scores)
def _level_counts(self, level_scores: "ScoreMatrix"):
hole_dict = level_scores.by_hole()
return {k: v.count for k, v in sorted(hole_dict.items())}
def score_breakdown(self):
by_score_dict = self.dict_by("score", ScoreMatrix)
return {
SCORE_NAME_MAP[k]: self._level_counts(v)
for k, v in sorted(by_score_dict.items())
}
def user_rows(self, wordle_day):
hole_no = wordle_day.golf_hole.hole_no
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]

View File

@@ -1,139 +1,3 @@
import collections
import typing
import wordlinator.db.pg
############
# Mappings #
############
SCORE_NAME_MAP = {
1: "Hole-in-1",
2: "Eagle",
3: "Birdie",
4: "Par",
5: "Bogey",
6: "Double Bogey",
7: "Fail",
}
###############
# ScoreMatrix #
###############
T = typing.TypeVar("T", bound="ScoreContainer")
class ScoreContainer:
def __init__(self, scores: typing.List[wordlinator.db.pg.Score]):
self._scores = scores
@staticmethod
def _get_attribute(
score: wordlinator.db.pg.Score, attribute_path: typing.List[str]
):
attribute = score
for path_part in attribute_path:
attribute = getattr(attribute, path_part)
return attribute
def dict_by(
self, attribute_path: str, container_class: typing.Type[T]
) -> typing.Dict[typing.Any, T]:
data_dict = collections.defaultdict(list)
path_parts = attribute_path.split(".")
for score in self._scores:
data_dict[self._get_attribute(score, path_parts)].append(score)
return {k: container_class(v) for k, v in data_dict.items()}
class ScoreRow(ScoreContainer):
@property
def total(self) -> int:
return sum(s.score for s in self._scores)
@property
def count(self) -> int:
return len(self._scores)
@property
def average(self) -> float:
return round(self.total / len(self._scores), 2)
class UserRow(ScoreRow):
def __init__(self, scores, username=None):
super().__init__(scores)
self.username = username or scores[0].user_id.username
@property
def golf_score(self) -> int:
return self.total - (self.count * 4)
def sorted_scores(self):
yield from sorted(self._scores, key=lambda s: s.hole_id.hole)
def raw_values(self):
yield from (s.score for s in self.sorted_scores())
def _present_format(self, score):
if score.tweet_id:
return (
f"[{score.score}]"
f"(https://twitter.com/{self.username}/status/{score.tweet_id})"
)
return score.score
def presentation_values(self, hole_no=None):
res = {s.hole_id.hole: self._present_format(s) for s in self.sorted_scores()}
if hole_no:
for i in range(1, hole_no + 1):
if i not in res:
res[i] = ""
return res
def user_row(self, hole_no=None):
return {
"Name": self.username,
"Score": self.golf_score,
**self.presentation_values(hole_no=hole_no),
}
class ScoreMatrix(ScoreContainer):
def by_user(self):
return self.dict_by("user_id.username", UserRow)
def for_user(self, username):
user_scores = [s for s in self._scores if s.user_id.username == username]
return UserRow(scores=user_scores, username=username)
def by_hole(self):
return self.dict_by("hole_id.hole", ScoreRow)
def for_hole(self, hole_no):
hole_scores = [s for s in self._scores if s.hole_id.hole == hole_no]
return ScoreRow(hole_scores)
def _level_counts(self, level_scores: "ScoreMatrix"):
hole_dict = level_scores.by_hole()
return {k: v.count for k, v in sorted(hole_dict.items())}
def score_breakdown(self):
by_score_dict = self.dict_by("score", ScoreMatrix)
return {
SCORE_NAME_MAP[k]: self._level_counts(v)
for k, v in sorted(by_score_dict.items())
}
def user_rows(self, wordle_day):
hole_no = wordle_day.golf_hole.hole_no
return [u.user_row(hole_no=hole_no) for u in self.by_user().values()]
######################
# Formatting Helpers #
######################
@@ -151,6 +15,7 @@ def column_formats(col, pct):
"width": f"{pct}%",
"minWidth": f"{pct}%",
},
# Plain and markdown over-par
{
"if": {
"column_id": col["id"],
@@ -158,6 +23,28 @@ def column_formats(col, pct):
},
"backgroundColor": "red",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[5]"'),
},
"backgroundColor": "red",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[6]"'),
},
"backgroundColor": "red",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[7]"'),
},
"backgroundColor": "red",
},
# Plain and Markdown par
{
"if": {
"column_id": col["id"],
@@ -165,6 +52,14 @@ def column_formats(col, pct):
},
"backgroundColor": "orange",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[4]"'),
},
"backgroundColor": "orange",
},
# Plain and markdown under par
{
"if": {
"column_id": col["id"],
@@ -172,6 +67,28 @@ def column_formats(col, pct):
},
"backgroundColor": "green",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[3]"'),
},
"backgroundColor": "green",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[2]"'),
},
"backgroundColor": "green",
},
{
"if": {
"column_id": col["id"],
"filter_query": format_string(col, 'contains "[1]"'),
},
"backgroundColor": "green",
},
# Plain no score
{
"if": {
"column_id": col["id"],

View File

@@ -13,6 +13,7 @@ import plotly.graph_objs
import wordlinator.db.pg as db
import wordlinator.twitter
import wordlinator.utils
import wordlinator.utils.scores
import wordlinator.utils.web
###################
@@ -59,7 +60,7 @@ def _scores_from_db(ttl_hash=None):
def scores_from_db():
return wordlinator.utils.web.ScoreMatrix(_scores_from_db(get_ttl_hash()))
return wordlinator.utils.scores.ScoreMatrix(_scores_from_db(get_ttl_hash()))
#################