Compare commits

...

13 Commits

Author SHA1 Message Date
b43cacea56 Merge pull request 'Add bbox filtering to get mapillary images/sequences' (#8) from remonet/mapillary_download:tykayn/batch-get-mapillary-sequences into tykayn/batch-get-mapillary-sequences
Reviewed-on: tykayn/mapillary_download#8
2024-11-02 18:38:13 +01:00
Rémi Emonet
8bb31a65dc Add bbox filtering to get mapillary images/sequences 2024-11-02 14:39:59 +01:00
efe40c10d2 Merge pull request 'remove JFIF data in case EXIF extraction fails' (#7) from matthias/mapillary_download:workaround/faulty-exif into tykayn/batch-get-mapillary-sequences
Reviewed-on: tykayn/mapillary_download#7
2024-10-27 09:04:42 +01:00
Matthias
0f3a3a8d6e
remove JFIF data in case EXIF extraction fails
Signed-off-by: Matthias <matthias@pebble>
2024-10-27 00:51:41 +02:00
4258a8b84b Merge pull request 'correct text-array to download script' (#6) from matthias/mapillary_download:bugfix/download-array into tykayn/batch-get-mapillary-sequences
Reviewed-on: tykayn/mapillary_download#6
2024-10-26 23:24:17 +02:00
Matthias
795fd20030
correct text-array to download script, ask for usernames if empty, improve argparse, linting to black
Signed-off-by: Matthias <matthias@pebble>
2024-10-26 22:11:45 +02:00
ff90b70d5d add curl_land.sh to fetch sequences of a user 2024-10-25 00:56:27 +02:00
c9cf523562 add script to find user id 2024-10-25 00:54:40 +02:00
Matthias
7d297ed805
fixup! correct typos, adapt chmod of bash scripts
Signed-off-by: Matthias <matthias@pebble>
2024-10-24 01:10:06 +02:00
Matthias
c8404cdd54
add panormax cli
Signed-off-by: Matthias <matthias@pebble>
2024-10-24 00:01:05 +02:00
Matthias
5099f0f368
correct typos, adapt chmod of bash scripts
Signed-off-by: Matthias <matthias@pebble>
2024-10-22 23:12:54 +02:00
7979c16033 Merge pull request 'Fix typo destionation -> destination' (#1) from Robot8A/mapillary_download:robot8a-patch-1 into tykayn/batch-get-mapillary-sequences
Reviewed-on: tykayn/mapillary_download#1
Reviewed-by: tykayn <contact@cipherbliss.com>
2024-10-20 14:32:11 +02:00
8d3661881c Fix typo destionation -> destination 2024-10-20 13:32:22 +02:00
13 changed files with 486 additions and 198 deletions

View File

@ -3,9 +3,9 @@ Simple code to download images in one or several mapillary sequences. The images
## How to use
### Setup env variables
copy the secret variables files to setup your maipllary dev token, and your panoramax OpenStreetMap token.
copy the secret variables files to setup your mapillary dev token, and your panoramax OpenStreetMap token.
```Shell
cp secrets_variables_examples.sh secrets_variables.sh
cp secrets_variables_example.sh secrets_variables.sh
editor secrets_variables.sh
```
@ -13,10 +13,13 @@ editor secrets_variables.sh
To avoid to download everything at once, this is a 2 steps process:
- 1/ We set a list of users for which we want to get the list of sequences to download, and get a bash script for each user.
Change the list of users in the batch file `batch_get_username.sh`.
change the access token with your access token and the sequence ids with the ids of the sequences you want to download
```bash
usernames=( "someone_having_nice_pictures" "someone_else" "oh_look_a_these_usernames" ) # use real accounts, it might work better
```
then, execute:
```bash
./batch_get_username.sh
```
This will give you a few files that the other scripts will use to generate a script to download all sequences of each user. One user, one bash script generated.
@ -25,7 +28,7 @@ To avoid to download everything at once, this is a 2 steps process:
bash script_bash_get_sequences_for_user_SOMEONE.sh
```
**Or all of the generated batch scripts at once.**
```bash
bash script_bash_get_sequences_for_user_*
```
@ -67,7 +70,7 @@ optional arguments:
This is used to restrict zones for national instances of Panoramax to upload to reuse the sequences of multi national users.
Edit the bounding boxes in `find_france_photos_and_move.py` and change the destination folder.
Running the script will check in the /data folder for every user sequences and move the files being in the bouding boxes in the `destionation` folder path of the script.
Running the script will check in the /data folder for every user sequences and move the files being in the bouding boxes in the `destination` folder path of the script.
For this script to read the exif of pictures, you will need the python package `exifread`
It was tested with pictures taken with the App Open Camera.
@ -99,9 +102,10 @@ git clone https://github.com/Stefal/mapillary_download.git
cd mapillary_download
python3 -m venv mly_venv
source mly_venv/bin/activate
source secrets_variables.sh
python -m pip install -r requirements.txt
```
Then you can run `python mapillary_download "MLY|xxxx|xxxxxxx" --sequence_ids xxxxxxxxxxx`
Then you can run `python mapillary_download.py $MAPILLARY_DEV_TOKEN --sequence_ids xxxxxxxxxxx`
When you're done with the script, simply run `deactivate` to exit the virtual environnement.
On windows you can create a virtual environnement too, or use the prebuilt `mapillary_download.exe` available on the release page.
@ -114,8 +118,10 @@ go in your `data` folder, and to the user subfolder, and assuming you have alrea
```Shell
source mly_venv/bin/activate
source secrets_variables.sh
cd data/some_user
for dir in */; do dir=${dir%?} ; geovisio upload --api-url https://panoramax.openstreetmap.fr "$dir" --token=BLAH_BLAH ; done
for dir in */; do dir=${dir%?} ; panoramax_cli upload --api-url https://panoramax.openstreetmap.fr "$dir" --token=$PANORAMAX_DEV_TOKEN ; done
```
Have fun!

7
batch_get_username.sh Normal file → Executable file
View File

@ -5,7 +5,12 @@
# Liste des usernames
# example:
# usernames=( "riri" "fifi" "loulou")
usernames=( "someone_having_nice_pictures" "someone_else" "oh_look_a_these_usernames" )
usernames=( )
if test -z $usernames; then
read -p "Please enter a mapillary username: " ANS
usernames=$ANS
fi
# check env variables are valid
if [ -f "secrets_variables.sh" ]; then

8
curl_land.sh Normal file
View File

@ -0,0 +1,8 @@
#!/bin/bash
# se rendre sur la page de l'utilisateur concerné avec firefox
# trouver la requête graphql qui fait le fetch latest activity
# copier la valeur curl de la requete dans les devtool firefox
# remplacer la valeur 10000 qui limite le nombre de séquences par disons 10000
# lancer la commande : bash curl_land.sh > out.json
#source secrets_variables.sh
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' > out.json

View File

@ -12,9 +12,19 @@ import shutil
import exifread
# Définition du rectangle entourant la France métropolitaine et un peu autour
france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max)
france_bbox: tuple[float, float, float, float] = (
42.0,
-5.0,
51.0,
10.0,
) # (lat_min, lon_min, lat_max, lon_max)
# Définition du rectangle entourant la France métropolitaine et un peu autour
france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max)
france_bbox: tuple[float, float, float, float] = (
42.0,
-5.0,
51.0,
10.0,
) # (lat_min, lon_min, lat_max, lon_max)
# Définition du rectangle entourant la Guadeloupe
guadeloupe_bbox: tuple[float, float, float, float] = (15.8, -61.8, 17.3, -59.3)
@ -50,11 +60,11 @@ pf_bbox: tuple[float, float, float, float] = (-27.5, -140.0, -7.5, -134.0)
taaf_bbox: tuple[float, float, float, float] = (-49.5, 68.5, -37.5, 77.5)
# Chemin du répertoire source
source_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/'
source_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/"
# Chemin du répertoire destination
destination_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/'
sequence_folder: str = 'principale_sequence'
destination_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/"
sequence_folder: str = "principale_sequence"
count_files_all: int = 0
count_files_moved: int = 0
# Crée le répertoire destination si il n'existe pas
@ -71,18 +81,19 @@ def move_file_if_in_france(filepath, sequence_folder):
latitude, longitude = get_gps_info(filepath)
if latitude and longitude:
print(f'Latitude: {latitude}, Longitude: {longitude}')
print(f"Latitude: {latitude}, Longitude: {longitude}")
if are_lat_lon_in_france(latitude, longitude):
move_file_in_destination(filepath, sequence_folder)
else:
print('Informations GPS non trouvées')
print("Informations GPS non trouvées")
def move_file_in_destination(filepath, sequence_folder):
global count_files_moved
# Déplace le fichier dans le sous-répertoire "photos_in_france"
dest_subdir = os.path.join(destination_dir, sequence_folder,
os.path.basename(os.path.dirname(filepath)))
dest_subdir = os.path.join(
destination_dir, sequence_folder, os.path.basename(os.path.dirname(filepath))
)
if not os.path.exists(dest_subdir):
os.makedirs(dest_subdir)
shutil.move(filepath, os.path.join(dest_subdir, filepath))
@ -90,6 +101,7 @@ def move_file_in_destination(filepath, sequence_folder):
print(f"Moved {filepath} to {dest_subdir}")
return True
def are_lat_lon_in_france(gps_lat, gps_lon):
"""
recherche d'une zone du territoire français
@ -117,37 +129,63 @@ def are_lat_lon_in_france(gps_lat, gps_lon):
print("lat lon :", gps_lat, gps_lon)
if (france_bbox[0] <= gps_lat <= france_bbox[2] and france_bbox[1] <= gps_lon <= france_bbox[3]):
if (
france_bbox[0] <= gps_lat <= france_bbox[2]
and france_bbox[1] <= gps_lon <= france_bbox[3]
):
return "France métropolitaine"
elif (taaf_bbox[0] <= gps_lat <= taaf_bbox[2] and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]):
elif (
taaf_bbox[0] <= gps_lat <= taaf_bbox[2]
and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]
):
return "Terres australes et antarctiques françaises"
elif (guyane_bbox[0] <= gps_lat <= guyane_bbox[2] and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]):
elif (
guyane_bbox[0] <= gps_lat <= guyane_bbox[2]
and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]
):
return "Guyane française"
elif (reunion_bbox[0] <= gps_lat <= reunion_bbox[2] and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]):
elif (
reunion_bbox[0] <= gps_lat <= reunion_bbox[2]
and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]
):
return "La Réunion"
elif (wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]):
elif wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]:
return "Wallis-et-Futuna"
elif (stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2] and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]):
elif (
stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2]
and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]
):
return "Saint-Martin et Saint-Barthélemy"
elif (spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]):
elif (
spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]
):
return "Saint-Pierre-et-Miquelon"
elif (mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2] and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]):
elif (
mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2]
and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]
):
return "Mayotte"
elif (martinique_bbox[0] <= gps_lat <= martinique_bbox[2] and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]):
elif (
martinique_bbox[0] <= gps_lat <= martinique_bbox[2]
and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]
):
return "Martinique"
elif (guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2] and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]):
elif (
guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2]
and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]
):
return "Guadeloupe"
elif (pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]):
elif pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]:
return "Polynésie française"
elif (nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]):
elif nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]:
return "Nouvelle-Calédonie"
else:
return None # "Hors de France"
return None # "Hors de France"
def get_gps_info(filepath):
with open(filepath, 'rb') as f:
with open(filepath, "rb") as f:
tags = exifread.process_file(f)
gps_info = {}
@ -155,12 +193,12 @@ def get_gps_info(filepath):
# print("clés exif ", tags.keys())
for tag in tags.keys():
if tag.startswith('GPS'):
if tag.startswith("GPS"):
gps_info[tag] = tags[tag]
# Extraction des informations de latitude et de longitude
gps_latitude = convert_rational_to_float(gps_info.get('GPS GPSLatitude'))
gps_longitude = convert_rational_to_float(gps_info.get('GPS GPSLongitude'))
gps_latitude = convert_rational_to_float(gps_info.get("GPS GPSLatitude"))
gps_longitude = convert_rational_to_float(gps_info.get("GPS GPSLongitude"))
if gps_latitude and gps_longitude:
return gps_latitude, gps_longitude
@ -172,22 +210,38 @@ def convert_rational_to_float(rational):
return float(rational.values[0].num) / float(rational.values[0].den)
if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--source_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/', help='Chemin du répertoire source')
parser.add_argument('--destination_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/', help='Chemin du répertoire destination')
parser.add_argument('--sequence_folder', default='principale_sequence', help='Nom du dossier de séquence')
parser.add_argument(
"--source_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/",
help="Chemin du répertoire source",
)
parser.add_argument(
"--destination_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/",
help="Chemin du répertoire destination",
)
parser.add_argument(
"--sequence_folder",
default="principale_sequence",
help="Nom du dossier de séquence",
)
args = parser.parse_args()
# Parcourt tous les fichiers dans le répertoire source et ses sous-répertoires
for root, dirs, files in os.walk(args.source_dir):
for filename in files:
# Vérifie si le fichier est une image
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tif')):
if filename.lower().endswith(
(".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tif")
):
filepath = os.path.join(root, filename)
move_file_if_in_france(filepath, sequence_folder)
print('fichiers se situant en france déplacés: ', count_files_moved, ' / ', count_files_all)
print(
"fichiers se situant en france déplacés: ",
count_files_moved,
" / ",
count_files_all,
)

15
find_user_id.sh Normal file
View File

@ -0,0 +1,15 @@
#!/bin/bash
# récupérer l'id utilisateur et ses séquences dans un fichier json
echo "Prenez un token oauth sur https://www.mapillary.com/app/user/$1"
USERNAME=$1
response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS)
ID=$(echo "$response" | jq -r '.data.user_by_username.id')
echo "ID: $ID"
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS > "out_${1}.json"
echo " lancez: python3 get_sequences_of_username.py --username=\"$1\" --dev_token='$MAPILLARY_DEV_TOKEN' --max_sequence=99999; bash text_array_to_download_script.py --username=$1 --dev_token='$MAPILLARY_DEV_TOKEN'"

View File

@ -1,30 +1,38 @@
import json
import requests
# lit un json listant les id de photo de chaque séquence et va
# chercher la séquence par API.
import argparse
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--username', type=str, help='Username to get the sequences id of')
parser.add_argument('--dev_token', type=str, help='Your mapillary developer token')
parser.add_argument('--max_sequence', type=str, help='Username to get the sequences id of')
parser.add_argument(
"--username",
type=str,
help="Username to get the sequences id of",
required=True,
)
parser.add_argument("--dev_token", type=str, help="Your mapillary developer token")
parser.add_argument(
"--max_sequence", type=str, help="Limit the amount of retrieved sequence ids"
)
global args
args = parser.parse_args(argv)
print(args)
# Initialisation de la liste pour stocker les réponses
responses = []
sequences = []
def get_image_data_from_sequences():
username = args.username
input_file = "out_"+username+".json"
input_file = "out_" + username + ".json"
# Chargement du fichier JSON d'entrée
with open(input_file, "r") as file:
@ -32,7 +40,7 @@ def get_image_data_from_sequences():
# Itération sur les noeuds pour collectionner les image_ids
nodelist = input_data["data"]["fetch__User"]["feed"]["nodes"]
print( 'séquences : ', len(nodelist))
print("séquences : ", len(nodelist))
image_ids = [node["image_id"] for node in nodelist]
print(image_ids)
@ -41,15 +49,21 @@ def get_image_data_from_sequences():
# Préparation de la tête d'autorisation pour toutes les futures requêtes
header = {"Access-Token": dev_token}
ii=0
ii = 0
limit_requests = 1000000000
# limit_requests = 5 # pour tester
# limit_requests = 5 # pour tester
# Boucle sur chaque image_id pour interroger l'API Mapillary
for image_id in image_ids:
ii+=1
ii += 1
if limit_requests >= ii and image_id:
params = {"id": image_id, "fields": "id,sequence"}
request_url = "https://graph.mapillary.com/" + str(image_id)+"?access_token="+dev_token+"&fields=id,sequence"
request_url = (
"https://graph.mapillary.com/"
+ str(image_id)
+ "?access_token="
+ dev_token
+ "&fields=id,sequence"
)
# print("requete: "+request_url)
response = requests.get(request_url)
@ -63,23 +77,31 @@ def get_image_data_from_sequences():
parsed_response["sequence"] = raw_response["sequence"]
sequences.append(parsed_response["sequence"])
print("séquence trouvée: "+str(ii)+"/"+args.max_sequence+" : "+raw_response["sequence"])
print(
"séquence trouvée: "
+ str(ii)
+ "/"
+ args.max_sequence
+ " : "
+ raw_response["sequence"]
)
else:
print(response)
responses.append(parsed_response)
def persist_files():
# Sauvegarde des nouveaux résultats dans le fichier output.json
output_file = "sequences_"+args.username+".json"
output_file = "sequences_" + args.username + ".json"
with open(output_file, "w") as file:
json.dump(responses, file)
sequence_filename = "sequences_"+args.username+".txt"
sequence_filename = "sequences_" + args.username + ".txt"
with open(sequence_filename, "w") as file:
json.dump(sequences, file)
print('fichier sauvegardé: '+sequence_filename)
print("fichier sauvegardé: " + sequence_filename)
parse_args()

1
get_user.sh Normal file → Executable file
View File

@ -9,6 +9,7 @@ export username=$1
export num_user=$2
echo "télécharger la séquence pour l'utilisateur $username, $num_user"
bash curl_land.sh "$username" "$num_user" > "out_${username}.json"
#FIXME curl_land script not included in this repo
echo "séquences récupérées:"
num_sequences=$(grep -o -w 'image_id' "out_${username}.json" | wc -l)

View File

@ -1,38 +1,60 @@
import requests, json
import os, requests, json
import argparse
from urllib.parse import quote
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--access_token', type=str, help='Your mapillary access token')
parser.add_argument('--username', type=str, help='Username to get the sequences id of')
parser.add_argument('--pictures', type=str, help='Limit of pictures to fetch')
parser.add_argument(
"--access_token",
type=str,
default=os.environ["MAPILLARY_DEV_TOKEN"],
help="Your mapillary access token",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="Username to get the sequences id of",
)
parser.add_argument(
"--pictures",
type=str,
default=500,
help="Limit of pictures to fetch, max=5000",
)
parser.add_argument(
"--bbox",
type=str,
default=None,
help="Limit to a bounding box, e.g. '-5.5,47.3,-1.2,48.9', use http://bboxfinder.com",
)
global args
args = parser.parse_args(argv)
if __name__ == '__main__':
if __name__ == "__main__":
parse_args()
if args.access_token == None:
print('please provide the access_token')
exit()
mly_key = args.access_token
creator_username = args.username
max_img= args.pictures
max_img = args.pictures
bbox_filter = f'&bbox={args.bbox}' if args.bbox is not None else ''
url = f'https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence'
url = f"https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence{bbox_filter}"
print(url)
response = requests.get(url)
if response.status_code == 200:
json = response.json()
json = response.json()
# tri des séquences uniques
sequences_ids = [obj['sequence'] for obj in json['data']]
unique_ids = list(set(sequences_ids))
print(unique_ids)
# tri des séquences uniques
sequences_ids = [obj["sequence"] for obj in json["data"]]
unique_ids = list(set(sequences_ids))
print(unique_ids)
print("---")
print(" ".join(unique_ids))
else:
print(response)
print(response)

View File

@ -1,7 +1,8 @@
import requests
from requests.adapters import HTTPAdapter
from requests.adapters import Retry
import json
from PIL import Image
import io
import os
import concurrent.futures
import argparse
@ -14,35 +15,98 @@ session = requests.Session()
retries_strategies = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429,502, 503, 504],
)
session.mount('https://', HTTPAdapter(max_retries=retries_strategies))
status_forcelist=[429, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retries_strategies))
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('access_token', type=str, help='Your mapillary access token')
parser.add_argument('--sequence_ids', type=str, nargs='*', help='The mapillary sequence id(s) to download')
parser.add_argument('--image_ids', type=int, nargs='*', help='The mapillary image id(s) to get their sequence id(s)')
parser.add_argument('--destination', type=str, default='data', help='Path destination for the images')
parser.add_argument('--image_limit', type=int, default=None, help='How many images you want to download')
parser.add_argument('--overwrite', default=False, action='store_true', help='overwrite existing images')
parser.add_argument("access_token", type=str, help="Your mapillary access token")
parser.add_argument(
"--sequence_ids",
type=str,
nargs="*",
help="The mapillary sequence id(s) to download",
)
parser.add_argument(
"--image_ids",
type=int,
nargs="*",
help="The mapillary image id(s) to get their sequence id(s)",
)
parser.add_argument(
"--destination",
type=str,
default="data",
help="Path destination for the images",
)
parser.add_argument(
"--image_limit",
type=int,
default=None,
help="How many images you want to download",
)
parser.add_argument(
"--overwrite",
default=False,
action="store_true",
help="overwrite existing images",
)
parser.add_argument("-v", "--version", action="version", version="release 1.6")
args = parser.parse_args(argv)
if args.sequence_ids is None and args.image_ids is None:
parser.error("Please enter at least one sequence id or image id")
return args
def download(url, filepath, metadata=None):
#print(asizeof.asizeof(image)/1024, "MB")
def download(url, filepath, metadata=None):
# print(asizeof.asizeof(image)/1024, "MB")
with open(str(filepath), "wb") as f:
r = session.get(url, stream=True, timeout=6)
image = write_exif(r.content, metadata)
f.write(image)
print("{} downloaded".format(filepath))
try:
img = write_exif(r.content, metadata)
except Exception as e:
print(
f"{filepath} FAILED to write exif data. Error: {e} Retrying with reduced EXIF.".replace(
"\n", " | "
)
)
# write_exif(img_byte_arr, metadata) crashes when JFIF fields present
# so here is a workaround to remove those fields with pillow
# definitely not the most elegant solution...
try:
r = session.get(url, stream=True, timeout=6)
im = Image.open(r.raw)
exif_fields = list(im.info.keys())
# print(f"{filepath} detected exif fields : {exif_fields}")
fields_to_keep = ("exif", "dpi")
for k in exif_fields:
if k not in fields_to_keep:
del im.info[k]
# print(f"{filepath} deleted exif field: {k}")
# done cleaning, now converting pillow image back to bytearray
img_byte_arr = io.BytesIO()
im.save(img_byte_arr, format="JPEG")
img_byte_arr = img_byte_arr.getvalue()
img = write_exif(img_byte_arr, metadata)
except Exception as e:
print(
f"{filepath} FAILED WORKAROUND. Error: {e} Saving image without EXIF data.".replace(
"\n", " | "
)
)
img = r.content
f.write(img)
print("{} downloaded {}".format(filepath, r))
def get_single_image_data(image_id, mly_header):
req_url = 'https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence'.format(image_id)
req_url = "https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence".format(
image_id
)
r = session.get(req_url, headers=mly_header)
data = r.json()
print(data)
@ -50,54 +114,68 @@ def get_single_image_data(image_id, mly_header):
def get_image_data_from_sequences(sequences_id, mly_header):
for i,sequence_id in enumerate(sequences_id):
url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id)
for i, sequence_id in enumerate(sequences_id):
url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header)
data = r.json()
image_ids = data['data']
image_ids = data["data"]
total_image = len(image_ids)
print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id))
print('getting images data')
print(
"{} images in sequence {} of {} - id : {}".format(
total_image, i + 1, len(sequences_id), sequence_id
)
)
print("getting images data")
for x in range(0, total_image):
image_id = image_ids[x]['id']
image_id = image_ids[x]["id"]
image_data = get_single_image_data(image_id, mly_header)
image_data['sequence_id'] = sequence_id
image_data["sequence_id"] = sequence_id
yield image_data
def get_image_data_from_sequences__future(sequences_id, mly_header):
for i,sequence_id in enumerate(sequences_id):
url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id)
for i, sequence_id in enumerate(sequences_id):
url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header)
data = r.json()
if data.get('data') == []:
print("Empty or wrong sequence {} of {} - id : {}".format(i+1, len(sequences_id), sequence_id))
if data.get("data") == []:
print(
"Empty or wrong sequence {} of {} - id : {}".format(
i + 1, len(sequences_id), sequence_id
)
)
continue
image_ids = data['data']
image_ids = data["data"]
total_image = len(image_ids)
print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id))
print('getting images data')
print(
"{} images in sequence {} of {} - id : {}".format(
total_image, i + 1, len(sequences_id), sequence_id
)
)
print("getting images data")
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {}
for x in range(0, total_image):
image_id = image_ids[x]['id']
future_to_url[executor.submit(get_single_image_data, image_id, mly_header)] = image_id
image_id = image_ids[x]["id"]
future_to_url[
executor.submit(get_single_image_data, image_id, mly_header)
] = image_id
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
image_data = future.result()
image_data['sequence_id'] = sequence_id
#print(image_data)
image_data["sequence_id"] = sequence_id
# print(image_data)
yield image_data
def write_exif(picture, img_metadata):
'''
"""
Write exif metadata
'''
#{'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca',
"""
# {'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca',
# 'captured_at': 1603459736644, 'geometry': {'type': 'Point', 'coordinates': [2.5174596904057, 48.777089857534]}, 'id': '485924785946693'}
with writer.Writer(picture) as image:
image.add_artist(img_metadata)
image.add_camera_make(img_metadata)
@ -113,61 +191,84 @@ def write_exif(picture, img_metadata):
return updated_image
if __name__ == '__main__':
if __name__ == "__main__":
args = parse_args()
sequence_ids= args.sequence_ids if args.sequence_ids is not None else []
sequence_ids = args.sequence_ids if args.sequence_ids is not None else []
images_ids = args.image_ids
access_token = args.access_token
images_data = []
header = {'Authorization' : 'OAuth {}'.format(access_token)}
header = {"Authorization": "OAuth {}".format(access_token)}
if images_ids:
for image_id in images_ids:
image_data = get_single_image_data(image_id, header)
if 'error' in image_data:
if "error" in image_data:
print("data : ", image_data)
print("something wrong happened ! Please check your image id and/or your connection")
print(
"something wrong happened ! Please check your image id and/or your connection"
)
sys.exit()
else:
sequence_ids.append(image_data.get('sequence'))
sequence_ids.append(image_data.get("sequence"))
#for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)):
for i,image_data in enumerate(get_image_data_from_sequences__future(sequence_ids, header)):
# for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)):
for i, image_data in enumerate(
get_image_data_from_sequences__future(sequence_ids, header)
):
if args.image_limit is not None and i >= args.image_limit:
break
if 'error' in image_data:
if "error" in image_data:
print("data : ", image_data)
print("something wrong happened ! Please check your token and/or your connection")
print(
"something wrong happened ! Please check your token and/or your connection"
)
sys.exit()
images_data.append(image_data)
#sys.exit()
# sys.exit()
print('downloading.. this process will take a while. please wait')
print("downloading.. this process will take a while. please wait")
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for i,image_data in enumerate(images_data):
for i, image_data in enumerate(images_data):
# create a folder for each unique sequence ID to group images by sequence
path_destination = os.path.join(args.destination, image_data['sequence_id'])
path_destination = os.path.join(args.destination, image_data["sequence_id"])
if not os.path.exists(path_destination):
os.makedirs(path_destination)
date_time_image_filename = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000).strftime('%Y-%m-%d_%HH%Mmn%Ss%f')[:-3] + '.jpg'
date_time_image_filename = (
datetime.utcfromtimestamp(
int(image_data["captured_at"]) / 1000
).strftime("%Y-%m-%d_%HH%Mmn%Ss%f")[:-3]
+ ".jpg"
)
path = os.path.join(path_destination, date_time_image_filename)
img_metadata = writer.PictureMetadata(
capture_time = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000),
artist = image_data['creator']['username'],
camera_make = image_data['make'],
camera_model = image_data['model'],
longitude = image_data['geometry']['coordinates'][0],
latitude = image_data['geometry']['coordinates'][1],
picture_type = PictureType("equirectangular") if image_data.get('camera_type') == 'spherical' or image_data.get('camera_type') == 'equirectangular' else PictureType("flat"),
direction = image_data['compass_angle'],
altitude = image_data['altitude'],
capture_time=datetime.utcfromtimestamp(
int(image_data["captured_at"]) / 1000
),
artist=image_data["creator"]["username"],
camera_make=image_data["make"],
camera_model=image_data["model"],
longitude=image_data["geometry"]["coordinates"][0],
latitude=image_data["geometry"]["coordinates"][1],
picture_type=(
PictureType("equirectangular")
if image_data.get("camera_type") == "spherical"
or image_data.get("camera_type") == "equirectangular"
else PictureType("flat")
),
direction=image_data["compass_angle"],
altitude=image_data["altitude"],
)
#print("metadata: ", img_metadata)
#print("path: ", image_data)
# print("metadata: ", img_metadata)
# print("path: ", image_data)
image_exists = os.path.exists(path)
if not args.overwrite and image_exists:
print("{} already exists. Skipping ".format(path))
continue
executor.submit(download, url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)
#download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)
executor.submit(
download,
url=image_data["thumb_original_url"],
filepath=path,
metadata=img_metadata,
)
# download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)

View File

@ -1,4 +1,6 @@
requests >= 2.28.2
pytz >= 2023.3
timezonefinder >=6.2.0
pyexiv2 >= 2.8.2
pyexiv2 >= 2.8.2
panoramax_cli >= 1.1.1
pillow >= 11.0.0

View File

@ -1,5 +1,11 @@
# scripts common shared variables
# this is to copy as a file named "secrets_variables.sh" which is ignored in this git repo
# Mapillary:
# register an application at https://www.mapillary.com/dashboard/developers
# read permissions needed, all other fields can be random. copy the client token
export MAPILLARY_DEV_TOKEN="MLY|blahblah_replace_it"
# Panoramax
# copy the token from your profile settings at https://panoramax.openstreetmap.fr/settings
export PANORAMAX_DEV_TOKEN="yourtokenhere"

View File

@ -1,53 +1,61 @@
import os
input_file = 'input_file'
import argparse
def parse_args(argv =None):
input_file = "input_file"
def parse_args(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--dev_token', type=str, help='Your mapillary access token')
parser.add_argument('--username', type=str, help='Username to get the sequences id of')
parser.add_argument(
"--dev_token",
type=str,
default=os.environ["MAPILLARY_DEV_TOKEN"],
help="Your mapillary access token",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="Username to get the sequences id of",
)
global args
args = parser.parse_args(argv)
if __name__ == '__main__':
print("Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)")
if __name__ == "__main__":
print(
"Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)"
)
parse_args()
username=args.username
username = args.username
input_file = f"sequences_{username}.txt"
if not args.dev_token:
print(f"Erreur : Le token de développeur de mapillary manque, vérifiez le fichier de variables secretes. Arrêt du script.")
exit(1)
if not os.path.exists(input_file) or not os.path.isfile(input_file):
print(f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script.")
print(
f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script."
)
exit(1)
else:
print(f"Fichier '{input_file}' trouvé.")
output_file = f"script_bash_get_sequences_for_user_{username}.sh"
access_token = "--access_token='"+args.dev_token+"' "
format_string = "/usr/bin/python3 mapillary_download.py {} --sequence_id={}\n"
access_token = "$MAPILLARY_DEV_TOKEN" # or, if you want to use the password in clear text: "'"+args.dev_token+"' "
with open(output_file, "w") as output:
with open(input_file, "r") as input_handle:
content = input_handle.read()
sequences = eval(content)
for seq in sequences:
full_cmd = f"/usr/bin/python3 mapillary_download.py {access_token} --sequence_id='{seq}' --username={username}\n"
full_cmd = f"python mapillary_download.py {access_token} --sequence_ids={seq}\n"
output.write(full_cmd)
print(output_file)
print(f"\n Script Bash généré avec succès.")
print(f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}")
print(
f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}"
)

View File

@ -1,4 +1,4 @@
#source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py
# source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py
from typing import Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
@ -31,7 +31,8 @@ class PictureMetadata:
direction: Optional[float] = None
orientation: Optional[int] = 1
class Writer():
class Writer:
def __init__(self, picture: bytes) -> None:
self.content = picture
self.image = pyexiv2.ImageData(picture)
@ -42,7 +43,7 @@ class Writer():
def __enter__(self):
return self
def __exit__(self, *args) -> None:
self.image.close()
@ -53,19 +54,28 @@ class Writer():
if self.updated_xmp:
self.image.modify_xmp(self.updated_xmp)
except Exception as e:
print("exception \nexif: {}\nxmp: {}".format(self.updated_exif, self.updated_xmp))
print(
"exception \nexif: {}\nxmp: {}".format(
self.updated_exif, self.updated_xmp
)
)
def close(self) -> None:
self.image.close()
def get_Bytes(self) -> bytes:
return self.image.get_bytes()
def writePictureMetadata(self, metadata: PictureMetadata) -> None:
"""
Override exif metadata on raw picture and return updated bytes
"""
if not metadata.capture_time and not metadata.longitude and not metadata.latitude and not metadata.picture_type:
if (
not metadata.capture_time
and not metadata.longitude
and not metadata.latitude
and not metadata.picture_type
):
return
if metadata.capture_time:
@ -83,12 +93,20 @@ class Writer():
Add latitude and longitude values in GPSLatitude + GPSLAtitudeRef and GPSLongitude + GPSLongitudeRef
"""
if metadata.latitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = "N" if metadata.latitude > 0 else "S"
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(metadata.latitude)
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = (
"N" if metadata.latitude > 0 else "S"
)
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(
metadata.latitude
)
if metadata.longitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = "E" if metadata.longitude > 0 else "W"
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(metadata.longitude)
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = (
"E" if metadata.longitude > 0 else "W"
)
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(
metadata.longitude
)
def add_altitude(self, metadata: PictureMetadata, precision: int = 1000) -> None:
"""
@ -98,18 +116,24 @@ class Writer():
if altitude is not None:
negative_altitude = 0 if altitude >= 0 else 1
self.updated_exif['Exif.GPSInfo.GPSAltitude'] = f"{int(abs(altitude * precision))} / {precision}"
self.updated_exif['Exif.GPSInfo.GPSAltitudeRef'] = negative_altitude
self.updated_exif["Exif.GPSInfo.GPSAltitude"] = (
f"{int(abs(altitude * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSAltitudeRef"] = negative_altitude
def add_direction(self, metadata: PictureMetadata, ref: str = 'T', precision: int = 1000) -> None:
def add_direction(
self, metadata: PictureMetadata, ref: str = "T", precision: int = 1000
) -> None:
"""
Add direction value in GPSImgDirection and GPSImgDirectionRef
"""
direction = metadata.direction
if metadata.direction is not None:
self.updated_exif['Exif.GPSInfo.GPSImgDirection'] = f"{int(abs(direction % 360.0 * precision))} / {precision}"
self.updated_exif['Exif.GPSInfo.GPSImgDirectionRef'] = ref
self.updated_exif["Exif.GPSInfo.GPSImgDirection"] = (
f"{int(abs(direction % 360.0 * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSImgDirectionRef"] = ref
def add_gps_datetime(self, metadata: PictureMetadata) -> None:
"""
@ -120,15 +144,21 @@ class Writer():
metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override GPSInfo time and DatetimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
offset = metadata.capture_time.utcoffset()
if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset)
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
utc_dt = metadata.capture_time.astimezone(tz=pytz.UTC)
self.updated_exif["Exif.GPSInfo.GPSDateStamp"] = utc_dt.strftime("%Y:%m:%d")
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime("%H/1 %M/1 %S/1")
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime(
"%H/1 %M/1 %S/1"
)
def add_datetimeoriginal(self, metadata: PictureMetadata) -> None:
"""
Add date time in Exif DateTimeOriginal and SubSecTimeOriginal tags
@ -138,12 +168,18 @@ class Writer():
metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override DatetimeOriginal and SubSecTimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
offset = metadata.capture_time.utcoffset()
if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset)
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
if metadata.capture_time.microsecond != 0:
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = metadata.capture_time.strftime("%f")
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = (
metadata.capture_time.strftime("%f")
)
def add_img_projection(self, metadata: PictureMetadata) -> None:
"""
@ -162,15 +198,15 @@ class Writer():
if metadata.artist is not None:
self.updated_exif["Exif.Image.Artist"] = ascii(metadata.artist).strip("'")
def add_camera_make(self, metadata: PictureMetadata) -> None:
"""
Add camera manufacture in Exif Make tag
"""
if metadata.camera_make is not None:
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip("'")
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip(
"'"
)
def add_camera_model(self, metadata: PictureMetadata) -> None:
"""
@ -178,7 +214,9 @@ class Writer():
"""
if metadata.camera_model is not None:
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip("'")
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip(
"'"
)
def format_offset(self, offset: timedelta) -> str:
"""Format offset for OffsetTimeOriginal. Format is like "+02:00" for paris offset
@ -197,7 +235,7 @@ class Writer():
"""
new_lat_lon = metadata.longitude is not None and metadata.latitude is not None
if new_lat_lon :
if new_lat_lon:
lon = metadata.longitude
lat = metadata.latitude
@ -211,14 +249,14 @@ class Writer():
lon = self._from_dms(lon) * (1 if lon_ref == "E" else -1)
lat = self._from_dms(lat) * (1 if lat_ref == "N" else -1)
except KeyError:
return metadata.capture_time # canot localize, returning same date
return metadata.capture_time # canot localize, returning same date
tz_name = tz_finder.timezone_at(lng=lon, lat=lat)
if not tz_name:
return metadata.capture_time # cannot find timezone, returning same date
tz = pytz.timezone(tz_name)
return tz.localize(naive_dt)
def _from_dms(self, val: str) -> float: