Python批量推送模板消息
⽬录
1. 依赖包
Django==3.1.3
django-redis==4.12.1
redis==3.5.3
requests==2.24.0
2. 消息推送的步骤
1. 获取 access_token;
2. 构建数据;
3. 发送推送请求。
3. 功能描述
3.1. 注意事项
1. 要注意模板消息的类型,⼀次性订阅消息 和 长期订阅消息。
1. ⼀次性订阅消息
⼀次性订阅消息⽤于解决⽤户使⽤⼩程序后,后续服务环节的通知问题。⽤户⾃主订阅后,开发者可不限时间地下发⼀条对应的服务消息;每条消息可单独订阅或退订。
2. 长期订阅消息
⼀次性订阅消息可满⾜⼩程序的⼤部分服务场景需求,但线下公共服务领域存在⼀次性订阅⽆法满⾜的场景,如航班延误,需根据航班实时动态来多次发送消息提醒。为便于服务,我们提供了长期性订阅消息,⽤户订阅⼀次后,开发者可长期下发多条消息。
⽬前长期性订阅消息仅向政务民⽣、医疗、交通、⾦融、教育等线下公共服务开放,后期将逐步⽀持到其他线下公共服务业务。
2. access_token的有效期⽬前为2个⼩时,需定时刷新,重复获取将导致上次获取的access_token失效。
3.2. 消息推送模板的参数说明
参数是否必填说明
touser是接收者openid
开发程序template_id是模板ID
url否模板跳转链接(海外帐号没有跳转能⼒)
miniprogram否跳⼩程序所需数据,不需跳⼩程序可不⽤传该数据
appid是所需跳转到的⼩程序appid(该⼩程序appid必须与发模板消息的是绑定关联关系,暂不⽀持⼩游戏)pagepath否所需跳转到⼩程序的具体页⾯路径,⽀持带参数,(⽰例index?foo=bar),暂不⽀持⼩游戏data是模板数据
color否模板内容字体颜⾊,不填默认为⿊⾊
3.3. 消息推送模块代码
import copy
import redis
import requests
f import settings
from django_redis import get_redis_connection
class WeChatMsg:
def__init__(self, app_id=settings.WX_APP_ID, secret=settings.WX_APP_SECRET, template=None):        self.app_id = app_id
self.secret = secret
"touser":"",
"template_id":"",
"url":"",
"data":{
"first":{
"value":"",
},
"keyword1":{
"value":"",
},
"keyword2":{
"value":"",
},
"remark":{
"value":"",
},
}
}
if template:
self.access_token =None
self.user_list =list()
self.data_list =list()
def_get_token(self, force_update=False):
key_name ='wechat_token_{}'.format(self.app_id)
if force_update:
self.access_token =None
else:
self.access_token = get_data(key_name)
if not self.access_token:
url ="api.weixin.qq/cgi-bin/token?"
payload ={
'grant_type':'client_credential',
'appid': self.app_id,
'secret': self.secret,
}
response = (url, params=payload, timeout=50)
access_token = response.json().get("access_token")
if access_token:
set_data(key_name, access_token, ex=7100)
self.access_token = get_data(key_name)
print('access_token', self.access_token)
def make_data_list(self):
dels import User
user_openid_list = User.objects.filter(
pk__in=self.user_list
).exclude(
openid=''
openid__isnull=True
).values_list('openid', flat=True)
for openid in user_openid_list:
user_template = copy.plate)
user_template['touser']= openid
self.data_list.append(user_template)
def post_data(self):
# 获取 token
self._get_token()
url ="api.weixin.qq/cgi-bin/message/template/send?access_token={}".format(self.access_token) # 准备数据
if self.user_list:
self.make_data_list()
# 发送请求
for data in self.data_list:
json_template = json.dumps(data)
res = requests.post(url, data=json_template)
print('post_data', )
def get_redis():
"""
获取 Redis 的连接
"""
pool = get_redis_connection('default').connection_pool
r = redis.Redis(connection_pool=pool)
return r
def set_data(name, value,**kwargs):
# 将单条数据存⼊redis缓存
r = get_redis()
value = json.dumps(value)
r.set(name, value,**kwargs)
def get_data(name):
# 取出单条数据
r = get_redis()
value = r.get(name)
if value:
value = json.loads(value)
return value
3.4. 消息推送模块的调⽤
def send_wechat_msg():    w = WeChatMsg()
"touser":"",
"template_id":"",
"url":"",
"data":{
"first":{
"value":"",
},
"keyword1":{
"value":"",
},
"keyword2":{
"value":"",
},
"remark":{
"value":"",
},
}
}
w.user_list =[1,2]
w.post_data()