Refactor shared logic objects, add write
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
token.json
|
||||
@@ -25,6 +25,9 @@ ipython = "^8.4.0"
|
||||
[tool.poetry.scripts]
|
||||
wordlinator = "wordlinator.app:sync_main"
|
||||
|
||||
[tool.mypy]
|
||||
ignore_missing_imports = true
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
@@ -1,18 +1,17 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import datetime
|
||||
|
||||
import rich
|
||||
import rich.table
|
||||
|
||||
import wordlinator.sheets
|
||||
import wordlinator.twitter
|
||||
|
||||
WORDLE_DAY_ZERO = datetime.date(2021, 6, 19)
|
||||
WORDLE_TODAY_NUMBER = (datetime.date.today() - WORDLE_DAY_ZERO).days
|
||||
import wordlinator.utils
|
||||
|
||||
|
||||
async def get_scores(wordle_day=WORDLE_TODAY_NUMBER):
|
||||
async def get_scores(
|
||||
wordle_day: wordlinator.utils.WordleDay = wordlinator.utils.WORDLE_TODAY,
|
||||
):
|
||||
users = wordlinator.sheets.SheetsClient().get_users()
|
||||
|
||||
twitter_client = wordlinator.twitter.TwitterClient()
|
||||
@@ -21,7 +20,7 @@ async def get_scores(wordle_day=WORDLE_TODAY_NUMBER):
|
||||
|
||||
for user in users:
|
||||
user_scores = await twitter_client.get_user_wordles(user)
|
||||
day_score = [s for s in user_scores if s.wordle_no == wordle_day]
|
||||
day_score = [s for s in user_scores if s.wordle_day == wordle_day]
|
||||
scores[user] = day_score[0] if day_score else None
|
||||
|
||||
return scores
|
||||
@@ -68,9 +67,13 @@ def _get_day():
|
||||
"--wordle-day", type=int, help="The wordle day number for the score report."
|
||||
)
|
||||
args = parser.parse_args()
|
||||
wordle_day = args.wordle_day or WORDLE_TODAY_NUMBER
|
||||
if args.days_ago:
|
||||
wordle_day = WORDLE_TODAY_NUMBER - args.days_ago
|
||||
wordle_day = wordlinator.utils.WORDLE_TODAY
|
||||
if args.wordle_day:
|
||||
wordle_day = wordlinator.utils.WordleDay.from_wordle_no(args.wordle_day)
|
||||
elif args.days_ago:
|
||||
wordle_day = wordlinator.utils.WordleDay.from_wordle_no(
|
||||
wordle_day.wordle_no - args.days_ago
|
||||
)
|
||||
return wordle_day
|
||||
|
||||
|
||||
|
||||
@@ -1,23 +1,50 @@
|
||||
import itertools
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
import google.oauth2.credentials
|
||||
import google.oauth2.service_account
|
||||
import googleapiclient.discovery
|
||||
import rich
|
||||
import rich.table
|
||||
|
||||
import wordlinator.utils
|
||||
|
||||
SPREADSHEET_ID = "1POoklzvD643pvdMAleFxrecN50IMv2NdQBs9h43Hw8E"
|
||||
SHEET_NAME = "Round 1"
|
||||
SHEET_NAME = os.getenv("SHEET_NAME", "AutoTest")
|
||||
USER_RANGE = "A2:A1000"
|
||||
SCORE_RANGE = "C2:T1000"
|
||||
|
||||
|
||||
class SheetsClient:
|
||||
FILLER_VALUE = "7"
|
||||
|
||||
def __init__(
|
||||
self, sheet_id=SPREADSHEET_ID, sheet_name=SHEET_NAME, user_range=USER_RANGE
|
||||
self,
|
||||
wordle_day: wordlinator.utils.WordleDay = wordlinator.utils.WORDLE_TODAY,
|
||||
sheet_id=SPREADSHEET_ID,
|
||||
sheet_name=SHEET_NAME,
|
||||
user_range=USER_RANGE,
|
||||
score_range=SCORE_RANGE,
|
||||
):
|
||||
creds = {"developerKey": os.getenv("SHEET_API_KEY")}
|
||||
env_path = os.getenv("SHEET_TOKEN_FILE_PATH")
|
||||
if env_path:
|
||||
token_file = pathlib.Path(env_path)
|
||||
else:
|
||||
token_file = pathlib.Path.cwd() / "token.json"
|
||||
if token_file.exists():
|
||||
cred_obj = google.oauth2.service_account.Credentials
|
||||
creds = {"credentials": cred_obj.from_service_account_file(str(token_file))}
|
||||
self.client = googleapiclient.discovery.build("sheets", "v4", **creds)
|
||||
self.sheet_id = sheet_id
|
||||
self.sheet_name = sheet_name
|
||||
self.user_range = user_range
|
||||
self.score_range = score_range
|
||||
if not wordle_day.golf_hole:
|
||||
self.game_round = None
|
||||
else:
|
||||
self.game_round = wordle_day.golf_hole.hole_no
|
||||
|
||||
def _get_sheet_values(self, range):
|
||||
sheets = self.client.spreadsheets()
|
||||
@@ -28,12 +55,74 @@ class SheetsClient:
|
||||
rows = self._get_sheet_values(f"{self.sheet_name}!{self.user_range}")
|
||||
return list(filter(None, [row[0] for row in rows]))
|
||||
|
||||
def _normalize_scores(self, scores_value, completed_only=True):
|
||||
if not self.game_round:
|
||||
# Can't normalize if we're not in a game.
|
||||
return scores_value
|
||||
scores = []
|
||||
for score in scores_value or []:
|
||||
scores.append(score or self.FILLER_VALUE)
|
||||
expected_len = self.game_round
|
||||
if completed_only:
|
||||
expected_len = expected_len - 1
|
||||
current_len = len(scores)
|
||||
if current_len < expected_len:
|
||||
filler = [self.FILLER_VALUE] * (expected_len - current_len)
|
||||
scores = scores + filler
|
||||
return scores
|
||||
|
||||
def score_dict(self, names, scores, completed_only=True):
|
||||
score_data = dict(itertools.zip_longest(names, scores))
|
||||
for name in score_data:
|
||||
score_data[name] = self._normalize_scores(
|
||||
score_data[name], completed_only=completed_only
|
||||
)
|
||||
return score_data
|
||||
|
||||
def get_scores(self):
|
||||
sheets = self.client.spreadsheets()
|
||||
result = (
|
||||
sheets.values()
|
||||
.batchGet(
|
||||
spreadsheetId=self.sheet_id,
|
||||
ranges=[
|
||||
f"{self.sheet_name}!{self.user_range}",
|
||||
f"{self.sheet_name}!{self.score_range}",
|
||||
],
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
ranges = result.get("valueRanges", [])
|
||||
names = [row[0] for row in ranges[0].get("values", [])]
|
||||
scores = ranges[1].get("values", [])
|
||||
return self.score_dict(names, scores)
|
||||
|
||||
def write_scores(self, score_dict):
|
||||
body = {"values": list(score_dict.values())}
|
||||
sheets = self.client.spreadsheets()
|
||||
result = (
|
||||
sheets.values()
|
||||
.update(
|
||||
spreadsheetId=self.sheet_id,
|
||||
range=f"{self.sheet_name}!{self.score_range}",
|
||||
body=body,
|
||||
valueInputOption="USER_ENTERED",
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
users = SheetsClient().get_users()
|
||||
table = rich.table.Table("Username", title="WordleGolf Players")
|
||||
for user in users:
|
||||
table.add_row(user)
|
||||
scores = SheetsClient().get_scores()
|
||||
score_cols = [rich.table.Column(f"{i}") for i in range(1, 19)]
|
||||
table = rich.table.Table(
|
||||
rich.table.Column("Username", min_width=20),
|
||||
*score_cols,
|
||||
title="WordleGolf Players",
|
||||
)
|
||||
for name, score_list in scores.items():
|
||||
table.add_row(name, *score_list)
|
||||
rich.print(table)
|
||||
|
||||
|
||||
|
||||
@@ -10,8 +10,12 @@ import dateutil.parser
|
||||
import httpx
|
||||
import rich
|
||||
|
||||
import wordlinator.utils
|
||||
|
||||
BASE_URL = "https://api.twitter.com/2"
|
||||
WORDLE_RE = re.compile(r"Wordle(\w+)? (?P<number>\d+) (?P<score>[X\d])/6")
|
||||
WORDLE_RE = re.compile(
|
||||
r"Wordle(\w+)? (?P<number>\d+) (?P<score>[X\d])/6", re.IGNORECASE
|
||||
)
|
||||
TOKEN = os.getenv("TWITTER_TOKEN")
|
||||
|
||||
|
||||
@@ -50,7 +54,7 @@ class WordleTweet:
|
||||
|
||||
created_at: datetime.datetime
|
||||
text: str
|
||||
wordle_no: int
|
||||
wordle_day: wordlinator.utils.WordleDay
|
||||
raw_score: int
|
||||
user: TwitterUser
|
||||
|
||||
@@ -68,7 +72,7 @@ class WordleTweet:
|
||||
if not wordle:
|
||||
return None
|
||||
|
||||
wordle_no = int(wordle.groupdict()["number"])
|
||||
wordle_no = wordle.groupdict()["number"]
|
||||
score = wordle.groupdict()["score"]
|
||||
score = int(score) if score.isdigit() else 7
|
||||
|
||||
@@ -79,7 +83,7 @@ class WordleTweet:
|
||||
return cls(
|
||||
created_at=created,
|
||||
text=tweet["text"],
|
||||
wordle_no=wordle_no,
|
||||
wordle_day=wordlinator.utils.WordleDay.from_wordle_no(wordle_no),
|
||||
raw_score=score,
|
||||
user=twitter_user,
|
||||
)
|
||||
|
||||
47
wordlinator/utils.py
Normal file
47
wordlinator/utils.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import dataclasses
|
||||
import datetime
|
||||
import typing
|
||||
|
||||
WORDLE_DAY_ZERO = datetime.date(2021, 6, 19)
|
||||
|
||||
WORDLE_GOLF_ROUND_DATES = [datetime.date(2022, 5, 9), datetime.date(2022, 5, 30)]
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class GolfHole:
|
||||
game_no: int
|
||||
hole_no: int
|
||||
|
||||
@classmethod
|
||||
def from_date(cls, date: datetime.date):
|
||||
for game_no, start_date in enumerate(WORDLE_GOLF_ROUND_DATES, start=1):
|
||||
hole_value = (date - start_date).days + 1
|
||||
if 1 <= hole_value <= 18:
|
||||
return cls(game_no=game_no, hole_no=hole_value)
|
||||
return None
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class WordleDay:
|
||||
wordle_no: int
|
||||
date: datetime.date
|
||||
golf_hole: typing.Optional[GolfHole]
|
||||
|
||||
@classmethod
|
||||
def from_wordle_no(cls, wordle_no: int):
|
||||
wordle_no = int(wordle_no)
|
||||
date = WORDLE_DAY_ZERO + datetime.timedelta(days=wordle_no)
|
||||
golf_hole = GolfHole.from_date(date)
|
||||
return cls(wordle_no=wordle_no, date=date, golf_hole=golf_hole)
|
||||
|
||||
@classmethod
|
||||
def from_date(cls, date: datetime.date):
|
||||
wordle_no = (date - WORDLE_DAY_ZERO).days
|
||||
golf_hole = GolfHole.from_date(date)
|
||||
return cls(wordle_no=wordle_no, date=date, golf_hole=golf_hole)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.wordle_no == other.wordle_no
|
||||
|
||||
|
||||
WORDLE_TODAY = WordleDay.from_date(datetime.date.today())
|
||||
Reference in New Issue
Block a user