Module: fetch

The fetch module starts at your server where you're logged in and searches a hashtag. After it gets all the toots your server knows about, then it starts looking at where they came from. For each server that it finds mentioned, it calls fetch_hashtag_remote(). Each time that it connects to a new server, it fetches every toot that server knows about the hashtag. Then it looks at the servers that are mentioned and adds any new ones to a list of servers to contact.

Public API

This code depends on Mastodon.py and uses it to connect to servers that are mentioned. If you know anything about the fediverse, you know that there's more than just Mastodon servers out there. There's Pleroma, Akkoma, and various other ActivityPub-compatible servers. Some are derived from Mastodon and implement the same APIs. Others don't. Some Mastodon servers offer public read APIs, others don't. So servers that allow public read of their APIs will send you the details on their toots. Servers that don't allow public read, or that don't implement a Mastodon-compatible timeline API will be quietly skipped.

Directory Structure

Fetch organizes data in a directory based on the date of the event. So if the journaldir is data, the hashtag is monsterdon, and the event_date is 2025-10-19, then the directory structure is data/2025/10/19 and all the files will have the monsterdon hashtag in their name. See the example below:

data
├── 2025
│  └── 10
│     ├── 19
│     │  ├── data-monsterdon-analysis.json
│     │  ├── data-monsterdon-fetch.json
│     │  ├── index.md
│     │  ├── monsterdon-20251019.png
│     │  ├── monsterdon-20251019.txt
│     │  ├── monsterdon-beige.party.json
│     │  ├── monsterdon-bolha.us.json
│     │  ... lots more files, one per server...
│     │  ├── wordcloud-monsterdon-20251019-remove.png
│     │  └── wordcloud-monsterdon-20251019-remove.txt

List of files

Every file has the hashtag and the date in its name. If you ran the same analysis on 2 different hashtags on the same day, none of the files would conflict, though they would all be stored in the same directory.

Module for fetching toots for a hashtag.

TimestampEncoder

Bases: JSONEncoder

A convenience function that converts Pandas Timestamp objects to an ISO string

Source code in mastoscore/fetch.py
660
661
662
663
664
665
666
667
668
class TimestampEncoder(JSONEncoder):
    """A convenience function that converts Pandas Timestamp objects to an ISO string"""

    def default(self, o):
        if isinstance(o, Timestamp):
            return o.isoformat()
        if isinstance(o, pd.api.typing.NaTType):
            return "null"
        return super().default(o)

check_journaldir(dir_name)

Check if a directory exists and create it if it doesn't.

Parameters:

Name Type Description Default
directory str

The name of the directory to check/create.

required

Returns:

Type Description
bool

True if the directory already existed or was created, False means we tried

bool

to create it and failed.

Source code in mastoscore/fetch.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def check_journaldir(dir_name: str) -> bool:
    """
    Check if a directory exists and create it if it doesn't.

    Args:
      directory (str): The name of the directory to check/create.

    Returns:
      True if the directory already existed or was created, False means we tried
      to create it and failed.
    """
    global logger

    journaldir = abspath(dir_name)
    if exists(journaldir):
        if isdir(journaldir):
            if access(journaldir, W_OK):
                return True
            else:
                logger.critical(f"'{journaldir}' directory exists but is not writeable")
                return False
        else:
            logger.critical(
                f"Something already exists at '{journaldir}' but it is not a directory"
            )
            return False
    else:
        try:
            makedirs(journaldir)
            logger.warning(f"Created '{journaldir}' successfully.")
            return True
        except OSError as e:
            logger.critical(f"Error creating directory '{journaldir}': {e}")
            return False
        except Exception as e:
            logger.critical(f"Error creating directory '{journaldir}': {e}")
            return False

create_journal_directory(config)

Create a hierarchical directory structure for journal files.

Config Values Used

Option Description
mastoscore:journaldir Base directory for journal files: typically the hashtag
mastoscore:event_year Year as string (YYYY)
mastoscore:event_month Month as string (MM)
mastoscore:event_day Day as string (DD)

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required

Returns:

Type Description
str | None

Full path to the created directory, or None if creation failed

Source code in mastoscore/fetch.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def create_journal_directory(config: ConfigParser) -> str | None:
    """
    Create a hierarchical directory structure for journal files.

    ## Config Values Used
    | Option | Description |
    | ------- | ------- |
    | mastoscore:journaldir | Base directory for journal files: typically the hashtag
    | mastoscore:event_year | Year as string (YYYY)
    | mastoscore:event_month | Month as string (MM)
    | mastoscore:event_day | Day as string (DD)

    Args:
      config: A ConfigParser object from the config module

    Returns:
      Full path to the created directory, or None if creation failed
    """
    global logger

    # Get date components from config
    year = config.get("mastoscore", "event_year")
    month = config.get("mastoscore", "event_month")
    day = config.get("mastoscore", "event_day")
    base_dir = config.get("mastoscore", "journaldir")

    # Create the full path
    dir_path = join(base_dir, year, month, day)
    dir_path = abspath(dir_path)

    # Check if directory exists
    if exists(dir_path):
        if isdir(dir_path):
            if access(dir_path, W_OK):
                return dir_path
            else:
                logger.critical(f"Directory '{dir_path}' exists but is not writeable")
                return None
        else:
            logger.critical(f"Path '{dir_path}' exists but is not a directory")
            return None

    # Create directory structure
    try:
        makedirs(dir_path, exist_ok=True)
        logger.debug(f"Created directory structure: '{dir_path}'")
        return dir_path
    except Exception as e:
        logger.critical(f"Error creating directory structure '{dir_path}': {e}")
        return None

fetch(config, progress=FetchProgress(), cancel_token=CancelToken())

This is the top-level function that will download toots and store them in a JSON cache. This function will create a tooter and login to the server named in the cred_file.

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required
progress FetchProgress

A thread-safe webapi FetchProgress FetchProgress object

FetchProgress()
  • **cancel_token: a thread-safe webapi FetchProgress CancelToken object that the thread will check to determine if it should stop mid-fetch

Config Parameters Used

  • fetch:max: Max number of toots to pull from a server (default: 2000)
  • fetch:overwrite: If True, overwrite files (re-fetch). If False and a file exists for a server, skip it. Default: False
  • fetch:parallel_workers: int, how many fetches to run in parallel. Default: 5
  • mastoscore:api_base_url: Starting server for our first connection
  • mastoscore:cred_file: Implicitly used when we create our Tooter
  • mastoscore:debug: Level for debug logging
  • mastoscore:hashtag: Hashtag to search for
  • mastoscore:timezone: Timezone for any non-UTC times
  • mastoscore:journalfile: prefix for journal files.

Returns:

None

