Karp 的技术博客
#!/usr/bin/env python  
# _*_ coding:utf-8 _*_

import cv2
import os
import numpy as np
import math
import requests
import json
import base64
from threading import Timer
import time
from pprint import pprint
from PIL import Image, ImageDraw, ImageFont
import sys

reload(sys)
sys.setdefaultencoding('utf-8')


def curl(url):
    headers = {}
    response = requests.get(url, headers=headers)
    # pprint(response)
    return json.loads(response.text)


def getInfo(file_path):
    info = {}
    filepath, filename = os.path.split(file_path)
    info["name"] = filename

    if os.path.exists(file_path):
        info["size"] = os.path.getsize(file_path)
        info["sizeUnit"] = sizeConvert(info["size"])

    cap = cv2.VideoCapture(file_path)
    if cap.isOpened():
        # get方法参数按顺序对应下表(从0开始编号)
        rate = cap.get(5)  # 帧速率
        frame_number = cap.get(7)  # 视频文件的帧数
        info["rate"] = rate
        info["duration"] = int(frame_number / rate)
        info["durationHMS"] = timeConvert(info["duration"])
        info["width"] = int(cap.get(3))
        info["height"] = int(cap.get(4))
        cap.release()

    return info


def getFrames(file_path, cutTimes):
    '''
    file_path: 文件名
    cutTimes: 抽取帧的时间数组,时间单位为s
    return [时间, 帧图像]数组
    '''
    t_frames = []

    info = getInfo(file_path)
    cutFrames = [int(info["rate"] * x) + 1 for x in cutTimes]
    print("cutFrames:", cutFrames)
    cap = cv2.VideoCapture(file_path)
    cut_cnt = 0
    for cutFrame in cutFrames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, cutFrame - 1)
        ret, frame = cap.read()
        if ret:
            t_frames.append((cutTimes[cut_cnt], frame))
            cut_cnt += 1
            print("截取视频第:" + str(cut_cnt) + " 帧")
        else:
            break
    cap.release()
    return t_frames


def timeConvert(seconds, str=True):
    h = seconds // 3600
    m = seconds % 3600 // 60
    s = seconds % 60
    if str:
        if h > 0:
            return '{:.0f}:{:.0f}:{:.0f}'.format(h, m, s)
        else:
            return '{:.0f}:{:.0f}s'.format(m, s)
    else:
        return h, m, s


