Python 代码主要用于处理和分析 OpenFOAM(一种用于计算流体力学的软件)生成的数据,并提供了一些实用的工具函数。
Python 代码主要用于处理和分析 OpenFOAM(一种用于计算流体力学的软件)生成的数据,并提供了一些实用的工具函数。
完整代码下载:https://download.csdn.net/download/huanghm88/89909280
# coding = UTF-8
import json
import os
import re
import socket
import time
from io import StringIO
import argparseimport matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import peakutils
from scipy.interpolate import interp1dTensorFOAM_path = os.path.dirname(os.path.abspath(__file__))def test():print('Success!')class RingBuffer():"""A 1D ring buffer using numpy arrays"""def __init__(self, length):self.data = np.zeros(length, dtype='f')self.index = 0def extend(self, x):"""adds ndarray x to ring buffer"""x_index = (self.index + np.arange(x.size)) % self.data.sizeself.data[x_index] = xself.index = x_index[-1] + 1def get(self):"""Returns the first-in-first-out data in the ring buffer"""idx = (self.index + np.arange(self.data.size)) % self.data.sizereturn self.data[idx]def freq_domain_analysis(data, const, threshold=0.5, min_distance=30) -> dict:if 'interval' in const:interval = const['interval']else:interval = [None, None]data = np.array(data)num_selected_points = data[interval[0]:interval[1]].shape[0]cl_mean = np.mean(data[interval[0]:interval[1]])cl_rms = np.sqrt(np.sum(data[interval[0]:interval[1]] ** 2) / num_selected_points)cd_mean = np.mean(data[interval[0]:interval[1], 2])Cl = data[interval[0]:interval[1]]t_s = data[1, 0] - data[0, 0]f_s = 1 / t_sF = np.fft.fft(Cl)f = np.fft.fftfreq(num_selected_points, t_s)mask = np.where(f >= 0)peaks_index = peakutils.indexes(np.abs(F[mask])/num_selected_points, thres=threshold, min_dist=min_distance)peaks_x = np.array(f[peaks_index])peaks_y = np.array(np.abs(F[peaks_index])/num_selected_points)shedding_frequency = np.sum(peaks_x * peaks_y) / peaks_y.sum()strouhal_number = shedding_frequency * const['D'] / const['U']result = {'Cl_mean': cl_mean,'Cl_RMS': cl_rms,'Cd_mean': cd_mean,'num_selected_points': num_selected_points,'num_all_points': data.shape[0],'sampling_frequency': f_s,'sampling_period': t_s,'shedding_frequency': shedding_frequency,'strouhal_number': strouhal_number}return resultdef read_foam_file(path, mandatory=False, saver=False, dimension=3):if path.split('/')[-2] == 'system':if path.split('/')[-1] == 'probes':with open(path, 'r') as f:content_total = f.read()right_str = re.sub('\);?', '', re.sub('[ \t]*\(', '', content_total))annotation_num = 0for line in right_str.split('\n'):if re.search('^-?\d+', line):breakannotation_num += 1right_content = StringIO(right_str)data_frame_obj = pd.read_csv(right_content, sep=' ', skiprows=annotation_num, header=None, names=['x', 'y', 'z'])else:data_frame_obj = Falseassert data_frame_obj, f'Unknown system/file type\n{path}'elif path.split('/')[-4] == 'postProcessing':with open(path, 'r') as f:content_total = f.read()f.seek(0.000)content_lines = f.readlines()annotation_num = 0for line in content_lines:if line[0] == '#':annotation_num += 1else:breakif path.split('/')[-1] == 'forces.dat':column_name = ['Time']column_name.extend(['Fpx', 'Fpy', 'Fpz'])column_name.extend(['Fvx', 'Fvy', 'Fvz'])column_name.extend(['Fox', 'Foy', 'Foz'])column_name.extend(['Mpx', 'Mpy', 'Mpz'])column_name.extend(['Mvx', 'Mvy', 'Mvz'])column_name.extend(['Mox', 'Moy', 'Moz'])right_content = StringIO(re.sub('\)', '', re.sub('\(', '', re.sub('\t+', '\t', re.sub(' +', '\t', re.sub('# ', '', re.sub('[ \t]+\n', '\n', content_total)))))))data_frame_obj = pd.read_csv(right_content, sep='\t', skiprows=annotation_num, header=None, index_col=False, names=column_name)elif path.split('/')[-1] == 'p':right_content = StringIO(re.sub('\t\n', '\n', re.sub(' +', '\t', re.sub('# ', '', re.sub('[ \t]+\n', '\n', content_total)))))data_frame_obj = pd.read_csv(right_content, sep='\t', skiprows=annotation_num-1, index_col=False)elif path.split('/')[-1] == 'U':column_name = ['Time']for n in range(annotation_num-1):column_name.append(f'Ux_{n}')column_name.append(f'Uy_{n}')column_name.append(f'Uz_{n}')right_content = StringIO(re.sub(' +', '\t', re.sub('[\(\)]', '', re.sub('# ', '', re.sub('[ \t]+\n', '\n', content_total)))))data_frame_obj = pd.read_csv(right_content, sep='\t', skiprows=annotation_num, header=None, index_col=False, names=column_name)if dimension == 2:drop_column = [i for i in column_name if re.search('^Uz_\d', i)]data_frame_obj.drop(drop_column, axis=1, inplace=True)elif path.split('/')[-1] ==