#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import cv2
import os
import numpy as np
import math
import requests
import json
import base64
from threading import Timer
import time
from pprint import pprint
from PIL import Image, ImageDraw, ImageFont
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
def curl(url):
headers = {}
response = requests.get(url, headers=headers)
# pprint(response)
return json.loads(response.text)
def getInfo(file_path):
info = {}
filepath, filename = os.path.split(file_path)
info["name"] = filename
if os.path.exists(file_path):
info["size"] = os.path.getsize(file_path)
info["sizeUnit"] = sizeConvert(info["size"])
cap = cv2.VideoCapture(file_path)
if cap.isOpened():
# get方法参数按顺序对应下表(从0开始编号)
rate = cap.get(5) # 帧速率
frame_number = cap.get(7) # 视频文件的帧数
info["rate"] = rate
info["duration"] = int(frame_number / rate)
info["durationHMS"] = timeConvert(info["duration"])
info["width"] = int(cap.get(3))
info["height"] = int(cap.get(4))
cap.release()
return info
def getFrames(file_path, cutTimes):
'''
file_path: 文件名
cutTimes: 抽取帧的时间数组,时间单位为s
return [时间, 帧图像]数组
'''
t_frames = []
info = getInfo(file_path)
cutFrames = [int(info["rate"] * x) + 1 for x in cutTimes]
print("cutFrames:", cutFrames)
cap = cv2.VideoCapture(file_path)
cut_cnt = 0
for cutFrame in cutFrames:
cap.set(cv2.CAP_PROP_POS_FRAMES, cutFrame - 1)
ret, frame = cap.read()
if ret:
t_frames.append((cutTimes[cut_cnt], frame))
cut_cnt += 1
print("截取视频第:" + str(cut_cnt) + " 帧")
else:
break
cap.release()
return t_frames
def timeConvert(seconds, str=True):
h = seconds // 3600
m = seconds % 3600 // 60
s = seconds % 60
if str:
if h > 0:
return '{:.0f}:{:.0f}:{:.0f}'.format(h, m, s)
else:
return '{:.0f}:{:.0f}s'.format(m, s)
else:
return h, m, s
def sizeConvert(size): # 单位换算
K, M, G = 1024, 1024 ** 2, 1024 ** 3
if size >= G:
return str(size // G) + 'GB'
elif size >= M:
return str(size // M) + 'MB'
elif size >= K:
return str(size // K) + 'KB'
else:
return str(size) + 'Bytes'
def imgResize(img, width=1980):
'''
将图片等比拉伸至宽为width
'''
h, w = img.shape[:2]
height = int(h * width / w)
if width > w: # 放大图像
img_new = cv2.resize(img, (width, height), interpolation=cv2.INTER_CUBIC)
else:
img_new = cv2.resize(img, (width, height), interpolation=cv2.INTER_AREA)
return img_new
def imgAddImg(imgDst, imgSrc, xOffset=0, yOffset=0, copy=False):
if copy:
imgDst = imgDst.copy()
imgDst[yOffset:yOffset + imgSrc.shape[0], xOffset:xOffset + imgSrc.shape[1]] = imgSrc
return imgDst
def imgAddText(img, text, xOffset=0, yOffset=0):
# cv2.putText(图像,需要添加字符串,需要绘制的坐标,字体类型,字号,字体颜色,字体粗细)
img2 = cv2.putText(img, text, (xOffset, yOffset), cv2.LINE_AA, 0.7, (249, 249, 249), 2)
return img2
def imgAddTextUTF8(img, text, xOffset=0, yOffset=0):
img_cv2_RGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # cv2和PIL中颜色的hex码的储存顺序不同
img_PIL = Image.fromarray(img_cv2_RGB)
draw = ImageDraw.Draw(img_PIL)
font = ImageFont.truetype("simhei.ttf", 20, encoding="utf-8") # 参数1:字体文件路径,参数2:字体大小
draw.text((xOffset, yOffset), text, (0, 0, 0), font=font) # 参数1:打印坐标,参数2:文本,参数3:字体颜色,参数4:字体
# PIL图片转cv2 图片
img_cv2_textAdded = cv2.cvtColor(np.array(img_PIL), cv2.COLOR_RGB2BGR)
return img_cv2_textAdded
def imgWhite(height, width):
height = int(height)
width = int(width)
img = np.zeros((height, width), dtype=np.uint8)
# img = np.zeros((height,width,3), dtype=np.uint8)
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
img[:, :, :] = 255
return img
def mp4toJpg(f_video, f_img):
start = cv2.getTickCount()
small_pic_width = 480
small_pic_cnt = 1
small_pic_per_line = 1
small_pic_gap_x = 0
small_pic_gap_y = 0
small_pic_top = 0
small_pic_bottom = 0
small_pic_left = 0
small_pic_right = 0
time_reduce = 0
# 获取信息
info = getInfo(f_video)
if info['duration'] == 0: # 预防不足一秒的场景
print('视频不足一秒 %s' % f_video)
t_imgs = getFrames(f_video, [0])
else:
t_imgs = getFrames(f_video, [1])
small_pic_cnt = len(t_imgs)
# 对图像进行缩放,添加时间戳水印
imgs_fixed = []
h, w = (0, 0)
for time, img in t_imgs:
# print(r"正在加工图片%d"%time)
img_resize = imgResize(img, small_pic_width)
img_add_time = imgAddText(img_resize, timeConvert(time), 10, 25)
h, w = img_add_time.shape[:2]
# cv2.imwrite(str(time) + "_time.png", img_add_time)
imgs_fixed.append(img_add_time)
# 先画一张空白图
height = (math.ceil(small_pic_cnt * 1.0 / small_pic_per_line)) * h
width = small_pic_left + small_pic_right + small_pic_per_line * (small_pic_gap_x + w) - small_pic_gap_x
print("h = %.0f, w = %.0f" % (h, w))
print("height = %.0f, width = %.0f" % (height, width))
img_backgroud = imgWhite(height, width)
# 在大图上加入视频信息
filepath, filename = os.path.split(f_video)
print('file name %s' % filename)
print('file path %s' % filepath)
# img_backgroud = imgAddTextUTF8(img_backgroud, "name : %s"%filename, 0, 0)
# img_backgroud = imgAddTextUTF8(img_backgroud, "time : %s"%info["durationHMS"], 20, 45)
# img_backgroud = imgAddTextUTF8(img_backgroud, "size : %s"%(info["sizeUnit"]), 20, 70)
# img_backgroud = imgAddTextUTF8(img_backgroud, "px : %dx%d"%(info["width"],info["height"]), 20, 95)
# 在大图上加入视频截图
i = 0
while i < small_pic_cnt:
xx = i % small_pic_per_line
yy = int(i / small_pic_per_line)
print(r"正在将第(%d,%d)放入背景" % (xx + 1, yy + 1))
off_x = int(small_pic_left + (small_pic_gap_x + w) * xx)
off_y = int(small_pic_top + (small_pic_gap_y + h) * yy)
img_backgroud = imgAddImg(img_backgroud, imgs_fixed[i], off_x, off_y)
# imgAddImg(img_backgroud, img_backgroud, 0, 0)
i += 1
end = cv2.getTickCount()
time_spent = (end - start) / cv2.getTickFrequency() / 60
print("耗时%.1f min" % time_spent)
cv2.imwrite(f_img, img_backgroud)
# cv2.imshow("img", img_backgroud)
# cv2.waitKey(0)
def curlGet(url):
# 请求接口
r = requests.get(url)
# json字符串转换字典格式
return json.loads(r.text)
def curlPost(url, in_value):
r = requests.post(url, in_value)
return json.loads(r.text)
# 获取phone需要审核的视频
def getphoneVideo():
url = 'http://127.0.0.1:8080/Aider/phone/getphoneVideo?channel_id=%d' % CHANNEL_ID
dictinfo = curlGet(url)
if dictinfo['status'] == 200:
# {"user_id":10000008058,"passport":"xkdkkd","full_name":"kdkdk","base64":""}
return dictinfo['data']
print('请求结果异常 %s : %s' % (dictinfo['status'], dictinfo['msg']))
return {}
# 保存phone视频截图
def savephoneVideoUrl(user_id, base64_str):
url = 'http://127.0.0.1:8080/Aider/phone/savephoneVideoUrl'
dictinfo = curlPost(url, {'channel_id': CHANNEL_ID, 'user_id': user_id, 'base64': base64_str})
if dictinfo['status'] == 200:
# {"user_id":10000008058,"passport":"xkdkkd","full_name":"kdkdk","base64":""}
return dictinfo['data']
print('请求结果异常 %s : %s' % (dictinfo['status'], dictinfo['msg']))
return {}
# base64 转换 mp4 文件
def createMp4File(base64_str, user_id):
img_data = base64.b64decode(base64_str)
mp4_file = '/tmp/phone/%s.mp4' % user_id
file = open(mp4_file, 'wb')
file.write(img_data)
file.close()
return mp4_file
# 文件 转换 base64 字符串
def fileToBase64(file_path):
with open(file_path, "rb") as f:
# b64encode是编码,b64decode是解码
base64_data = base64.b64encode(f.read())
return base64_data
# 检测 phone视频
def checkphone():
print("------ checkphone 时间:%s ------" % time.ctime())
print('------ 开始phone视频截取 ------')
try:
print('开始获取phone视频详情')
# 获取视频
phone_video_info = getphoneVideo()
if phone_video_info == {}:
raise Exception, " -- 暂无需要审核视频 -- "
user_id = phone_video_info['user_id']
mp4_file = createMp4File(phone_video_info['base64'], user_id)
print('mp4文件地址%s' % mp4_file)
# 截图文件地址
img_file = "/tmp/phone/%s.jpg" % user_id
print('Mp4 %s 转 Jpg :%s' % (mp4_file, img_file))
mp4toJpg(mp4_file, img_file)
if os.path.exists(img_file) == False:
raise Exception, '-- 图片文件生成失败 --'
img_base64 = fileToBase64(img_file)
print('保存phone视频截图 %d' % user_id)
if savephoneVideoUrl(user_id, img_base64) == {}:
raise Exception, '-- 保存phone 视频接口抛错 --'
except Exception, err:
print err
except IOError:
print("Error: 没有找到文件或读取文件失败")
else:
print("phone 视频截图上传成功")
print('------ 结束phone视频截取 ------')
# 如果count小于10,开始下一次调度
Timer(10, checkphone).start()
#生产仅单一渠道使用
CHANNEL_ID = 123
if __name__ == '__main__':
# 指定10秒后执行print_time函数
checkphone()
版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。