添加UDP工具类

This commit is contained in:
qdxkrs 2022-02-15 15:41:45 +08:00
parent c27ff27f9e
commit 2920bdb516
7 changed files with 641 additions and 0 deletions

296
.gitignore vendored Normal file
View File

@ -0,0 +1,296 @@
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# spec
manage.spec
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
staticfiles/
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# pyenv
.python-version
# Environments
.venv
venv/
ENV/
.vscode
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
### Node template
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# nyc test coverage
.nyc_output
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# Typescript v1 declaration files
typings/
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
### Linux template
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
# .nfs files are created when an open file is removed but is still being accessed
.nfs*
### VisualStudioCode template
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
### Windows template
# Windows thumbnail cache files
Thumbs.db
ehthumbs.db
ehthumbs_vista.db
# Dump file
*.stackdump
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp
# Windows shortcuts
*.lnk
### macOS template
# General
*.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### SublimeText template
# Cache files for Sublime Text
*.tmlanguage.cache
*.tmPreferences.cache
*.stTheme.cache
# Workspace files are user-specific
*.sublime-workspace
# Project files should be checked into the repository, unless a significant
# proportion of contributors will probably not be using Sublime Text
# *.sublime-project
# SFTP configuration file
sftp-config.json
# Package control specific files
Package Control.last-run
Package Control.ca-list
Package Control.ca-bundle
Package Control.system-ca-bundle
Package Control.cache/
Package Control.ca-certs/
Package Control.merged-ca-bundle
Package Control.user-ca-bundle
oscrypto-ca-bundle.crt
bh_unicode_properties.cache
# Sublime-github package stores a github token in this file
# https://packagecontrol.io/packages/sublime-github
GitHub.sublime-settings
### Vim template
# Swap
[._]*.s[a-v][a-z]
[._]*.sw[a-p]
[._]s[a-v][a-z]
[._]sw[a-p]
# Session
Session.vim
# Temporary
.netrwhist
# Auto-generated tag files
tags
### VirtualEnv template
# Virtualenv
[Bb]in
[Ii]nclude
[Ll]ib
[Ll]ib64
[Ss]cripts
pyvenv.cfg
pip-selfcheck.json
.env
### Project template
izan/media/
.pytest_cache/

BIN
dms_client.db Normal file

Binary file not shown.

View File

@ -0,0 +1,95 @@
#!/usr/bin/python3
# coding= utf-8
import sqlite3
# @param text 文本
# @return TrueFalse不是
def __string(text):
return True if isinstance(text, str) else False
# 检查文本类型是否为浮点型
# @param text 文本
# @return TrueFalse不是
def __float(text):
if __string(text):
try:
return True if float("{0}".format(text)) else False
except Exception:
return False
else:
return True if isinstance(text, float) else False
# 检查文本类型是否为浮点型
# @param text 文本
# @return TrueFalse不是
def __int(text):
if __string(text):
try:
return True if float("{0}".format(text)) else False
except Exception:
return False
else:
return True if isinstance(text, float) else False
# 检查文本类型是否为数字型
# @param text 文本
# @return TrueFalse不是
def __number(text):
return True if re.search("[^0-9]", text) == None else False
def func_year(s):
print('func_year:', s)
def func_month(s):
print('func_month:', s)
def create_data_collection_info_table():
con = sqlite3.connect("../dms_client.db")
cur = con.cursor()
sql = "CREATE TABLE IF NOT EXISTS data_collection_info(id INTEGER PRIMARY KEY,collection_code TEXT,function_name TEXT,describe TEXT)"
cur.execute(sql)
# ①:添加单条数据
data = "1,'Desire',5,'test'"
cur.execute('INSERT INTO data_collection_info VALUES (%s)' % data)
# ②:添加单条数据
cur.execute("INSERT INTO data_collection_info values(?,?,?,?)", (6, "zgq", 20, 'test'))
# ③:添加多条数据
cur.executemany('INSERT INTO data_collection_info VALUES (?,?,?,?)',
[(3, 'name3', 19, 'test'), (4, 'name4', 26, 'test')])
cur.execute("UPDATE data_collection_info SET collection_code=? WHERE id=?", ('test1', 19))
con.commit()
# 关闭游标
cur.close()
# 断开数据库连接
con.close()
# 动态调用函数
# @param tag 标签名
# @param text 文本
# @return True:OK, False:NG
def item_check(func_name, text):
if type and text:
try:
# 调用导入模块中的函数,并传参
return eval("__{0}".format(func_name))(text)
except Exception:
return False
else:
return False
if __name__ == '__main__':
result = item_check("num", 123.23)
print(result)
strs = ['year', 'month']
for s in strs:
globals().get('func_%s' % s)(s)

