From 2b30d2b80d026ddf71d111e4487ec7ec72f19a36 Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 25 Oct 2024 23:19:29 +0200 Subject: [PATCH] correct text-array to download script, ask for usernames if empty, improve argparse, linting to black Signed-off-by: Matthias --- batch_get_username.sh | 7 +- find_france_photos_and_move.py | 124 +++++++++++++------ find_user_id.sh | 4 +- get_sequences_of_username.py | 54 ++++++--- images_par_username.py | 49 +++++--- mapillary_download.py | 201 +++++++++++++++++++++---------- text_array_to_download_script.py | 53 ++++---- writer.py | 96 ++++++++++----- 8 files changed, 399 insertions(+), 189 deletions(-) mode change 100644 => 100755 batch_get_username.sh diff --git a/batch_get_username.sh b/batch_get_username.sh old mode 100644 new mode 100755 index da8cab1..248e564 --- a/batch_get_username.sh +++ b/batch_get_username.sh @@ -5,7 +5,12 @@ # Liste des usernames # example: # usernames=( "riri" "fifi" "loulou") -usernames=( "someone_having_nice_pictures" "someone_else" "oh_look_a_these_usernames" ) +usernames=( ) + +if test -z $usernames; then + read -p "Please enter a mapillary username: " ANS + usernames=$ANS +fi # check env variables are valid if [ -f "secrets_variables.sh" ]; then diff --git a/find_france_photos_and_move.py b/find_france_photos_and_move.py index 2a81845..008915f 100644 --- a/find_france_photos_and_move.py +++ b/find_france_photos_and_move.py @@ -12,9 +12,19 @@ import shutil import exifread # Définition du rectangle entourant la France métropolitaine et un peu autour -france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max) +france_bbox: tuple[float, float, float, float] = ( + 42.0, + -5.0, + 51.0, + 10.0, +) # (lat_min, lon_min, lat_max, lon_max) # Définition du rectangle entourant la France métropolitaine et un peu autour -france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max) +france_bbox: tuple[float, float, float, float] = ( + 42.0, + -5.0, + 51.0, + 10.0, +) # (lat_min, lon_min, lat_max, lon_max) # Définition du rectangle entourant la Guadeloupe guadeloupe_bbox: tuple[float, float, float, float] = (15.8, -61.8, 17.3, -59.3) @@ -50,11 +60,11 @@ pf_bbox: tuple[float, float, float, float] = (-27.5, -140.0, -7.5, -134.0) taaf_bbox: tuple[float, float, float, float] = (-49.5, 68.5, -37.5, 77.5) # Chemin du répertoire source -source_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/' +source_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/" # Chemin du répertoire destination -destination_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/' -sequence_folder: str = 'principale_sequence' +destination_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/" +sequence_folder: str = "principale_sequence" count_files_all: int = 0 count_files_moved: int = 0 # Crée le répertoire destination si il n'existe pas @@ -71,18 +81,19 @@ def move_file_if_in_france(filepath, sequence_folder): latitude, longitude = get_gps_info(filepath) if latitude and longitude: - print(f'Latitude: {latitude}, Longitude: {longitude}') + print(f"Latitude: {latitude}, Longitude: {longitude}") if are_lat_lon_in_france(latitude, longitude): move_file_in_destination(filepath, sequence_folder) else: - print('Informations GPS non trouvées') + print("Informations GPS non trouvées") def move_file_in_destination(filepath, sequence_folder): global count_files_moved # Déplace le fichier dans le sous-répertoire "photos_in_france" - dest_subdir = os.path.join(destination_dir, sequence_folder, - os.path.basename(os.path.dirname(filepath))) + dest_subdir = os.path.join( + destination_dir, sequence_folder, os.path.basename(os.path.dirname(filepath)) + ) if not os.path.exists(dest_subdir): os.makedirs(dest_subdir) shutil.move(filepath, os.path.join(dest_subdir, filepath)) @@ -90,6 +101,7 @@ def move_file_in_destination(filepath, sequence_folder): print(f"Moved {filepath} to {dest_subdir}") return True + def are_lat_lon_in_france(gps_lat, gps_lon): """ recherche d'une zone du territoire français @@ -117,37 +129,63 @@ def are_lat_lon_in_france(gps_lat, gps_lon): print("lat lon :", gps_lat, gps_lon) - if (france_bbox[0] <= gps_lat <= france_bbox[2] and france_bbox[1] <= gps_lon <= france_bbox[3]): + if ( + france_bbox[0] <= gps_lat <= france_bbox[2] + and france_bbox[1] <= gps_lon <= france_bbox[3] + ): return "France métropolitaine" - elif (taaf_bbox[0] <= gps_lat <= taaf_bbox[2] and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]): + elif ( + taaf_bbox[0] <= gps_lat <= taaf_bbox[2] + and taaf_bbox[1] <= gps_lon <= taaf_bbox[3] + ): return "Terres australes et antarctiques françaises" - elif (guyane_bbox[0] <= gps_lat <= guyane_bbox[2] and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]): + elif ( + guyane_bbox[0] <= gps_lat <= guyane_bbox[2] + and guyane_bbox[1] <= gps_lon <= guyane_bbox[3] + ): return "Guyane française" - elif (reunion_bbox[0] <= gps_lat <= reunion_bbox[2] and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]): + elif ( + reunion_bbox[0] <= gps_lat <= reunion_bbox[2] + and reunion_bbox[1] <= gps_lon <= reunion_bbox[3] + ): return "La Réunion" - elif (wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]): + elif wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]: return "Wallis-et-Futuna" - elif (stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2] and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]): + elif ( + stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2] + and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3] + ): return "Saint-Martin et Saint-Barthélemy" - elif (spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]): + elif ( + spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3] + ): return "Saint-Pierre-et-Miquelon" - elif (mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2] and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]): + elif ( + mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2] + and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3] + ): return "Mayotte" - elif (martinique_bbox[0] <= gps_lat <= martinique_bbox[2] and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]): + elif ( + martinique_bbox[0] <= gps_lat <= martinique_bbox[2] + and martinique_bbox[1] <= gps_lon <= martinique_bbox[3] + ): return "Martinique" - elif (guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2] and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]): + elif ( + guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2] + and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3] + ): return "Guadeloupe" - elif (pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]): + elif pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]: return "Polynésie française" - elif (nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]): + elif nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]: return "Nouvelle-Calédonie" else: - return None # "Hors de France" + return None # "Hors de France" def get_gps_info(filepath): - with open(filepath, 'rb') as f: + with open(filepath, "rb") as f: tags = exifread.process_file(f) gps_info = {} @@ -155,12 +193,12 @@ def get_gps_info(filepath): # print("clés exif ", tags.keys()) for tag in tags.keys(): - if tag.startswith('GPS'): + if tag.startswith("GPS"): gps_info[tag] = tags[tag] # Extraction des informations de latitude et de longitude - gps_latitude = convert_rational_to_float(gps_info.get('GPS GPSLatitude')) - gps_longitude = convert_rational_to_float(gps_info.get('GPS GPSLongitude')) + gps_latitude = convert_rational_to_float(gps_info.get("GPS GPSLatitude")) + gps_longitude = convert_rational_to_float(gps_info.get("GPS GPSLongitude")) if gps_latitude and gps_longitude: return gps_latitude, gps_longitude @@ -172,22 +210,38 @@ def convert_rational_to_float(rational): return float(rational.values[0].num) / float(rational.values[0].den) - - - -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--source_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/', help='Chemin du répertoire source') - parser.add_argument('--destination_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/', help='Chemin du répertoire destination') - parser.add_argument('--sequence_folder', default='principale_sequence', help='Nom du dossier de séquence') + parser.add_argument( + "--source_dir", + default="/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/", + help="Chemin du répertoire source", + ) + parser.add_argument( + "--destination_dir", + default="/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/", + help="Chemin du répertoire destination", + ) + parser.add_argument( + "--sequence_folder", + default="principale_sequence", + help="Nom du dossier de séquence", + ) args = parser.parse_args() # Parcourt tous les fichiers dans le répertoire source et ses sous-répertoires for root, dirs, files in os.walk(args.source_dir): for filename in files: # Vérifie si le fichier est une image - if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tif')): + if filename.lower().endswith( + (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tif") + ): filepath = os.path.join(root, filename) move_file_if_in_france(filepath, sequence_folder) - print('fichiers se situant en france déplacés: ', count_files_moved, ' / ', count_files_all) \ No newline at end of file + print( + "fichiers se situant en france déplacés: ", + count_files_moved, + " / ", + count_files_all, + ) diff --git a/find_user_id.sh b/find_user_id.sh index e68dd05..d43bb57 100644 --- a/find_user_id.sh +++ b/find_user_id.sh @@ -4,12 +4,12 @@ echo "Prenez un token oauth sur https://www.mapillary.com/app/user/$1" USERNAME=$1 -response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers') +response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS) ID=$(echo "$response" | jq -r '.data.user_by_username.id') echo "ID: $ID" -curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' > "out_${1}.json" +curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS > "out_${1}.json" echo " lancez: python3 get_sequences_of_username.py --username=\"$1\" --dev_token='$MAPILLARY_DEV_TOKEN' --max_sequence=99999; bash text_array_to_download_script.py --username=$1 --dev_token='$MAPILLARY_DEV_TOKEN'" diff --git a/get_sequences_of_username.py b/get_sequences_of_username.py index 3fe8f1c..4c3a8f2 100644 --- a/get_sequences_of_username.py +++ b/get_sequences_of_username.py @@ -1,30 +1,38 @@ import json import requests + # lit un json listant les id de photo de chaque séquence et va # chercher la séquence par API. import argparse -def parse_args(argv =None): + +def parse_args(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('--username', type=str, help='Username to get the sequences id of', required=True) - parser.add_argument('--dev_token', type=str, help='Your mapillary developer token') - parser.add_argument('--max_sequence', type=str, help='Limit the amount of retrieved sequence ids') + parser.add_argument( + "--username", + type=str, + help="Username to get the sequences id of", + required=True, + ) + parser.add_argument("--dev_token", type=str, help="Your mapillary developer token") + parser.add_argument( + "--max_sequence", type=str, help="Limit the amount of retrieved sequence ids" + ) global args args = parser.parse_args(argv) print(args) - # Initialisation de la liste pour stocker les réponses responses = [] sequences = [] + def get_image_data_from_sequences(): username = args.username - input_file = "out_"+username+".json" - + input_file = "out_" + username + ".json" # Chargement du fichier JSON d'entrée with open(input_file, "r") as file: @@ -32,7 +40,7 @@ def get_image_data_from_sequences(): # Itération sur les noeuds pour collectionner les image_ids nodelist = input_data["data"]["fetch__User"]["feed"]["nodes"] - print( 'séquences : ', len(nodelist)) + print("séquences : ", len(nodelist)) image_ids = [node["image_id"] for node in nodelist] print(image_ids) @@ -41,15 +49,21 @@ def get_image_data_from_sequences(): # Préparation de la tête d'autorisation pour toutes les futures requêtes header = {"Access-Token": dev_token} - ii=0 + ii = 0 limit_requests = 1000000000 -# limit_requests = 5 # pour tester + # limit_requests = 5 # pour tester # Boucle sur chaque image_id pour interroger l'API Mapillary for image_id in image_ids: - ii+=1 + ii += 1 if limit_requests >= ii and image_id: params = {"id": image_id, "fields": "id,sequence"} - request_url = "https://graph.mapillary.com/" + str(image_id)+"?access_token="+dev_token+"&fields=id,sequence" + request_url = ( + "https://graph.mapillary.com/" + + str(image_id) + + "?access_token=" + + dev_token + + "&fields=id,sequence" + ) # print("requete: "+request_url) response = requests.get(request_url) @@ -63,23 +77,31 @@ def get_image_data_from_sequences(): parsed_response["sequence"] = raw_response["sequence"] sequences.append(parsed_response["sequence"]) - print("séquence trouvée: "+str(ii)+"/"+args.max_sequence+" : "+raw_response["sequence"]) + print( + "séquence trouvée: " + + str(ii) + + "/" + + args.max_sequence + + " : " + + raw_response["sequence"] + ) else: print(response) responses.append(parsed_response) + def persist_files(): # Sauvegarde des nouveaux résultats dans le fichier output.json - output_file = "sequences_"+args.username+".json" + output_file = "sequences_" + args.username + ".json" with open(output_file, "w") as file: json.dump(responses, file) - sequence_filename = "sequences_"+args.username+".txt" + sequence_filename = "sequences_" + args.username + ".txt" with open(sequence_filename, "w") as file: json.dump(sequences, file) - print('fichier sauvegardé: '+sequence_filename) + print("fichier sauvegardé: " + sequence_filename) parse_args() diff --git a/images_par_username.py b/images_par_username.py index a181bcf..e2f6551 100644 --- a/images_par_username.py +++ b/images_par_username.py @@ -1,38 +1,51 @@ -import requests, json +import os, requests, json import argparse from urllib.parse import quote -def parse_args(argv =None): + +def parse_args(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('--access_token', type=str, help='Your mapillary access token') - parser.add_argument('--username', type=str, help='Username to get the sequences id of') - parser.add_argument('--pictures', type=str, help='Limit of pictures to fetch') + parser.add_argument( + "--access_token", + type=str, + default=os.environ["MAPILLARY_DEV_TOKEN"], + help="Your mapillary access token", + ) + parser.add_argument( + "--username", + type=str, + required=True, + help="Username to get the sequences id of", + ) + parser.add_argument( + "--pictures", + type=str, + default=500, + help="Limit of pictures to fetch, max=5000", + ) global args args = parser.parse_args(argv) -if __name__ == '__main__': +if __name__ == "__main__": parse_args() - if args.access_token == None: - print('please provide the access_token') - exit() - mly_key = args.access_token creator_username = args.username - max_img= args.pictures + max_img = args.pictures - url = f'https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence' + url = f"https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence" + print(url) response = requests.get(url) if response.status_code == 200: - json = response.json() + json = response.json() - # tri des séquences uniques - sequences_ids = [obj['sequence'] for obj in json['data']] - unique_ids = list(set(sequences_ids)) - print(unique_ids) + # tri des séquences uniques + sequences_ids = [obj["sequence"] for obj in json["data"]] + unique_ids = list(set(sequences_ids)) + print(unique_ids) else: - print(response) + print(response) diff --git a/mapillary_download.py b/mapillary_download.py index be13deb..47badc0 100644 --- a/mapillary_download.py +++ b/mapillary_download.py @@ -14,35 +14,67 @@ session = requests.Session() retries_strategies = Retry( total=5, backoff_factor=1, - status_forcelist=[429,502, 503, 504], - ) -session.mount('https://', HTTPAdapter(max_retries=retries_strategies)) + status_forcelist=[429, 502, 503, 504], +) +session.mount("https://", HTTPAdapter(max_retries=retries_strategies)) -def parse_args(argv =None): + +def parse_args(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('access_token', type=str, help='Your mapillary access token') - parser.add_argument('--sequence_ids', type=str, nargs='*', help='The mapillary sequence id(s) to download') - parser.add_argument('--image_ids', type=int, nargs='*', help='The mapillary image id(s) to get their sequence id(s)') - parser.add_argument('--destination', type=str, default='data', help='Path destination for the images') - parser.add_argument('--image_limit', type=int, default=None, help='How many images you want to download') - parser.add_argument('--overwrite', default=False, action='store_true', help='overwrite existing images') + parser.add_argument("access_token", type=str, help="Your mapillary access token") + parser.add_argument( + "--sequence_ids", + type=str, + nargs="*", + help="The mapillary sequence id(s) to download", + ) + parser.add_argument( + "--image_ids", + type=int, + nargs="*", + help="The mapillary image id(s) to get their sequence id(s)", + ) + parser.add_argument( + "--destination", + type=str, + default="data", + help="Path destination for the images", + ) + parser.add_argument( + "--image_limit", + type=int, + default=None, + help="How many images you want to download", + ) + parser.add_argument( + "--overwrite", + default=False, + action="store_true", + help="overwrite existing images", + ) parser.add_argument("-v", "--version", action="version", version="release 1.6") args = parser.parse_args(argv) if args.sequence_ids is None and args.image_ids is None: parser.error("Please enter at least one sequence id or image id") return args -def download(url, filepath, metadata=None): - #print(asizeof.asizeof(image)/1024, "MB") + +def download(url, filepath, metadata=None): + # print(asizeof.asizeof(image)/1024, "MB") with open(str(filepath), "wb") as f: r = session.get(url, stream=True, timeout=6) - image = write_exif(r.content, metadata) + try: + image = write_exif(r.content, metadata) + except Exception as e: + print(f"FAILED to write exif data for {filepath}. Error: {e}") f.write(image) - print("{} downloaded".format(filepath)) + print("{} downloaded {}".format(filepath, r)) def get_single_image_data(image_id, mly_header): - req_url = 'https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence'.format(image_id) + req_url = "https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence".format( + image_id + ) r = session.get(req_url, headers=mly_header) data = r.json() print(data) @@ -50,54 +82,68 @@ def get_single_image_data(image_id, mly_header): def get_image_data_from_sequences(sequences_id, mly_header): - for i,sequence_id in enumerate(sequences_id): - url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id) + for i, sequence_id in enumerate(sequences_id): + url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id) r = requests.get(url, headers=header) data = r.json() - image_ids = data['data'] + image_ids = data["data"] total_image = len(image_ids) - print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id)) - print('getting images data') + print( + "{} images in sequence {} of {} - id : {}".format( + total_image, i + 1, len(sequences_id), sequence_id + ) + ) + print("getting images data") for x in range(0, total_image): - image_id = image_ids[x]['id'] + image_id = image_ids[x]["id"] image_data = get_single_image_data(image_id, mly_header) - image_data['sequence_id'] = sequence_id + image_data["sequence_id"] = sequence_id yield image_data def get_image_data_from_sequences__future(sequences_id, mly_header): - for i,sequence_id in enumerate(sequences_id): - url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id) + for i, sequence_id in enumerate(sequences_id): + url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id) r = requests.get(url, headers=header) data = r.json() - if data.get('data') == []: - print("Empty or wrong sequence {} of {} - id : {}".format(i+1, len(sequences_id), sequence_id)) + if data.get("data") == []: + print( + "Empty or wrong sequence {} of {} - id : {}".format( + i + 1, len(sequences_id), sequence_id + ) + ) continue - image_ids = data['data'] + image_ids = data["data"] total_image = len(image_ids) - print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id)) - print('getting images data') + print( + "{} images in sequence {} of {} - id : {}".format( + total_image, i + 1, len(sequences_id), sequence_id + ) + ) + print("getting images data") with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_url = {} for x in range(0, total_image): - image_id = image_ids[x]['id'] - future_to_url[executor.submit(get_single_image_data, image_id, mly_header)] = image_id + image_id = image_ids[x]["id"] + future_to_url[ + executor.submit(get_single_image_data, image_id, mly_header) + ] = image_id for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] image_data = future.result() - image_data['sequence_id'] = sequence_id - #print(image_data) + image_data["sequence_id"] = sequence_id + # print(image_data) yield image_data def write_exif(picture, img_metadata): - ''' + """ Write exif metadata - ''' - #{'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca', + """ + # {'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca', # 'captured_at': 1603459736644, 'geometry': {'type': 'Point', 'coordinates': [2.5174596904057, 48.777089857534]}, 'id': '485924785946693'} - + with writer.Writer(picture) as image: image.add_artist(img_metadata) image.add_camera_make(img_metadata) @@ -113,61 +159,84 @@ def write_exif(picture, img_metadata): return updated_image -if __name__ == '__main__': - +if __name__ == "__main__": + args = parse_args() - sequence_ids= args.sequence_ids if args.sequence_ids is not None else [] + sequence_ids = args.sequence_ids if args.sequence_ids is not None else [] images_ids = args.image_ids access_token = args.access_token images_data = [] - header = {'Authorization' : 'OAuth {}'.format(access_token)} + header = {"Authorization": "OAuth {}".format(access_token)} if images_ids: for image_id in images_ids: image_data = get_single_image_data(image_id, header) - if 'error' in image_data: + if "error" in image_data: print("data : ", image_data) - print("something wrong happened ! Please check your image id and/or your connection") + print( + "something wrong happened ! Please check your image id and/or your connection" + ) sys.exit() else: - sequence_ids.append(image_data.get('sequence')) + sequence_ids.append(image_data.get("sequence")) - #for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)): - for i,image_data in enumerate(get_image_data_from_sequences__future(sequence_ids, header)): + # for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)): + for i, image_data in enumerate( + get_image_data_from_sequences__future(sequence_ids, header) + ): if args.image_limit is not None and i >= args.image_limit: break - if 'error' in image_data: + if "error" in image_data: print("data : ", image_data) - print("something wrong happened ! Please check your token and/or your connection") + print( + "something wrong happened ! Please check your token and/or your connection" + ) sys.exit() images_data.append(image_data) - #sys.exit() + # sys.exit() - print('downloading.. this process will take a while. please wait') + print("downloading.. this process will take a while. please wait") with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - for i,image_data in enumerate(images_data): + for i, image_data in enumerate(images_data): # create a folder for each unique sequence ID to group images by sequence - path_destination = os.path.join(args.destination, image_data['sequence_id']) + path_destination = os.path.join(args.destination, image_data["sequence_id"]) if not os.path.exists(path_destination): os.makedirs(path_destination) - date_time_image_filename = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000).strftime('%Y-%m-%d_%HH%Mmn%Ss%f')[:-3] + '.jpg' + date_time_image_filename = ( + datetime.utcfromtimestamp( + int(image_data["captured_at"]) / 1000 + ).strftime("%Y-%m-%d_%HH%Mmn%Ss%f")[:-3] + + ".jpg" + ) path = os.path.join(path_destination, date_time_image_filename) img_metadata = writer.PictureMetadata( - capture_time = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000), - artist = image_data['creator']['username'], - camera_make = image_data['make'], - camera_model = image_data['model'], - longitude = image_data['geometry']['coordinates'][0], - latitude = image_data['geometry']['coordinates'][1], - picture_type = PictureType("equirectangular") if image_data.get('camera_type') == 'spherical' or image_data.get('camera_type') == 'equirectangular' else PictureType("flat"), - direction = image_data['compass_angle'], - altitude = image_data['altitude'], + capture_time=datetime.utcfromtimestamp( + int(image_data["captured_at"]) / 1000 + ), + artist=image_data["creator"]["username"], + camera_make=image_data["make"], + camera_model=image_data["model"], + longitude=image_data["geometry"]["coordinates"][0], + latitude=image_data["geometry"]["coordinates"][1], + picture_type=( + PictureType("equirectangular") + if image_data.get("camera_type") == "spherical" + or image_data.get("camera_type") == "equirectangular" + else PictureType("flat") + ), + direction=image_data["compass_angle"], + altitude=image_data["altitude"], ) - #print("metadata: ", img_metadata) - #print("path: ", image_data) + # print("metadata: ", img_metadata) + # print("path: ", image_data) image_exists = os.path.exists(path) if not args.overwrite and image_exists: print("{} already exists. Skipping ".format(path)) continue - executor.submit(download, url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata) - #download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata) + executor.submit( + download, + url=image_data["thumb_original_url"], + filepath=path, + metadata=img_metadata, + ) + # download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata) diff --git a/text_array_to_download_script.py b/text_array_to_download_script.py index 98b7d88..6e79f3b 100644 --- a/text_array_to_download_script.py +++ b/text_array_to_download_script.py @@ -1,53 +1,62 @@ import os - -input_file = 'input_file' - import argparse -def parse_args(argv =None): + +input_file = "input_file" + + +def parse_args(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('--dev_token', type=str, help='Your mapillary access token') - parser.add_argument('--username', type=str, help='Username to get the sequences id of') + parser.add_argument( + "--dev_token", + type=str, + default=os.environ["MAPILLARY_DEV_TOKEN"], + help="Your mapillary access token", + ) + parser.add_argument( + "--username", + type=str, + required=True, + help="Username to get the sequences id of", + ) global args args = parser.parse_args(argv) - -if __name__ == '__main__': - print("Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)") +if __name__ == "__main__": + print( + "Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)" + ) parse_args() - username=args.username + username = args.username input_file = f"sequences_{username}.txt" - if not args.dev_token: - print(f"Erreur : Le token de développeur de mapillary manque, vérifiez le fichier de variables secretes. Arrêt du script.") - exit(1) - if not os.path.exists(input_file) or not os.path.isfile(input_file): - print(f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script.") + print( + f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script." + ) exit(1) else: print(f"Fichier '{input_file}' trouvé.") - output_file = f"script_bash_get_sequences_for_user_{username}.sh" - access_token = "--access_token='"+args.dev_token+"' " - format_string = "/usr/bin/python3 mapillary_download.py {} --sequence_id={}\n" - + access_token = "$MAPILLARY_DEV_TOKEN" # or, if you want to use the password in clear text: "'"+args.dev_token+"' " + format_string = "python mapillary_download.py {} --sequence_ids {}\n" with open(output_file, "w") as output: with open(input_file, "r") as input_handle: content = input_handle.read() sequences = eval(content) for seq in sequences: - full_cmd = f"/usr/bin/python3 mapillary_download.py {access_token} --sequence_id='{seq}' --username={username}\n" + full_cmd = f"python mapillary_download.py {access_token} --sequence_ids {seq}\n" output.write(full_cmd) print(output_file) print(f"\n Script Bash généré avec succès.") - print(f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}") - + print( + f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}" + ) diff --git a/writer.py b/writer.py index 2f81e67..7533103 100644 --- a/writer.py +++ b/writer.py @@ -1,4 +1,4 @@ -#source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py +# source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py from typing import Optional, Tuple from datetime import datetime, timedelta from dataclasses import dataclass @@ -31,7 +31,8 @@ class PictureMetadata: direction: Optional[float] = None orientation: Optional[int] = 1 -class Writer(): + +class Writer: def __init__(self, picture: bytes) -> None: self.content = picture self.image = pyexiv2.ImageData(picture) @@ -42,7 +43,7 @@ class Writer(): def __enter__(self): return self - + def __exit__(self, *args) -> None: self.image.close() @@ -53,19 +54,28 @@ class Writer(): if self.updated_xmp: self.image.modify_xmp(self.updated_xmp) except Exception as e: - print("exception \nexif: {}\nxmp: {}".format(self.updated_exif, self.updated_xmp)) + print( + "exception \nexif: {}\nxmp: {}".format( + self.updated_exif, self.updated_xmp + ) + ) def close(self) -> None: self.image.close() def get_Bytes(self) -> bytes: return self.image.get_bytes() - + def writePictureMetadata(self, metadata: PictureMetadata) -> None: """ Override exif metadata on raw picture and return updated bytes """ - if not metadata.capture_time and not metadata.longitude and not metadata.latitude and not metadata.picture_type: + if ( + not metadata.capture_time + and not metadata.longitude + and not metadata.latitude + and not metadata.picture_type + ): return if metadata.capture_time: @@ -83,12 +93,20 @@ class Writer(): Add latitude and longitude values in GPSLatitude + GPSLAtitudeRef and GPSLongitude + GPSLongitudeRef """ if metadata.latitude is not None: - self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = "N" if metadata.latitude > 0 else "S" - self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(metadata.latitude) + self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = ( + "N" if metadata.latitude > 0 else "S" + ) + self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms( + metadata.latitude + ) if metadata.longitude is not None: - self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = "E" if metadata.longitude > 0 else "W" - self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(metadata.longitude) + self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = ( + "E" if metadata.longitude > 0 else "W" + ) + self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms( + metadata.longitude + ) def add_altitude(self, metadata: PictureMetadata, precision: int = 1000) -> None: """ @@ -98,18 +116,24 @@ class Writer(): if altitude is not None: negative_altitude = 0 if altitude >= 0 else 1 - self.updated_exif['Exif.GPSInfo.GPSAltitude'] = f"{int(abs(altitude * precision))} / {precision}" - self.updated_exif['Exif.GPSInfo.GPSAltitudeRef'] = negative_altitude + self.updated_exif["Exif.GPSInfo.GPSAltitude"] = ( + f"{int(abs(altitude * precision))} / {precision}" + ) + self.updated_exif["Exif.GPSInfo.GPSAltitudeRef"] = negative_altitude - def add_direction(self, metadata: PictureMetadata, ref: str = 'T', precision: int = 1000) -> None: + def add_direction( + self, metadata: PictureMetadata, ref: str = "T", precision: int = 1000 + ) -> None: """ Add direction value in GPSImgDirection and GPSImgDirectionRef """ direction = metadata.direction if metadata.direction is not None: - self.updated_exif['Exif.GPSInfo.GPSImgDirection'] = f"{int(abs(direction % 360.0 * precision))} / {precision}" - self.updated_exif['Exif.GPSInfo.GPSImgDirectionRef'] = ref + self.updated_exif["Exif.GPSInfo.GPSImgDirection"] = ( + f"{int(abs(direction % 360.0 * precision))} / {precision}" + ) + self.updated_exif["Exif.GPSInfo.GPSImgDirectionRef"] = ref def add_gps_datetime(self, metadata: PictureMetadata) -> None: """ @@ -120,15 +144,21 @@ class Writer(): metadata.capture_time = self.localize(metadata.capture_time, metadata) # for capture time, override GPSInfo time and DatetimeOriginal - self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S") + self.updated_exif["Exif.Photo.DateTimeOriginal"] = ( + metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S") + ) offset = metadata.capture_time.utcoffset() if offset is not None: - self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset) + self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset( + offset + ) utc_dt = metadata.capture_time.astimezone(tz=pytz.UTC) self.updated_exif["Exif.GPSInfo.GPSDateStamp"] = utc_dt.strftime("%Y:%m:%d") - self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime("%H/1 %M/1 %S/1") - + self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime( + "%H/1 %M/1 %S/1" + ) + def add_datetimeoriginal(self, metadata: PictureMetadata) -> None: """ Add date time in Exif DateTimeOriginal and SubSecTimeOriginal tags @@ -138,12 +168,18 @@ class Writer(): metadata.capture_time = self.localize(metadata.capture_time, metadata) # for capture time, override DatetimeOriginal and SubSecTimeOriginal - self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S") + self.updated_exif["Exif.Photo.DateTimeOriginal"] = ( + metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S") + ) offset = metadata.capture_time.utcoffset() if offset is not None: - self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset) + self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset( + offset + ) if metadata.capture_time.microsecond != 0: - self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = metadata.capture_time.strftime("%f") + self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = ( + metadata.capture_time.strftime("%f") + ) def add_img_projection(self, metadata: PictureMetadata) -> None: """ @@ -162,15 +198,15 @@ class Writer(): if metadata.artist is not None: self.updated_exif["Exif.Image.Artist"] = ascii(metadata.artist).strip("'") - def add_camera_make(self, metadata: PictureMetadata) -> None: """ Add camera manufacture in Exif Make tag """ if metadata.camera_make is not None: - self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip("'") - + self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip( + "'" + ) def add_camera_model(self, metadata: PictureMetadata) -> None: """ @@ -178,7 +214,9 @@ class Writer(): """ if metadata.camera_model is not None: - self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip("'") + self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip( + "'" + ) def format_offset(self, offset: timedelta) -> str: """Format offset for OffsetTimeOriginal. Format is like "+02:00" for paris offset @@ -197,7 +235,7 @@ class Writer(): """ new_lat_lon = metadata.longitude is not None and metadata.latitude is not None - if new_lat_lon : + if new_lat_lon: lon = metadata.longitude lat = metadata.latitude @@ -211,14 +249,14 @@ class Writer(): lon = self._from_dms(lon) * (1 if lon_ref == "E" else -1) lat = self._from_dms(lat) * (1 if lat_ref == "N" else -1) except KeyError: - return metadata.capture_time # canot localize, returning same date + return metadata.capture_time # canot localize, returning same date tz_name = tz_finder.timezone_at(lng=lon, lat=lat) if not tz_name: return metadata.capture_time # cannot find timezone, returning same date tz = pytz.timezone(tz_name) - + return tz.localize(naive_dt) def _from_dms(self, val: str) -> float: