转换
This commit is contained in:
		
							
								
								
									
										255
									
								
								main.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										255
									
								
								main.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,255 @@ | ||||
| import sys | ||||
| import ezdxf | ||||
| import geojson | ||||
| import json | ||||
| import os | ||||
| from datetime import datetime | ||||
| from pydantic import BaseModel | ||||
| from pyproj import Transformer | ||||
| from pyproj.exceptions import ProjError | ||||
| from shapely.geometry import LineString, Point, Polygon, mapping | ||||
| from shapely.geometry.polygon import LinearRing | ||||
| import math | ||||
|  | ||||
|  | ||||
| # ------------------------------ 基础模型与工具类 ------------------------------ | ||||
| class ConversionParams(BaseModel): | ||||
|     """坐标转换参数模型(Pydantic 数据验证)""" | ||||
|     dxf_abs_path: str          # DXF 文件绝对路径 | ||||
|     result_json_abs_path: str  # 结果 JSON 文件绝对路径 | ||||
|     source_epsg: int           # 源坐标系 EPSG 代码 | ||||
|     target_epsg: int           # 目标坐标系 EPSG 代码 | ||||
|  | ||||
|  | ||||
| class ResponseUtil: | ||||
|     """响应结果工具类""" | ||||
|     @staticmethod | ||||
|     def success(message: str = "操作成功"): | ||||
|         return {"code": 200, "message": message} | ||||
|  | ||||
|     @staticmethod | ||||
|     def error(message: str): | ||||
|         return {"code": 500, "message": message} | ||||
|  | ||||
|  | ||||
| # ------------------------------ DXF 解析核心函数 ------------------------------ | ||||
| def return_error(code, message): | ||||
|     """返回错误信息结构(原 parse_dxf.py 功能)""" | ||||
|     return { | ||||
|         "code": code, | ||||
|         "message": message | ||||
|     } | ||||
|  | ||||
|  | ||||
| def arc_to_linestring(e, segments=36): | ||||
|     """将圆弧转换为 LineString(原 parse_dxf.py 功能)""" | ||||
|     # 获取圆弧中心点、半径和角度(角度转弧度) | ||||
|     cx, cy = e.dxf.center.x, e.dxf.center.y | ||||
|     r = e.dxf.radius | ||||
|     start_angle = math.radians(e.dxf.start_angle) | ||||
|     end_angle = math.radians(e.dxf.end_angle) | ||||
|  | ||||
|     # 处理跨 360 度的圆弧 | ||||
|     if end_angle < start_angle: | ||||
|         end_angle += 2 * math.pi | ||||
|  | ||||
|     # 离散化圆弧为线段 | ||||
|     angle_step = (end_angle - start_angle) / segments | ||||
|     points = [ | ||||
|         ( | ||||
|             cx + r * math.cos(start_angle + i * angle_step), | ||||
|             cy + r * math.sin(start_angle + i * angle_step) | ||||
|         ) | ||||
|         for i in range(segments + 1) | ||||
|     ] | ||||
|     return LineString(points) | ||||
|  | ||||
|  | ||||
| def spline_to_linestring(e, segments=50): | ||||
|     """将样条曲线转换为 LineString(原 parse_dxf.py 功能)""" | ||||
|     points = [tuple(p)[:2] for p in e.approximate(segments)] | ||||
|     return LineString(points) | ||||
|  | ||||
|  | ||||
| def ellipse_to_linestring(e, segments=50): | ||||
|     """将椭圆转换为 LineString(原 parse_dxf.py 功能)""" | ||||
|     points = [tuple(p)[:2] for p in e.flattening(0.01)] | ||||
|     return LineString(points) | ||||
|  | ||||
|  | ||||
| def dxf_to_geojson(dxf_path): | ||||
|     """将 DXF 文件转换为 GeoJSON 结构数据(原 parse_dxf.py 核心功能)""" | ||||
|     try: | ||||
|         # 读取 DXF 文件 | ||||
|         doc = ezdxf.readfile(dxf_path) | ||||
|     except Exception as e: | ||||
|         return return_error(1001, f"DXF 文件读取失败: {e}") | ||||
|  | ||||
|     # 获取模型空间并按图层存储要素 | ||||
|     msp = doc.modelspace() | ||||
|     layers = {} | ||||
|  | ||||
|     # 遍历所有实体并转换为几何对象 | ||||
|     for e in msp: | ||||
|         geom = None  # 几何对象 | ||||
|         layer_name = e.dxf.layer  # 实体所在图层 | ||||
|         properties = {"type": e.dxftype()}  # 实体属性(包含类型) | ||||
|  | ||||
|         try: | ||||
|             # 根据 DXF 实体类型转换为对应几何对象 | ||||
|             if e.dxftype() == 'LINE': | ||||
|                 geom = LineString([(e.dxf.start.x, e.dxf.start.y), (e.dxf.end.x, e.dxf.end.y)]) | ||||
|  | ||||
|             elif e.dxftype() == 'INSERT': | ||||
|                 geom = Point(e.dxf.insert.x, e.dxf.insert.y) | ||||
|  | ||||
|             elif e.dxftype() == 'CIRCLE': | ||||
|                 geom = Point(e.dxf.center.x, e.dxf.center.y).buffer(e.dxf.radius) | ||||
|  | ||||
|             elif e.dxftype() in ['LWPOLYLINE', 'POLYLINE']: | ||||
|                 points = [(p[0], p[1]) for p in e.get_points()] | ||||
|                 if e.is_closed: | ||||
|                     geom = Polygon(LinearRing(points)) | ||||
|                 else: | ||||
|                     geom = LineString(points) | ||||
|  | ||||
|             elif e.dxftype() == 'ARC': | ||||
|                 geom = arc_to_linestring(e) | ||||
|  | ||||
|             elif e.dxftype() == 'SPLINE': | ||||
|                 geom = spline_to_linestring(e) | ||||
|  | ||||
|             elif e.dxftype() == 'ELLIPSE': | ||||
|                 geom = ellipse_to_linestring(e) | ||||
|  | ||||
|             elif e.dxftype() == 'POINT': | ||||
|                 geom = Point(e.dxf.location.x, e.dxf.location.y) | ||||
|  | ||||
|             elif e.dxftype() == 'TEXT': | ||||
|                 geom = Point(e.dxf.insert.x, e.dxf.insert.y) | ||||
|                 properties['text'] = e.dxf.text | ||||
|  | ||||
|             # 若几何对象创建成功,添加到对应图层 | ||||
|             if geom: | ||||
|                 geo_json = geojson.loads(geojson.dumps(mapping(geom))) | ||||
|                 feature = geojson.Feature(geometry=geo_json, properties=properties) | ||||
|                 layers.setdefault(layer_name, []).append(feature) | ||||
|  | ||||
|         except Exception as ex: | ||||
|             print(f"跳过无法解析的实体 [{e.dxftype()}]: {ex}") | ||||
|             continue | ||||
|  | ||||
|     # 返回按图层组织的 GeoJSON 结构 | ||||
|     return { | ||||
|         "layers": [ | ||||
|             { | ||||
|                 "name": layer, | ||||
|                 "type": "FeatureCollection", | ||||
|                 "features": features | ||||
|             } | ||||
|             for layer, features in layers.items() | ||||
|         ] | ||||
|     } | ||||
|  | ||||
|  | ||||
| # ------------------------------ 坐标转换核心函数 ------------------------------ | ||||
| def transform_coordinates(coords, transformer): | ||||
|     """递归转换坐标(支持点、线、面的坐标结构)""" | ||||
|     try: | ||||
|         # 处理嵌套坐标(如面的外环/内环、线的点集) | ||||
|         if isinstance(coords, list) and len(coords) > 0 and isinstance(coords[0], list): | ||||
|             return [transform_coordinates(sub_coords, transformer) for sub_coords in coords] | ||||
|         # 处理单个点坐标([x, y]) | ||||
|         elif isinstance(coords, list) and len(coords) == 2: | ||||
|             x, y = coords | ||||
|             lon, lat = transformer.transform(x, y) | ||||
|             return [round(lon, 6), round(lat, 6)]  # 保留6位小数(可根据需求调整) | ||||
|         # 非坐标结构直接返回 | ||||
|         else: | ||||
|             return coords | ||||
|     except (ProjError, ValueError, IndexError) as e: | ||||
|         print(f"坐标转换失败: 原始坐标={coords}, 错误={str(e)}") | ||||
|         return coords | ||||
|  | ||||
|  | ||||
| def process_dxf(params: ConversionParams): | ||||
|     """整合 DXF 解析 + 坐标转换 + 结果保存(无 exe 调用)""" | ||||
|     # 1. 解析 DXF 文件为 GeoJSON 结构 | ||||
|     dxf_result = dxf_to_geojson(params.dxf_abs_path) | ||||
|     # 检查 DXF 解析是否出错 | ||||
|     if "code" in dxf_result and dxf_result["code"] != 0: | ||||
|         return ResponseUtil.error(f"DXF 解析失败: {dxf_result['message']}") | ||||
|  | ||||
|     # 2. 初始化坐标转换器(EPSG 投影转换) | ||||
|     try: | ||||
|         transformer = Transformer.from_crs( | ||||
|             f"EPSG:{params.source_epsg}", | ||||
|             f"EPSG:{params.target_epsg}", | ||||
|             always_xy=True  # 保持 (x,y) 顺序(避免投影后顺序反转) | ||||
|         ) | ||||
|     except ProjError as e: | ||||
|         return ResponseUtil.error(f"坐标系初始化失败: {str(e)}(请检查 EPSG 代码是否正确)") | ||||
|  | ||||
|     # 3. 提取所有要素并转换坐标 | ||||
|     data = dxf_result | ||||
|     features = [] | ||||
|     # 从图层中提取要素(兼容原 parse_dxf 的输出结构) | ||||
|     if 'layers' in data: | ||||
|         for layer in data['layers']: | ||||
|             features.extend(layer.get('features', [])) | ||||
|     # 兼容标准 GeoJSON 的 FeatureCollection 结构(可选) | ||||
|     elif 'features' in data: | ||||
|         features = data['features'] | ||||
|  | ||||
|     # 批量转换要素坐标 | ||||
|     for feature in features: | ||||
|         geometry = feature.get('geometry', {}) | ||||
|         if geometry and 'coordinates' in geometry: | ||||
|             geometry['coordinates'] = transform_coordinates(geometry['coordinates'], transformer) | ||||
|  | ||||
|     # 4. 保存转换结果到目标路径 | ||||
|     try: | ||||
|         # 确保结果目录存在 | ||||
|         result_dir = os.path.dirname(params.result_json_abs_path) | ||||
|         if not os.path.exists(result_dir): | ||||
|             os.makedirs(result_dir, exist_ok=True) | ||||
|  | ||||
|         # 写入 JSON 文件 | ||||
|         with open(params.result_json_abs_path, 'w', encoding='utf-8') as f: | ||||
|             json.dump(data, f, ensure_ascii=False, indent=2) | ||||
|  | ||||
|         return ResponseUtil.success(f"结果已保存至: {params.result_json_abs_path}") | ||||
|     except Exception as e: | ||||
|         return ResponseUtil.error(f"结果保存失败: {str(e)}") | ||||
|  | ||||
|  | ||||
| # ------------------------------ 命令行入口 ------------------------------ | ||||
| if __name__ == "__main__": | ||||
|     # 命令行参数校验 | ||||
|     if len(sys.argv) != 5: | ||||
|         print("=" * 50) | ||||
|         print("使用方法: python dxf_converter.py <参数1> <参数2> <参数3> <参数4>") | ||||
|         print("参数说明:") | ||||
|         print("  参数1: DXF 文件的绝对路径(例:C:/data/input.dxf)") | ||||
|         print("  参数2: 结果 JSON 文件的绝对路径(例:C:/data/output.json)") | ||||
|         print("  参数3: 源坐标系 EPSG 代码(例:2437 表示北京54平面坐标)") | ||||
|         print("  参数4: 目标坐标系 EPSG 代码(例:4326 表示WGS84经纬度)") | ||||
|         print("=" * 50) | ||||
|         sys.exit(1) | ||||
|  | ||||
|     # 解析命令行参数 | ||||
|     try: | ||||
|         params = ConversionParams( | ||||
|             dxf_abs_path=sys.argv[1], | ||||
|             result_json_abs_path=sys.argv[2], | ||||
|             source_epsg=int(sys.argv[3]), | ||||
|             target_epsg=int(sys.argv[4]) | ||||
|         ) | ||||
|     except ValueError as e: | ||||
|         print(json.dumps(ResponseUtil.error(f"参数格式错误: {str(e)}"), ensure_ascii=False)) | ||||
|         sys.exit(1) | ||||
|  | ||||
|     # 执行转换流程 | ||||
|     result = process_dxf(params) | ||||
|     print(json.dumps(result, ensure_ascii=False)) | ||||
|     sys.exit(0 if result["code"] == 200 else 1) | ||||
		Reference in New Issue
	
	Block a user
	 ZZX9599
					ZZX9599