def sizeConvert(size):  # 单位换算
    K, M, G = 1024, 1024 ** 2, 1024 ** 3
    if size >= G:
        return str(size // G) + 'GB'
    elif size >= M:
        return str(size // M) + 'MB'
    elif size >= K:
        return str(size // K) + 'KB'
    else:
        return str(size) + 'Bytes'


def imgResize(img, width=1980):
    '''
    将图片等比拉伸至宽为width
    '''
    h, w = img.shape[:2]
    height = int(h * width / w)
    if width > w:  # 放大图像
        img_new = cv2.resize(img, (width, height), interpolation=cv2.INTER_CUBIC)
    else:
        img_new = cv2.resize(img, (width, height), interpolation=cv2.INTER_AREA)
    return img_new


def imgAddImg(imgDst, imgSrc, xOffset=0, yOffset=0, copy=False):
    if copy:
        imgDst = imgDst.copy()
    imgDst[yOffset:yOffset + imgSrc.shape[0], xOffset:xOffset + imgSrc.shape[1]] = imgSrc
    return imgDst


def imgAddText(img, text, xOffset=0, yOffset=0):
    # cv2.putText(图像,需要添加字符串,需要绘制的坐标,字体类型,字号,字体颜色,字体粗细)
    img2 = cv2.putText(img, text, (xOffset, yOffset), cv2.LINE_AA, 0.7, (249, 249, 249), 2)
    return img2


def imgAddTextUTF8(img, text, xOffset=0, yOffset=0):
    img_cv2_RGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # cv2和PIL中颜色的hex码的储存顺序不同
    img_PIL = Image.fromarray(img_cv2_RGB)

    draw = ImageDraw.Draw(img_PIL)
    font = ImageFont.truetype("simhei.ttf", 20, encoding="utf-8")  # 参数1:字体文件路径,参数2:字体大小
    draw.text((xOffset, yOffset), text, (0, 0, 0), font=font)  # 参数1:打印坐标,参数2:文本,参数3:字体颜色,参数4:字体

    # PIL图片转cv2 图片
    img_cv2_textAdded = cv2.cvtColor(np.array(img_PIL), cv2.COLOR_RGB2BGR)
    return img_cv2_textAdded


def imgWhite(height, width):
    height = int(height)
    width = int(width)
    img = np.zeros((height, width), dtype=np.uint8)
    # img = np.zeros((height,width,3), dtype=np.uint8)
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    img[:, :, :] = 255
    return img


def mp4toJpg(f_video, f_img):
    start = cv2.getTickCount()
    small_pic_width = 480
    small_pic_cnt = 1
    small_pic_per_line = 1
    small_pic_gap_x = 0
    small_pic_gap_y = 0
    small_pic_top = 0
    small_pic_bottom = 0
    small_pic_left = 0
    small_pic_right = 0
    time_reduce = 0

    # 获取信息
    info = getInfo(f_video)
    if info['duration'] == 0:  # 预防不足一秒的场景
        print('视频不足一秒 %s' % f_video)
        t_imgs = getFrames(f_video, [0])
    else:
        t_imgs = getFrames(f_video, [1])

    small_pic_cnt = len(t_imgs)

    # 对图像进行缩放,添加时间戳水印
    imgs_fixed = []
    h, w = (0, 0)
    for time, img in t_imgs:
        # print(r"正在加工图片%d"%time)
        img_resize = imgResize(img, small_pic_width)
        img_add_time = imgAddText(img_resize, timeConvert(time), 10, 25)
        h, w = img_add_time.shape[:2]
        # cv2.imwrite(str(time) + "_time.png", img_add_time)
        imgs_fixed.append(img_add_time)

    # 先画一张空白图
    height = (math.ceil(small_pic_cnt * 1.0 / small_pic_per_line)) * h
    width = small_pic_left + small_pic_right + small_pic_per_line * (small_pic_gap_x + w) - small_pic_gap_x
    print("h = %.0f, w = %.0f" % (h, w))
    print("height = %.0f, width = %.0f" % (height, width))
    img_backgroud = imgWhite(height, width)

    # 在大图上加入视频信息
    filepath, filename = os.path.split(f_video)
    print('file name %s' % filename)
    print('file path %s' % filepath)

    # img_backgroud = imgAddTextUTF8(img_backgroud, "name : %s"%filename, 0, 0)
    # img_backgroud = imgAddTextUTF8(img_backgroud, "time : %s"%info["durationHMS"], 20, 45)
    # img_backgroud = imgAddTextUTF8(img_backgroud, "size : %s"%(info["sizeUnit"]), 20, 70)
    # img_backgroud = imgAddTextUTF8(img_backgroud, "px : %dx%d"%(info["width"],info["height"]), 20, 95)
    # 在大图上加入视频截图
    i = 0
    while i < small_pic_cnt:
        xx = i % small_pic_per_line
        yy = int(i / small_pic_per_line)
        print(r"正在将第(%d,%d)放入背景" % (xx + 1, yy + 1))
        off_x = int(small_pic_left + (small_pic_gap_x + w) * xx)
        off_y = int(small_pic_top + (small_pic_gap_y + h) * yy)
        img_backgroud = imgAddImg(img_backgroud, imgs_fixed[i], off_x, off_y)
        # imgAddImg(img_backgroud, img_backgroud, 0, 0)
        i += 1

    end = cv2.getTickCount()
    time_spent = (end - start) / cv2.getTickFrequency() / 60
    print("耗时%.1f min" % time_spent)
    cv2.imwrite(f_img, img_backgroud)
    # cv2.imshow("img", img_backgroud)
    # cv2.waitKey(0)


def curlGet(url):
    # 请求接口
    r = requests.get(url)
    # json字符串转换字典格式
    return json.loads(r.text)


def curlPost(url, in_value):
    r = requests.post(url, in_value)
    return json.loads(r.text)


# 获取phone需要审核的视频
def getphoneVideo():
    url = 'http://127.0.0.1:8080/Aider/phone/getphoneVideo?channel_id=%d' % CHANNEL_ID
    dictinfo = curlGet(url)
    if dictinfo['status'] == 200:
        # {"user_id":10000008058,"passport":"xkdkkd","full_name":"kdkdk","base64":""}
        return dictinfo['data']

    print('请求结果异常 %s : %s' % (dictinfo['status'], dictinfo['msg']))

    return {}


# 保存phone视频截图
def savephoneVideoUrl(user_id, base64_str):
    url = 'http://127.0.0.1:8080/Aider/phone/savephoneVideoUrl'
    dictinfo = curlPost(url, {'channel_id': CHANNEL_ID, 'user_id': user_id, 'base64': base64_str})

    if dictinfo['status'] == 200:
        # {"user_id":10000008058,"passport":"xkdkkd","full_name":"kdkdk","base64":""}
        return dictinfo['data']

    print('请求结果异常 %s : %s' % (dictinfo['status'], dictinfo['msg']))
    return {}


# base64 转换 mp4 文件
def createMp4File(base64_str, user_id):
    img_data = base64.b64decode(base64_str)
    mp4_file = '/tmp/phone/%s.mp4' % user_id

    file = open(mp4_file, 'wb')
    file.write(img_data)
    file.close()

    return mp4_file


# 文件 转换 base64 字符串
def fileToBase64(file_path):
    with open(file_path, "rb") as f:
        # b64encode是编码,b64decode是解码
        base64_data = base64.b64encode(f.read())

    return base64_data


# 检测 phone视频
def checkphone():
    print("------ checkphone 时间:%s ------" % time.ctime())

    print('------ 开始phone视频截取 ------')

    try:
        print('开始获取phone视频详情')
        # 获取视频
        phone_video_info = getphoneVideo()

        if phone_video_info == {}:
            raise Exception, " -- 暂无需要审核视频 -- "

        user_id = phone_video_info['user_id']
        mp4_file = createMp4File(phone_video_info['base64'], user_id)
        print('mp4文件地址%s' % mp4_file)

        # 截图文件地址
        img_file = "/tmp/phone/%s.jpg" % user_id
        print('Mp4 %s 转 Jpg :%s' % (mp4_file, img_file))
        mp4toJpg(mp4_file, img_file)

        if os.path.exists(img_file) == False:
            raise Exception, '-- 图片文件生成失败 --'

        img_base64 = fileToBase64(img_file)

        print('保存phone视频截图 %d' % user_id)

        if savephoneVideoUrl(user_id, img_base64) == {}:
            raise Exception, '-- 保存phone 视频接口抛错 --'

    except Exception, err:
        print err
    except IOError:
        print("Error: 没有找到文件或读取文件失败")
    else:
        print("phone 视频截图上传成功")

    print('------ 结束phone视频截取 ------')

    # 如果count小于10,开始下一次调度
    Timer(10, checkphone).start()


#生产仅单一渠道使用
CHANNEL_ID = 123

if __name__ == '__main__':
    # 指定10秒后执行print_time函数
    checkphone()

python

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2024年10月21日 07:33
7

目录

来自 《Python mp4 转 jpg 脚本》