Compare commits

...

2 Commits

8 changed files with 398 additions and 189 deletions

7
batch_get_username.sh Normal file → Executable file
View File

@ -5,7 +5,12 @@
# Liste des usernames
# example:
# usernames=( "riri" "fifi" "loulou")
usernames=( "someone_having_nice_pictures" "someone_else" "oh_look_a_these_usernames" )
usernames=( )
if test -z $usernames; then
read -p "Please enter a mapillary username: " ANS
usernames=$ANS
fi
# check env variables are valid
if [ -f "secrets_variables.sh" ]; then

View File

@ -12,9 +12,19 @@ import shutil
import exifread
# Définition du rectangle entourant la France métropolitaine et un peu autour
france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max)
france_bbox: tuple[float, float, float, float] = (
42.0,
-5.0,
51.0,
10.0,
) # (lat_min, lon_min, lat_max, lon_max)
# Définition du rectangle entourant la France métropolitaine et un peu autour
france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max)
france_bbox: tuple[float, float, float, float] = (
42.0,
-5.0,
51.0,
10.0,
) # (lat_min, lon_min, lat_max, lon_max)
# Définition du rectangle entourant la Guadeloupe
guadeloupe_bbox: tuple[float, float, float, float] = (15.8, -61.8, 17.3, -59.3)
@ -50,11 +60,11 @@ pf_bbox: tuple[float, float, float, float] = (-27.5, -140.0, -7.5, -134.0)
taaf_bbox: tuple[float, float, float, float] = (-49.5, 68.5, -37.5, 77.5)
# Chemin du répertoire source
source_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/'
source_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/"
# Chemin du répertoire destination
destination_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/'
sequence_folder: str = 'principale_sequence'
destination_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/"
sequence_folder: str = "principale_sequence"
count_files_all: int = 0
count_files_moved: int = 0
# Crée le répertoire destination si il n'existe pas
@ -71,18 +81,19 @@ def move_file_if_in_france(filepath, sequence_folder):
latitude, longitude = get_gps_info(filepath)
if latitude and longitude:
print(f'Latitude: {latitude}, Longitude: {longitude}')
print(f"Latitude: {latitude}, Longitude: {longitude}")
if are_lat_lon_in_france(latitude, longitude):
move_file_in_destination(filepath, sequence_folder)
else:
print('Informations GPS non trouvées')
print("Informations GPS non trouvées")
def move_file_in_destination(filepath, sequence_folder):
global count_files_moved
# Déplace le fichier dans le sous-répertoire "photos_in_france"
dest_subdir = os.path.join(destination_dir, sequence_folder,
os.path.basename(os.path.dirname(filepath)))
dest_subdir = os.path.join(
destination_dir, sequence_folder, os.path.basename(os.path.dirname(filepath))
)
if not os.path.exists(dest_subdir):
os.makedirs(dest_subdir)
shutil.move(filepath, os.path.join(dest_subdir, filepath))
@ -90,6 +101,7 @@ def move_file_in_destination(filepath, sequence_folder):
print(f"Moved {filepath} to {dest_subdir}")
return True
def are_lat_lon_in_france(gps_lat, gps_lon):
"""
recherche d'une zone du territoire français
@ -117,37 +129,63 @@ def are_lat_lon_in_france(gps_lat, gps_lon):
print("lat lon :", gps_lat, gps_lon)
if (france_bbox[0] <= gps_lat <= france_bbox[2] and france_bbox[1] <= gps_lon <= france_bbox[3]):
if (
france_bbox[0] <= gps_lat <= france_bbox[2]
and france_bbox[1] <= gps_lon <= france_bbox[3]
):
return "France métropolitaine"
elif (taaf_bbox[0] <= gps_lat <= taaf_bbox[2] and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]):
elif (
taaf_bbox[0] <= gps_lat <= taaf_bbox[2]
and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]
):
return "Terres australes et antarctiques françaises"
elif (guyane_bbox[0] <= gps_lat <= guyane_bbox[2] and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]):
elif (
guyane_bbox[0] <= gps_lat <= guyane_bbox[2]
and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]
):
return "Guyane française"
elif (reunion_bbox[0] <= gps_lat <= reunion_bbox[2] and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]):
elif (
reunion_bbox[0] <= gps_lat <= reunion_bbox[2]
and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]
):
return "La Réunion"
elif (wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]):
elif wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]:
return "Wallis-et-Futuna"
elif (stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2] and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]):
elif (
stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2]
and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]
):
return "Saint-Martin et Saint-Barthélemy"
elif (spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]):
elif (
spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]
):
return "Saint-Pierre-et-Miquelon"
elif (mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2] and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]):
elif (
mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2]
and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]
):
return "Mayotte"
elif (martinique_bbox[0] <= gps_lat <= martinique_bbox[2] and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]):
elif (
martinique_bbox[0] <= gps_lat <= martinique_bbox[2]
and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]
):
return "Martinique"
elif (guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2] and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]):
elif (
guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2]
and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]
):
return "Guadeloupe"
elif (pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]):
elif pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]:
return "Polynésie française"
elif (nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]):
elif nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]:
return "Nouvelle-Calédonie"
else:
return None # "Hors de France"
return None # "Hors de France"
def get_gps_info(filepath):
with open(filepath, 'rb') as f:
with open(filepath, "rb") as f:
tags = exifread.process_file(f)
gps_info = {}
@ -155,12 +193,12 @@ def get_gps_info(filepath):
# print("clés exif ", tags.keys())
for tag in tags.keys():
if tag.startswith('GPS'):
if tag.startswith("GPS"):
gps_info[tag] = tags[tag]
# Extraction des informations de latitude et de longitude
gps_latitude = convert_rational_to_float(gps_info.get('GPS GPSLatitude'))
gps_longitude = convert_rational_to_float(gps_info.get('GPS GPSLongitude'))
gps_latitude = convert_rational_to_float(gps_info.get("GPS GPSLatitude"))
gps_longitude = convert_rational_to_float(gps_info.get("GPS GPSLongitude"))
if gps_latitude and gps_longitude:
return gps_latitude, gps_longitude
@ -172,22 +210,38 @@ def convert_rational_to_float(rational):
return float(rational.values[0].num) / float(rational.values[0].den)
if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--source_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/', help='Chemin du répertoire source')
parser.add_argument('--destination_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/', help='Chemin du répertoire destination')
parser.add_argument('--sequence_folder', default='principale_sequence', help='Nom du dossier de séquence')
parser.add_argument(
"--source_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/",
help="Chemin du répertoire source",
)
parser.add_argument(
"--destination_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/",
help="Chemin du répertoire destination",
)
parser.add_argument(
"--sequence_folder",
default="principale_sequence",
help="Nom du dossier de séquence",
)
args = parser.parse_args()
# Parcourt tous les fichiers dans le répertoire source et ses sous-répertoires
for root, dirs, files in os.walk(args.source_dir):
for filename in files:
# Vérifie si le fichier est une image
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tif')):
if filename.lower().endswith(
(".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tif")
):
filepath = os.path.join(root, filename)
move_file_if_in_france(filepath, sequence_folder)
print('fichiers se situant en france déplacés: ', count_files_moved, ' / ', count_files_all)
print(
"fichiers se situant en france déplacés: ",
count_files_moved,
" / ",
count_files_all,
)

View File

@ -4,12 +4,12 @@ echo "Prenez un token oauth sur https://www.mapillary.com/app/user/$1"
USERNAME=$1
response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers')
response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS)
ID=$(echo "$response" | jq -r '.data.user_by_username.id')
echo "ID: $ID"
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' > "out_${1}.json"
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS > "out_${1}.json"
echo " lancez: python3 get_sequences_of_username.py --username=\"$1\" --dev_token='$MAPILLARY_DEV_TOKEN' --max_sequence=99999; bash text_array_to_download_script.py --username=$1 --dev_token='$MAPILLARY_DEV_TOKEN'"

View File

@ -1,30 +1,38 @@
import json
import requests
# lit un json listant les id de photo de chaque séquence et va
# chercher la séquence par API.
import argparse
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--username', type=str, help='Username to get the sequences id of', required=True)
parser.add_argument('--dev_token', type=str, help='Your mapillary developer token')
parser.add_argument('--max_sequence', type=str, help='Limit the amount of retrieved sequence ids')
parser.add_argument(
"--username",
type=str,
help="Username to get the sequences id of",
required=True,
)
parser.add_argument("--dev_token", type=str, help="Your mapillary developer token")
parser.add_argument(
"--max_sequence", type=str, help="Limit the amount of retrieved sequence ids"
)
global args
args = parser.parse_args(argv)
print(args)
# Initialisation de la liste pour stocker les réponses
responses = []
sequences = []
def get_image_data_from_sequences():
username = args.username
input_file = "out_"+username+".json"
input_file = "out_" + username + ".json"
# Chargement du fichier JSON d'entrée
with open(input_file, "r") as file:
@ -32,7 +40,7 @@ def get_image_data_from_sequences():
# Itération sur les noeuds pour collectionner les image_ids
nodelist = input_data["data"]["fetch__User"]["feed"]["nodes"]
print( 'séquences : ', len(nodelist))
print("séquences : ", len(nodelist))
image_ids = [node["image_id"] for node in nodelist]
print(image_ids)
@ -41,15 +49,21 @@ def get_image_data_from_sequences():
# Préparation de la tête d'autorisation pour toutes les futures requêtes
header = {"Access-Token": dev_token}
ii=0
ii = 0
limit_requests = 1000000000
# limit_requests = 5 # pour tester
# limit_requests = 5 # pour tester
# Boucle sur chaque image_id pour interroger l'API Mapillary
for image_id in image_ids:
ii+=1
ii += 1
if limit_requests >= ii and image_id:
params = {"id": image_id, "fields": "id,sequence"}
request_url = "https://graph.mapillary.com/" + str(image_id)+"?access_token="+dev_token+"&fields=id,sequence"
request_url = (
"https://graph.mapillary.com/"
+ str(image_id)
+ "?access_token="
+ dev_token
+ "&fields=id,sequence"
)
# print("requete: "+request_url)
response = requests.get(request_url)
@ -63,23 +77,31 @@ def get_image_data_from_sequences():
parsed_response["sequence"] = raw_response["sequence"]
sequences.append(parsed_response["sequence"])
print("séquence trouvée: "+str(ii)+"/"+args.max_sequence+" : "+raw_response["sequence"])
print(
"séquence trouvée: "
+ str(ii)
+ "/"
+ args.max_sequence
+ " : "
+ raw_response["sequence"]
)
else:
print(response)
responses.append(parsed_response)
def persist_files():
# Sauvegarde des nouveaux résultats dans le fichier output.json
output_file = "sequences_"+args.username+".json"
output_file = "sequences_" + args.username + ".json"
with open(output_file, "w") as file:
json.dump(responses, file)
sequence_filename = "sequences_"+args.username+".txt"
sequence_filename = "sequences_" + args.username + ".txt"
with open(sequence_filename, "w") as file:
json.dump(sequences, file)
print('fichier sauvegardé: '+sequence_filename)
print("fichier sauvegardé: " + sequence_filename)
parse_args()

View File

@ -1,38 +1,51 @@
import requests, json
import os, requests, json
import argparse
from urllib.parse import quote
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--access_token', type=str, help='Your mapillary access token')
parser.add_argument('--username', type=str, help='Username to get the sequences id of')
parser.add_argument('--pictures', type=str, help='Limit of pictures to fetch')
parser.add_argument(
"--access_token",
type=str,
default=os.environ["MAPILLARY_DEV_TOKEN"],
help="Your mapillary access token",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="Username to get the sequences id of",
)
parser.add_argument(
"--pictures",
type=str,
default=500,
help="Limit of pictures to fetch, max=5000",
)
global args
args = parser.parse_args(argv)
if __name__ == '__main__':
if __name__ == "__main__":
parse_args()
if args.access_token == None:
print('please provide the access_token')
exit()
mly_key = args.access_token
creator_username = args.username
max_img= args.pictures
max_img = args.pictures
url = f'https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence'
url = f"https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence"
print(url)
response = requests.get(url)
if response.status_code == 200:
json = response.json()
json = response.json()
# tri des séquences uniques
sequences_ids = [obj['sequence'] for obj in json['data']]
unique_ids = list(set(sequences_ids))
print(unique_ids)
# tri des séquences uniques
sequences_ids = [obj["sequence"] for obj in json["data"]]
unique_ids = list(set(sequences_ids))
print(unique_ids)
else:
print(response)
print(response)

View File

@ -14,35 +14,67 @@ session = requests.Session()
retries_strategies = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429,502, 503, 504],
)
session.mount('https://', HTTPAdapter(max_retries=retries_strategies))
status_forcelist=[429, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retries_strategies))
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('access_token', type=str, help='Your mapillary access token')
parser.add_argument('--sequence_ids', type=str, nargs='*', help='The mapillary sequence id(s) to download')
parser.add_argument('--image_ids', type=int, nargs='*', help='The mapillary image id(s) to get their sequence id(s)')
parser.add_argument('--destination', type=str, default='data', help='Path destination for the images')
parser.add_argument('--image_limit', type=int, default=None, help='How many images you want to download')
parser.add_argument('--overwrite', default=False, action='store_true', help='overwrite existing images')
parser.add_argument("access_token", type=str, help="Your mapillary access token")
parser.add_argument(
"--sequence_ids",
type=str,
nargs="*",
help="The mapillary sequence id(s) to download",
)
parser.add_argument(
"--image_ids",
type=int,
nargs="*",
help="The mapillary image id(s) to get their sequence id(s)",
)
parser.add_argument(
"--destination",
type=str,
default="data",
help="Path destination for the images",
)
parser.add_argument(
"--image_limit",
type=int,
default=None,
help="How many images you want to download",
)
parser.add_argument(
"--overwrite",
default=False,
action="store_true",
help="overwrite existing images",
)
parser.add_argument("-v", "--version", action="version", version="release 1.6")
args = parser.parse_args(argv)
if args.sequence_ids is None and args.image_ids is None:
parser.error("Please enter at least one sequence id or image id")
return args
def download(url, filepath, metadata=None):
#print(asizeof.asizeof(image)/1024, "MB")
def download(url, filepath, metadata=None):
# print(asizeof.asizeof(image)/1024, "MB")
with open(str(filepath), "wb") as f:
r = session.get(url, stream=True, timeout=6)
image = write_exif(r.content, metadata)
try:
image = write_exif(r.content, metadata)
except Exception as e:
print(f"FAILED to write exif data for {filepath}. Error: {e}")
f.write(image)
print("{} downloaded".format(filepath))
print("{} downloaded {}".format(filepath, r))
def get_single_image_data(image_id, mly_header):
req_url = 'https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence'.format(image_id)
req_url = "https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence".format(
image_id
)
r = session.get(req_url, headers=mly_header)
data = r.json()
print(data)
@ -50,54 +82,68 @@ def get_single_image_data(image_id, mly_header):
def get_image_data_from_sequences(sequences_id, mly_header):
for i,sequence_id in enumerate(sequences_id):
url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id)
for i, sequence_id in enumerate(sequences_id):
url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header)
data = r.json()
image_ids = data['data']
image_ids = data["data"]
total_image = len(image_ids)
print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id))
print('getting images data')
print(
"{} images in sequence {} of {} - id : {}".format(
total_image, i + 1, len(sequences_id), sequence_id
)
)
print("getting images data")
for x in range(0, total_image):
image_id = image_ids[x]['id']
image_id = image_ids[x]["id"]
image_data = get_single_image_data(image_id, mly_header)
image_data['sequence_id'] = sequence_id
image_data["sequence_id"] = sequence_id
yield image_data
def get_image_data_from_sequences__future(sequences_id, mly_header):
for i,sequence_id in enumerate(sequences_id):
url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id)
for i, sequence_id in enumerate(sequences_id):
url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header)
data = r.json()
if data.get('data') == []:
print("Empty or wrong sequence {} of {} - id : {}".format(i+1, len(sequences_id), sequence_id))
if data.get("data") == []:
print(
"Empty or wrong sequence {} of {} - id : {}".format(
i + 1, len(sequences_id), sequence_id
)
)
continue
image_ids = data['data']
image_ids = data["data"]
total_image = len(image_ids)
print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id))
print('getting images data')
print(
"{} images in sequence {} of {} - id : {}".format(
total_image, i + 1, len(sequences_id), sequence_id
)
)
print("getting images data")
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {}
for x in range(0, total_image):
image_id = image_ids[x]['id']
future_to_url[executor.submit(get_single_image_data, image_id, mly_header)] = image_id
image_id = image_ids[x]["id"]
future_to_url[
executor.submit(get_single_image_data, image_id, mly_header)
] = image_id
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
image_data = future.result()
image_data['sequence_id'] = sequence_id
#print(image_data)
image_data["sequence_id"] = sequence_id
# print(image_data)
yield image_data
def write_exif(picture, img_metadata):
'''
"""
Write exif metadata
'''
#{'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca',
"""
# {'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca',
# 'captured_at': 1603459736644, 'geometry': {'type': 'Point', 'coordinates': [2.5174596904057, 48.777089857534]}, 'id': '485924785946693'}
with writer.Writer(picture) as image:
image.add_artist(img_metadata)
image.add_camera_make(img_metadata)
@ -113,61 +159,84 @@ def write_exif(picture, img_metadata):
return updated_image
if __name__ == '__main__':
if __name__ == "__main__":
args = parse_args()
sequence_ids= args.sequence_ids if args.sequence_ids is not None else []
sequence_ids = args.sequence_ids if args.sequence_ids is not None else []
images_ids = args.image_ids
access_token = args.access_token
images_data = []
header = {'Authorization' : 'OAuth {}'.format(access_token)}
header = {"Authorization": "OAuth {}".format(access_token)}
if images_ids:
for image_id in images_ids:
image_data = get_single_image_data(image_id, header)
if 'error' in image_data:
if "error" in image_data:
print("data : ", image_data)
print("something wrong happened ! Please check your image id and/or your connection")
print(
"something wrong happened ! Please check your image id and/or your connection"
)
sys.exit()
else:
sequence_ids.append(image_data.get('sequence'))
sequence_ids.append(image_data.get("sequence"))
#for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)):
for i,image_data in enumerate(get_image_data_from_sequences__future(sequence_ids, header)):
# for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)):
for i, image_data in enumerate(
get_image_data_from_sequences__future(sequence_ids, header)
):
if args.image_limit is not None and i >= args.image_limit:
break
if 'error' in image_data:
if "error" in image_data:
print("data : ", image_data)
print("something wrong happened ! Please check your token and/or your connection")
print(
"something wrong happened ! Please check your token and/or your connection"
)
sys.exit()
images_data.append(image_data)
#sys.exit()
# sys.exit()
print('downloading.. this process will take a while. please wait')
print("downloading.. this process will take a while. please wait")
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for i,image_data in enumerate(images_data):
for i, image_data in enumerate(images_data):
# create a folder for each unique sequence ID to group images by sequence
path_destination = os.path.join(args.destination, image_data['sequence_id'])
path_destination = os.path.join(args.destination, image_data["sequence_id"])
if not os.path.exists(path_destination):
os.makedirs(path_destination)
date_time_image_filename = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000).strftime('%Y-%m-%d_%HH%Mmn%Ss%f')[:-3] + '.jpg'
date_time_image_filename = (
datetime.utcfromtimestamp(
int(image_data["captured_at"]) / 1000
).strftime("%Y-%m-%d_%HH%Mmn%Ss%f")[:-3]
+ ".jpg"
)
path = os.path.join(path_destination, date_time_image_filename)
img_metadata = writer.PictureMetadata(
capture_time = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000),
artist = image_data['creator']['username'],
camera_make = image_data['make'],
camera_model = image_data['model'],
longitude = image_data['geometry']['coordinates'][0],
latitude = image_data['geometry']['coordinates'][1],
picture_type = PictureType("equirectangular") if image_data.get('camera_type') == 'spherical' or image_data.get('camera_type') == 'equirectangular' else PictureType("flat"),
direction = image_data['compass_angle'],
altitude = image_data['altitude'],
capture_time=datetime.utcfromtimestamp(
int(image_data["captured_at"]) / 1000
),
artist=image_data["creator"]["username"],
camera_make=image_data["make"],
camera_model=image_data["model"],
longitude=image_data["geometry"]["coordinates"][0],
latitude=image_data["geometry"]["coordinates"][1],
picture_type=(
PictureType("equirectangular")
if image_data.get("camera_type") == "spherical"
or image_data.get("camera_type") == "equirectangular"
else PictureType("flat")
),
direction=image_data["compass_angle"],
altitude=image_data["altitude"],
)
#print("metadata: ", img_metadata)
#print("path: ", image_data)
# print("metadata: ", img_metadata)
# print("path: ", image_data)
image_exists = os.path.exists(path)
if not args.overwrite and image_exists:
print("{} already exists. Skipping ".format(path))
continue
executor.submit(download, url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)
#download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)
executor.submit(
download,
url=image_data["thumb_original_url"],
filepath=path,
metadata=img_metadata,
)
# download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)

View File

@ -1,53 +1,61 @@
import os
input_file = 'input_file'
import argparse
def parse_args(argv =None):
input_file = "input_file"
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--dev_token', type=str, help='Your mapillary access token')
parser.add_argument('--username', type=str, help='Username to get the sequences id of')
parser.add_argument(
"--dev_token",
type=str,
default=os.environ["MAPILLARY_DEV_TOKEN"],
help="Your mapillary access token",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="Username to get the sequences id of",
)
global args
args = parser.parse_args(argv)
if __name__ == '__main__':
print("Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)")
if __name__ == "__main__":
print(
"Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)"
)
parse_args()
username=args.username
username = args.username
input_file = f"sequences_{username}.txt"
if not args.dev_token:
print(f"Erreur : Le token de développeur de mapillary manque, vérifiez le fichier de variables secretes. Arrêt du script.")
exit(1)
if not os.path.exists(input_file) or not os.path.isfile(input_file):
print(f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script.")
print(
f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script."
)
exit(1)
else:
print(f"Fichier '{input_file}' trouvé.")
output_file = f"script_bash_get_sequences_for_user_{username}.sh"
access_token = "--access_token='"+args.dev_token+"' "
format_string = "/usr/bin/python3 mapillary_download.py {} --sequence_id={}\n"
access_token = "$MAPILLARY_DEV_TOKEN" # or, if you want to use the password in clear text: "'"+args.dev_token+"' "
with open(output_file, "w") as output:
with open(input_file, "r") as input_handle:
content = input_handle.read()
sequences = eval(content)
for seq in sequences:
full_cmd = f"/usr/bin/python3 mapillary_download.py {access_token} --sequence_id='{seq}' --username={username}\n"
full_cmd = f"python mapillary_download.py {access_token} --sequence_ids={seq}\n"
output.write(full_cmd)
print(output_file)
print(f"\n Script Bash généré avec succès.")
print(f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}")
print(
f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}"
)

View File

@ -1,4 +1,4 @@
#source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py
# source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py
from typing import Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
@ -31,7 +31,8 @@ class PictureMetadata:
direction: Optional[float] = None
orientation: Optional[int] = 1
class Writer():
class Writer:
def __init__(self, picture: bytes) -> None:
self.content = picture
self.image = pyexiv2.ImageData(picture)
@ -42,7 +43,7 @@ class Writer():
def __enter__(self):
return self
def __exit__(self, *args) -> None:
self.image.close()
@ -53,19 +54,28 @@ class Writer():
if self.updated_xmp:
self.image.modify_xmp(self.updated_xmp)
except Exception as e:
print("exception \nexif: {}\nxmp: {}".format(self.updated_exif, self.updated_xmp))
print(
"exception \nexif: {}\nxmp: {}".format(
self.updated_exif, self.updated_xmp
)
)
def close(self) -> None:
self.image.close()
def get_Bytes(self) -> bytes:
return self.image.get_bytes()
def writePictureMetadata(self, metadata: PictureMetadata) -> None:
"""
Override exif metadata on raw picture and return updated bytes
"""
if not metadata.capture_time and not metadata.longitude and not metadata.latitude and not metadata.picture_type:
if (
not metadata.capture_time
and not metadata.longitude
and not metadata.latitude
and not metadata.picture_type
):
return
if metadata.capture_time:
@ -83,12 +93,20 @@ class Writer():
Add latitude and longitude values in GPSLatitude + GPSLAtitudeRef and GPSLongitude + GPSLongitudeRef
"""
if metadata.latitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = "N" if metadata.latitude > 0 else "S"
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(metadata.latitude)
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = (
"N" if metadata.latitude > 0 else "S"
)
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(
metadata.latitude
)
if metadata.longitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = "E" if metadata.longitude > 0 else "W"
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(metadata.longitude)
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = (
"E" if metadata.longitude > 0 else "W"
)
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(
metadata.longitude
)
def add_altitude(self, metadata: PictureMetadata, precision: int = 1000) -> None:
"""
@ -98,18 +116,24 @@ class Writer():
if altitude is not None:
negative_altitude = 0 if altitude >= 0 else 1
self.updated_exif['Exif.GPSInfo.GPSAltitude'] = f"{int(abs(altitude * precision))} / {precision}"
self.updated_exif['Exif.GPSInfo.GPSAltitudeRef'] = negative_altitude
self.updated_exif["Exif.GPSInfo.GPSAltitude"] = (
f"{int(abs(altitude * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSAltitudeRef"] = negative_altitude
def add_direction(self, metadata: PictureMetadata, ref: str = 'T', precision: int = 1000) -> None:
def add_direction(
self, metadata: PictureMetadata, ref: str = "T", precision: int = 1000
) -> None:
"""
Add direction value in GPSImgDirection and GPSImgDirectionRef
"""
direction = metadata.direction
if metadata.direction is not None:
self.updated_exif['Exif.GPSInfo.GPSImgDirection'] = f"{int(abs(direction % 360.0 * precision))} / {precision}"
self.updated_exif['Exif.GPSInfo.GPSImgDirectionRef'] = ref
self.updated_exif["Exif.GPSInfo.GPSImgDirection"] = (
f"{int(abs(direction % 360.0 * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSImgDirectionRef"] = ref
def add_gps_datetime(self, metadata: PictureMetadata) -> None:
"""
@ -120,15 +144,21 @@ class Writer():
metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override GPSInfo time and DatetimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
offset = metadata.capture_time.utcoffset()
if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset)
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
utc_dt = metadata.capture_time.astimezone(tz=pytz.UTC)
self.updated_exif["Exif.GPSInfo.GPSDateStamp"] = utc_dt.strftime("%Y:%m:%d")
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime("%H/1 %M/1 %S/1")
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime(
"%H/1 %M/1 %S/1"
)
def add_datetimeoriginal(self, metadata: PictureMetadata) -> None:
"""
Add date time in Exif DateTimeOriginal and SubSecTimeOriginal tags
@ -138,12 +168,18 @@ class Writer():
metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override DatetimeOriginal and SubSecTimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
offset = metadata.capture_time.utcoffset()
if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset)
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
if metadata.capture_time.microsecond != 0:
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = metadata.capture_time.strftime("%f")
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = (
metadata.capture_time.strftime("%f")
)
def add_img_projection(self, metadata: PictureMetadata) -> None:
"""
@ -162,15 +198,15 @@ class Writer():
if metadata.artist is not None:
self.updated_exif["Exif.Image.Artist"] = ascii(metadata.artist).strip("'")
def add_camera_make(self, metadata: PictureMetadata) -> None:
"""
Add camera manufacture in Exif Make tag
"""
if metadata.camera_make is not None:
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip("'")
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip(
"'"
)
def add_camera_model(self, metadata: PictureMetadata) -> None:
"""
@ -178,7 +214,9 @@ class Writer():
"""
if metadata.camera_model is not None:
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip("'")
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip(
"'"
)
def format_offset(self, offset: timedelta) -> str:
"""Format offset for OffsetTimeOriginal. Format is like "+02:00" for paris offset
@ -197,7 +235,7 @@ class Writer():
"""
new_lat_lon = metadata.longitude is not None and metadata.latitude is not None
if new_lat_lon :
if new_lat_lon:
lon = metadata.longitude
lat = metadata.latitude
@ -211,14 +249,14 @@ class Writer():
lon = self._from_dms(lon) * (1 if lon_ref == "E" else -1)
lat = self._from_dms(lat) * (1 if lat_ref == "N" else -1)
except KeyError:
return metadata.capture_time # canot localize, returning same date
return metadata.capture_time # canot localize, returning same date
tz_name = tz_finder.timezone_at(lng=lon, lat=lat)
if not tz_name:
return metadata.capture_time # cannot find timezone, returning same date
tz = pytz.timezone(tz_name)
return tz.localize(naive_dt)
def _from_dms(self, val: str) -> float: