Files
dxf2geojson/main.py
ZZX9599 7b1129883e 转换
2025-09-11 19:06:51 +08:00

255 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import ezdxf
import geojson
import json
import os
from datetime import datetime
from pydantic import BaseModel
from pyproj import Transformer
from pyproj.exceptions import ProjError
from shapely.geometry import LineString, Point, Polygon, mapping
from shapely.geometry.polygon import LinearRing
import math
# ------------------------------ 基础模型与工具类 ------------------------------
class ConversionParams(BaseModel):
"""坐标转换参数模型Pydantic 数据验证)"""
dxf_abs_path: str # DXF 文件绝对路径
result_json_abs_path: str # 结果 JSON 文件绝对路径
source_epsg: int # 源坐标系 EPSG 代码
target_epsg: int # 目标坐标系 EPSG 代码
class ResponseUtil:
"""响应结果工具类"""
@staticmethod
def success(message: str = "操作成功"):
return {"code": 200, "message": message}
@staticmethod
def error(message: str):
return {"code": 500, "message": message}
# ------------------------------ DXF 解析核心函数 ------------------------------
def return_error(code, message):
"""返回错误信息结构(原 parse_dxf.py 功能)"""
return {
"code": code,
"message": message
}
def arc_to_linestring(e, segments=36):
"""将圆弧转换为 LineString原 parse_dxf.py 功能)"""
# 获取圆弧中心点、半径和角度(角度转弧度)
cx, cy = e.dxf.center.x, e.dxf.center.y
r = e.dxf.radius
start_angle = math.radians(e.dxf.start_angle)
end_angle = math.radians(e.dxf.end_angle)
# 处理跨 360 度的圆弧
if end_angle < start_angle:
end_angle += 2 * math.pi
# 离散化圆弧为线段
angle_step = (end_angle - start_angle) / segments
points = [
(
cx + r * math.cos(start_angle + i * angle_step),
cy + r * math.sin(start_angle + i * angle_step)
)
for i in range(segments + 1)
]
return LineString(points)
def spline_to_linestring(e, segments=50):
"""将样条曲线转换为 LineString原 parse_dxf.py 功能)"""
points = [tuple(p)[:2] for p in e.approximate(segments)]
return LineString(points)
def ellipse_to_linestring(e, segments=50):
"""将椭圆转换为 LineString原 parse_dxf.py 功能)"""
points = [tuple(p)[:2] for p in e.flattening(0.01)]
return LineString(points)
def dxf_to_geojson(dxf_path):
"""将 DXF 文件转换为 GeoJSON 结构数据(原 parse_dxf.py 核心功能)"""
try:
# 读取 DXF 文件
doc = ezdxf.readfile(dxf_path)
except Exception as e:
return return_error(1001, f"DXF 文件读取失败: {e}")
# 获取模型空间并按图层存储要素
msp = doc.modelspace()
layers = {}
# 遍历所有实体并转换为几何对象
for e in msp:
geom = None # 几何对象
layer_name = e.dxf.layer # 实体所在图层
properties = {"type": e.dxftype()} # 实体属性(包含类型)
try:
# 根据 DXF 实体类型转换为对应几何对象
if e.dxftype() == 'LINE':
geom = LineString([(e.dxf.start.x, e.dxf.start.y), (e.dxf.end.x, e.dxf.end.y)])
elif e.dxftype() == 'INSERT':
geom = Point(e.dxf.insert.x, e.dxf.insert.y)
elif e.dxftype() == 'CIRCLE':
geom = Point(e.dxf.center.x, e.dxf.center.y).buffer(e.dxf.radius)
elif e.dxftype() in ['LWPOLYLINE', 'POLYLINE']:
points = [(p[0], p[1]) for p in e.get_points()]
if e.is_closed:
geom = Polygon(LinearRing(points))
else:
geom = LineString(points)
elif e.dxftype() == 'ARC':
geom = arc_to_linestring(e)
elif e.dxftype() == 'SPLINE':
geom = spline_to_linestring(e)
elif e.dxftype() == 'ELLIPSE':
geom = ellipse_to_linestring(e)
elif e.dxftype() == 'POINT':
geom = Point(e.dxf.location.x, e.dxf.location.y)
elif e.dxftype() == 'TEXT':
geom = Point(e.dxf.insert.x, e.dxf.insert.y)
properties['text'] = e.dxf.text
# 若几何对象创建成功,添加到对应图层
if geom:
geo_json = geojson.loads(geojson.dumps(mapping(geom)))
feature = geojson.Feature(geometry=geo_json, properties=properties)
layers.setdefault(layer_name, []).append(feature)
except Exception as ex:
print(f"跳过无法解析的实体 [{e.dxftype()}]: {ex}")
continue
# 返回按图层组织的 GeoJSON 结构
return {
"layers": [
{
"name": layer,
"type": "FeatureCollection",
"features": features
}
for layer, features in layers.items()
]
}
# ------------------------------ 坐标转换核心函数 ------------------------------
def transform_coordinates(coords, transformer):
"""递归转换坐标(支持点、线、面的坐标结构)"""
try:
# 处理嵌套坐标(如面的外环/内环、线的点集)
if isinstance(coords, list) and len(coords) > 0 and isinstance(coords[0], list):
return [transform_coordinates(sub_coords, transformer) for sub_coords in coords]
# 处理单个点坐标([x, y]
elif isinstance(coords, list) and len(coords) == 2:
x, y = coords
lon, lat = transformer.transform(x, y)
return [round(lon, 6), round(lat, 6)] # 保留6位小数可根据需求调整
# 非坐标结构直接返回
else:
return coords
except (ProjError, ValueError, IndexError) as e:
print(f"坐标转换失败: 原始坐标={coords}, 错误={str(e)}")
return coords
def process_dxf(params: ConversionParams):
"""整合 DXF 解析 + 坐标转换 + 结果保存(无 exe 调用)"""
# 1. 解析 DXF 文件为 GeoJSON 结构
dxf_result = dxf_to_geojson(params.dxf_abs_path)
# 检查 DXF 解析是否出错
if "code" in dxf_result and dxf_result["code"] != 0:
return ResponseUtil.error(f"DXF 解析失败: {dxf_result['message']}")
# 2. 初始化坐标转换器EPSG 投影转换)
try:
transformer = Transformer.from_crs(
f"EPSG:{params.source_epsg}",
f"EPSG:{params.target_epsg}",
always_xy=True # 保持 (x,y) 顺序(避免投影后顺序反转)
)
except ProjError as e:
return ResponseUtil.error(f"坐标系初始化失败: {str(e)}(请检查 EPSG 代码是否正确)")
# 3. 提取所有要素并转换坐标
data = dxf_result
features = []
# 从图层中提取要素(兼容原 parse_dxf 的输出结构)
if 'layers' in data:
for layer in data['layers']:
features.extend(layer.get('features', []))
# 兼容标准 GeoJSON 的 FeatureCollection 结构(可选)
elif 'features' in data:
features = data['features']
# 批量转换要素坐标
for feature in features:
geometry = feature.get('geometry', {})
if geometry and 'coordinates' in geometry:
geometry['coordinates'] = transform_coordinates(geometry['coordinates'], transformer)
# 4. 保存转换结果到目标路径
try:
# 确保结果目录存在
result_dir = os.path.dirname(params.result_json_abs_path)
if not os.path.exists(result_dir):
os.makedirs(result_dir, exist_ok=True)
# 写入 JSON 文件
with open(params.result_json_abs_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return ResponseUtil.success(f"结果已保存至: {params.result_json_abs_path}")
except Exception as e:
return ResponseUtil.error(f"结果保存失败: {str(e)}")
# ------------------------------ 命令行入口 ------------------------------
if __name__ == "__main__":
# 命令行参数校验
if len(sys.argv) != 5:
print("=" * 50)
print("使用方法: python dxf_converter.py <参数1> <参数2> <参数3> <参数4>")
print("参数说明:")
print(" 参数1: DXF 文件的绝对路径C:/data/input.dxf")
print(" 参数2: 结果 JSON 文件的绝对路径C:/data/output.json")
print(" 参数3: 源坐标系 EPSG 代码2437 表示北京54平面坐标")
print(" 参数4: 目标坐标系 EPSG 代码4326 表示WGS84经纬度")
print("=" * 50)
sys.exit(1)
# 解析命令行参数
try:
params = ConversionParams(
dxf_abs_path=sys.argv[1],
result_json_abs_path=sys.argv[2],
source_epsg=int(sys.argv[3]),
target_epsg=int(sys.argv[4])
)
except ValueError as e:
print(json.dumps(ResponseUtil.error(f"参数格式错误: {str(e)}"), ensure_ascii=False))
sys.exit(1)
# 执行转换流程
result = process_dxf(params)
print(json.dumps(result, ensure_ascii=False))
sys.exit(0 if result["code"] == 200 else 1)