Source code in mastoscore/fetch.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def fetch(config: ConfigParser, progress:FetchProgress=FetchProgress(),
    cancel_token:CancelToken=CancelToken()) -> dict | None:
    """
    This is the top-level function that will download toots and store them in a JSON cache. This
    function will create a [tooter](module-tooter.md) and login to the server named in the `cred_file`.

    Args:
      config: A ConfigParser object from the [config](module-config.md) module
      progress: A thread-safe [webapi FetchProgress](reference/module-webapi.md) FetchProgress object
      to track partial progress for reporting.
    - **cancel_token: a thread-safe [webapi FetchProgress](reference/module-webapi.md) CancelToken
      object that the thread will check to determine if it should stop mid-fetch

    ## Config Parameters Used
    - fetch:max: Max number of toots to pull from a server (default: 2000)
    - fetch:overwrite: If True, overwrite files (re-fetch). If False and
      a file exists for a server, skip it. Default: False
    - fetch:parallel_workers: int, how many fetches to run in parallel. Default: 5
    - mastoscore:api_base_url: Starting server for our first connection
    - mastoscore:cred_file: Implicitly used when we create our [Tooter](module-tooter.md)
    - mastoscore:debug: Level for debug logging
    - mastoscore:hashtag: Hashtag to search for
    - mastoscore:timezone: Timezone for any non-UTC times
    - mastoscore:journalfile: prefix for journal files.

    Returns:

    None
    """

    overwrite = config.getboolean("fetch", "overwrite", fallback=False)
    parallel_workers = config.getint("fetch", "parallel_workers", fallback=5)
    maxtoots = config.getint("mastoscore", "max")
    journalfile = config.get("mastoscore", "journalfile")
    hashtag = config.get("mastoscore", "hashtag")
    debug = get_debug_level(config, "mastoscore")
    api_base_url = config.get("mastoscore", "api_base_url")
    timezone = pytimezone(config.get("mastoscore", "timezone"))

    fresults = {}
    start_time = datetime.datetime.now(tz=timezone)
    end_time = datetime.datetime.now(tz=timezone)
    fetch_duration = datetime.timedelta(0)
    logger = logging.getLogger(__name__)
    logging.basicConfig(
        format="%(asctime)s %(levelname)-8s %(message)s",
        level=logging.ERROR,
        datefmt="%H:%M:%S",
    )
    logger.setLevel(debug)

    oldest_date, newest_date = get_fetch_window(config)
    oldest_str = oldest_date.strftime("%Y-%m-%d %H:%M:%S")
    newest_str = newest_date.strftime("%Y-%m-%d %H:%M:%S")
    logger.debug(f"Fetch window: {oldest_str} to {newest_str}")

    # Create directory structure
    dir_path = create_journal_directory(config)
    # Make sure we can write data before we try to fetch it
    if dir_path is None:
        return None

    try:
        t = Tooter(config, "fetch")
    except Exception as e:
        logger.critical(f"Failed to create Tooter for {api_base_url}")
        logger.critical(e)
        raise RuntimeError(f"Failed to create Tooter: {e}") from e

    logger.debug(
        f"Looking for at most {maxtoots} toots visible from {t.api_base_url} with #{hashtag} since {oldest_str}"
    )

    # Collect toots from paginated generator
    toots = []
    for page in t.search_hashtag(hashtag, oldest_date, maxtoots):
        toots.extend(page)

    if not toots:
        logger.error(
            "We found 0 toots for hashtag %s on %s", f"#{hashtag}", api_base_url
        )
        return None
    else:
        logger.debug(f"Found {len(toots)} local toots")
        df = toots2df(toots, api_base_url)
        write_journal(config, df, api_base_url.split("/")[2])

    # Look for non-local statuses. Let's figure out how many remote servers we need
    # to contact. This splits a URI like https://example.net/blah/blah/blah on slashes
    # takes the first 0-3 elements, and rejoins it on slashes. Produces https://example.net
    uris = ["/".join(s.split("/")[0:3]) for s in df["uri"]]

    # servers_done holds the list of servers that we've already contacted
    # servers_todo holds the list we still need to contact
    servers_done = set()
    servers_todo = set(uris)
    servers_fail = set()
    total_toots = len(df)
    try:
        # don't need to contact our own server, because we already got the local toots.
        servers_todo.remove(api_base_url)
    except Exception:
        logger.warning(f"api_base_url ({api_base_url}) wasn't in the set.")
    servers_done.add(api_base_url)

    # Thread-safe lock for shared state
    servers_lock = threading.Lock()

    # Initialize progress tracking
    if progress:
        # Add the initial server (api_base_url) as completed
        progress.server_status[api_base_url] = {'state': 'done', 'toots': total_toots}
        # Initialize all remaining servers as pending
        with servers_lock:
            for uri in servers_todo:
                progress.server_status[uri] = {'state': 'pending', 'toots': 0}
        progress.update(len(servers_todo), len(servers_done), len(servers_fail), total_toots)

    # Clean up dataframes we don't need anymore
    del df
    del toots

    logger.info(f"Starting parallel fetch: {len(servers_todo)} servers to process with {parallel_workers} workers")

    # Parallel fetch using ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
        futures = {}  # future -> uri mapping

        # Submit initial batch of work
        initial_count = min(parallel_workers, len(servers_todo))
        for _ in range(initial_count):
            with servers_lock:
                if servers_todo:
                    uri = servers_todo.pop()
                else:
                    break
            future = executor.submit(fetch_single_server, config, uri, dir_path, journalfile, overwrite, progress, cancel_token)
            futures[future] = uri

        logger.debug(f"Initial batch: {len(futures)} futures submitted, {len(servers_todo)} servers remaining")

        # Process completions and submit new work
        while futures:
            # Wait for at least one future to complete
            done, pending = wait(futures.keys(), return_when=FIRST_COMPLETED)

            for future in done:
                # Check for cancellation
                if cancel_token and cancel_token.is_cancelled():
                    logger.info("Fetch cancelled by user")
                    futures.clear()
                    break

                uri = futures[future]
                _, toots_result, newuris = future.result()

                with servers_lock:
                    if toots_result is None:
                        servers_fail.add(uri)
                        logger.debug(f"Failed: {uri}")
                    else:
                        servers_done.add(uri)
                        total_toots += len(toots_result)

                        # Add newly discovered servers
                        if newuris:
                            new_count = 0
                            for server in set(newuris):
                                if server not in servers_done and server not in servers_todo:
                                    servers_todo.add(server)
                                    # Add to progress status as pending
                                    if progress:
                                        progress.server_status[server] = {'state': 'pending', 'toots': 0}
                                    new_count += 1
                            if new_count > 0:
                                logger.info(f"Discovered {new_count} new servers from {uri}")

                    # Update progress
                    if progress:
                        progress.update(len(servers_todo), len(servers_done), len(servers_fail), total_toots, uri)

                    logger.info(
                        f"Processed {uri}. Todo: {len(servers_todo)}, Done: {len(servers_done)}, Fail: {len(servers_fail)}"
                    )

                    # Submit next server if available
                    if servers_todo and not (cancel_token and cancel_token.is_cancelled()):
                        next_uri = servers_todo.pop()
                        new_future = executor.submit(fetch_single_server, config, next_uri, dir_path, journalfile, overwrite, progress, cancel_token)
                        futures[new_future] = next_uri
                        logger.debug(f"Submitted: {next_uri}")

                # Remove completed future
                del futures[future]

        logger.debug(f"Exited loop: {len(futures)} futures remaining, {len(servers_todo)} servers in todo")

    end_time = datetime.datetime.now(tz=timezone)
    fetch_start = start_time.strftime("%a %e %b %Y %H:%M:%S %Z")
    fetch_end = end_time.strftime("%a %e %b %Y %H:%M:%S %Z")
    fetch_duration = end_time - start_time
    duration_string = ""
    fetch_hours, fetch_seconds = divmod(int(fetch_duration.total_seconds()), 3600)
    if fetch_hours > 1:
        duration_string = f"{fetch_hours} hours"
    elif fetch_hours == 1:
        duration_string = f"{fetch_hours} hour"

    fetch_minutes, fetch_seconds = divmod(fetch_seconds, 60)
    if fetch_minutes > 1:
        duration_string = f"{duration_string} {fetch_minutes} minutes"
    elif fetch_minutes == 1:
        duration_string = f"{duration_string} {fetch_minutes} minute"

    if fetch_seconds > 1:
        duration_string = f"{duration_string} {fetch_seconds} seconds"
    elif fetch_seconds == 1:
        duration_string = f"{duration_string} {fetch_seconds} second"
    else:
        duration_string = f"{duration_string} exactly"

    fresults["total_toots"] = total_toots
    fresults["servers_done"] = list(servers_done)
    fresults["servers_fail"] = list(servers_fail)
    fresults["oldest_date"] = oldest_date.strftime("%a %e %b %Y %H:%M:%S %Z")
    fresults["total_toots"] = total_toots
    fresults["fetch_time"] = fetch_start
    fresults["fetch_end"] = fetch_end
    fresults["fetch_duration"] = duration_string
    fresults["fetch_version"] = __version__

    write_json(config, "fetch", fresults)
    logger.info(
        f"Done! Collected {total_toots} toots from {len(servers_done)} servers with {len(servers_fail)} failures."
    )

    # Return summary for API
    return {
        "servers_done": list(servers_done),
        "servers_fail": list(servers_fail),
        "total_toots": total_toots,
        "duration_seconds": fetch_duration.total_seconds(),
        "output_file": dir_path
    }