46
util/UDP_Receive.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/python3
# coding= utf-8
import socket
import sys
import struct
# 本机信息
host_ip = socket.gethostbyname(socket.gethostname())
# 组播组IP和端口
mcast_group_ip = '239.255.255.252'
mcast_group_port = 5678
def receiver():
# 建立接收 udp socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# linux能绑定网卡这里绑定组播IP地址不会服错windows没法绑定网卡只能绑定本网卡IP地址
if "linux" in sys.platform:
# 绑定到的网卡名如果自己的不是eth0则修改
nic_name = 0
# 监听的组播地址
sock.setsockopt(socket.SOL_SOCKET, 25, nic_name)
sock.bind((mcast_group_ip, mcast_group_port))
else:
sock.bind((host_ip, mcast_group_port))
# 加入组播组
mq_request = struct.pack("=4sl", socket.inet_aton(mcast_group_ip), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mq_request)
# 设置非阻塞
sock.setblocking(True)
while True:
try:
data, address = sock.recvfrom(4096)
except socket.error as e:
print(f"while receive message error occur:{e}")
else:
print("Receive Data!")
print("FROM: ", address)
print("DATA: ", data.decode('utf-8'))
if __name__ == "__main__":
receiver()

32
util/UDP_Sender.py Normal file
View File

@ -0,0 +1,32 @@
#!/usr/bin/python3
# coding= utf-8
import time
import struct
import socket
# 本机信息
host_ip = socket.gethostname()
host_port = 6501
# 组播组IP和端口
mcast_group_ip = '239.255.255.252'
mcast_group_port = 5678
def sender():
send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
send_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
send_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
send_sock.bind((host_ip, host_port))
# 设置存活时长
ttl_bin = struct.pack('@i', 255)
send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin)
while True:
data = '12345 english 汉字#测试'
send_sock.sendto(str(data).encode('utf-8'), (mcast_group_ip, mcast_group_port))
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}: send finish.')
time.sleep(10)
if __name__ == "__main__":
sender()

81
util/find_GPS_image.py Normal file
View File

