255 lines
9.1 KiB
Python
255 lines
9.1 KiB
Python
import sys
|
||
import ezdxf
|
||
import geojson
|
||
import json
|
||
import os
|
||
from datetime import datetime
|
||
from pydantic import BaseModel
|
||
from pyproj import Transformer
|
||
from pyproj.exceptions import ProjError
|
||
from shapely.geometry import LineString, Point, Polygon, mapping
|
||
from shapely.geometry.polygon import LinearRing
|
||
import math
|
||
|
||
|
||
# ------------------------------ 基础模型与工具类 ------------------------------
|
||
class ConversionParams(BaseModel):
|
||
"""坐标转换参数模型(Pydantic 数据验证)"""
|
||
dxf_abs_path: str # DXF 文件绝对路径
|
||
result_json_abs_path: str # 结果 JSON 文件绝对路径
|
||
source_epsg: int # 源坐标系 EPSG 代码
|
||
target_epsg: int # 目标坐标系 EPSG 代码
|
||
|
||
|
||
class ResponseUtil:
|
||
"""响应结果工具类"""
|
||
@staticmethod
|
||
def success(message: str = "操作成功"):
|
||
return {"code": 200, "message": message}
|
||
|
||
@staticmethod
|
||
def error(message: str):
|
||
return {"code": 500, "message": message}
|
||
|
||
|
||
# ------------------------------ DXF 解析核心函数 ------------------------------
|
||
def return_error(code, message):
|
||
"""返回错误信息结构(原 parse_dxf.py 功能)"""
|
||
return {
|
||
"code": code,
|
||
"message": message
|
||
}
|
||
|
||
|
||
def arc_to_linestring(e, segments=36):
|
||
"""将圆弧转换为 LineString(原 parse_dxf.py 功能)"""
|
||
# 获取圆弧中心点、半径和角度(角度转弧度)
|
||
cx, cy = e.dxf.center.x, e.dxf.center.y
|
||
r = e.dxf.radius
|
||
start_angle = math.radians(e.dxf.start_angle)
|
||
end_angle = math.radians(e.dxf.end_angle)
|
||
|
||
# 处理跨 360 度的圆弧
|
||
if end_angle < start_angle:
|
||
end_angle += 2 * math.pi
|
||
|
||
# 离散化圆弧为线段
|
||
angle_step = (end_angle - start_angle) / segments
|
||
points = [
|
||
(
|
||
cx + r * math.cos(start_angle + i * angle_step),
|
||
cy + r * math.sin(start_angle + i * angle_step)
|
||
)
|
||
for i in range(segments + 1)
|
||
]
|
||
return LineString(points)
|
||
|
||
|
||
def spline_to_linestring(e, segments=50):
|
||
"""将样条曲线转换为 LineString(原 parse_dxf.py 功能)"""
|
||
points = [tuple(p)[:2] for p in e.approximate(segments)]
|
||
return LineString(points)
|
||
|
||
|
||
def ellipse_to_linestring(e, segments=50):
|
||
"""将椭圆转换为 LineString(原 parse_dxf.py 功能)"""
|
||
points = [tuple(p)[:2] for p in e.flattening(0.01)]
|
||
return LineString(points)
|
||
|
||
|
||
def dxf_to_geojson(dxf_path):
|
||
"""将 DXF 文件转换为 GeoJSON 结构数据(原 parse_dxf.py 核心功能)"""
|
||
try:
|
||
# 读取 DXF 文件
|
||
doc = ezdxf.readfile(dxf_path)
|
||
except Exception as e:
|
||
return return_error(1001, f"DXF 文件读取失败: {e}")
|
||
|
||
# 获取模型空间并按图层存储要素
|
||
msp = doc.modelspace()
|
||
layers = {}
|
||
|
||
# 遍历所有实体并转换为几何对象
|
||
for e in msp:
|
||
geom = None # 几何对象
|
||
layer_name = e.dxf.layer # 实体所在图层
|
||
properties = {"type": e.dxftype()} # 实体属性(包含类型)
|
||
|
||
try:
|
||
# 根据 DXF 实体类型转换为对应几何对象
|
||
if e.dxftype() == 'LINE':
|
||
geom = LineString([(e.dxf.start.x, e.dxf.start.y), (e.dxf.end.x, e.dxf.end.y)])
|
||
|
||
elif e.dxftype() == 'INSERT':
|
||
geom = Point(e.dxf.insert.x, e.dxf.insert.y)
|
||
|
||
elif e.dxftype() == 'CIRCLE':
|
||
geom = Point(e.dxf.center.x, e.dxf.center.y).buffer(e.dxf.radius)
|
||
|
||
elif e.dxftype() in ['LWPOLYLINE', 'POLYLINE']:
|
||
points = [(p[0], p[1]) for p in e.get_points()]
|
||
if e.is_closed:
|
||
geom = Polygon(LinearRing(points))
|
||
else:
|
||
geom = LineString(points)
|
||
|
||
elif e.dxftype() == 'ARC':
|
||
geom = arc_to_linestring(e)
|
||
|
||
elif e.dxftype() == 'SPLINE':
|
||
geom = spline_to_linestring(e)
|
||
|
||
elif e.dxftype() == 'ELLIPSE':
|
||
geom = ellipse_to_linestring(e)
|
||
|
||
elif e.dxftype() == 'POINT':
|
||
geom = Point(e.dxf.location.x, e.dxf.location.y)
|
||
|
||
elif e.dxftype() == 'TEXT':
|
||
geom = Point(e.dxf.insert.x, e.dxf.insert.y)
|
||
properties['text'] = e.dxf.text
|
||
|
||
# 若几何对象创建成功,添加到对应图层
|
||
if geom:
|
||
geo_json = geojson.loads(geojson.dumps(mapping(geom)))
|
||
feature = geojson.Feature(geometry=geo_json, properties=properties)
|
||
layers.setdefault(layer_name, []).append(feature)
|
||
|
||
except Exception as ex:
|
||
print(f"跳过无法解析的实体 [{e.dxftype()}]: {ex}")
|
||
continue
|
||
|
||
# 返回按图层组织的 GeoJSON 结构
|
||
return {
|
||
"layers": [
|
||
{
|
||
"name": layer,
|
||
"type": "FeatureCollection",
|
||
"features": features
|
||
}
|
||
for layer, features in layers.items()
|
||
]
|
||
}
|
||
|
||
|
||
# ------------------------------ 坐标转换核心函数 ------------------------------
|
||
def transform_coordinates(coords, transformer):
|
||
"""递归转换坐标(支持点、线、面的坐标结构)"""
|
||
try:
|
||
# 处理嵌套坐标(如面的外环/内环、线的点集)
|
||
if isinstance(coords, list) and len(coords) > 0 and isinstance(coords[0], list):
|
||
return [transform_coordinates(sub_coords, transformer) for sub_coords in coords]
|
||
# 处理单个点坐标([x, y])
|
||
elif isinstance(coords, list) and len(coords) == 2:
|
||
x, y = coords
|
||
lon, lat = transformer.transform(x, y)
|
||
return [round(lon, 6), round(lat, 6)] # 保留6位小数(可根据需求调整)
|
||
# 非坐标结构直接返回
|
||
else:
|
||
return coords
|
||
except (ProjError, ValueError, IndexError) as e:
|
||
print(f"坐标转换失败: 原始坐标={coords}, 错误={str(e)}")
|
||
return coords
|
||
|
||
|
||
def process_dxf(params: ConversionParams):
|
||
"""整合 DXF 解析 + 坐标转换 + 结果保存(无 exe 调用)"""
|
||
# 1. 解析 DXF 文件为 GeoJSON 结构
|
||
dxf_result = dxf_to_geojson(params.dxf_abs_path)
|
||
# 检查 DXF 解析是否出错
|
||
if "code" in dxf_result and dxf_result["code"] != 0:
|
||
return ResponseUtil.error(f"DXF 解析失败: {dxf_result['message']}")
|
||
|
||
# 2. 初始化坐标转换器(EPSG 投影转换)
|
||
try:
|
||
transformer = Transformer.from_crs(
|
||
f"EPSG:{params.source_epsg}",
|
||
f"EPSG:{params.target_epsg}",
|
||
always_xy=True # 保持 (x,y) 顺序(避免投影后顺序反转)
|
||
)
|
||
except ProjError as e:
|
||
return ResponseUtil.error(f"坐标系初始化失败: {str(e)}(请检查 EPSG 代码是否正确)")
|
||
|
||
# 3. 提取所有要素并转换坐标
|
||
data = dxf_result
|
||
features = []
|
||
# 从图层中提取要素(兼容原 parse_dxf 的输出结构)
|
||
if 'layers' in data:
|
||
for layer in data['layers']:
|
||
features.extend(layer.get('features', []))
|
||
# 兼容标准 GeoJSON 的 FeatureCollection 结构(可选)
|
||
elif 'features' in data:
|
||
features = data['features']
|
||
|
||
# 批量转换要素坐标
|
||
for feature in features:
|
||
geometry = feature.get('geometry', {})
|
||
if geometry and 'coordinates' in geometry:
|
||
geometry['coordinates'] = transform_coordinates(geometry['coordinates'], transformer)
|
||
|
||
# 4. 保存转换结果到目标路径
|
||
try:
|
||
# 确保结果目录存在
|
||
result_dir = os.path.dirname(params.result_json_abs_path)
|
||
if not os.path.exists(result_dir):
|
||
os.makedirs(result_dir, exist_ok=True)
|
||
|
||
# 写入 JSON 文件
|
||
with open(params.result_json_abs_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
return ResponseUtil.success(f"结果已保存至: {params.result_json_abs_path}")
|
||
except Exception as e:
|
||
return ResponseUtil.error(f"结果保存失败: {str(e)}")
|
||
|
||
|
||
# ------------------------------ 命令行入口 ------------------------------
|
||
if __name__ == "__main__":
|
||
# 命令行参数校验
|
||
if len(sys.argv) != 5:
|
||
print("=" * 50)
|
||
print("使用方法: python dxf_converter.py <参数1> <参数2> <参数3> <参数4>")
|
||
print("参数说明:")
|
||
print(" 参数1: DXF 文件的绝对路径(例:C:/data/input.dxf)")
|
||
print(" 参数2: 结果 JSON 文件的绝对路径(例:C:/data/output.json)")
|
||
print(" 参数3: 源坐标系 EPSG 代码(例:2437 表示北京54平面坐标)")
|
||
print(" 参数4: 目标坐标系 EPSG 代码(例:4326 表示WGS84经纬度)")
|
||
print("=" * 50)
|
||
sys.exit(1)
|
||
|
||
# 解析命令行参数
|
||
try:
|
||
params = ConversionParams(
|
||
dxf_abs_path=sys.argv[1],
|
||
result_json_abs_path=sys.argv[2],
|
||
source_epsg=int(sys.argv[3]),
|
||
target_epsg=int(sys.argv[4])
|
||
)
|
||
except ValueError as e:
|
||
print(json.dumps(ResponseUtil.error(f"参数格式错误: {str(e)}"), ensure_ascii=False))
|
||
sys.exit(1)
|
||
|
||
# 执行转换流程
|
||
result = process_dxf(params)
|
||
print(json.dumps(result, ensure_ascii=False))
|
||
sys.exit(0 if result["code"] == 200 else 1) |