fetch_hashtag_remote(config, server, progress=FetchProgress(), cancel_token=CancelToken())

Given a uri of a toot, (like from Mastodon.status), create a Tooter for that URI. Connect and fetch the statuses. Return a few fields, but not all.

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required
server str

The api_base_url of a server to fetch from

required
progress FetchProgress

Optional progress tracker for reporting incremental progress

FetchProgress()
cancel_token CancelToken

Optional token that can tell us we need to cancel

CancelToken()

Config Parameters Used

Option Description
fetch:max Max number of toots to pull from a server (default: 2000)
mastoscore:hashtag Hashtag to search for

Returns:

Type Description
list | None

List of statuses in the raw JSON format from the API. Fields are not

list | None

normalised or converted in any way. Since not all ActivityPub servers

list | None

are exactly the same, it's not even sure which fields you get.

Source code in mastoscore/fetch.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def fetch_hashtag_remote(config:ConfigParser,
    server: str,
    progress:FetchProgress=FetchProgress(),
    cancel_token:CancelToken=CancelToken()) -> list | None:
    """
    Given a uri of a toot, (like from Mastodon.status), create a Tooter
    for that URI. Connect and fetch the statuses. Return a few fields, but not all.

    Args:
      config: A ConfigParser object from the [config](module-config.md) module
      server: The api_base_url of a server to fetch from
      progress: Optional progress tracker for reporting incremental progress
      cancel_token: Optional token that can tell us we need to cancel

    ## Config Parameters Used
    | Option | Description |
    | ------- | ------- |
    | fetch:max | Max number of toots to pull from a server (default: 2000)
    | mastoscore:hashtag | Hashtag to search for

    Returns:
      List of statuses in the raw JSON format from the API. Fields are not
      normalised or converted in any way. Since not all ActivityPub servers
      are exactly the same, it's not even sure which fields you get.
    """

    maxtoots = config.getint("mastoscore", "max")
    hashtag = config.get("mastoscore", "hashtag")
    oldest_date, newest_date = get_fetch_window(config)

    # Make the tooter that will do the searching.
    try:
        t = Tooter(config, "fetch", server)
        logger.debug(f"Tooter created for {server}")
    except Exception as e:
        logger.error(f"Failed to create Tooter for {server}")
        logger.error(e)
        return None

    try:
        # Collect toots from paginated generator
        all_toots = []
        for page in t.search_hashtag(hashtag, oldest_date, maxtoots):
            # Check for cancellation between pages
            if cancel_token and cancel_token.is_cancelled():
                logger.debug(f"Fetch cancelled for {server}")
                return None

            all_toots.extend(page)
            # Update progress with current toot count
            if progress:
                progress.update_server_progress(server, len(all_toots))

        return all_toots if all_toots else None
    except Exception:
        logger.error(f"fetch_hashtag_remote: failure fetching {hashtag} from {server}.")
        return None

