""" PROGRAMMES TRANSPORT INTERURBAIN ET TERRITOIRES """ ############################################################################################### ############################################################################################### """ PREMIERE PARTIE : DONNEES PONCTUELLES """ ############################################################################################### ############################################################################################### """ 1 Exploitation de la BPE : base permanente des équipements de l'Insee """ #Imports nécessaires import geopandas as gpd import numpy as np import pandas as pd from shapely.geometry import Point #Téléchargement du fichier de l'Insee # Source : https://www.data.gouv.fr/datasets/base-permanente-des-equipements-1 fic="./tele/insee/BPE/BPE24.zip" bpe=pd.read_csv(fic, compression='zip',sep=';',decimal=",",usecols=["NOMRS","CNOMRS","SDOM","TYPEQU","SIRET","LAMBERT_X","LAMBERT_Y", "EPSG","QUALITE_GEOLOC","DCIRIS"], dtype={"SIRET":"str"}) #On enlève les équipements sans géocodage bpe=bpe.dropna(subset=["LAMBERT_X"]) bpe=bpe.dropna(subset=["LAMBERT_Y"]) bpe["LAMBERT_X"]=bpe["LAMBERT_X"].astype(float) bpe["LAMBERT_Y"]=bpe["LAMBERT_Y"].astype(float) #Limiter à l'EPSG=2154 (métropole) bpe=bpe[bpe["EPSG"]==2154] bpe['geometry'] = bpe.apply(lambda row: Point(row['LAMBERT_X'], row['LAMBERT_Y']), axis=1) bpe_gdf = gpd.GeoDataFrame(bpe, geometry='geometry') bpe_gdf=bpe_gdf.set_crs("EPSG:2154") # Les coordonnées sont à la base en Lambert93 bpe_gdf=bpe_gdf.to_crs(3035) # Transfert en projection Inspire LAEA (ESPG 3035) bpe_gdf["LAEA_X"]=bpe_gdf.geometry.x bpe_gdf["LAEA_Y"]=bpe_gdf.geometry.y def base_1000(x) : return (1000*np.floor(x/1000)) bpe_1000=bpe_gdf.loc[:,("SDOM","LAEA_X","LAEA_Y")] bpe_1000["X_centre"]=base_1000(bpe_1000["LAEA_X"]).astype(int) bpe_1000["Y_centre"]=base_1000(bpe_1000["LAEA_Y"]).astype(int) bpe_1000["domaine"]=bpe_1000["SDOM"].str[0] bpe_1000=pd.pivot_table(bpe_1000.loc[:,("X_centre","Y_centre","domaine","LAEA_X")],index=("X_centre","Y_centre"),columns="domaine",values="LAEA_X",aggfunc="count") bpe_1000_code=bpe_1000 bpe_tempo=bpe_1000_code.index.to_frame(index=False) bpe_1000_code=pd.concat([bpe_tempo, bpe_1000_code.reset_index(drop=True)], axis=1) bpe_1000_code.info() bpe_1000_code["code_inspire"]="FR_CRS3035RES1000mN"+bpe_1000_code["Y_centre"].astype(str)+"E"+bpe_1000_code["X_centre"].astype(str) bpe_1000_code=bpe_1000_code.loc[:,("code_inspire","A","B","C","D","E","F")] bpe_1000_code=bpe_1000_code.set_index("code_inspire") fic="./resultats/bpe/bpe_1000_code.csv" bpe_1000_code.to_csv(fic) #Si besoin, export d'une petite partie bpe_gdf_test=bpe_gdf.loc[88550:89000,:] fic="./tele/insee/BPE/BPE_test.shp" bpe_gdf_test.to_file(fic) # Traitement d'un équipement particulier. Exemple : les pharmacies bpe_pharma_gdf=bpe_gdf[bpe_gdf["TYPEQU"]=="D307"] def base_1000(x) : return (1000*np.floor(x/1000)) bpe_pharma=bpe_pharma_gdf.loc[:,("TYPEQU","LAEA_X","LAEA_Y")] bpe_pharma["X_centre"]=base_1000(bpe_pharma["LAEA_X"]).astype(int) bpe_pharma["Y_centre"]=base_1000(bpe_pharma["LAEA_Y"]).astype(int) bpe_pharma=pd.pivot_table(bpe_pharma.loc[:,("X_centre","Y_centre","TYPEQU","LAEA_X")],index=("X_centre","Y_centre"),columns="TYPEQU",values="LAEA_X",aggfunc="count") bpe_pharma_code=bpe_pharma bpe_tempo=bpe_pharma_code.index.to_frame(index=False) bpe_pharma_code=pd.concat([bpe_tempo, bpe_pharma_code.reset_index(drop=True)], axis=1) bpe_pharma_code["code_inspire"]="FR_CRS3035RES1000mN"+bpe_pharma_code["Y_centre"].astype(str)+"E"+bpe_pharma_code["X_centre"].astype(str) bpe_pharma_code=bpe_pharma_code.loc[:,("code_inspire","D307")] bpe_pharma_code=bpe_pharma_code.set_index("code_inspire") fic="./resultats/bpe/bpe_pharma_code.csv" bpe_pharma_code.to_csv(fic) ############################################################################################### ############################################################################################### """ 1.2 DONNEES 2 : les établissements et emplois """ #Traitement des établissements #On part d'un fichier national Fichier Sirene à télécharger StockEtablissement_utf8.zip et à dézipper #https://www.data.gouv.fr/fr/datasets/base-sirene-des-entreprises-et-de-leurs-etablissements-siren-siret/ #séparation en plusieurs fichiers du fichier source pour arriver à le traiter. Attention, le pgm ne s'arrête pas. Il faut l'interrompre. chunk_size = 1000000 def write_chunk(part, lines): with open('./tele/etab/data_part_'+ str(part) +'.csv', 'w') as f_out: f_out.write(header) f_out.writelines(lines) with open("./tele/etab/StockEtablissement_utf8.csv", "r") as f: count = 0 header = f.readline() lines = [] for line in f: count += 1 lines.append(line) if count % chunk_size == 0: write_chunk(count // chunk_size, lines) lines = [] # write remainder if len(lines) > 0: write_chunk((count // chunk_size) + 1, lines) # Importations nécessaires import pandas as pd import numpy as np # Ecriture d'un fichier complet plus léger def is_float(x): try: float(x) except ValueError: return False return True for i in range(1,43) : # Attention : le chiffre final dépend du nombre de fichiers data_part fic="./tele/etab/data_part_"+str(i)+".csv" etab_temp=pd.read_csv(fic,usecols=["trancheEffectifsEtablissement","coordonneeLambertAbscisseEtablissement","coordonneeLambertOrdonneeEtablissement", "caractereEmployeurEtablissement"], dtype={'trancheEffectifsEtablissement':str, 'caractereEmployeurEtablissement':str}) etab_temp=etab_temp.loc[(etab_temp["caractereEmployeurEtablissement"]=='O') & (etab_temp["coordonneeLambertAbscisseEtablissement"].apply(lambda x: is_float(x))) & (etab_temp["coordonneeLambertOrdonneeEtablissement"].apply(lambda x: is_float(x)))] etab_temp=etab_temp.dropna() if i==1 : etab=etab_temp else : etab=pd.concat([etab,etab_temp]) etab=etab.loc[:,("trancheEffectifsEtablissement","coordonneeLambertAbscisseEtablissement","coordonneeLambertOrdonneeEtablissement")] etab.to_csv("./tele/etab/etab.csv") from shapely.geometry import Point import geopandas as gpd #Réimport etab fic="./tele/etab/etab.csv" etab=pd.read_csv(fic,usecols=["trancheEffectifsEtablissement","coordonneeLambertAbscisseEtablissement","coordonneeLambertOrdonneeEtablissement"], dtype={'trancheEffectifsEtablissement':str, 'coordonneeLambertAbscisseEtablissement':float, 'coordonneeLambertOrdonneeEtablissement':float}) etab=etab.loc[(etab["coordonneeLambertOrdonneeEtablissement"]>6000000) & (etab["coordonneeLambertOrdonneeEtablissement"]<7200000)] etab=etab.rename(columns={"coordonneeLambertAbscisseEtablissement":"X_centre","coordonneeLambertOrdonneeEtablissement":"Y_centre"}) etab['geometry'] = etab.apply(lambda row: Point(row['X_centre'], row['Y_centre']), axis=1) etab_gdf = gpd.GeoDataFrame(etab, geometry='geometry') etab_gdf=etab_gdf.set_crs("EPSG:2154") # Coordonnées en Lambert93 etab_gdf=etab_gdf.to_crs(3035) # Transfert en projection Inspire LAEA (ESPG 3035) etab_gdf["LAEA_X"]=etab_gdf.geometry.x etab_gdf["LAEA_Y"]=etab_gdf.geometry.y #Regroupement en dalles de 1000 sur la France et Corse etab_1000=etab_gdf.loc[:,("trancheEffectifsEtablissement","LAEA_X","LAEA_Y")] def centre_1000(x) : return (1000*np.floor(x/1000))+500 def effectif(x) : #Fonction d'estimation des effectifs if x=="00" : y=0 elif x=="01" : y=1 elif x=="02" : y=4 elif x=="11" : y=15 elif x=="12" : y=35 elif x=="21" : y=75 elif x=="22" : y=150 elif x=="31" : y=225 elif x=="32" : y=375 elif x=="41" : y=750 elif x=="42" : y=1500 elif x=="51" : y=3500 elif x=="52" : y=7500 elif x=="53" : y=10000 else : y=0 return y #Réduction à la France et Corse etab_1000["X_centre"]=centre_1000(etab_1000["LAEA_X"]) etab_1000["Y_centre"]=centre_1000(etab_1000["LAEA_Y"]) etab_1000["Effectifs"]=etab_1000["trancheEffectifsEtablissement"].apply(lambda x: effectif(x)) etab_1000=etab_1000.loc[etab_1000["Effectifs"]>0] etab_1000=etab_1000.loc[:,("Effectifs","X_centre","Y_centre")] etab_1000=etab_1000.groupby(["X_centre","Y_centre"],observed=True).sum() etab_1000=etab_1000.reset_index() etab_1000=etab_1000.loc[:,("X_centre","Y_centre","Effectifs")] fic="./tele/etab/etab_1000.csv" etab_1000.to_csv(fic) geometry2 = [Point(xy) for xy in zip(etab_1000['X_centre'], etab_1000['Y_centre'])] etab_gdf_1000=gpd.GeoDataFrame(etab_1000.loc[:,("Effectifs")], geometry=geometry2, crs="3035") fic2="./tele/etab/etab_1000.shp" etab_gdf_1000.to_file(fic2) etab_1000_code=etab_1000 etab_1000_code["X_centre"]=etab_1000_code["X_centre"].astype(int)-500 etab_1000_code["Y_centre"]=etab_1000_code["Y_centre"].astype(int)-500 etab_1000_code["code_inspire"]="FR_CRS3035RES1000mN"+etab_1000_code["Y_centre"].astype(str)+"E"+etab_1000_code["X_centre"].astype(str) etab_1000_code=etab_1000_code.loc[:,("code_inspire","Effectifs")] etab_1000_code=etab_1000_code.set_index("code_inspire") fic="./resultats/emplois/etab_1000_code.csv" etab_1000_code.to_csv(fic) #Génération d'un raster sur Etab #Cette partie sert juste à une visualisation du résultat sous forme de raster import rasterio from rasterio.features import rasterize # Charger le fichier vecteur avec Geopandas fic="./tele/etab/etab_1000.shp" vector_data = gpd.read_file(fic, engine="pyogrio") def bas(x,y) : return y*np.floor(x/y) def haut(x,y) : return y*np.ceil(x/y) # Définir les limites et la résolution du raster xmin1, ymin1, xmax1, ymax1 = vector_data.total_bounds xmin=bas(xmin1,1000) ymin=bas(ymin1,1000) xmax=haut(xmax1,1000) ymax=haut(ymax1,1000) print("xmin1 :",np.round(xmin1)," xmin :",np.round(xmin) ) print("xmax1 :",np.round(xmax1)," xmax :",np.round(xmax) ) print("ymin1 :",np.round(ymin1)," ymin :",np.round(ymin) ) print("ymax1 :",np.round(ymax1)," ymax :",np.round(ymax) ) cell_size = 1000 # Taille de cellule width = int((xmax - xmin) / cell_size) height = int((ymax - ymin) / cell_size) print("width : ",width) print("height : ",height) # Fonction qui transforme linéairement au-dessus de 100 jusqu'à 255 (limite) def transfo_256(x) : if x<100 : y=x else : y= 100 + round(155*(x/50000)) if y>255 : y=255 return y vector_data["Eff_256"]=vector_data["Effectifs"].apply(lambda x: transfo_256(x)) # Fonction pour ajouter les poids des points dans chaque cellule def generate_rasterize_shapes(df): return ((geom, weight) for geom, weight in zip(df.geometry, df["Eff_256"])) # Rasterisation rasterized = rasterize( generate_rasterize_shapes(vector_data), out_shape=(height, width), transform=rasterio.transform.from_bounds(xmin, ymin, xmax, ymax, width, height), fill=0, all_touched=True, merge_alg=rasterio.enums.MergeAlg.add # Somme des poids dans chaque cellule ) rasterized = rasterized.astype('uint8') # QGIS bloque sinon # Sauvegarde en GeoTIFF fic2="./resultats/emplois/raster_etab_1000.tif" with rasterio.open( fic2, "w", driver="GTiff", height=height, width=width, count=1, dtype=rasterized.dtype, crs=vector_data.crs, transform=rasterio.transform.from_bounds(xmin, ymin, xmax, ymax, width, height) ) as dst: dst.write(rasterized, 1) ############################################################################################### ############################################################################################### """ SECONDE PARTIE : RESEAUX """ ############################################################################################### ############################################################################################### """ 2.1 RESEAU 1 : gestion des trains Il s'agit de récupérer les données open data de la SNCF pour déterminer les temps de trajet entre gares, gares qui seront ramenées à leur localisation au 1000 m Calcul identique aux cars (2.2) avec spécificités sur le repère et l'affectation non univoque du type. """ # Imports nécessaires import geopandas as gpd import numpy as np import pandas as pd from shapely.geometry import Point from datetime import datetime, time def centre_1000(x) : return int(1000*np.floor(x/1000)) def lecture_y_1000(a) : y=a[19:26] return y def lecture_x_1000(a) : x=a[27:34] return x def formule(A,B) : texte="LINESTRING ("+str(lecture_x_1000(A))+" "+str(lecture_y_1000(A))+","+str(lecture_x_1000(B))+" "+str(lecture_y_1000(B))+")" return texte def calcul_trajets_sncf(nom,date_cible=20260310) : repertoire="./tele/" fic=repertoire+nom+"/stop_times.txt" print("Commencement du calcul de "+nom) #Récupération des horaires stop_times=pd.read_csv(fic, usecols=["trip_id","arrival_time","departure_time","stop_id","stop_sequence"],parse_dates=["arrival_time","departure_time"], date_format="%H:%M:%S",dtype={'trip_id':str,'stop_id': str, 'stop_sequence': np.int32}) stop_times["arrival_time"]=pd.to_datetime(stop_times["arrival_time"], format="%H:%M:%S", errors='coerce') stop_times["departure_time"]=pd.to_datetime(stop_times["departure_time"], format="%H:%M:%S", errors='coerce') horaire_6 = datetime.strptime("06:30:00","%H:%M:%S").time() horaire_9 = datetime.strptime("08:30:00","%H:%M:%S").time() horaire_17 = datetime.strptime("16:30:00","%H:%M:%S").time() horaire_19 = datetime.strptime("18:30:00","%H:%M:%S").time() #récupération des parcours fic=repertoire+nom+"/trips.txt" trips=pd.read_csv(fic, usecols=["route_id","service_id","trip_id"],dtype={'route_id': str, 'service_id': str,'trip_id':str}) #récupération des calendriers """ A ce stade, il faut calculer les services actifs pour la date cible (située dans les bornes de calendar et non supprimée par calendar_dates (exception_type=1 : ok roule mais 2 : ko ne roule pas). Puis ensuite, récupérer les "trip_id" actifs liés à ces "services_id" actifs, pour aboutir aux stop_times concernés """ fic=repertoire+nom+"/calendar.txt" #Si le fichier existe, on l'importe, sinon on en crée un vide. try : calendar=pd.read_csv(fic, usecols=["service_id","tuesday","start_date","end_date"], dtype={"service_id": str,"start_date":np.int32,"end_date":np.int32}) except : calendar=pd.DataFrame(columns=["service_id","tuesday","start_date","end_date"]) fic=repertoire+nom+"/calendar_dates.txt" calendar_dates=pd.read_csv(fic, usecols=["service_id","date","exception_type"], dtype={'service_id': str,'date':np.int32}) calendar=calendar[(calendar["start_date"]date_cible) & (calendar["tuesday"]==1)] calendar_dates_1=calendar_dates[(calendar_dates["date"]==date_cible) & (calendar_dates["exception_type"]==1)] calendar_dates_2=calendar_dates[(calendar_dates["date"]==date_cible) & (calendar_dates["exception_type"]==2)] calendar_total = calendar[~calendar["service_id"].isin(calendar_dates_2['service_id'])] services_cible=pd.concat([calendar_total.loc[:,"service_id"],calendar_dates_1.loc[:,"service_id"]], ignore_index=True) services_cible=services_cible.drop_duplicates() trips_cible=trips[trips["service_id"].isin(services_cible)] trips_cible=trips_cible.drop_duplicates() stop_times=stop_times[stop_times["trip_id"].isin(trips_cible["trip_id"])] stop_times=stop_times.reset_index(drop=True) print(len(stop_times)) #Fabrication dfrom datetime import datetime, timees itinéraires élémentaires (départ arrivée) avec calcul du temps de trajet j=0 trajets=pd.DataFrame(columns=["A","B","duree","heure"]) for i in range(1,len(stop_times)) : #if i%100000==0 : print(i) #La description des trajets peut commencer à 0, 1 ou 2,... Le paramètre retenu est qu'il y ait continuité de séquence et de parcours if (stop_times.loc[i,"stop_sequence"]==stop_times.loc[i-1,"stop_sequence"]+1) & (stop_times.loc[i,"trip_id"]==stop_times.loc[i-1,"trip_id"]) :# & cond : trajets.loc[j,"A"]=stop_times.loc[i-1,"stop_id"] trajets.loc[j,"B"]=stop_times.loc[i,"stop_id"] trajets.loc[j,"duree"]=pd.to_timedelta((stop_times.loc[i,"departure_time"]- stop_times.loc[i-1,"departure_time"]), unit='s').total_seconds() try : if ((stop_times.loc[i-1,"departure_time"].time()>horaire_6) and (stop_times.loc[i-1,"departure_time"].time()horaire_17) and (stop_times.loc[i-1,"departure_time"].time()=date_cible) & (calendar["tuesday"]==1)] calendar_dates_1=calendar_dates[(calendar_dates["date"]==date_cible) & (calendar_dates["exception_type"]==1)] calendar_dates_2=calendar_dates[(calendar_dates["date"]==date_cible) & (calendar_dates["exception_type"]==2)] calendar_total = calendar[~calendar["service_id"].isin(calendar_dates_2['service_id'])] services_cible=pd.concat([calendar_total.loc[:,"service_id"],calendar_dates_1.loc[:,"service_id"]], ignore_index=True) services_cible=services_cible.drop_duplicates() trips_cible=trips[trips["service_id"].isin(services_cible)] trips_cible=trips_cible.drop_duplicates() stop_times=stop_times[stop_times["trip_id"].isin(trips_cible["trip_id"])] stop_times=stop_times.reset_index(drop=True) print(len(stop_times)) #Fabrication des itinéraires élémentaires (départ arrivée) avec calcul du temps de trajet j=0 trajets=pd.DataFrame(columns=["A","B","duree","heure"]) for i in range(1,len(stop_times)) : #if i%100000==0 : print(i) #La description des trajets peut commencer à 0, 1 ou 2,... Le paramètre retenu est qu'il y ait continuité de séquence et de parcours if (stop_times.loc[i,"stop_sequence"]==stop_times.loc[i-1,"stop_sequence"]+1) & (stop_times.loc[i,"trip_id"]==stop_times.loc[i-1,"trip_id"]) :# & cond : trajets.loc[j,"A"]=stop_times.loc[i-1,"stop_id"] trajets.loc[j,"B"]=stop_times.loc[i,"stop_id"] trajets.loc[j,"duree"]=pd.to_timedelta((stop_times.loc[i,"departure_time"]- stop_times.loc[i-1,"departure_time"]), unit='s').total_seconds() try : if ((stop_times.loc[i-1,"departure_time"].time()>horaire_6) and (stop_times.loc[i-1,"departure_time"].time()horaire_17) and (stop_times.loc[i-1,"departure_time"].time()3000000) & (lignes_route_1["depart_x"]<4300000)] # Pour bien limiter à la France Metropolitaine + Corse G_route = nx.from_pandas_edgelist(lignes_route_1, "point1", "point2",["duree","longueur"]) # A partir de là, on va essayer de raffiner le graphe G_route en regroupant les arêtes si le noeud n'a que deux voisins # et si ce n'est pas un result_point # Fonction de contraction : def contract_nodes(G, node): # Identifier neighbors = list(G.neighbors(node)) duree=0 longueur=0 for neighbor in neighbors: duree=duree+G[node][neighbor]["duree"] longueur=longueur+G[node][neighbor]["longueur"] G.remove_edge(node, neighbor) G.remove_node(node) G.add_edge(neighbors[0],neighbors[1],duree=duree,longueur=longueur) return G #Parcours du graphe for i in range(0,8) : # boucle 6 fois liste=list(G_route.nodes()) for node in liste : if G_route.degree(node)!=2 or node in list(result_points.index) : continue else : contract_nodes(G_route,node) # Préparation de l'export export_lignes_route_1 = nx.to_pandas_edgelist(G_route) def coord_code(node) : y=int(node[19:26])+500 x=int(node[27:34])+500 return (x,y) def linestring(source,target) : x1,y1=coord_code(source) x2,y2=coord_code(target) texte="LINESTRING ("+str(x1)+" "+str(y1)+","+str(x2)+" "+str(y2)+")" return texte export_lignes_route_1["geometrie"]=export_lignes_route_1.apply(lambda row: linestring(row["source"],row["target"]), axis=1) #Export de route_1 avec un champ WKT (Well Known Text) pour des lignes entre le départ et l'arrivée fic="./tele/bdcarto/export_lignes_route_1.csv" export_lignes_route_1.to_csv(fic) ############################################################################################### ############################################################################################### """ 2.4 RESEAU 4 : gestion des routes locales Il s'agit de traiter les routes locales dans une logique réseau On commence par identifier les carrefours les plus importants et proches de la grille de base, puis s'il n'y a pas de carrefour, on coupe la route au niveau du point le plus proche d'un point de base Puis on vient simplifier le graphe par des méthodes de contraction. """ import geopandas as gpd import numpy as np import pandas as pd import networkx as nx #Import des routes de la Bd carto fic="./tele/bdcarto/TRONCON_ROUTE_BDCARTO.shp" roads = gpd.read_file(fic, engine="pyogrio") # Sélection avec nature de type routière ou bac (on a enlevé le type autoroutier traité au 2.3) roads=roads.loc[roads["nature"].isin(["Bac ou liaison maritime","Bretelle","Route à 1 chaussée","Route à 2 chaussées","Route empierrée"])] # On affecte une vitesse minimale de 20 (car certaines routes à 0 dans le fichier IGN) roads.loc[roads["vitesse_mo"]<20,"vitesse_mo"]=20 # Réindexation roads.reset_index(drop=True,inplace=True) roads=roads.to_crs(3035) # Transfert en projection Inspire # Processus par dalle from shapely.ops import nearest_points, split from shapely.ops import substring from shapely.geometry import Point, MultiPoint, LineString # définition de fonctions pour le processus suivant def arrondi_10(x) : y=np.round(x, decimals=-1) return y # Codage du point pour le repérer avec une seule variable (point arrondi au préalable à 10 m près) def codage_10(point) : calcul=(point[0]*100000)+(point[1])/10 return calcul # Codage d'un point non arrondi def codage2_10(x,y) : calcul=((arrondi_10(x))*100000)+(arrondi_10(y))/10 return calcul def codage_1000(x,y) : calcul="FR_CRS3035RES1000mN"+str(y-500)+"E"+str(x-500) return calcul # lecture d'un code_10 def coord_code_10(node) : a=int(np.floor(node/1000000)) b=int(np.mod(node,1000000)) x=a*10 y=b*10 return (x,y) # pour écrire une ligne en WKT (Well Known Text) à partir de points codés def linestring(source,target) : x1,y1=coord_code_10(source) x2,y2=coord_code_10(target) texte="LINESTRING ("+str(x1)+" "+str(y1)+","+str(x2)+" "+str(y2)+")" return texte # Distance entre noeuds du graphe def distance_noeuds(G,node1,node2) : x1,y1=coord_code_10(node1) x2,y2=coord_code_10(node2) point1 = Point(x1, y1) point2 = Point(x2, y2) distance = point1.distance(point2) return distance # Fonction de contraction de graphe (on enlève les noeuds inutiles en cumulant les temps et en gardant les points_routes def contract_nodes(G, node): # Identifier les voisins neighbors = list(G.neighbors(node)) temps0=G[node][neighbors[0]]["temps"] longueur0=G[node][neighbors[0]]["longueur"] G.remove_edge(node, neighbors[0]) temps1=G[node][neighbors[1]]["temps"] longueur1=G[node][neighbors[1]]["longueur"] G.remove_edge(node, neighbors[1]) G.remove_node(node) G.add_edge(neighbors[0],neighbors[1],temps=temps0+temps1,longueur=longueur0+longueur1) return G """ # variante def contract_nodes(G, node): # Identifier neighbors = list(G.neighbors(node)) temps=0 longueur=0 for neighbor in neighbors: temps=temps+G[node][neighbor]["temps"] longueur=longueur+G[node][neighbor]["longueur"] G.remove_edge(node, neighbor) G.remove_node(node) G.add_edge(neighbors[0],neighbors[1],temps=temps,longueur=longueur) return G """ # Fonction de dilatation des noeuds de 3 : on éclate le noeud pour le supprimer def dilate_nodes_3(G, node): # Identifier les voisins neighbors = list(G.neighbors(node)) temps0=G[node][neighbors[0]]["temps"] longueur0=G[node][neighbors[0]]["longueur"] G.remove_edge(node, neighbors[0]) temps1=G[node][neighbors[1]]["temps"] longueur1=G[node][neighbors[1]]["longueur"] G.remove_edge(node, neighbors[1]) temps2=G[node][neighbors[2]]["temps"] longueur2=G[node][neighbors[2]]["longueur"] G.remove_edge(node, neighbors[2]) G.remove_node(node) G.add_edge(neighbors[0],neighbors[1],temps=temps0+temps1,longueur=longueur0+longueur1) G.add_edge(neighbors[0],neighbors[2],temps=temps0+temps2,longueur=longueur0+longueur2) G.add_edge(neighbors[1],neighbors[2],temps=temps1+temps2,longueur=longueur1+longueur2) return G # Fonction pour aimanter un noeud vers un noeud proche. Attention, on aimante en fonction des points de base def aimant_node(G,node,points_routes) : # Identification du point proche x,y=coord_code_10(node) points_filt = points_routes[ (points_routes["x_base"] >= x - 500) & (points_routes["x_base"] <= x + 500) & (points_routes["y_base"] >= y - 500) & (points_routes["y_base"] <= y + 500) ] # On prend le premier node2=points_filt.first_valid_index() #Changement des arêtes concernées en affectant node2 à la place de node, si node2 existe if node2 : neighbors = list(G.neighbors(node)) for neighbor in neighbors: temps=G[node][neighbor]["temps"] longueur=G[node][neighbor]["longueur"] G.remove_edge(node, neighbor) if node2 != neighbor : dist=distance_noeuds(G,node2,neighbor) # On ajuste si la distance*1.2 est supérieure à la longueur recensée if dist*1.2 > longueur : temps=temps*dist*1.2/longueur longueur=dist*1.2 G.add_edge(node2,neighbor,temps=temps,longueur=longueur) #Suppression du noeud G.remove_node(node) return G #Fonction pour enlever les noeuds qui ne sont pas des points proches def delete_node(G,node,points_routes) : liste=list(G_route.nodes()) for node in liste : if node in list(points_routes.index) : continue else : neighbors = list(G.neighbors(node)) for neighbor in neighbors: G.remove_edge(node, neighbor) G.remove_node(node) return G #Limites en projection Inspire : #Xmin : 3200000 #Xmax : 4200000 sans Corse, 4300000 avec Corse #Ymin : 2000000 avec Corse, 2100000 sans Corse #Ymax : 3200000 xinit=3200000 yinit=2000000 graphe_total=[] lignes_total=[] compteur=0 for i in range(0,111) : # 0-111 total / 60-65 test for j in range(0,121) : # 0-121 total/ 70-75 test xmin=xinit+(10000*i) xmax=xmin+1000+10000 # ymin=yinit+(10000*j) ymax=ymin+1000+10000 # # On sélectionne roads avec une marge de 2 km roads_extrait=roads.cx[xmin-2000:xmax+2000,ymin-2000:ymax+2000] # Si l'extrait est vide, on passe au suivant if roads_extrait.empty : continue roads_geom=roads_extrait.geometry.union_all() #On crée une grille des points espacés de 1000 m au centre des carrés kilométriques points = [] for x in range(xmin, xmax, 1000): for y in range(ymin, ymax, 1000): points.append(Point(x+500, y+500)) grille = gpd.GeoDataFrame(geometry=points) grille.crs = 'EPSG:3035' ############ # Segmentation des routes suivant les points les plus proches de la grille # On va d'abord chercher les points sur la route qui soient les plus proches de la grille # en écartant ceux qui sont à plus de 250 m en x ou en y (carré autour de chaque point de la grille) # On obtient un fichier que l'on appelle points_routes. Ces points sont en référence à la grille. #Repérer d'abord # 1. les carrefours, prendre celui de degré le plus important, sinon le plus proche # 2. en l'absence de carrefour, prendre le point le plus proche pts_on_roads = [] route_ids = [] dx_list = [] dy_list = [] x_base_list = [] y_base_list = [] for l, pt in grille.iterrows(): x_base = pt.geometry.x y_base = pt.geometry.y roads_grille=roads_extrait.cx[x_base-500:x_base+500,y_base-500:y_base+500] if roads_grille.empty : continue # Extraire les nœuds (début et fin de chaque ligne) noeuds = [] for geom in roads_grille.geometry: coords = list(geom.coords) noeuds.append(Point(coords[0])) noeuds.append(Point(coords[-1])) noeuds_gdf = gpd.GeoDataFrame(geometry=noeuds, crs=roads_grille.crs) noeuds_df = pd.DataFrame(noeuds_gdf.geometry.apply(lambda p: (p.x, p.y))) noeuds_df.columns = ["coords"] # Compter combien de segments y arrivent noeud_degre = noeuds_df.value_counts() # Sélection des intersections les plus importantes de degré >3 intersection_nodes = noeud_degre[(noeud_degre==noeud_degre.max()) & (noeud_degre >= 3)] # Si cette série n'est pas vide, on récupère le point le plus près du point originel if intersection_nodes.empty==False : multi_point = MultiPoint(intersection_nodes.index) p_on_line, _ = nearest_points(multi_point, pt.geometry) pts_on_roads.append(p_on_line) route_ids.append(0) x_base_list.append(x_base) y_base_list.append(y_base) # Sinon, on vient chercher le point le plus proche else : # trouver la route la plus proche nearest_idx = roads_extrait.distance(pt.geometry).idxmin() line = roads_extrait.loc[nearest_idx].geometry # point projeté SUR la route p_on_line, _ = nearest_points(line, pt.geometry) dx = abs(pt.geometry.x - p_on_line.x) dy = abs(pt.geometry.y - p_on_line.y) x_base = pt.geometry.x y_base = pt.geometry.y # filtre pour écarter les points à plus de 250 m en x et en y if dx < 250 and dy < 250: pts_on_roads.append(p_on_line) route_ids.append(nearest_idx) x_base_list.append(x_base) y_base_list.append(y_base) # GeoDataFrame des points projetés gardés points_routes = gpd.GeoDataFrame( {"route_id": route_ids, "x_base": x_base_list, "y_base": y_base_list}, geometry=pts_on_roads, crs=roads_extrait.crs ) # On segmente ensuite le fichier routes au niveau des points_routes en conservant les attributs segments = [] for idx, route in roads_extrait.iterrows(): line = route.geometry # points associés à cette route pts = points_routes.loc[points_routes["route_id"] == idx, "geometry"].tolist() # distances le long de la ligne distances = sorted(set([round(line.project(p), 3) for p in pts])) # si aucun point → on garde la ligne entière if len(distances) == 0 : data = route.drop("geometry").to_dict() data["geometry"] = line segments.append(data) continue prev = 0 for d in distances: seg = substring(line, prev, d) if not seg.is_empty and seg.length > 0: data = route.drop("geometry").to_dict() # garde tous les attributs data["geometry"] = seg segments.append(data) prev = d # dernier segment if prev < line.length: seg = substring(line, prev, line.length) if not seg.is_empty and seg.length > 0: data = route.drop("geometry").to_dict() data["geometry"] = seg segments.append(data) segmented_roads = gpd.GeoDataFrame(segments, crs=roads_extrait.crs) # On indexe le fichier points_routes avec un code points_routes["x"]=points_routes.geometry.x points_routes["y"]=points_routes.geometry.y points_routes["index_geo"]=codage2_10(points_routes["x"],points_routes["y"]) points_routes["index_geo"]=points_routes["index_geo"].astype(int) points_routes=points_routes.set_index("index_geo") # On reprend aussi le code du point de 1000 correspondant CELA POSE PROBLEME points_routes["code_1000"]=codage_1000(points_routes["x_base"],points_routes["y_base"]) #points_routes["code_500"]=points_routes["code_500"].astype(int) # A supprimer ##################### # Création du graphe networkx # On va d'abord simplifier le fichier segmented_roads en ne gardant que les points de début et fin # et en ajoutant une variable longueur puis d'autres variables concernant les points initiaux et finaux # et le temps de traversée start = segmented_roads.geometry.apply(lambda g: g.coords[0]) end = segmented_roads.geometry.apply(lambda g: g.coords[-1]) start = start.apply(lambda p: (arrondi_10(p[0]), arrondi_10(p[1]))) end = end.apply(lambda p: (arrondi_10(p[0]), arrondi_10(p[1]))) new_geometry = [LineString([s, e]) for s, e in zip(start, end)] lignes_route = segmented_roads.copy() lignes_route.geometry = new_geometry lignes_route["longueur"]=segmented_roads['geometry'].length lignes_route["point1"]=start.apply(lambda p: codage_10(p)) # Codage du point pour le repérer avec une seule variable lignes_route["point1"]=lignes_route["point1"].astype('int') lignes_route["point2"]=end.apply(lambda p: codage_10(p)) lignes_route["point2"]=lignes_route["point2"].astype('int') #Calcul du temps de traversée lignes_route["temps"]=(lignes_route["longueur"]*3600)/(lignes_route["vitesse_mo"]*1000) #Calcul du temps de traversée # Suppression des lignes ayant même point initial et final lignes_route=lignes_route[lignes_route["point1"]!=lignes_route["point2"]] # Suppression des duplicatas lignes_route=lignes_route.drop("fid",axis=1) lignes_route=lignes_route.drop_duplicates(keep='first') # Création du graphe G_route = nx.from_pandas_edgelist(lignes_route, "point1", "point2",["temps","longueur"]) G_route = nx.Graph(G_route, duplicates=False) # On contracte les noeuds for k in range(0,1) : # boucle 6 fois liste=list(G_route.nodes()) for node in liste : if G_route.degree(node)!=2 or node in list(points_routes.index) : continue else : contract_nodes(G_route,node) # Cette fois-ci on ne garde que ceux ayant trois degrés et n'étant pas dans points_routes. for k in range(0,5) : # boucle 6 fois liste=list(G_route.nodes()) for node in liste : if G_route.degree(node)!=3 or node in list(points_routes.index) : continue else : dilate_nodes_3(G_route,node) # On amène les carrefours restants sur un point proche s'il y en a un liste=list(G_route.nodes()) for node in liste : if G_route.degree(node)<2 or node in list(points_routes.index) : continue else : aimant_node(G_route,node,points_routes) # On achève en éliminant tous les noeuds qui ne sont pas des points proches delete_node(G_route,node,points_routes) #Transformation du graphe en dataframe graphe_df = nx.to_pandas_edgelist(G_route) compteur=compteur+1 if compteur == 1 : graphe_total=graphe_df #lignes_total=lignes_route # A supprimer après test #segments_total=segmented_roads # A supprimer après test points_total=points_routes else : graphe_total=pd.concat([graphe_total,graphe_df]) #lignes_total=pd.concat([lignes_total,lignes_route]) # A supprimer après test #segments_total=pd.concat([segments_total,segmented_roads]) points_total=pd.concat([points_total,points_routes]) # Ajout de la géométrie #graphe_df["geometrie"]=graphe_df.apply(lambda row: linestring(row["source"],row["target"]), axis=1) print(i) graphe_total.reset_index(drop=True,inplace=True) #lignes_total=lignes_total.drop_duplicates(keep='first') # A supprimer après test # Export du graphe fic="./resultats/graphe_route/graphe_total.csv" graphe_total.to_csv(fic) #Export des points points_export=points_total.loc[:,("code_1000","x_base","y_base")] fic="./resultats/graphe_route/points_total.csv" points_export.to_csv(fic) print("export graphe") # Ajout de la géométrie graphe_total["geometrie"]=graphe_total.apply(lambda row: linestring(row["source"],row["target"]), axis=1) fic="./resultats/graphe_route/graphe_total_geo.csv" graphe_total.to_csv(fic) #Réimport si besoin fic="./resultats/graphe_route/graphe_total.csv" graphe_total=pd.read_csv(fic) #Réimport si besoin fic="./resultats/graphe_route/points_total.csv" points_total=pd.read_csv(fic) points_total=points_total.set_index("index_geo") # Mises au format et export graphe_total["temps"]=graphe_total["temps"].astype('int') graphe_total["longueur"]=graphe_total["longueur"].astype('int') graphe_total["source"]=graphe_total["source"].astype('int') graphe_total["target"]=graphe_total["target"].astype('int') graphe_total.drop(['Unnamed: 0'], axis=1, inplace=True) graphe_total=graphe_total.drop_duplicates(keep='first') graphe_total.reset_index(drop=True,inplace=True) def inspire_code_10(x) : a=10*(x//1000000) #a=10000*np.floor(a/1000) a=1000*(a//1000) b=10*(x%1000000) b=1000*(b//1000) calcul="FR_CRS3035RES1000mN"+str(b)+"E"+str(a) return calcul graphe_total["source_inspire"]=graphe_total["source"].apply(inspire_code_10) graphe_total["target_inspire"]=graphe_total["target"].apply(inspire_code_10) graphe_1000=graphe_total.loc[:,("source_inspire","target_inspire","temps","longueur")] graphe_1000=graphe_1000.rename(columns={"temps":"duree"}) graphe_1000=graphe_1000.drop_duplicates(keep='first') fic="./resultats/graphe_route/graphe_1000.csv" graphe_1000.to_csv(fic) ############################################################################################### ############################################################################################### """ 2.5 RESEAU 5 : agrégation Il s'agit de ramener les fichiers de graphes (trains, cars, autoroutes, routes) en un seul. """ #Imports nécessaires import geopandas as gpd import numpy as np import pandas as pd from shapely.ops import nearest_points, split from shapely.ops import substring from shapely.geometry import Point, MultiPoint, LineString import networkx as nx # lecture d'un code_1000 def coord_code_1000(node) : b=int(node[19:26])+500 a=int(node[27:34])+500 return (a,b) # Distance entre noeuds du graphe def distance_noeuds(node1,node2) : x1,y1=coord_code_1000(node1) x2,y2=coord_code_1000(node2) point1 = Point(x1, y1) point2 = Point(x2, y2) distance = point1.distance(point2) return distance # pour écrire une ligne en WKT (Well Known Text) à partir de points codés def linestring_1000(source,target) : x1,y1=coord_code_1000(source) x2,y2=coord_code_1000(target) texte="LINESTRING ("+str(x1)+" "+str(y1)+","+str(x2)+" "+str(y2)+")" return texte # traitement des routes locales fic="./resultats/graphe_route/graphe_1000.csv" graphe_route = pd.read_csv(fic) graphe_route.drop(['Unnamed: 0'], axis=1, inplace=True) graphe_route.rename(columns={"source_inspire": "source"},inplace=True) graphe_route.rename(columns={"target_inspire": "target"},inplace=True) graphe_route["nbre"]=0 graphe_route["type"]="R" graphe_route["heure"]="NC" graphe_route.info() RangeIndex: 835568 entries, 0 to 835567 Data columns (total 7 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 source 835568 non-null object 1 target 835568 non-null object 2 duree 835568 non-null int64 3 longueur 835568 non-null int64 4 nbre 835568 non-null int64 5 type 835568 non-null object 6 heure 835568 non-null object dtypes: int64(3), object(4) memory usage: 44.6+ MB # traitement des autoroutes fic="./resultats/graphe_autoroute/graphe_autoroute.csv" graphe_auto = pd.read_csv(fic) graphe_auto.drop(['Unnamed: 0'], axis=1, inplace=True) graphe_auto.drop(['geometrie'], axis=1, inplace=True) # A vérifier : pourquoi existe-t-il des "inf" dans le calcul ? graphe_auto.replace([np.inf, -np.inf], 10000, inplace=True) graphe_auto["duree"]=graphe_auto["duree"].astype('int') graphe_auto["longueur"]=graphe_auto["longueur"].astype('int') graphe_auto["nbre"]=0 graphe_auto["type"]="A" graphe_auto["heure"]="NC" # traitement des trains fic="./resultats/graphe_train/trajets.csv" graphe_train= pd.read_csv(fic) graphe_train.drop(['Unnamed: 0'], axis=1, inplace=True) graphe_train.rename(columns={"code_inspire_A": "source"},inplace=True) graphe_train.rename(columns={"code_inspire_B": "target"},inplace=True) graphe_train["longueur"]=graphe_train.apply(lambda row: distance_noeuds(row["source"],row["target"]), axis=1) graphe_train["longueur"]=graphe_train["longueur"].astype('int') # Traitement des TC fic="./resultats/graphe_tc/graphe_tc.csv" graphe_tc= pd.read_csv(fic) graphe_tc.drop(['Unnamed: 0'], axis=1, inplace=True) graphe_tc.rename(columns={"code_inspire_A": "source"},inplace=True) graphe_tc.rename(columns={"code_inspire_B": "target"},inplace=True) #Conditions liées à une erreur notamment de korrigo avec des chiffres négatifs... graphe_tc=graphe_tc[~graphe_tc['source'].str.contains('-', regex=True)] graphe_tc=graphe_tc[~graphe_tc['target'].str.contains('-', regex=True)] graphe_tc["longueur"]=graphe_tc.apply(lambda row: distance_noeuds(row["source"],row["target"]), axis=1) graphe_tc["longueur"]=graphe_tc["longueur"].astype('int') # Agrégation des quatre graphe_total=pd.concat([graphe_route, graphe_auto, graphe_train,graphe_tc]) graphe_total.reset_index(drop=True,inplace=True) fic="./resultats/graphe_total/graphe_total.csv" graphe_total.to_csv(fic) # Ajout de la géométrie graphe_total_geo=graphe_total graphe_total_geo["geometrie"]=graphe_total_geo.apply(lambda row: linestring_1000(row["source"],row["target"]), axis=1) fic="./resultats/graphe_total/graphe_total_geo.csv" graphe_total.to_csv(fic) ############################################################################################### ############################################################################################### """ TROISIEME PARTIE : CALCULS DE RESEAU ET DE CENTRALITE """ ############################################################################################### ############################################################################################### """ 3.1 CENTRALITE 1 : exemple de calculs, ici sur les emplois On recherche le nombre d'emplois situés à 15 minutes, et aussi le temps nécessaire pour accéder à 1000 emplois """ #Imporst nécessaires import geopandas as gpd import numpy as np import pandas as pd import networkx as nx #Import du graphe fic="./resultats/graphe_total/graphe_total.csv" graphe_total=pd.read_csv(fic) graphe_total.drop(['Unnamed: 0'], axis=1, inplace=True) #Il faudra vérifier le problème des durées négatives graphe_total=graphe_total[graphe_total["duree"]>0] #Calcul de duree_tc (durée en transport en commun) : #si TC : durée identique, si route : durée avec vitesse de 10 km/h (moyen terme entre à pieds ou à vélo). Ceci pour lisser les TC. for i in graphe_total.index : if graphe_total.loc[i,"type"]=="C" or graphe_total.loc[i,"type"]=="T" : graphe_total.loc[i,"duree_tc"]=graphe_total.loc[i,"duree"] else : graphe_total.loc[i,"duree_tc"]=graphe_total.loc[i,"longueur"]*36/100 # 3 600 secondes / 10 000 mètres # lecture d'un code_1000 def coord_code_y_1000(node) : a=int(node[19:26])+500 return a def coord_code_x_1000(node) : b=int(node[27:34])+500 return b graphe_total["source_x"]=graphe_total["source"].apply(coord_code_x_1000) graphe_total["source_y"]=graphe_total["source"].apply(coord_code_y_1000) # Création du graphe G = nx.from_pandas_edgelist(graphe_total, "source", "target",["duree","duree_tc","longueur","type"]) G = nx.Graph(G, duplicates=False) #Récupérer les données total_ajouts pour sommer les poids des noeuds concernés fic="./resultats/emplois/etab_1000_code.csv" total_500_ajouts=pd.read_csv(fic) total_500_ajouts=total_500_ajouts.set_index("code_inspire") #Limitation aux noeuds situés dans la variable "source" de graphe_total total_500_ajouts_emplois=total_500_ajouts[total_500_ajouts.index.isin(graphe_total["source"])] # dictionnaire {noeud: poids} poids_noeuds = { n: total_500_ajouts.loc[n, "Effectifs"] if n in total_500_ajouts.index else 0 for n in G.nodes } """ #Version plus concise poids_noeuds = ( total_500_ajouts["Emplois"] .reindex(G.nodes, fill_value=0) .to_dict() ) """ # affectation au graphe nx.set_node_attributes(G, poids_noeuds, "Effectifs") print(G.nodes["FR_CRS3035RES1000mN2929000E3212000"]["Effectifs"]) # Réponse attendue : 42 #parcours de l'index for i in total_500_ajouts_emplois.index : #print(i) #Bien vérifier s'il n'y a pas un décalage d'une unité. #Recherche des noeuds situés à moins de 15 minutes (900 secondes) length= nx.single_source_dijkstra_path_length(G, i, cutoff=900, weight="duree") #print(length.keys()) somme=0 #parcours sur ces noeuds pour sommer les emplois for j in length.keys() : try : ajout=total_500_ajouts.loc[j,"Effectifs"] ajout=np.nan_to_num(ajout, nan=0) somme+=ajout except : continue total_500_ajouts_emplois.loc[i,"Effectifs_15"]=somme def temps_pour_1000_emplois(G,poids="duree"): resultats = {} for noeud in G.nodes(): # 1. Calcul des plus courts chemins depuis 'noeud' distances = nx.single_source_dijkstra_path_length(G, noeud, weight=poids,cutoff=2700) # 2. Tri des noeuds par distance croissante noeuds_trie = sorted(distances.items(), key=lambda x: x[1]) # 3. Cumul des emplois cumul = 0 temps_max = 0 for (n, d) in noeuds_trie: cumul += G.nodes[n]["Effectifs"] temps_max = d if cumul >= 1000: break resultats[noeud] = temps_max return resultats # Exemple d'utilisation et export # On pourra successivement traiter la durée routière ("duree") et la durée en transport en commun (duree_tc) # Et nommer différemment le fichier exporté resultats = temps_pour_1000_emplois(G,"duree_tc") df_result = pd.DataFrame.from_dict(resultats, orient='index') df_result = df_result.rename(columns={0:"Temps_1000_emplois"}) df_result["source_x"]=df_result.index.map(coord_code_x_1000) df_result["source_y"]=df_result.index.map(coord_code_y_1000) fic="./resultats/test_centralite/temps_1000_emplois_tc.csv" df_result.to_csv(fic,sep=";",decimal=",") #Export du fichier des emplois à 15 mn total_500_ajouts_emplois["source_x"]=total_500_ajouts_emplois.index.map(coord_code_x_1000) total_500_ajouts_emplois["source_y"]=total_500_ajouts_emplois.index.map(coord_code_y_1000) total_500_ajouts_emplois["Effectifs_15"]=total_500_ajouts_emplois["Effectifs_15"].astype(int) fic="./resultats/test_centralite/total_500_ajouts_emplois.csv" total_500_ajouts_emplois.to_csv(fic,sep=";",decimal=",") ############################################################################################### ############################################################################################### """ QUATRIEME PARTIE : EXPLOITATION """ ############################################################################################### ############################################################################################### """ 4 EXPLOITATION : EXEMPLE SUR LES EMPLOIS Création d'un rsater avec trois variables : durée routière pour aboutir à 1000 emplois, durée en transport en commun, population """ #Imports nécessaires import pandas as pd import numpy as np import rasterio from rasterio.transform import from_origin #Import du fichier traité au 3, avec "duree" fic="./resultats/test_centralite/temps_1000_emplois.csv" temps_emploi=pd.read_csv(fic,sep=";",decimal=",",index_col=0) #Import du fichier traité au 3, avec "duree_tc" fic="./resultats/test_centralite/temps_1000_emplois_tc.csv" temps_emploi_tc=pd.read_csv(fic,sep=";",decimal=",",index_col=0) temps_emploi_tc = temps_emploi_tc.rename(columns={"Temps_1000_emplois":"Temps_1000_emplois_tc"}) temps_emploi_tc["Temps_1000_emplois_tc"]=temps_emploi_tc["Temps_1000_emplois_tc"].astype(int) #Import du fichier Inseeavec le carroyage de la population fic="./tele/insee/rp2021_carreaux_1km_csv.zip" insee=pd.read_csv(fic, compression='zip',sep=";",decimal=".",index_col=0) insee["pop"]=insee["pop"].astype(int) # Elaboration du dataframe support de la numérisation df_base=pd.merge(temps_emploi.loc[:,"Temps_1000_emplois"],insee.loc[:,"pop"],how="left",left_index=True,right_index=True) df=pd.merge(df_base,temps_emploi_tc.loc[:,"Temps_1000_emplois_tc"],how="left",left_index=True,right_index=True) # lecture d'un code_1000 def coord_code_y_1000(node) : a=int(node[19:26])+500 return a def coord_code_x_1000(node) : b=int(node[27:34])+500 return b df["x"]=df.index.map(coord_code_x_1000) df["y"]=df.index.map(coord_code_y_1000) df["pop"]=df["pop"].fillna(0) df["pop"]=df["pop"].astype(int) # 1. Paramètres de la grille x_min, x_max = df['x'].min(), df['x'].max() y_min, y_max = df['y'].min(), df['y'].max() resolution = 1000 # 1 km en mètres # 2. Taille du raster width = int((x_max - x_min) / resolution) + 1 height = int((y_max - y_min) / resolution) + 1 # 3. Tableau 3D initialisé avec np.nan raster = np.full((height, width, 3), np.nan, dtype=np.float32) # 4. Remplissage du tableau for _, row in df.iterrows(): col = int((row['x'] - x_min) / resolution) row_idx = int((y_max - row['y']) / resolution) if row['pop']!=0 : raster[row_idx, col, 0] = row['Temps_1000_emplois']#*row['pop'] raster[row_idx, col, 1] = row['Temps_1000_emplois_tc']#*row['pop'] raster[row_idx, col, 2] = row['pop'] # 5. Export en GeoTIFF output_file = './resultats/test_centralite/raster_emplois.tif' transform = from_origin(x_min, y_max, resolution, resolution) with rasterio.open( output_file, 'w', driver='GTiff', height=height, width=width, count=3, dtype=raster.dtype, crs='EPSG:3035', transform=transform, nodata=np.nan, ) as dst: for i in range(3): dst.write(raster[:, :, i], i + 1) print(f"Raster exporté : {output_file}")