17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 | class Tooter(Mastodon):
credentials: dict = {}
hostname: str = ""
files: dict = {}
def __init__(self, config, phase: str, server: str = ""):
# Handle debug level - can be string (INFO, DEBUG) or int (10, 20)
self.debug = get_debug_level(config, phase)
self.logger = logging.getLogger("tooter")
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.ERROR,
datefmt="%H:%M:%S",
)
self.logger.setLevel(self.debug)
if server != "":
self.logger.debug(f"Connecting to {server} anonymously")
super().__init__(
user_agent="mastoscore",
debug_requests=False,
feature_set="pleroma",
request_timeout=10,
api_base_url=server,
)
self.logger.debug(f"Connected to {self.api_base_url} (anonymous)")
else:
try:
self.cred_file = config.get(phase, "cred_file")
self.logger.debug(f"Logging in from {self.cred_file}")
super().__init__(
access_token=self.cred_file,
user_agent="mastoscore",
debug_requests=False,
feature_set="pleroma",
request_timeout=10,
)
account = self.me()
self.id = account["id"]
self.name = account.username
self.acct = self.name
self.logger.debug(
f"Logged in as {account.username} on {self.api_base_url} (uid {account.id})"
)
except FileNotFoundError as e:
raise RuntimeError(f"Credentials file not found: {self.cred_file}. Please create it with your Mastodon access token.") from e
except Exception as e:
if "API base URL is required" in str(e):
raise RuntimeError(f"Invalid or missing credentials in {self.cred_file}. The file should contain a valid Mastodon access token.") from e
raise
def check_toot_ages(
self, tootlist: Sequence[Dict[str, Any]], oldest_date: datetime.datetime
) -> List[Dict[str, Any]]:
returnlist = []
for toot in tootlist:
tootdate = toot["created_at"]
if tootdate > oldest_date:
returnlist.append(toot)
return returnlist
def search_hashtag(
self, hashtag: str, oldest_date: datetime.datetime, max: int = 2000
) -> Generator[List[Dict[str, Any]], None, None]:
"""
Given a hashtag, search the public timeline for that hashtag. Yield pages
of toots that are newer than oldest_date.
Args:
hashtag: String of the hashtag to search for
oldest_date: Oldest date that we will accept
max: Maximum number of toots to fetch (default: 2000)
Returns:
Yields pages of toots (up to 40 per page) as they are fetched.
"""
toots_so_far: int = 0
# According to https://docs.joinmastodon.org/methods/timelines/#tag
# max page size, by default, is 40
pagesize: int = min(max, 40) if max < 40 else 40
page: List[Dict[str, Any]] = self.timeline_hashtag(hashtag, limit=pagesize) # type: ignore
if len(page) == 0:
self.logger.error(f"No toots found for hashtag: {hashtag}")
return
# Keep fetching toots until we get to one that is older than oldest_date
# or we get to the end.
while len(page) > 0 and toots_so_far < max:
# check to see if we have reached the oldest allowable toot
toots_to_keep: List[Dict[str, Any]] = self.check_toot_ages(page, oldest_date) # type: ignore
toots_so_far += len(toots_to_keep)
self.logger.debug(
f"Added {len(toots_to_keep)} more toots, {toots_so_far} total"
)
# Yield this page
if toots_to_keep:
yield toots_to_keep
# if the check ages function returns fewer than we sent to it
# we have hit the end of the list of toots
if len(toots_to_keep) < len(page):
self.logger.debug("stopping due to age")
return
self.logger.debug(f"Requesting {pagesize} more toots")
page = self.fetch_next(page) # type: ignore
# According to https://docs.joinmastodon.org/api/rate-limits/#per-ip
# the rate limit is 300 calls in 5 minutes. If we have 2000 toots we have
# to fetch in pages of 40, that's 50 API calls + some authentication calls
if toots_so_far >= max:
# process what we just got and stop
toots_to_keep = self.check_toot_ages(page, oldest_date) # type: ignore
if toots_to_keep:
yield toots_to_keep
self.logger.debug(
f"More than {max} toots. (We got {toots_so_far}). Stopping"
)
break
|