fetch_single_server(config, uri, dir_path, journalfile, overwrite, progress=FetchProgress(), cancel_token=CancelToken())

Fetch toots from a single server (worker function for parallel execution). Assumes the journal directory already exists.

Returns:

Name Type Description
tuple tuple

(uri, toots_list, new_server_uris) or (uri, None, None) on failure

Source code in mastoscore/fetch.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def fetch_single_server(config: ConfigParser, uri: str, dir_path: str,
    journalfile: str, overwrite: bool, progress:FetchProgress=FetchProgress(), cancel_token: CancelToken=CancelToken()) -> tuple:
    """
    Fetch toots from a single server (worker function for parallel execution). Assumes
    the journal directory already exists.

    Returns:
        tuple: (uri, toots_list, new_server_uris) or (uri, None, None) on failure
    """
    logger = logging.getLogger(__name__)
    server = uri.split("/")[2]
    jfilename = join(dir_path, f"{journalfile}-{server}.json")

    # Check if file exists and we shouldn't overwrite
    if not overwrite:
        if exists(jfilename) and (stat(jfilename).st_size > 0):
            logger.warning(f"{jfilename} exists, skipping")
            return (uri, None, None)

    # Mark server as started
    if progress:
        progress.start_server(uri)

    # Fetch from remote server with progress updates
    newtoots = fetch_hashtag_remote(config, uri, progress, cancel_token)
    if newtoots is None:
        logger.debug(f"Got no toots back from {uri}")
        if progress:
            progress.complete_server(uri, 0, failed=True)
        return (uri, None, None)

    # Convert to dataframe and write
    try:
        df = toots2df(newtoots, uri)
        if not write_journal(config, df, server):
            if progress:
                progress.complete_server(uri, len(newtoots), failed=True)
            return (uri, None, None)
    except Exception as e:
        logger.error(f"Failed to convert {len(newtoots)} toots from {uri}: {e}")
        if progress:
            progress.complete_server(uri, 0, failed=True)
        return (uri, None, None)

    # Mark server as complete
    if progress:
        progress.complete_server(uri, len(newtoots), failed=False)

    # Extract new server URIs
    newuris = ["/".join(s["uri"].split("/")[0:3]) for s in newtoots]

    return (uri, newtoots, newuris)