@ -0,0 +1,81 @@
import exifread
import re
import json
import requests
def latitude_and_longitude_convert_to_decimal_system(*arg):
"""
经纬度转为小数, param arg:
:return: 十进制小数
"""
return float(arg[0]) + ((float(arg[1]) + (float(arg[2].split('/')[0]) / float(arg[2].split('/')[-1]) / 60)) / 60)
def find_GPS_image(pic_path):
GPS = {}
date = ''
with open(pic_path, 'rb') as f:
tags = exifread.process_file(f)
for tag, value in tags.items():
if re.match('GPS GPSLatitudeRef', tag):
GPS['GPSLatitudeRef'] = str(value)
elif re.match('GPS GPSLongitudeRef', tag):
GPS['GPSLongitudeRef'] = str(value)
elif re.match('GPS GPSAltitudeRef', tag):
GPS['GPSAltitudeRef'] = str(value)
elif re.match('GPS GPSLatitude', tag):
try:
match_result = re.match('\[(\w*),(\w*),(\w.*)/(\w.*)\]', str(value)).groups()
GPS['GPSLatitude'] = int(match_result[0]), int(match_result[1]), int(match_result[2])
except:
deg, min, sec = [x.replace(' ', '') for x in str(value)[1:-1].split(',')]
GPS['GPSLatitude'] = latitude_and_longitude_convert_to_decimal_system(deg, min, sec)
elif re.match('GPS GPSLongitude', tag):
try:
match_result = re.match('\[(\w*),(\w*),(\w.*)/(\w.*)\]', str(value)).groups()
GPS['GPSLongitude'] = int(match_result[0]), int(match_result[1]), int(match_result[2])
except:
deg, min, sec = [x.replace(' ', '') for x in str(value)[1:-1].split(',')]
GPS['GPSLongitude'] = latitude_and_longitude_convert_to_decimal_system(deg, min, sec)
elif re.match('GPS GPSAltitude', tag):
GPS['GPSAltitude'] = str(value)
elif re.match('.*Date.*', tag):
date = str(value)
return {'GPS_information': GPS, 'date_information': date}
def find_address_from_GPS(GPS):
"""
使用Geocoding API把经纬度坐标转换为结构化地址
:param GPS:
:return:
"""
secret_key = 'zbLsuDDL4CS2U0M4KezOZZbGUY9iWtVf'
if not GPS['GPS_information']:
return '该照片无GPS信息'
lat, lng = GPS['GPS_information']['GPSLatitude'], GPS['GPS_information']['GPSLongitude']
baidu_map_api = "http://api.map.baidu.com/geocoder/v2/?ak={0}&callback=renderReverse&location={1},{2}s&output=json&pois=0".format(
secret_key, lat, lng)
response = requests.get(baidu_map_api)
content = response.text.replace("renderReverse&&renderReverse(", "")[:-1]
baidu_map_address = json.loads(content)
formatted_address = baidu_map_address["result"]["formatted_address"]
province = baidu_map_address["result"]["addressComponent"]["province"]
city = baidu_map_address["result"]["addressComponent"]["city"]
district = baidu_map_address["result"]["addressComponent"]["district"]
return formatted_address,province,city,district
pic_path = 'D:\pythonjob\pic-time&location\DJI_0001.jpg'
GPS_info = find_GPS_image(pic_path)
address = find_address_from_GPS(GPS=GPS_info)
#print(GPS_info)
#print(address)
x = list(GPS_info.values())
#print(x)
time = x[1]
gps_dict_formate = x[0]
y = list(gps_dict_formate.values())
information = '拍照时间:'+time+',拍照地点:'+str(address[0])+'(经度:'+str(y[2])+' '+str(y[3])+',纬度:'+str(y[0])+' '+str(y[1])+',高度:'+str(y[5])+'米)'
print(pic_path)
print(information)

View File

@ -0,0 +1,91 @@
# coding: utf-8
# Authorboxker
# Mailicjb@foxmail.com
import sqlite3
import os
class SimpleSQLite3Tool:
"""
simpleToolSql for sqlite3
简单数据库工具类
编写这个类主要是为了封装sqlite继承此类复用方法
"""
def __init__(self, filename="stsql"):
"""
初始化数据库默认文件名 stsql.db
filename文件名
"""
self.filename = filename
self.db = sqlite3.connect(self.filename)
self.c = self.db.cursor()
def close(self):
"""
关闭数据库
"""
self.c.close()
self.db.close()
def execute(self, sql, param=None):
"""
执行数据库的增
sqlsql语句
param数据可以是list或tuple亦可是None
retutn成功返回True
"""
try:
if param is None:
self.c.execute(sql)
else:
if type(param) is list:
self.c.executemany(sql, param)
else:
self.c.execute(sql, param)
count = self.db.total_changes
self.db.commit()
except Exception as e:
print(e)
return False, e
if count > 0:
return True
else:
return False
def query(self, sql, param=None):
"""
查询语句
sqlsql语句
param参数,可为None
retutn成功返回True
"""
if param is None:
self.c.execute(sql)
else:
self.c.execute(sql, param)
return self.c.fetchall()
# def set(self,table,field=" * ",where="",isWhere=False):
# self.table = table
# self.filed = field
# if where != "" :
# self.where = where
# self.isWhere = True
# return True
if __name__ == "__main__":
# 数据库文件位置
sql = SimpleSQLite3Tool("../dms_client.db")
# f = sql.execute("create table test (id int not null,name text not null,age int);")
# print("ok")
# sql.execute("insert into test (id,name,age) values (?,?,?);", [(1, 'abc', 15), (2, 'bca', 16)])
# res = sql.query("select * from test;")
# print(res)
# sql.execute("insert into test (id,name) values (?,?);", (3, 'bac'))
# res = sql.query("select * from test where id=?;", (3,))
res = sql.query("select * from data_collection_info;")
print(res)
sql.close()