Karp 的技术博客
编程语言
Python mp4 转 jpg 脚本
#!/usr/bin/env python  
# _*_ coding:utf-8 _*_

import cv2
import os
import numpy as np
import math
import requests
import json
import base64
from threading import Timer
import time
from pprint import pprint
from PIL import Image, ImageDraw, ImageFont
import sys

reload(sys)
sys.setdefaultencoding('utf-8')


def curl(url):
    headers = {}
    response = requests.get(url, headers=headers)
    # pprint(response)
    return json.loads(response.text)


def getInfo(file_path):
    info = {}
    filepath, filename = os.path.split(file_path)
    info["name"] = filename

    if os.path.exists(file_path):
        info["size"] = os.path.getsize(file_path)
        info["sizeUnit"] = sizeConvert(info["size"])

    cap = cv2.VideoCapture(file_path)
    if cap.isOpened():
        # get方法参数按顺序对应下表(从0开始编号)
        rate = cap.get(5)  # 帧速率
        frame_number = cap.get(7)  # 视频文件的帧数
        info["rate"] = rate
        info["duration"] = int(frame_number / rate)
        info["durationHMS"] = timeConvert(info["duration"])
        info["width"] = int(cap.get(3))
        info["height"] = int(cap.get(4))
        cap.release()

    return info


def getFrames(file_path, cutTimes):
    '''
    file_path: 文件名
    cutTimes: 抽取帧的时间数组,时间单位为s
    return [时间, 帧图像]数组
    '''
    t_frames = []

    info = getInfo(file_path)
    cutFrames = [int(info["rate"] * x) + 1 for x in cutTimes]
    print("cutFrames:", cutFrames)
    cap = cv2.VideoCapture(file_path)
    cut_cnt = 0
    for cutFrame in cutFrames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, cutFrame - 1)
        ret, frame = cap.read()
        if ret:
            t_frames.append((cutTimes[cut_cnt], frame))
            cut_cnt += 1
            print("截取视频第:" + str(cut_cnt) + " 帧")
        else:
            break
    cap.release()
    return t_frames


def timeConvert(seconds, str=True):
    h = seconds // 3600
    m = seconds % 3600 // 60
    s = seconds % 60
    if str:
        if h > 0:
            return '{:.0f}:{:.0f}:{:.0f}'.format(h, m, s)
        else:
            return '{:.0f}:{:.0f}s'.format(m, s)
    else:
        return h, m, s


def sizeConvert(size):  # 单位换算
    K, M, G = 1024, 1024 ** 2, 1024 ** 3
    if size >= G:
        return str(size // G) + 'GB'
    elif size >= M:
        return str(size // M) + 'MB'
    elif size >= K:
        return str(size // K) + 'KB'
    else:
        return str(size) + 'Bytes'


def imgResize(img, width=1980):
    '''
    将图片等比拉伸至宽为width
    '''
    h, w = img.shape[:2]
    height = int(h * width / w)
    if width > w:  # 放大图像
        img_new = cv2.resize(img, (width, height), interpolation=cv2.INTER_CUBIC)
    else:
        img_new = cv2.resize(img, (width, height), interpolation=cv2.INTER_AREA)
    return img_new


def imgAddImg(imgDst, imgSrc, xOffset=0, yOffset=0, copy=False):
    if copy:
        imgDst = imgDst.copy()
    imgDst[yOffset:yOffset + imgSrc.shape[0], xOffset:xOffset + imgSrc.shape[1]] = imgSrc
    return imgDst


def imgAddText(img, text, xOffset=0, yOffset=0):
    # cv2.putText(图像,需要添加字符串,需要绘制的坐标,字体类型,字号,字体颜色,字体粗细)
    img2 = cv2.putText(img, text, (xOffset, yOffset), cv2.LINE_AA, 0.7, (249, 249, 249), 2)
    return img2


def imgAddTextUTF8(img, text, xOffset=0, yOffset=0):
    img_cv2_RGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # cv2和PIL中颜色的hex码的储存顺序不同
    img_PIL = Image.fromarray(img_cv2_RGB)

    draw = ImageDraw.Draw(img_PIL)
    font = ImageFont.truetype("simhei.ttf", 20, encoding="utf-8")  # 参数1:字体文件路径,参数2:字体大小
    draw.text((xOffset, yOffset), text, (0, 0, 0), font=font)  # 参数1:打印坐标,参数2:文本,参数3:字体颜色,参数4:字体

    # PIL图片转cv2 图片
    img_cv2_textAdded = cv2.cvtColor(np.array(img_PIL), cv2.COLOR_RGB2BGR)
    return img_cv2_textAdded


def imgWhite(height, width):
    height = int(height)
    width = int(width)
    img = np.zeros((height, width), dtype=np.uint8)
    # img = np.zeros((height,width,3), dtype=np.uint8)
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    img[:, :, :] = 255
    return img


def mp4toJpg(f_video, f_img):
    start = cv2.getTickCount()
    small_pic_width = 480
    small_pic_cnt = 1
    small_pic_per_line = 1
    small_pic_gap_x = 0
    small_pic_gap_y = 0
    small_pic_top = 0
    small_pic_bottom = 0
    small_pic_left = 0
    small_pic_right = 0
    time_reduce = 0

    # 获取信息
    info = getInfo(f_video)
    if info['duration'] == 0:  # 预防不足一秒的场景
        print('视频不足一秒 %s' % f_video)
        t_imgs = getFrames(f_video, [0])
    else:
        t_imgs = getFrames(f_video, [1])

    small_pic_cnt = len(t_imgs)

    # 对图像进行缩放,添加时间戳水印
    imgs_fixed = []
    h, w = (0, 0)
    for time, img in t_imgs:
        # print(r"正在加工图片%d"%time)
        img_resize = imgResize(img, small_pic_width)
        img_add_time = imgAddText(img_resize, timeConvert(time), 10, 25)
        h, w = img_add_time.shape[:2]
        # cv2.imwrite(str(time) + "_time.png", img_add_time)
        imgs_fixed.append(img_add_time)

    # 先画一张空白图
    height = (math.ceil(small_pic_cnt * 1.0 / small_pic_per_line)) * h
    width = small_pic_left + small_pic_right + small_pic_per_line * (small_pic_gap_x + w) - small_pic_gap_x
    print("h = %.0f, w = %.0f" % (h, w))
    print("height = %.0f, width = %.0f" % (height, width))
    img_backgroud = imgWhite(height, width)

    # 在大图上加入视频信息
    filepath, filename = os.path.split(f_video)
    print('file name %s' % filename)
    print('file path %s' % filepath)

    # img_backgroud = imgAddTextUTF8(img_backgroud, "name : %s"%filename, 0, 0)
    # img_backgroud = imgAddTextUTF8(img_backgroud, "time : %s"%info["durationHMS"], 20, 45)
    # img_backgroud = imgAddTextUTF8(img_backgroud, "size : %s"%(info["sizeUnit"]), 20, 70)
    # img_backgroud = imgAddTextUTF8(img_backgroud, "px : %dx%d"%(info["width"],info["height"]), 20, 95)
    # 在大图上加入视频截图
    i = 0
    while i < small_pic_cnt:
        xx = i % small_pic_per_line
        yy = int(i / small_pic_per_line)
        print(r"正在将第(%d,%d)放入背景" % (xx + 1, yy + 1))
        off_x = int(small_pic_left + (small_pic_gap_x + w) * xx)
        off_y = int(small_pic_top + (small_pic_gap_y + h) * yy)
        img_backgroud = imgAddImg(img_backgroud, imgs_fixed[i], off_x, off_y)
        # imgAddImg(img_backgroud, img_backgroud, 0, 0)
        i += 1

    end = cv2.getTickCount()
    time_spent = (end - start) / cv2.getTickFrequency() / 60
    print("耗时%.1f min" % time_spent)
    cv2.imwrite(f_img, img_backgroud)
    # cv2.imshow("img", img_backgroud)
    # cv2.waitKey(0)


def curlGet(url):
    # 请求接口
    r = requests.get(url)
    # json字符串转换字典格式
    return json.loads(r.text)


def curlPost(url, in_value):
    r = requests.post(url, in_value)
    return json.loads(r.text)


# 获取phone需要审核的视频
def getphoneVideo():
    url = 'http://127.0.0.1:8080/Aider/phone/getphoneVideo?channel_id=%d' % CHANNEL_ID
    dictinfo = curlGet(url)
    if dictinfo['status'] == 200:
        # {"user_id":10000008058,"passport":"xkdkkd","full_name":"kdkdk","base64":""}
        return dictinfo['data']

    print('请求结果异常 %s : %s' % (dictinfo['status'], dictinfo['msg']))

    return {}


# 保存phone视频截图
def savephoneVideoUrl(user_id, base64_str):
    url = 'http://127.0.0.1:8080/Aider/phone/savephoneVideoUrl'
    dictinfo = curlPost(url, {'channel_id': CHANNEL_ID, 'user_id': user_id, 'base64': base64_str})

    if dictinfo['status'] == 200:
        # {"user_id":10000008058,"passport":"xkdkkd","full_name":"kdkdk","base64":""}
        return dictinfo['data']

    print('请求结果异常 %s : %s' % (dictinfo['status'], dictinfo['msg']))
    return {}


# base64 转换 mp4 文件
def createMp4File(base64_str, user_id):
    img_data = base64.b64decode(base64_str)
    mp4_file = '/tmp/phone/%s.mp4' % user_id

    file = open(mp4_file, 'wb')
    file.write(img_data)
    file.close()

    return mp4_file


# 文件 转换 base64 字符串
def fileToBase64(file_path):
    with open(file_path, "rb") as f:
        # b64encode是编码,b64decode是解码
        base64_data = base64.b64encode(f.read())

    return base64_data


# 检测 phone视频
def checkphone():
    print("------ checkphone 时间:%s ------" % time.ctime())

    print('------ 开始phone视频截取 ------')

    try:
        print('开始获取phone视频详情')
        # 获取视频
        phone_video_info = getphoneVideo()

        if phone_video_info == {}:
            raise Exception, " -- 暂无需要审核视频 -- "

        user_id = phone_video_info['user_id']
        mp4_file = createMp4File(phone_video_info['base64'], user_id)
        print('mp4文件地址%s' % mp4_file)

        # 截图文件地址
        img_file = "/tmp/phone/%s.jpg" % user_id
        print('Mp4 %s 转 Jpg :%s' % (mp4_file, img_file))
        mp4toJpg(mp4_file, img_file)

        if os.path.exists(img_file) == False:
            raise Exception, '-- 图片文件生成失败 --'

        img_base64 = fileToBase64(img_file)

        print('保存phone视频截图 %d' % user_id)

        if savephoneVideoUrl(user_id, img_base64) == {}:
            raise Exception, '-- 保存phone 视频接口抛错 --'

    except Exception, err:
        print err
    except IOError:
        print("Error: 没有找到文件或读取文件失败")
    else:
        print("phone 视频截图上传成功")

    print('------ 结束phone视频截取 ------')

    # 如果count小于10,开始下一次调度
    Timer(10, checkphone).start()


#生产仅单一渠道使用
CHANNEL_ID = 123

if __name__ == '__main__':
    # 指定10秒后执行print_time函数
    checkphone()
踩坑教训
Aws 非公开桶资源 PHP sdk 解决方案

<?php


use Aws\S3\S3Client;
use Model\Main\Config\DbModel;

class AwsOss
{
    const IMG = 1;
    const MP4_VIDEO = 2;

    /**
     * 获取解密后 base64文件
     * @cli php cli.php Test getObjectUrl
     * @param string $url
     * @param int $fileType 1 图片 2 mp4视频
     * @return string
     * @throws \Exception
     */
    public static function getUrlBase64(string $url, int $fileType)
    {
        $config = DbModel::formatDbConfig(AwsOss::AWS_OSS_PUBLIC_TAG);

        return self::getKycBase64File($url, $config, $fileType);
    }

    /**
     * 获取二进制加密文件
     * @param $url
     * @param array $config aws 桶配置
     * @param int $fileType 1 图片类型 2 视频类型(mp4)
     * @return string
     * @throws \Exception
     */
    private static function getKycBase64File($url, array $config, $fileType = self::IMG)
    {
        $bucket = $config['bucket'];
        $sdKParams = $config['sdk_params'];

        $fileKey = trim(parse_url($url)['path'], '/'); // 一定不能带 /

        $s3Client = new S3Client($sdKParams);
        // 下载文件资源
        $result = $s3Client->getObject(['Bucket' => $bucket, 'Key' => $fileKey]);

        $fileSourceObj = $result['Body'] ?? "";
        if ('' === $fileSourceObj) {
            throw new \Exception('文件资源拉取失败');
        }

        if ($fileType == 1) {
            $type = getimagesizefromstring($fileSourceObj)['mime']; //获取二进制流图片格式
        } else {
            $type = 'audio/mp4';
        }

        return 'data:' . $type . ';base64,' . chunk_split(base64_encode($fileSourceObj));
    }

}
编程语言
使用`JSON.stringify()`生成漂亮格式的JSON字符串

使用JSON.stringify()生成漂亮格式的JSON字符串

编程语言
Swoole 使用 Xhprof 调试 HTTP服务

昨天服务出了问题,生产环境代码需要优化

编程语言
PHP - 将GD图像转换回二进制数据

我有一个从imagecreatefromstring创建的GD图像资源。经过一些图像操作后,我想将其转换回二进制数据。我该怎么做?手册中看不到任何功能...