read_json(config, filename)

Given a config and a filename (a fragment, like 'analysis'), figure out the path to the right JSON file. Read the file, if it exists, and return its contents in a dict.

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required
filename str

The JSON file name to open

required

Returns:

Type Description
dict

A dictionary of the contents of the file, if it is successful. Return empty

dict

dict if there are problems.

Source code in mastoscore/fetch.py
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
def read_json(config: ConfigParser, filename: str) -> dict:
    """
    Given a config and a filename (a fragment, like 'analysis'), figure out the path
    to the right JSON file. Read the file, if it exists, and return its contents
    in a dict.

    Args:
        config: A ConfigParser object from the [config](module-config.md) module
        filename: The JSON file name to open

    Returns:
        A dictionary of the contents of the file, if it is successful. Return empty
        dict if there are problems.

    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get("mastoscore", "journalfile")
    base_name = f"data-{journalfile}-{filename}.json"

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return {}

    jfilename = join(dir_path, base_name)
    try:
        with open(jfilename, "r") as jfile:
            analysis = load(jfile)
        logger.debug(
            f"Opened {jfilename} and read {len(analysis.keys())} keys from it."
        )
    except (OSError, IOError):
        logger.warning(f"Failed to read {filename} analysis in {jfilename}")
        raise
    except Exception:
        logger.warning(f"Failed to read {filename} analysis in {jfilename}")
        raise

    return analysis

reset_fetch(config)

Delete all fetch data files for a given configuration.

Removes: - All journal files (data-{journalfile}-{server}.json) - Fetch results file (data-{journalfile}-fetch.json)

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required

Config Parameters Used

  • mastoscore:hashtag: Hashtag to search for
  • mastoscore:journalfile: slug for journal files

Returns: dict with: - files_deleted: list of dicts with 'path' and 'size' for each file - count: number of files deleted - total_size: total bytes deleted

Source code in mastoscore/fetch.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def reset_fetch(config: ConfigParser) -> dict:
    """
    Delete all fetch data files for a given configuration.

    Removes:
    - All journal files (data-{journalfile}-{server}.json)
    - Fetch results file (data-{journalfile}-fetch.json)

    Args:
      config: A ConfigParser object from the config module

    ## Config Parameters Used
    - mastoscore:hashtag: Hashtag to search for
    - mastoscore:journalfile: slug for journal files

    Returns:
    dict with:
    - files_deleted: list of dicts with 'path' and 'size' for each file
    - count: number of files deleted
    - total_size: total bytes deleted
    """
    logger = logging.getLogger(__name__)

    journalfile = config.get("mastoscore", "journalfile")
    hashtag = config.get("mastoscore", "hashtag")

    dir_path = create_journal_directory(config)

    if dir_path is None or not exists(dir_path):
        return {"files_deleted": [], "count": 0, "total_size": 0}

    deleted_files = []
    total_size = 0

    # Pattern 1: Journal files per server (data-{journalfile}-*.json)
    data_files = join(dir_path, f"data-{journalfile}-*.json")

    # Pattern 2: Fetch results file (data-{journalfile}-fetch.json)
    journal_files = join(dir_path, f"{hashtag}-*.json")

    # Find and delete all matching files
    for pattern in [data_files, journal_files]:
        for filepath in glob(pattern):
            try:
                # Get file size before deleting
                file_size = stat(filepath).st_size
                remove(filepath)
                deleted_files.append({
                    'path': filepath,
                    'size': file_size
                })
                total_size += file_size
                logger.info(f"Deleted: {filepath} ({file_size} bytes)")
            except Exception as e:
                logger.error(f"Failed to delete {filepath}: {e}")

    return {
        "files_deleted": deleted_files,
        "count": len(deleted_files),
        "total_size": total_size
    }

toots2df(toots, api_base_url)

Take in a list of toots from a tooter object, turn it into a pandas dataframe with a bunch of data normalized.

Args: - toots: list. A list of toots in the same format as returned by the search_hashtag() API - api_base_url: string. Expected to include protocol, like https://server.example.com.

Returns: A Pandas DataFrame that contains all the toots normalised. Normalisation includes: - Converting date fields like created_at to timezone-aware datetime objects - Converting integer fields like reblogs_count to integers - Adding some columns (see below) - Discarding all but a few columns. So many different systems return different columns, and I'm only using a few of them. So I just discard everything else. This cuts down on storage and processing time.

Synthetic columns added:

server: The server part of api_base_url: server.example.com if the api_base_url is https://server.example.com userid: The user's name in person@server.example.com format. Note it does not have the leading @ because tagging people is optional. local: Boolean that is True if the toot comes from the api_base_url server. False otherwise. source: The server part of the server who owns the toot. I might be talking to server.example.com, but they've sent me a copy of a toot from other.example.social.

Source code in mastoscore/fetch.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def toots2df(toots: list, api_base_url: str) -> pd.DataFrame:
    """
    Take in a list of toots from a tooter object, turn it into a
    pandas dataframe with a bunch of data normalized.

    Args:
    - toots: list. A list of toots in the same format as returned by the search_hashtag() API
    - api_base_url: string. Expected to include protocol, like `https://server.example.com`.

    Returns:
    A Pandas DataFrame that contains all the toots normalised. Normalisation includes:
    - Converting date fields like `created_at` to timezone-aware `datetime` objects
    - Converting integer fields like `reblogs_count` to integers
    - Adding some columns (see below)
    - Discarding all but a few columns. So many different systems return different columns, and I'm only
      using a few of them. So I just discard everything else. This cuts down on storage and processing time.

    # Synthetic columns added:
      server: The server part of `api_base_url`: `server.example.com` if the `api_base_url` is `https://server.example.com`
      userid: The user's name in `person@server.example.com` format. Note it does not have the leading `@` because tagging people is optional.
      local: Boolean that is **True** if the toot comes from the `api_base_url` server. **False** otherwise.
      source: The server part of the server who owns the toot. I might be talking to `server.example.com`, but they've sent me a copy of a toot from `other.example.social`.
    """

    df = pd.json_normalize(toots)
    df["source"] = api_base_url.split("/")[2]
    df["local"] = [True if i.startswith(api_base_url) else False for i in df["uri"]]
    # make a new "server" column off of uris
    df["server"] = [n.split("/")[2] for n in df["uri"]]
    df["userid"] = df["account.username"] + "@" + df["server"]
    df["reblogs_count"] = df["reblogs_count"].astype(int)
    df["replies_count"] = df["replies_count"].astype(int)
    df["favourites_count"] = df["favourites_count"].astype(int)
    df["created_at"] = pd.to_datetime(df["created_at"], utc=True, format="ISO8601")

    # Define the columns to keep, all others will be deleted
    desired_columns = {
        "account.display_name",
        "account.indexable",
        "account.url",
        "content",
        "created_at",
        "external_replies_count",
        "favourites_count",
        "id",
        "in_reply_to_id",
        "local",
        "max_boosts",
        "max_faves",
        "max_replies",
        "most_toots",
        "num_toots",
        "preamble",
        "reblogs_count",
        "replies_count",
        "required",
        "self_reply_count",
        "server",
        "source",
        "uri",
        "url",
        "userid",
    }
    # Get the intersection of desired columns and actual columns
    columns_to_keep = list(desired_columns.intersection(df.columns))

    # Create new data frame with only desired columns, implicitly discarding all others
    small_df = df[columns_to_keep]

    return small_df

update_json(config, filename, results)

Given a filename and a dictionary, read the JSON file, merge the provided dictionary into it, then save the final result to the same location.

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required
filename str

The JSON file name to write

required
results dict

A JSON dictionary to write into that file

required

Returns:

Type Description
bool

True if it is successful, False otherwise

Source code in mastoscore/fetch.py
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
def update_json(config: ConfigParser, filename: str, results: dict) -> bool:
    """
    Given a filename and a dictionary, read the JSON file, merge the
    provided dictionary into it, then save the final result to the same
    location.

    Args:
        config: A ConfigParser object from the [config](module-config.md) module
        filename: The JSON file name to write
        results: A JSON dictionary to write into that file

    Returns:
        True if it is successful, False otherwise
    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get("mastoscore", "journalfile")
    base_name = f"data-{journalfile}-{filename}.json"

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return False

    # Create full file path
    jfilename = join(dir_path, base_name)
    try:
        with open(jfilename, "r") as jfile:
            analysis = load(jfile)
        logger.debug(
            f"Opened {jfilename} and read {len(analysis.keys())} keys from it."
        )
    except (OSError, IOError) as e:
        logger.critical(f"Failed to read {filename} analysis in {jfilename}")
        logger.critical(e)
        return False
    except Exception as e:
        logger.critical(f"Failed to read {filename} analysis in {jfilename}")
        logger.critical(e)
        return False

    # append keys from results into analysis. Will overwrite if any are
    # the same.
    analysis.update(results)

    # Write the updated data back to the file
    try:
        with open(jfilename, "w") as jfile:
            dump(analysis, jfile, cls=TimestampEncoder)
        logger.debug(f"Now {len(analysis.keys())} keys.")
        logger.info(f"wrote {journalfile}-{filename}.json")
        return True
    except (OSError, IOError) as e:
        logger.critical(f"Failed to write {filename} analysis to {jfilename}")
        logger.critical(e)
        return False
    except Exception as e:
        logger.critical(f"Failed to write {filename} analysis to {jfilename}")
        logger.critical(e)
        return False

write_journal(config, df, server)

Take dataframe and the url it represents, and calls pandas.DataFrame.to_json() to write it to a corresponding json journal file. Writes it to a file in a hierarchical directory structure: journaldir/year/month/day/journalfile-server.json.

Config Parameters Used

Option Description
mastoscore:journalfile Journal file template

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required
df DataFrame

A Pandas DataFrame full of toots to write out.

required
server str

The api_base_url of a server to fetch from

required

Returns:

Type Description
bool

True if successful, False otherwise

Source code in mastoscore/fetch.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def write_journal(config: ConfigParser, df: pd.DataFrame, server: str) -> bool:
    """
    Take dataframe and the url it represents, and calls
    [pandas.DataFrame.to_json()](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html)
    to write it to a corresponding json journal file. Writes it to a file in a hierarchical
    directory structure: `journaldir/year/month/day/journalfile-server.json`.

    ## Config Parameters Used
    | Option | Description |
    | ------- | ------- |
    | mastoscore:journalfile | Journal file template

    Args:
      config: A ConfigParser object from the [config](module-config.md) module
      df: A Pandas DataFrame full of toots to write out.
      server: The api_base_url of a server to fetch from

    Returns:
      True if successful, False otherwise
    """
    logger = logging.getLogger(__name__)

    journalfile = config.get("mastoscore", "journalfile")

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return False

    # Create full file path
    jfilename = join(dir_path, f"{journalfile}-{server}.json")

    try:
        df.to_json(jfilename, orient="records", date_format="iso", date_unit="s")
        logger.info(f"Wrote {len(df)} total toots to {jfilename}")
    except Exception as e:
        logger.critical(f"Failed to write {len(df)} toots to {jfilename}")
        logger.critical(e)
        return False
    return True

