From df27414d5d7d3d1e75d2ee4e7172d4e68a416316 Mon Sep 17 00:00:00 2001 From: Jonathan Adamczewski Date: Thu, 17 Nov 2022 00:15:42 -0800 Subject: [PATCH] Support for threads and related changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Omit the horizontal rule between tweets where one is a reply to the previous tweet - Add an anchor to each tweet - Remove the "Replying to" header - instead, link @names to the tweet (on twitter) being replied to - When outputting replies to self that don't immediately follow the tweet being replied to, print an up arrow ↑ that links to the anchor of the previous tweet in the thread - When outputting a tweet that has replies that don't immediately follow, print down arrows ↓ that link to the anchor of each reply (I'm not sure of the method of linking is actually useful - id's and filenames are valid, but may need to be expressed differently to work as intended in practice) --- parser.py | 74 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/parser.py b/parser.py index cc34439..3d76a1a 100755 --- a/parser.py +++ b/parser.py @@ -24,6 +24,7 @@ import os import re import shutil +from collections import namedtuple def read_json_from_js_file(filename): """Reads the contents of a Twitter-produced .js file into a dictionary.""" @@ -42,35 +43,38 @@ def extract_username(account_js_filename): account = read_json_from_js_file(account_js_filename) return account[0]['account']['username'] +ExtractedTweet = namedtuple('ExtractedTweet', 'id reply_to_self reply_to_id body') + def tweet_json_to_markdown(tweet, username, archive_media_folder, output_media_folder_name, tweet_icon_path): """Converts a JSON-format tweet into markdown. Returns tuple of timestamp and markdown.""" + tweet = tweet['tweet'] timestamp_str = tweet['created_at'] timestamp = int(round(datetime.datetime.strptime(timestamp_str, '%a %b %d %X %z %Y').timestamp())) # Example: Tue Mar 19 14:05:17 +0000 2019 body = tweet['full_text'] tweet_id_str = tweet['id_str'] + # replace t.co URLs with their original versions if 'entities' in tweet and 'urls' in tweet['entities']: for url in tweet['entities']['urls']: if 'url' in url and 'expanded_url' in url: body = body.replace(url['url'], url['expanded_url']) + # if the tweet is a reply, construct a header that links the names of the accounts being replied to the tweet being replied to - header = '' + in_reply_to_status_id = '' + reply_to_self = False if 'in_reply_to_status_id' in tweet: + in_reply_to_status_id = tweet['in_reply_to_status_id'] # match and remove all occurences of '@username ' at the start of the body replying_to = re.match(r'^(@[0-9A-Za-z_]* )*', body)[0] if replying_to: - body = body[len(replying_to):] + # some old tweets lack 'in_reply_to_screen_name': use it if present, otherwise fall back to names[0] + in_reply_to_screen_name = tweet['in_reply_to_screen_name'] if 'in_reply_to_screen_name' in tweet else replying_to.split()[0] + body = f'[{replying_to[:-1]}](https://twitter.com/{in_reply_to_screen_name}/status/{in_reply_to_status_id}) ' + body[len(replying_to):] else: - # no '@username ' in the body: we're replying to self - replying_to = f'@{username}' - names = replying_to.split() - # some old tweets lack 'in_reply_to_screen_name': use it if present, otherwise fall back to names[0] - in_reply_to_screen_name = tweet['in_reply_to_screen_name'] if 'in_reply_to_screen_name' in tweet else names[0] - # create a list of names of the form '@name1, @name2 and @name3' - or just '@name1' if there is only one name - name_list = ', '.join(names[:-1]) + (f' and {names[-1]}' if len(names) > 1 else names[0]) - in_reply_to_status_id = tweet['in_reply_to_status_id'] - header += f'Replying to [{name_list}](https://twitter.com/{in_reply_to_screen_name}/status/{in_reply_to_status_id})\n\n' + # decide later if a header should be written + reply_to_self = True + # replace image URLs with markdown image links to local files if 'entities' in tweet and 'media' in tweet['entities'] and 'extended_entities' in tweet and 'media' in tweet['extended_entities']: original_url = tweet['entities']['media'][0]['url'] @@ -100,11 +104,18 @@ def tweet_json_to_markdown(tweet, username, archive_media_folder, output_media_f print(f'Warning: missing local file: {local_filename}. Using original link instead: {original_url} (expands to {original_expanded_url})') markdown += f'![]({original_url})' body = body.replace(original_url, markdown) + # make the body a quote body = '> ' + '\n> '.join(body.splitlines()) + + # give every tweet an anchor + body = f'\n' + body + # append the original Twitter URL as a link - body = header + body + f'\n\n [{timestamp_str}](https://twitter.com/{username}/status/{tweet_id_str})' - return timestamp, body + body += f'\n\n [{timestamp_str}](https://twitter.com/{username}/status/{tweet_id_str})' + + return timestamp, ExtractedTweet(id=tweet_id_str, reply_to_self=reply_to_self, reply_to_id=in_reply_to_status_id, body=body) + def main(): @@ -141,6 +152,8 @@ def main(): if not os.path.isfile(tweet_icon_path): shutil.copy('assets/images/favicon.ico', tweet_icon_path); + connected_tweets = {} + # Parse the tweets username = extract_username(account_js_filename) tweets_markdown = [] @@ -148,7 +161,10 @@ def main(): print(f'Parsing {tweets_js_filename}...') json = read_json_from_js_file(tweets_js_filename) tweets_markdown += [tweet_json_to_markdown(tweet, username, archive_media_folder, output_media_folder_name, tweet_icon_path) for tweet in json] - print(f'Parsed {len(tweets_markdown)} tweets and replies by {username}.') + + for tweet in json: + if 'in_reply_to_status_id' in tweet['tweet']: + connected_tweets[tweet['tweet']['id']] = tweet['tweet'] # Sort tweets with oldest first tweets_markdown.sort(key=lambda tup: tup[0]) @@ -162,9 +178,37 @@ def main(): filename = filename = f'{dt.year}-{dt.month:02}-01-Tweet-Archive-{dt.year}-{dt.month:02}.md' # change to group by day or year or timestamp grouped_tweets[filename].append(md) + tweet_files = {} + for filename, md in grouped_tweets.items(): + for t in md: + tweet_files[t.id] = filename + + forward_refs = defaultdict(list) + for filename, md in grouped_tweets.items(): + for t in md: + if t.reply_to_id and t.reply_to_id in tweet_files: + forward_refs[t.reply_to_id].append(t.id) + # Write into files for filename, md in grouped_tweets.items(): - md_string = '\n\n----\n\n'.join(md) + md_string = '' + prev_id = '' + for t in md: + # got successors? + first_forward = True + for s in forward_refs[prev_id]: + if s != t.id: + md_string += '\n\n' if first_forward else '' + first_forward = False + md_string += f' [↓]({tweet_files[s]}#s{s})' + if (not t.reply_to_id or t.reply_to_id != prev_id): + # not a reply to the last output tweet: write a horizontal rule + md_string += '\n\n----\n' if md_string else '' + if(t.reply_to_self): + # a reply-to-self, but not a reply to the most recently output tweet: add a header + md_string += f'\n[↑]({tweet_files[t.reply_to_id]}#s{t.reply_to_id})' if t.reply_to_id in tweet_files else '↑' + md_string += f'\n{t.body}' + prev_id = t.id with open(filename, 'w', encoding='utf-8') as f: f.write(md_string) print(f'Wrote tweets to *.md, with images and video embedded from {output_media_folder_name}')