Python应用指南:高德交通态势数据(一)
在现代城市的脉络中,交通流量如同流动的血液,交通流量的动态变化对出行规划和城市管理提出了更高的要求。为了应对这一挑战,高德地图推出了交通态势查询API,旨在为开发者提供一个强大的工具,用于实时获取指定区域或道路的交通状况。通过集成此API,应用和服务可以及时响应交通拥堵、事故等突发事件,从而帮助用户优化行程安排,提高出行效率,并为智能交通系统的构建贡献力量。本文聚焦于利用高德交通态势查询API中的矩形区域查询功能,探索如何高效地获取矩形范围内的路况信息和拥堵程度。
我们先来看一下官方的说明文档和参数配置:交通态势查询-高级 API 文档-开发指南-Web服务 API | 高德地图API
我们可以看到其中可以配置的参数有道路等级和矩形的左上右下的对角线坐标,这里的坐标使用的需要是GCJ-02,另外对角线距离不能超过10公里,矩形区域内交通态势查询:可选:JSON、XML,本文选择了JSON格式,方便后面导出为shp格式,道路等级一般选择6,这样获取的路况信息是最全的,有其他需求也可以自行调整;
roads可返回的所有可选道路信息;
先讲一下方法思路,一共三个步骤;
方法思路
- 通过python写一个根据中心点和半径生成左上右下的对角线坐标的脚本;
- 获取对角线坐标;
- 写入对角线坐标,并通过交通态势查询 API查询数据保存到shp文件中;
第一步:根据配置中心点坐标和半径来生成矩形,并打印左上右下的对角线坐标;
这个需要使用高德的坐标拾取器工具在地图上选择目标区域的中心点坐标,通过配置一下中心坐标和半径距离,工具将自动生成左下和右上的对角线坐标;
完整代码#运行环境 Python 3.11
from math import cos, radiansdef generate_rectangle(center_point, radius_km):"""通过中心点和半径生成矩形范围参数:center_point: 中心点坐标 (格式: "经度,纬度")radius_km: 半径(单位:公里)返回:rectangle: 矩形范围字符串 (格式: "左下角经度,左下角纬度;右上角经度,右上角纬度")"""try:# 解析中心点坐标center_lon, center_lat = map(float, center_point.split(','))# 经度1度约等于111公里,纬度1度约等于111*cos(纬度)公里# 将半径转换为度数lat_diff = radius_km / 111.0lon_diff = radius_km / (111.0 * cos(radians(center_lat)))# 计算矩形四个角的坐标min_lon = center_lon - lon_diff # 左下角经度min_lat = center_lat - lat_diff # 左下角纬度max_lon = center_lon + lon_diff # 右上角经度max_lat = center_lat + lat_diff # 右上角纬度# 格式化为字符串,保留6位小数rectangle = f"{min_lon:.6f},{min_lat:.6f};{max_lon:.6f},{max_lat:.6f}"return rectangleexcept Exception as e:print(f"生成矩形范围时出错: {str(e)}")return Noneif __name__ == "__main__":# 示例:上海某点坐标和1公里半径center_point = "121.446433,31.22321" # 中心点坐标radius = 2.0 # 半径2公里# 生成矩形范围rectangle = generate_rectangle(center_point, radius)if rectangle:print(f"中心点: {center_point}")print(f"半径: {radius}公里")print(f"矩形范围: {rectangle}")
将生成的左上和右下对角线坐标应用于下一个脚本中进行运行就可以得到shp文件,因为数据获取限制,获取半径最好不要超过5公里;
第二步:输入坐标点参数后,通过高德地图的交通态势查询API获取结果并导出为JSON格式,并由JSON数据将被转换为SHP文件导出,由于高德地图使用的是GCJ-02坐标系,因此在导出前脚本已自动将坐标转换为WGS84坐标系;
完整代码#运行环境 Python 3.11
import requests
import geopandas as gpd
from shapely.geometry import LineString
from datetime import datetime
import os
import math
import numpy as np
from shapely.ops import transform
from functools import partial# 坐标转换参数
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626 # π
a = 6378245.0 # 长半轴
ee = 0.00669342162296594323 # 扁率def gcj02towgs84(lng, lat):"""GCJ02(火星坐标系)转GPS84:param lng:火星坐标系的经度:param lat:火星坐标系纬度:return:"""if out_of_china(lng, lat):return lng, latdlat = transformlat(lng - 105.0, lat - 35.0)dlng = transformlng(lng - 105.0, lat - 35.0)radlat = lat / 180.0 * pimagic = math.sin(radlat)magic = 1 - ee * magic * magicsqrtmagic = math.sqrt(magic)dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)mglat = lat + dlatmglng = lng + dlngreturn [lng * 2 - mglng, lat * 2 - mglat]def transformlat(lng, lat):ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0return retdef transformlng(lng, lat):ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0return retdef out_of_china(lng, lat):"""判断是否在国内,不在国内不做偏移"""if lng < 72.004 or lng > 137.8347:return Trueif lat < 0.8293 or lat > 55.8271:return Truereturn Falsedef transform_geometry(geom):"""转换几何对象的坐标"""def transform_coords(x, y, z=None):wgs_x, wgs_y = gcj02towgs84(x, y)return wgs_x, wgs_yreturn transform(transform_coords, geom)def get_and_save_traffic_status(rectangle, key='你的key', output_dir='output'):try:# 初始化数据字典data = {'name': [],'status': [], # 路况状态码 (将转换为double)'direction': [],'angle': [],'speed': [],'geometry': []}# 构建API请求URLurl = f'https://restapi.amap.com/v3/traffic/status/rectangle?rectangle={rectangle}&output=json&extensions=all&key={key}&level=6'# 发送请求并获取JSON响应res = requests.get(url, timeout=10).json()# 遍历每条道路数据for road in res['trafficinfo']['roads']:try:polylines = [(float(y[0]), float(y[1])) for y in[x.split(',') for x in road['polyline'].split(';')]]# 创建线几何对象line = LineString(polylines)# 转换为WGS84坐标系wgs84_line = transform_geometry(line)data['geometry'].append(wgs84_line)data['name'].append(road.get('name', ''))# 将status转换为float类型data['status'].append(float(road.get('status', '0')))data['direction'].append(road.get('direction', ''))data['angle'].append(float(road.get('angle', 0)))data['speed'].append(int(road.get('speed', 0)))except:continue# 创建GeoDataFrame对象gdf = gpd.GeoDataFrame(data, geometry='geometry', crs='EPSG:4326')# 确保status列为float64类型gdf['status'] = gdf['status'].astype(np.float64)# 创建输出目录os.makedirs(output_dir, exist_ok=True)# 生成文件名filename = f'traffic_status_{datetime.now().strftime("%Y%m%d%H%M%S")}_wgs84.shp'# 保存为shapefileoutput_path = os.path.join(output_dir, filename)gdf.to_file(output_path, encoding='utf-8')print(f"已保存WGS84坐标系数据到: {output_path}")except Exception as e:print(f"处理出错: {str(e)}")if __name__ == "__main__":rectangle = '121.425363,31.205192;121.467503,31.241228' # 替换生成的坐标get_and_save_traffic_status(rectangle)
这里结果会保存在脚本所在目录的output/traffic_status_2025xxxxxx_wgs84.shp,直接把导出结果在arcgis加载即可,其中的status字段是路况字段,在可视化的过程中,符号系统字段选择status字段,这样我们就可以在图上直观的看出拥堵的位置区域;
status | 路况 | 0:未知;1:畅通;2:缓行;3:拥堵 |
数据更新频率参考高德的拥堵延时指数应该是5分钟自动刷新,可以通过增加脚本定时自动运行的逻辑来实现全时段监测;
文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。