=Start=
缘由:
我现在的工作内容一般不涉及到具体的代码实现,但工作中也少不了一些需要自动化提高工作效率的内容,所以多少还会写一些代码。Golang现在很火,而且之前也学过一段时间,但是对于字符串处理和网络请求的工作,还是更习惯用Python来实现(用Java也能完成,但是也不太熟,而且实现起这些功能来说略显笨重)。简单记录整理一下最近碰到的几个Python知识点,方便后面有需要的时候参考(才发现上一次记录这个主题还是在2019年5月14日)。
正文:
参考解答:
- 用Python往Kafka发消息
安装依赖:
pip install kafka-python
生产消息示例:
import json
from kafka import KafkaProducer
# bootstrap = '192.168.3.4:9092'
bootstrap = ['192.168.3.4:9092', '192.168.3.5:9092', '192.168.3.6:9092']
producer = KafkaProducer(bootstrap_servers=bootstrap)
info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert", "ip":"1.2.3.4", "user_ip":"sec-test", "user_ip_org":"/a/b/c", "user_sso":"user1", "user_sso_org":"/a/b/d", "extra_info":""}
for idx in range(5):
print(idx)
producer.send('my_favorite_topic_name', json.dumps(info_dict).encode('utf-8'))
#for
producer.flush()
注意事项:
1. 发布消息的send方法的第2个参数,即要发送的消息内容value需要是bytes字节类型的
producer.send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
所以当你要发送的是json字符串时,要么就是在新建 KafkaProducer 时通过 value_serializer 参数来自动编码,要么就是手动在每一个 send 里面对要发送的 value 内容进行编码处理。
>>> import json
>>> info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert"}
# 方法一
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', info_dict)
# 方法二
>>> producer = KafkaProducer()
>>> producer.send('fizzbuzz', json.dumps(info_dict).encode('utf-8'))
2. 测试的时候如果 send 之后但是 topic 里没消息,可能需要手动 flush 才行
>>> producer.flush()
- Python中 dict 的处理
# 将 dict 按json字符串格式写文件
import json
exDict = {'exDict': 'test'}
with open('mydict_file1.txt', 'w') as fp:
fp.write(json.dumps(exDict)) # use `json.loads` to do the reverse
# Python 3 测试可行,但是写入的内容不是标准的json格式,用的是单引号而非双引号
with open('mydict_file2.txt', 'w') as fp:
print(exDict, file=fp)
- Python中使用 APScheduler 实现定时任务
安装依赖:
$ pip install apscheduler
简单示例:
# 当你的程序只是用来执行调度时可以使用 BlockingScheduler ,它会以阻塞的方式一直运行,以便定期执行任务调度
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
n_seconds = 3
def my_clock():
print('time is {0}'.format(datetime.now()))
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(my_clock, trigger="interval", seconds=n_seconds, next_run_time=datetime.now())
scheduler.start()
#if
# 当你不使用 Tornado/Twisted/asyncio/gevent 等框架或模块,又希望调度程序在应用程序的后台运行时使用 BackgroundScheduler ,但是你自己的程序需要一直在线,否则主程序退出了调度任务肯定也就无法正常执行了
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
def job1():
print('job1: time is {0}'.format(datetime.now()))
with open('test_BackgroundScheduler.txt', 'a') as fp:
fp.write('job1: time is {0}\n'.format(datetime.now()))
#with
if __name__ == "__main__":
scheduler = BackgroundScheduler()
scheduler.add_job(job1, trigger='cron', second='*/10')
scheduler.start()
while 1: #需要在代码中有一个类似while循环的功能保证主程序在线,否则任务无法按照预期进行调度
time.sleep(5)
#while
#if
- Python中使用 elasticsearch5 模块进行ES的查询
# 查询body可以通过在页面上用Kibana时,通过浏览器控制台抓包看看Request Payload来快速获取
from elasticsearch5 import Elasticsearch
es = Elasticsearch(hosts="10.20.30.40:9200", timeout=60)
body = {
# "key": "value"
}
result = es.search(index="index_name-*", body=body) #search()返回的是 dict 类型
hits = result.get('hits')
shards = result.get('_shards')
print('{0}\n{1}'.format(hits, shards))
import jsonpath
data = jsonpath.jsonpath(result, "$.aggregations.aggr_name.buckets[*]")
参考链接:
kafka-python
https://kafka-python.readthedocs.io/en/master/index.html#kafkaproducer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send
Unable to send messages to topic in Kafka Python
https://stackoverflow.com/questions/62405458/unable-to-send-messages-to-topic-in-kafka-python
Writing a dictionary to a text file?
https://stackoverflow.com/questions/36965507/writing-a-dictionary-to-a-text-file
APScheduler User guide
https://apscheduler.readthedocs.io/en/3.x/userguide.html
Python 定时任务框架 APScheduler 详解
https://www.cnblogs.com/leffss/p/11912364.html
python BlockingScheduler定时任务
https://blog.csdn.net/lipachong/article/details/99962134
elasticsearch5 5.5.6
https://pypi.org/project/elasticsearch5/
https://elasticsearch-py.readthedocs.io/en/v7.17.7/#example-usage
ElasticSearch的scroll接口搜索示例
https://ixyzero.com/blog/archives/4112.html
JSONPath – XPath for JSON
https://goessner.net/articles/JsonPath/
https://pypi.org/project/jsonpath/
=END=
《 “Python的一些小知识点_14” 》 有 4 条评论
数据安全加固:深入解析滴滴ES安全认证技术方案
https://mp.weixin.qq.com/s/VA1LBau7Ed8FCmTLR6r_Eg
`
由于ES具有强大的搜索和分析功能,同时也因其开源和易于使用而成为黑客攻击的目标。近些年,业界ES数据泄露事件频发, 以下是一些比较严重的数据泄露案件:
2021年12月,Socialarks泄露了400GB数据,由于ElasticSearch数据库设置错误,泄露了超过3.18亿条用户记录,涉及到Instagram、领英、Facebook等多个社交平台的用户信息。[1]
2022年6月,美的(midea.com) 某分站存在Elasticsearch未授权访问漏洞。[2]
2022年8月, 超过2.8亿条印度公民记录在网上泄露,包括用户账户信息、银行账户信息和个人身份信息。[3]
滴滴引入 ES 时,也存在ES http 9200端口未授权访问和 Kibana http 5601端口未授权访问问题,为了保障数据安全,滴滴ES团队决定尽快修复这些问题。
1-问题描述
问题在于ES整个服务具备认证和鉴权能力,但ES集群单独对外时不具备安全认证能力。ES服务通过 Gateway 对外提供安全认证和鉴权服务,但 ES 集群本身并没有安全认证能力,任何人只要获取到 ES 集群 IP、端口就可以对ES集群进行任何操作。因此,我们需要对 ES 集群增加安全认证能力,并且需要对访问ES集群的admin、gateway、客户端进行安全适配工作。
2-解决方案
方案一:ES X-Pack 插件
方案二:自研 ES 安全插件
1、自研 ES 安全插件原理简介
通过自研插件的方式实现 http 请求拦截器,该拦截器用于获取 http请求头携带的账号密码信息,根据本地配置文件中保存的账号密码信息进行匹配认证。如果认证成功可以继续执行后续逻辑,失败则返回认证失败异常。
2、优势
架构简单,逻辑清晰,只需在HTTP请求处理环节中进行简单的字符串校验,无需涉及节点内部TCP通信验证。
支持ES集群滚动重启升级。通过增加动态集群配置能够很方便的开启关闭权限校验,对滚动升级友好。
支持一键开关安全认证能力,可以快速止损。新增集群动态配置,一键开关安全认证,用户因为安全认证访问异常时可以快速止损。
Kibana不需要进行代码改造
1) 只需要在Kibana.yml配置正确账号密码,kibana请求就会自动携带该账号密码可访问ES集群
2) 登录Kibana页面也需要输入正确的账号密码方可访问,不要额外的认证跳转页面
避免误操作修改密码导致请求不可用。账号密码配置在elasticsearch.yml 并且强制不可修改
3、缺点
只有大账号认证功能,没有鉴权、审计等其他功能
后期修改密码需要集群节点重启生效
# 方案选择
从开发量、易运维、稳定性、易用性等角度综合比较上述两个方案,我们最终决定采用了方案二。下面是采用方案二后ES生态的查询流程:
1. ES客户端向Gateway发起查询请求。
2. Gateway对该请求进行认证和鉴权,鉴权通过后会到Admin获取对应集群的访问地址以及访问ES集群的账号密码,并缓存到本地。
3. Gateway通过步骤2获取到的ES集群账号密码,将查询请求转发到对应ES集群。
4. ES执行查询逻辑将结果返回给Gateway,Gateway将该结果返回给客户端。至此查询流程结束。
`
Python中函数参数的默认值
Default arguments in Python
https://www.geeksforgeeks.org/default-arguments-in-python/
`
def student(firstname, lastname =’Mark’, standard =’Fifth’):
print(firstname, lastname, ‘studies in’, standard, ‘Standard’)
#def
# 1 positional argument
student(‘John’)
# 3 positional arguments
student(‘John’, ‘Gates’, ‘Seventh’)
# 2 positional arguments
student(‘John’, ‘Gates’)
student(‘John’, ‘Seventh’)
`
SyntaxError: non-default argument follows default argument
https://stackoverflow.com/questions/24719368/syntaxerror-non-default-argument-follows-default-argument
`
Python的函数中不允许把含有默认值的参数放在不含默认值的参数的前面,这样会导致报错;遇到报错时,只需要把参数的位置调整一下就好。
`
Why can’t non-default arguments follow default arguments?
https://stackoverflow.com/questions/16932825/why-cant-non-default-arguments-follow-default-arguments
Python 定时任务框架 APScheduler 详解
https://www.cnblogs.com/leffss/p/11912364.html
`
# 一个需要传参数才能正常调用的函数
def job2(x, y):
print(‘job2′, x, y)
# 每天 2 点 30 分 5 秒运行
# 当定时调用需要传参数的函数时,传参数是通过 args 参数中的来传递的
scheduler.add_job(job2, trigger=’cron’,hour=2,minute=30,second=5, args=[‘hello’, ‘world’])
用 crontab 的语法来指定要运行的时间点。
`
apscheduler.triggers.cron
https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html
PytzUsageWarning: The zone attribute is specific to pytz’s interface; please migrate to a new time zone provider
https://stackoverflow.com/questions/69776414/pytzusagewarning-the-zone-attribute-is-specific-to-pytzs-interface-please-mig
`
/py_venv/py3/lib/python3.6/site-packages/apscheduler/util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495).
scheduler = BlockingScheduler(timezone=”Asia/Shanghai”)
`