write_json(config, filename, results)

Given a config and a filename (which is a fragment, like 'analysis'), write the dictionary into a JSON file. This will overwrite any existing JSON file of the same name. Use update_json() to update without clobbering the original.

Parameters:

Name Type Description Default
config ConfigParser

A ConfigParser object from the config module

required
filename str

The JSON file name to write

required
results dict

A JSON dictionary to write into that file

required

Returns:

Type Description
bool

True if it is successful, False otherwise

Source code in mastoscore/fetch.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
def write_json(config: ConfigParser, filename: str, results: dict) -> bool:
    """
        Given a config and a filename (which is a fragment, like 'analysis'), write
        the dictionary into a JSON file. This will overwrite any existing JSON file
        of the same name. Use update_json() to update without clobbering the original.

        Args:
            config: A ConfigParser object from the [config](module-config.md) module
            filename: The JSON file name to write
            results: A JSON dictionary to write into that file

        Returns:
            True if it is successful, False otherwise
    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get("mastoscore", "journalfile")
    base_name = f"data-{journalfile}-{filename}.json"

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return False

    # Create full file path
    rfilename = join(dir_path, base_name)
    try:
        with open(rfilename, "+w") as rfile:
            dump(results, rfile, cls=TimestampEncoder)
    except (OSError, IOError) as e:
        logger.critical(f"Failed to write {filename} results to {rfilename}")
        logger.critical(e)
        return False
    except Exception as e:
        logger.critical(f"Failed to write {filename} results to {rfilename}")
        logger.critical(e)
        return False

    logger.info(f"wrote {journalfile}-{filename}.json")
    return True