基于MySQL的企业专利数据高效查询与统计实现
背景
在进行产业链/产业评估工作时,我们需要对企业的专利进行评估,其中一个重要指标是统计企业每一年的专利数量。本文基于MySQL数据库,通过公司名称查询该公司每年的专利数,实现了高效的专利数据统计。
流程
项目流程概述如下:
目标:根据给定的企业名单,查询出每个企业每年的专利数量及专利得分。
任务分为两步:
- 构建专利申请人数据表;
- 利用Python查询并导出数据至Excel表。
难点与注意事项⚠️:
-
设计高效的专利申请人数据表,以便通过申请人名称快速查询专利信息。
-
编写高效的企业专利SQL查询语句。
-
确定专利得分:考虑到一项专利可能有多个申请人,根据申请人的位置来定义不同的得分公式。
专利得分公式: s c o r e = 1 p o s i t i o n \text{专利得分公式: } score = \frac{1}{position} 专利得分公式: score=position1
构建专利申请人数据表
专利数据库相关的文章:3500多万家专利数据存入MySQL数据库。
由于专利数据包含超过3000万条记录,且每项专利可能有多位申请人,直接检索是否包含目标申请人效率较低。因此,我们构建了一个专利申请人
表(patent_p
),将每个申请人作为单独的记录,并对申请人字段建立索引,便于快速查询。表结构如下:
CREATE TABLE patent_p (id INT AUTO_INCREMENT PRIMARY KEY,applicant VARCHAR(255),publication_number VARCHAR(31),application_date DATE,publication_date DATE,grant_publication_date DATE,score DOUBLE
);
字段说明:
- 专利公开号:作为专利的唯一标识符,便于后续关联专利表。
- 申请人:每条记录仅包含一个申请人,以便在此字段上建立索引,加速检索。
- 日期字段:用于按照年份筛选专利数据。
注意:原始专利表中的申请人可能有多位,故在
专利申请人
表中将每个申请人独立存储,再对申请人字段建立索引,从而大幅提升检索效率。
使用以下SQL语句为 applicant
添加索引:
CREATE INDEX idx_applicant ON patent_p(applicant);
这条语句会在 patent_p
表的 applicant
列上创建一个索引 idx_applicant
,从而提高在该列上进行查询的效率。
处理申请人拆分的代码如下所示:
def filter_company(applicant):"""从原始的多个申请人,拆分成一个一个的申请人"""if applicant is None or not isinstance(applicant, str):return []split_pattern = r"[;;]"applicant = re.split(split_pattern, applicant)applicant = map(str.strip, applicant)return list(filter(lambda x: len(x) >= 4, applicant))
具体的数据导入代码:
import os
import re
import pymysql
import pandas as pd
from tqdm import tqdmPASSWORD = "数据库密码"
DATABASE = "数据库名"# 专利字段映射
Patent_Table_Column = {"申请人": "applicant","专利公开号": "publication_number","申请日": "application_date","申请公布日": "publication_date","授权公布日": "grant_publication_date",
}def filter_company(applicant):"""提取中文公司名称,并去除空格"""if applicant is None or not isinstance(applicant, str):return []split_pattern = r"[;;]"applicant = re.split(split_pattern, applicant)applicant = map(str.strip, applicant)return list(filter(lambda x: len(x) >= 4, applicant))def insert_sql_by_csv(file_name):df = pd.read_csv(file_name, low_memory=False)BATCH_SIZE = 3000table_column_en = list(Patent_Table_Column.values())# 连接到MySQL数据库connection = pymysql.connect(host="localhost", # MySQL数据库的主机user="root", # MySQL用户名password=PASSWORD, # MySQL密码database=DATABASE, # 你要插入数据的数据库charset="utf8mb4",cursorclass=pymysql.cursors.DictCursor,)try:with connection.cursor() as cursor:sql = f"""INSERT INTO patent_p ({", ".join(table_column_en)}, score) VALUES (%s, %s, %s, %s, %s, %s);""".strip()batch_data = []for _, row in tqdm(df.iterrows(), total=len(df)):d = {}applicants = []for zh_k, en_k in Patent_Table_Column.items():item = row[zh_k]if pd.isna(item):item = Noneif zh_k == "申请人":applicants = filter_company(item)else:d[en_k] = itemfor pos, applicant in enumerate(applicants):d["applicant"] = applicantd["score"] = 1 / (pos + 1)tmp_values = tuple([d[k] for k in table_column_en + ["score"]])batch_data.append(tmp_values)if len(batch_data) >= BATCH_SIZE:cursor.executemany(sql, batch_data)# 清空批次batch_data = []if batch_data:cursor.executemany(sql, batch_data)connection.commit()except Exception as e:print(f"插入数据时出现错误: {e}")connection.rollback()finally:connection.close()if __name__ == "__main__":folder = "/xxx/3571万专利申请全量数据1985-2022年/"print(f"文件总数: {len(os.listdir(folder))}")cnt = 0for file_name in os.listdir(folder):if file_name.endswith(".csv"):cnt += 1filename = os.path.join(folder, file_name)print(cnt, file_name)insert_sql_by_csv(filename)
该表建成后的效果如下所示:
企业专利查询
在构建完企业信息数据库后,我们添加了公司的年度专利统计数据(2016年至2022年各年专利数量及总得分)。最终查询效果如下:
示例SQL查询语句:
SELECT applicant AS company_name, YEAR(application_date) AS year, COUNT(*) AS cnt, SUM(score)
FROM patent_p
WHERE applicant='深圳大学'
GROUP BY YEAR(application_date);
查询结果如下所示:
查询结果解释
该查询语句的作用如下:
-
select 子句:
applicant as company_name
:将applicant
列重命名为company_name
,表示公司名称。YEAR(application_date) as year
:提取application_date
的年份,并将其命名为year
。count(*) as cnt
:计算每年提交的专利申请数量。sum(score)
:计算该公司每年所有专利申请的得分总和。
-
from 子句:从
patent_p
表中获取数据。 -
where 子句:筛选出
applicant
字段值等于指定公司名称的记录。 -
group by 子句:按
application_date
的年份分组,统计每年的数据。
该查询将返回指定公司每年专利申请数量(cnt
)及年度专利得分(sum(score)
)。具体Python代码实现如下:
import os
import pandas as pd
import pymysql
# import argparsedatabase = "数据库名"
password = "数据库密码"connection = pymysql.connect(host="localhost", # MySQL数据库的主机user="root", # MySQL用户名password=password, # MySQL密码database=database, # 插入数据的数据库charset="utf8mb4",cursorclass=pymysql.cursors.DictCursor,
)columns = list(range(1985, 2024)) + ["专利件数", "专利得分"]def get_patent_statistics_by_name(name):if not name:return {}sql = f"""select applicant as company_name, YEAR(application_date) as year, count(*) as cnt, sum(score) from patent_p where applicant='{name}'group by YEAR(application_date);"""with connection.cursor() as cursor:data = cursor.execute(sql)data = cursor.fetchall()ans = {}cnt = 0score = 0for k in columns:ans[k] = Nonefor item in data:cnt += item.get("cnt", 0)score += item.get("sum(score)", 0)year = item.get("year", None)if year:ans[year] = item.get("cnt", 0)ans["专利得分"] = scoreans["专利件数"] = cntreturn pd.Series(ans)def add_patent_data(input_file, company_name_field="企业名称"):print("open", input_file)# 读取 CSV 文件df = pd.read_csv(input_file, low_memory=False)df[columns] = df[company_name_field].apply(get_patent_statistics_by_name)folder_path = os.path.dirname(input_file)output_file = os.path.basename(input_file).split(".")[0] + "_专利统计.xlsx"# 保存更新后的数据到 CSV 文件output_file = os.path.join(folder_path, output_file)df.to_excel(output_file, index=False)print(f"专利数据已成功添加到文件:{output_file}")if __name__ == "__main__":# parser = argparse.ArgumentParser(description="Add patent counts to industry.csv")# parser.add_argument("input_file", help="The input CSV file with industry data")# parser.add_argument(# "-name", "--name", default="企业名称", help="The column name for company names"# )# args = parser.parse_args()# # 调用函数处理文件# add_patent_data(args.input_file, args.name)folder = "/.../pku_industry/csv_folder_test"for file in os.listdir(folder):if not file.endswith(".csv"):continuefile_name = os.path.join(folder, file)add_patent_data(file_name)connection.close()
经过上述专利申请人表
的构建流程,能够大幅提升企业专利信息的检索速度,为产业链分析提供强大的数据支持。