NLP模型工程化部署
文章目录
- 一、理论-微服务、测试与GPU
- 1)微服务架构
- 2)代码测试
- 3)GPU使用
- 二、实践-封装微服务,编写测试用例和脚本,并观察GPU
- 1)微服务封装(RestFul和RPC)
- ①RestFul接口
- ②RPC接口
- 2)测试编写(unit_test\api_test\load_test)
- (1)单元测试
- (2)接口测试
- (3)压力测试
- 3)GPU使用和监控
- 三、理论-docker、CICD与K8S
- 四、实践-构建镜像与CICD脚本
- 1)构建镜像
- 2)CI/CD脚本
一、理论-微服务、测试与GPU
- 学习目标与成果
1)微服务架构
①单服务架构
②微服务架构
③微服务特点
④主要技术栈
1)HTTP协议做RESTFul接口
基于flask web框架实现restful接口
2)远程RPC调用
基于grpc
- restful还是rpc
2)代码测试
- 具体实践举例
代码测试
压力测试
压力测试工具
3)GPU使用
可以使用命令,每一秒刷新一次
watch -n 1 nvidia-smi
二、实践-封装微服务,编写测试用例和脚本,并观察GPU
1)微服务封装(RestFul和RPC)
①RestFul接口
(1)配置日志logger
import logging.handlers# 获取logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)# 生成文件handler,打印到文件
# 按天滚动的log,一天滚动一次,只保留最近7个日志文件(即保留最近7天)
file_handler = logging.handlers.TimedRotatingFileHandler('./logs/root.log', 'D', 1, 7, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)# 设置formatter
# 打印日志时间、级别、文件名、行号、函数名字、内容
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(funcName)s() - %(message)s'
)# 将formatter设置到两个handler
file_handler.setFormatter(formatter)# 将handler设置到logger
logger.addHandler(file_handler)
(2)编写RESTFUL API
1>编写hello_resource,配置健康检查路由
#hello_resource.py
from flask_restful import Resourceclass HelloResource(Resource):"""hello路由快速检查服务是否健康"""def get(self):return 'hello'
2>编写seg_resource,配置分词路由
import json
import timefrom flask import request
from flask_restful import Resourcefrom online import loggerclass SegResource(Resource):"""分词路由主要调用segment.seg"""def __init__(self, segment):# 使用传过来的segment对象,进行后面的分词self.segment = segmentdef post(self):data = request.get_json() # 解析输入json为一个dictinit_time = time.time()result = {'status': 'OK', # 本次请求返回状态'msg': '' # 额外说明}request_id = data.get('request_id') # 支持传入request_id,便于线上追踪请求try:assert data, "请确保输入不为空"# 从data取用户输入的各种参数content, model, enable_offset, enable_stop_word, use_ner = \data['content'], data.get('model'), data.get('enable_offset', False), \data.get('enable_stop_word', False), data.get('use_ner', False)logger.info('request_id: {}, model: {}, enable_offset: {}, enable_stop_word: {}, use_ner: {}, ''content: {} ...'.format(request_id, model, enable_offset, enable_stop_word, use_ner,content[:100]))# 调用segment对象的seg方法r = self.segment.seg(content, model=model, enable_offset=enable_offset,enable_stop_word=enable_stop_word, use_ner=use_ner)result['result'] = list(r) # 将分词结果存放在result里面except Exception as e:# 出现异常,打印异常栈,更改本次请求状态为ERRORlogger.exception(e)result['status'] = 'ERROR'result['msg'] = str(e)logger.info('request_id: {}, result: {} ..., cost time: {}s'.format(request_id, json.dumps(result, ensure_ascii=False)[:200], time.time() - init_time))return result
3>编写其余resource
4>编写http server
#server.py
import sysfrom flask import Flask
from flask_restful import Apifrom online import logger
from online.http.resources.dict_resource import DictResource
from online.http.resources.hello_resource import HelloResource
from online.http.resources.pos_resource import PosResource
from online.http.resources.seg_resource import SegResource
from segment.segment import Segmentdef start_server(port=8000):# 如果输入第1个参数,将第1个参数解析为端口号if len(sys.argv) > 1:port = int(sys.argv[1])# 实例化flask appapp = Flask(__name__)app.config.update(RESTFUL_JSON=dict(ensure_ascii=False)) # 设置ensure_ascii=False,确保接口返回的中文正常api = Api(app)# 实例化segment对象,准备传入到各个resource里面segment = Segment()resource_class_kwargs = {'segment': segment}# 为api添加hello路由、seg路由、pos路由、dict路由api.add_resource(HelloResource, '/') # hello路由用于快速检查服务可用性api.add_resource(SegResource, '/seg', resource_class_kwargs=resource_class_kwargs) # seg路由用于分词api.add_resource(PosResource, '/pos', resource_class_kwargs=resource_class_kwargs) # pos路由用于词性标注api.add_resource(DictResource, '/dict', resource_class_kwargs=resource_class_kwargs) # dict路由用于管理词典# 启动服务,设置host port# host='0.0.0.0',表示外部机器可以访问,必须设置为0.0.0.0# threaded=False,表示我们的主程序是单线程模式,需要一个一个处理请求# (我们的word_graph对象不是线程安全的)logger.info('server starts port {}'.format(port))app.run(debug=False, host='0.0.0.0', port=port, threaded=False)if __name__ == '__main__':start_server()
- 启动APP server
执行脚本start_http_server.sh
cd ..
python -m online.http.server 8000
- 发送POSTman请求
①发送hello相关链接请求
curl 0.0.0.0:8000
服务器打印
客户端回复
②测试seg分割模型
postman发送请求
发送seg模块相关
②RPC接口
1>定义proto接口,然后生成
2>基础实践
syntax = "proto3";service Segment {rpc seg (SegRequest) returns (SegResponse) {}rpc pos (SegRequest) returns (PosResponse) {}rpc add_word (AddWordRequest) returns (Bool) {}rpc delete_word (DeleteWordRequest) returns (Bool) {}
}message SegRequest {string content = 1;string model = 2;bool enable_stop_word = 3;bool use_ner = 4;
}message SegResponse {message Term {string word = 1;int32 start_index = 2;int32 end_index = 3;}repeated Term terms = 1;
}message PosResponse {message Term {string word = 1;int32 start_index = 2;int32 end_index = 3;string pos = 4;}repeated Term terms = 1;
}message Bool {bool status = 1;
}message AddWordRequest {string word = 1;string pos = 2;int32 freq = 3;
}message DeleteWordRequest {string word = 1;
}
编译脚本命令
cd ..
python -m grpc_tools.protoc -Ionline/rpc/ --python_out=online/rpc/ --grpc_python_out=online/rpc/ segment.proto
客户端代码
import grpcfrom online.rpc.segment_pb2 import SegRequest, AddWordRequest, DeleteWordRequest # 引入Request类
from online.rpc.segment_pb2_grpc import SegmentStub # 引入stub,和服务端交互class SegmentClient(object):"""客户端代码,提供给使用方直接import使用作用:定义各种接口将原始函数输入封装为Request对象发送Request到server端,获得返回的Response解析Response对象为python基本类型,返回给用户"""def __init__(self, host, port):"""声明host, port 创建channel通过channel创建stub对象"""channel = grpc.insecure_channel('{}:{}'.format(host, port))self.stub = SegmentStub(channel)def seg(self, content, model, enable_stop_word=False, use_ner=False):"""定义seg接口"""# 将参数封装成request对象request = SegRequest(content=content, model=model, enable_stop_word=enable_stop_word,use_ner=use_ner)# 调用stub.seg方法,传入request对象,得到response对象response = self.stub.seg(request)# 将response对象解析成list of tuple,返回给用户words = [(term.word, term.start_index, term.end_index) for term in response.terms]return wordsdef pos(self, content, model, enable_stop_word=False, use_ner=False):"""定义pos接口"""request = SegRequest(content=content, model=model, enable_stop_word=enable_stop_word,use_ner=use_ner)response = self.stub.pos(request)words = [(term.word, term.start_index, term.end_index, term.pos) for term in response.terms]return wordsdef add_word(self, word, pos, freq):"""定义add_word接口"""request = AddWordRequest(word=word, pos=pos, freq=freq)response = self.stub.add_word(request)status = response.statusreturn statusdef delete_word(self, word):"""定义delete_word接口"""request = DeleteWordRequest(word=word)response = self.stub.delete_word(request)status = response.statusreturn status
2)测试编写(unit_test\api_test\load_test)
(1)单元测试
单元测试代码
import unittest
from segment.segment import Segmentclass MyTestCase(unittest.TestCase):def setUp(self) -> None:self.segment = Segment()def test_seg(self):content = '百年来,我们党始终筑牢“一切为了人民”这一基石。“一切为了人民”是中国共产党保持旺盛生机的密码。' \'十九届五中全会建议稿中突出强调了“扎实推动共同富裕”,这样表述,这在党的全会文件中还是第一次,彰显了“发展为人民”的理念。' \'“江山就是人民,人民就是江山”,中国共产党每一个历史转折,每一个伟大胜利背后,都蕴藏着巨大的人民力量。' \'一百年来,党始终与广大人民群众同舟共济、生死与共,在革命、建设、改革的风雨考验中,矢志不渝为了人民,' \'中国“红船”才能勇往直前,击鼓催征稳驭舟。'print('content: ', content)print('seg(content)')words = list(self.segment.seg(content))print(words)print('seg(content, model=\'HMM\')')words = list(self.segment.seg(content, model='HMM'))print(words)print('seg(content, model=\'CRF\')')words = list(self.segment.seg(content, model='CRF'))print(words)print('seg(content, model=\'DL\')')words = list(self.segment.seg(content, model='DL'))print(words)if __name__ == '__main__':unittest.main()
测试graph
import unittestfrom segment.word_tokenizer.word_graph import WordGraph, Nodeclass TestGraph(unittest.TestCase):def test_graph(self):graph = WordGraph()graph.insert_start_word(WordGraph.NODE_S) # 0graph.insert_start_word(Node('我', 1, 'core_dict')) # 1graph.insert_start_word(Node('喜', 2, 'core_dict')) # 2graph.insert_start_word(Node('喜欢', 4, 'model_word_dict')) # 3graph.insert_start_word(Node('欢', 1, 'core_dict')) # 4graph.insert_end_words([1])graph.insert_end_words([2, 3])graph.insert_end_words([4])graph.insert_end_words([5])graph.insert_end_words([5])route = graph.calculate()print(graph)print(route)assert route[0][0] == 5 # 确保最优路径权重为5if __name__ == '__main__':unittest.main()
(2)接口测试
①test_http_api.py
import json
import random
import unittestimport requestsHOST = '127.0.0.1'
PORT = 8000class MyTestCase(unittest.TestCase):def setUp(self) -> None:self.samples = []with open('tests/data/samples.txt', 'r', encoding='utf-8') as f:for line in f:line = line.strip()if not line:continueself.samples.append(line)def test_seg(self):print('test_seg~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')for sample in self.samples:data = {'content': sample,'model': _sample(['hmm', 'crf', 'dl']),'enable_offset': _sample([True, False])}print(json.dumps(data, ensure_ascii=False))r = requests.post('http://{}:{}/seg'.format(HOST, PORT), json=data)print(r.text)assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'print('\n')def test_pos(self):print('test_pos~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')for sample in self.samples:data = {'content': sample,'model': _sample(['hmm', 'crf']),}print(json.dumps(data, ensure_ascii=False))r = requests.post('http://{}:{}/pos'.format(HOST, PORT), json=data)print(r.text)assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'print('\n')def test_dict(self):print('test_dict~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')print('add word')data = {'word': '深度之眼', 'pos': 'nt', 'freq': 50}r = requests.post('http://{}:{}/dict'.format(HOST, PORT), json=data)print(r.text)assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'print('delete word')data = {'word': '深度之眼'}r = requests.delete('http://{}:{}/dict'.format(HOST, PORT), json=data)print(r.text)assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'def _sample(values):return random.sample(values, 1)[0]if __name__ == '__main__':unittest.main()
②test_rpc_api.py
import random
import unittestfrom online.rpc.segment_client import SegmentClientHOST = '127.0.0.1'
PORT = 8000class MyTestCase(unittest.TestCase):def setUp(self) -> None:self.client = SegmentClient(host=HOST, port=PORT)self.samples = []with open('tests/data/samples.txt', 'r', encoding='utf-8') as f:for line in f:line = line.strip()if not line:continueself.samples.append(line)def test_seg(self):print('test_seg~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')for sample in self.samples:data = {'content': sample,'model': _sample(['hmm', 'crf', 'dl'])}r = self.client.seg(**data)print(r)print('\n')def test_pos(self):print('test_pos~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')for sample in self.samples:data = {'content': sample,'model': _sample(['hmm', 'crf']),}r = self.client.pos(**data)print(r)print('\n')def test_dict(self):print('test_dict~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')print('add word')data = {'word': '深度之眼', 'pos': 'nt', 'freq': 10}self.client.add_word(**data)print('delete word')data = {'word': '深度之眼'}self.client.delete_word(**data)def _sample(values):return random.sample(values, 1)[0]if __name__ == '__main__':unittest.main()
(3)压力测试
①request.json
{"content": "【惊险一幕!英国伦敦上空一架飞机被三道闪电击中后继续飞行】6月6日,一架飞机在英国伦敦上空飞行时,遭到来自不同方向的三道闪电击中。视频显示,这架航班正穿过风雨交加的天空时,忽然三道闪电从云层中射出,击中了飞机。闪电击中飞机后,空中爆发出隆隆的雷声,飞机看似未受损坏继续飞行。(北京青年报编辑 许彦明)http://t.cn/A62eV2KY【#老奶奶去探病自己躺病床上睡着了# 老爷爷端着伤手 一脸无","model": "crf"
}
②跑测试脚本
siege -c 1 -t 1M --content-type "application/json" "http://127.0.0.1:8000/seg POST <request.json"
siege压力测试工具
测试结果
③观察CPU使用,内存使用,是否存在内存泄漏
3)GPU使用和监控
1)改造bilstm_crf_predictor\bilstm_crf_model,启动GPU加速
2)启动http_server,编辑request.json,更改模型为dl,压测
3)nvidia-smi观察gpu显存占用
三、理论-docker、CICD与K8S
四、实践-构建镜像与CICD脚本
1)构建镜像
- 学习目标
掌握dockerfile的编写,镜像构建以及构建的实际技巧
2)CI/CD脚本
- 学习目标
掌握CI/CD流水线,自动化脚本的编写,pipeline的定义