Python的一些小知识点_14


=Start=

缘由:

我现在的工作内容一般不涉及到具体的代码实现,但工作中也少不了一些需要自动化提高工作效率的内容,所以多少还会写一些代码。Golang现在很火,而且之前也学过一段时间,但是对于字符串处理和网络请求的工作,还是更习惯用Python来实现(用Java也能完成,但是也不太熟,而且实现起这些功能来说略显笨重)。简单记录整理一下最近碰到的几个Python知识点,方便后面有需要的时候参考(才发现上一次记录这个主题还是在2019年5月14日)。

正文:

参考解答:
  1. 用Python往Kafka发消息

安装依赖:

pip install kafka-python

生产消息示例:

import json
from kafka import KafkaProducer

# bootstrap = '192.168.3.4:9092'
bootstrap = ['192.168.3.4:9092', '192.168.3.5:9092', '192.168.3.6:9092']
producer = KafkaProducer(bootstrap_servers=bootstrap)

info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert", "ip":"1.2.3.4", "user_ip":"sec-test", "user_ip_org":"/a/b/c", "user_sso":"user1", "user_sso_org":"/a/b/d", "extra_info":""}

for idx in range(5):
    print(idx)
    producer.send('my_favorite_topic_name', json.dumps(info_dict).encode('utf-8'))
#for
producer.flush()

注意事项:

1. 发布消息的send方法的第2个参数,即要发送的消息内容value需要是bytes字节类型的
producer.send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

所以当你要发送的是json字符串时,要么就是在新建 KafkaProducer 时通过 value_serializer 参数来自动编码,要么就是手动在每一个 send 里面对要发送的 value 内容进行编码处理。

>>> import json
>>> info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert"}

# 方法一
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', info_dict)

# 方法二
>>> producer = KafkaProducer()
>>> producer.send('fizzbuzz', json.dumps(info_dict).encode('utf-8'))

2. 测试的时候如果 send 之后但是 topic 里没消息,可能需要手动 flush 才行
>>> producer.flush()
  1. Python中 dict 的处理
# 将 dict 按json字符串格式写文件
import json
exDict = {'exDict': 'test'}
with open('mydict_file1.txt', 'w') as fp:
     fp.write(json.dumps(exDict)) # use `json.loads` to do the reverse

# Python 3 测试可行,但是写入的内容不是标准的json格式,用的是单引号而非双引号
with open('mydict_file2.txt', 'w') as fp:
    print(exDict, file=fp)
  1. Python中使用 APScheduler 实现定时任务

安装依赖:

$ pip install apscheduler

简单示例:

# 当你的程序只是用来执行调度时可以使用 BlockingScheduler ,它会以阻塞的方式一直运行,以便定期执行任务调度
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

n_seconds = 3

def my_clock():
    print('time is {0}'.format(datetime.now()))

if __name__ == "__main__":
    scheduler = BlockingScheduler()
    scheduler.add_job(my_clock, trigger="interval", seconds=n_seconds, next_run_time=datetime.now())
    scheduler.start()
#if


# 当你不使用 Tornado/Twisted/asyncio/gevent 等框架或模块,又希望调度程序在应用程序的后台运行时使用 BackgroundScheduler ,但是你自己的程序需要一直在线,否则主程序退出了调度任务肯定也就无法正常执行了
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

from apscheduler.schedulers.background import BackgroundScheduler
def job1():
    print('job1: time is {0}'.format(datetime.now()))
    with open('test_BackgroundScheduler.txt', 'a') as fp:
        fp.write('job1: time is {0}\n'.format(datetime.now()))
    #with

if __name__ == "__main__":
    scheduler = BackgroundScheduler()
    scheduler.add_job(job1, trigger='cron', second='*/10')
    scheduler.start()
    while 1: #需要在代码中有一个类似while循环的功能保证主程序在线,否则任务无法按照预期进行调度
        time.sleep(5)
    #while
#if
  1. Python中使用 elasticsearch5 模块进行ES的查询
# 查询body可以通过在页面上用Kibana时,通过浏览器控制台抓包看看Request Payload来快速获取
from elasticsearch5 import Elasticsearch
es = Elasticsearch(hosts="10.20.30.40:9200", timeout=60)

body = {
    # "key": "value"
}

result = es.search(index="index_name-*", body=body) #search()返回的是 dict 类型
hits = result.get('hits')
shards = result.get('_shards')
print('{0}\n{1}'.format(hits, shards))

import jsonpath
data = jsonpath.jsonpath(result, "$.aggregations.aggr_name.buckets[*]")
参考链接:

kafka-python
https://kafka-python.readthedocs.io/en/master/index.html#kafkaproducer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send

Unable to send messages to topic in Kafka Python
https://stackoverflow.com/questions/62405458/unable-to-send-messages-to-topic-in-kafka-python

Writing a dictionary to a text file?
https://stackoverflow.com/questions/36965507/writing-a-dictionary-to-a-text-file

APScheduler User guide
https://apscheduler.readthedocs.io/en/3.x/userguide.html

Python 定时任务框架 APScheduler 详解
https://www.cnblogs.com/leffss/p/11912364.html

python BlockingScheduler定时任务
https://blog.csdn.net/lipachong/article/details/99962134

elasticsearch5 5.5.6
https://pypi.org/project/elasticsearch5/
https://elasticsearch-py.readthedocs.io/en/v7.17.7/#example-usage

ElasticSearch的scroll接口搜索示例
https://ixyzero.com/blog/archives/4112.html

JSONPath – XPath for JSON
https://goessner.net/articles/JsonPath/

https://pypi.org/project/jsonpath/

=END=


