-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathjd_gold_creator.py
executable file
·136 lines (120 loc) · 5.16 KB
/
jd_gold_creator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/local/bin/python
# -*- coding: utf-8 -*-
# @Time : 2021/7/13 1:58 下午
# @File : jd_gold_creator.py
# @Project : jd_scripts
# @Cron : 30 7,19 * * *
# @Desc : 京东APP->排行榜->金榜创造营
import asyncio
import aiohttp
import json
import re
import random
from urllib.parse import unquote, quote
from config import USER_AGENT
from utils.console import println
from utils.jd_init import jd_init
@jd_init
class JdGoldCreator:
headers = {
'referer': 'https://h5.m.jd.com/babelDiy/Zeus/2H5Ng86mUJLXToEo57qWkJkjFPxw/index.html',
'user-agent': USER_AGENT,
}
async def request(self, session, function_id, body=None):
try:
if body is None:
body = {}
url = 'https://api.m.jd.com/client.action?functionId={}&body={}' \
'&appid=content_ecology&clientVersion=10.0.6&client=wh5' \
'&jsonp=jsonp_kr1mdm3p_12m_29&eufv=false'.format(function_id, quote(json.dumps(body)))
response = await session.post(url=url)
text = await response.text()
temp = re.search(r'\((.*)\);', text).group(1)
data = json.loads(temp)
await asyncio.sleep(3)
return data
except Exception as e:
println('{}, 获取数据失败:{}'.format(self.account, e.args))
return None
async def get_index_data(self, session):
"""
获取获得首页数据
:param session:
:return:
"""
return await self.request(session, 'goldCreatorTab', {"subTitleId": "", "isPrivateVote": "0"})
async def do_vote(self, session):
"""
进行投票
:param session:
:param index_data:
:return:
"""
println('{}, 正在获取投票主题...'.format(self.account))
data = await self.get_index_data(session)
if not data or data['code'] != '0':
println('{}, 获取数据失败!'.format(self.account))
return
subject_list = data['result']['subTitleInfos']
stage_id = data['result']['mainTitleHeadInfo']['stageId']
for subject in subject_list:
if 'taskId' not in subject:
continue
body = {
"groupId": subject['matGrpId'],
"stageId": stage_id,
"subTitleId": subject['subTitleId'],
"batchId": subject['batchId'],
"skuId": "",
"taskId": int(subject['taskId']),
}
res = await self.request(session, 'goldCreatorDetail', body)
if res['code'] != '0':
println('{}, 获取主题:《{}》商品列表失败!'.format(self.account, subject['shortTitle']))
else:
println('{}, 获取主题:《{}》商品列表成功, 开始投票!'.format(self.account, subject['shortTitle']))
await asyncio.sleep(2)
task_list = res['result']['taskList']
sku_list = res['result']['skuList']
item_id = res['result']['signTask']['taskItemInfo']['itemId']
sku = random.choice(sku_list)
body = {
"stageId": stage_id,
"subTitleId": subject['subTitleId'],
"skuId": sku['skuId'],
"taskId": int(subject['taskId']),
"itemId": item_id,
"rankId": sku["rankId"],
"type": 1,
"batchId": subject['batchId'],
}
res = await self.request(session, 'goldCreatorDoTask', body)
if res['code'] != '0':
println('{}, 为商品:《{}》投票失败!'.format(self.account, sku['name']))
else:
if 'lotteryCode' in res['result'] and res['result']['lotteryCode'] != '0':
println('{}, 为商品:《{}》投票失败, {}'.format(self.account, sku['name'], res['result']['lotteryMsg']))
elif 'taskCode' in res['result'] and res['result']['taskCode'] == '103':
println('{}, 为商品: 《{}》投票失败, {}!'.format(self.account, sku['name'], res['result']['taskMsg']))
else:
println('{}, 为商品:《{}》投票成功, 获得京豆:{}'.format(self.account, sku['name'], res['result']['lotteryScore']))
for task in task_list:
if task[0]['taskStatus'] == 2:
continue
body = {
"taskId": int(task[0]['taskId']),
"itemId": task[0]['taskItemInfo']['itemId'],
"type": 2,
"batchId": subject['batchId']
}
res = await self.request(session, 'goldCreatorDoTask', body)
println('{}, 做额外任务: 《{}》, 结果:{}!'.format(self.account, task[0]['taskItemInfo']['title'], res))
async def run(self):
"""
:return:
"""
async with aiohttp.ClientSession(headers=self.headers, cookies=self.cookies) as session:
await self.do_vote(session) # 投票
if __name__ == '__main__':
from utils.process import process_start
process_start(JdGoldCreator, '金榜创造营')