《 “Python的一些小知识点_14” 》 有 4 条评论

  1. 数据安全加固:深入解析滴滴ES安全认证技术方案
    https://mp.weixin.qq.com/s/VA1LBau7Ed8FCmTLR6r_Eg
    `
    由于ES具有强大的搜索和分析功能,同时也因其开源和易于使用而成为黑客攻击的目标。近些年,业界ES数据泄露事件频发, 以下是一些比较严重的数据泄露案件:

    2021年12月,Socialarks泄露了400GB数据,由于ElasticSearch数据库设置错误,泄露了超过3.18亿条用户记录,涉及到Instagram、领英、Facebook等多个社交平台的用户信息。[1]
    2022年6月,美的(midea.com) 某分站存在Elasticsearch未授权访问漏洞。[2]
    2022年8月, 超过2.8亿条印度公民记录在网上泄露,包括用户账户信息、银行账户信息和个人身份信息。[3]

    滴滴引入 ES 时,也存在ES http 9200端口未授权访问和 Kibana http 5601端口未授权访问问题,为了保障数据安全,滴滴ES团队决定尽快修复这些问题。

    1-问题描述
    问题在于ES整个服务具备认证和鉴权能力,但ES集群单独对外时不具备安全认证能力。ES服务通过 Gateway 对外提供安全认证和鉴权服务,但 ES 集群本身并没有安全认证能力,任何人只要获取到 ES 集群 IP、端口就可以对ES集群进行任何操作。因此,我们需要对 ES 集群增加安全认证能力,并且需要对访问ES集群的admin、gateway、客户端进行安全适配工作。

    2-解决方案
    方案一:ES X-Pack 插件
    方案二:自研 ES 安全插件

    1、自研 ES 安全插件原理简介

    通过自研插件的方式实现 http 请求拦截器,该拦截器用于获取 http请求头携带的账号密码信息,根据本地配置文件中保存的账号密码信息进行匹配认证。如果认证成功可以继续执行后续逻辑,失败则返回认证失败异常。

    2、优势

    架构简单,逻辑清晰,只需在HTTP请求处理环节中进行简单的字符串校验,无需涉及节点内部TCP通信验证。

    支持ES集群滚动重启升级。通过增加动态集群配置能够很方便的开启关闭权限校验,对滚动升级友好。

    支持一键开关安全认证能力,可以快速止损。新增集群动态配置,一键开关安全认证,用户因为安全认证访问异常时可以快速止损。

    Kibana不需要进行代码改造
    1) 只需要在Kibana.yml配置正确账号密码,kibana请求就会自动携带该账号密码可访问ES集群
    2) 登录Kibana页面也需要输入正确的账号密码方可访问,不要额外的认证跳转页面

    避免误操作修改密码导致请求不可用。账号密码配置在elasticsearch.yml 并且强制不可修改

    3、缺点

    只有大账号认证功能,没有鉴权、审计等其他功能
    后期修改密码需要集群节点重启生效

    # 方案选择

    从开发量、易运维、稳定性、易用性等角度综合比较上述两个方案,我们最终决定采用了方案二。下面是采用方案二后ES生态的查询流程:

    1. ES客户端向Gateway发起查询请求。

    2. Gateway对该请求进行认证和鉴权,鉴权通过后会到Admin获取对应集群的访问地址以及访问ES集群的账号密码,并缓存到本地。

    3. Gateway通过步骤2获取到的ES集群账号密码,将查询请求转发到对应ES集群。

    4. ES执行查询逻辑将结果返回给Gateway,Gateway将该结果返回给客户端。至此查询流程结束。
    `

  2. Python中函数参数的默认值
    Default arguments in Python
    https://www.geeksforgeeks.org/default-arguments-in-python/
    `
    def student(firstname, lastname =’Mark’, standard =’Fifth’):
    print(firstname, lastname, ‘studies in’, standard, ‘Standard’)
    #def

    # 1 positional argument
    student(‘John’)

    # 3 positional arguments
    student(‘John’, ‘Gates’, ‘Seventh’)

    # 2 positional arguments
    student(‘John’, ‘Gates’)
    student(‘John’, ‘Seventh’)

    `
    SyntaxError: non-default argument follows default argument
    https://stackoverflow.com/questions/24719368/syntaxerror-non-default-argument-follows-default-argument
    `
    Python的函数中不允许把含有默认值的参数放在不含默认值的参数的前面,这样会导致报错;遇到报错时,只需要把参数的位置调整一下就好。
    `
    Why can’t non-default arguments follow default arguments?
    https://stackoverflow.com/questions/16932825/why-cant-non-default-arguments-follow-default-arguments

  3. Python 定时任务框架 APScheduler 详解
    https://www.cnblogs.com/leffss/p/11912364.html
    `
    # 一个需要传参数才能正常调用的函数
    def job2(x, y):
    print(‘job2′, x, y)

    # 每天 2 点 30 分 5 秒运行
    # 当定时调用需要传参数的函数时,传参数是通过 args 参数中的来传递的
    scheduler.add_job(job2, trigger=’cron’,hour=2,minute=30,second=5, args=[‘hello’, ‘world’])

    用 crontab 的语法来指定要运行的时间点。
    `
    apscheduler.triggers.cron
    https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html

  4. PytzUsageWarning: The zone attribute is specific to pytz’s interface; please migrate to a new time zone provider
    https://stackoverflow.com/questions/69776414/pytzusagewarning-the-zone-attribute-is-specific-to-pytzs-interface-please-mig
    `
    /py_venv/py3/lib/python3.6/site-packages/apscheduler/util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495).

    scheduler = BlockingScheduler(timezone=”Asia/Shanghai”)
    `

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注