Python的一些小知识点_14


=Start=

缘由:

我现在的工作内容一般不涉及到具体的代码实现,但工作中也少不了一些需要自动化提高工作效率的内容,所以多少还会写一些代码。Golang现在很火,而且之前也学过一段时间,但是对于字符串处理和网络请求的工作,还是更习惯用Python来实现(用Java也能完成,但是也不太熟,而且实现起这些功能来说略显笨重)。简单记录整理一下最近碰到的几个Python知识点,方便后面有需要的时候参考(才发现上一次记录这个主题还是在2019年5月14日)。

正文:

参考解答:
  1. 用Python往Kafka发消息

安装依赖:

pip install kafka-python

生产消息示例:

import json
from kafka import KafkaProducer

# bootstrap = '192.168.3.4:9092'
bootstrap = ['192.168.3.4:9092', '192.168.3.5:9092', '192.168.3.6:9092']
producer = KafkaProducer(bootstrap_servers=bootstrap)

info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert", "ip":"1.2.3.4", "user_ip":"sec-test", "user_ip_org":"/a/b/c", "user_sso":"user1", "user_sso_org":"/a/b/d", "extra_info":""}

for idx in range(5):
    print(idx)
    producer.send('my_favorite_topic_name', json.dumps(info_dict).encode('utf-8'))
#for
producer.flush()

注意事项:

1. 发布消息的send方法的第2个参数,即要发送的消息内容value需要是bytes字节类型的
producer.send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

所以当你要发送的是json字符串时,要么就是在新建 KafkaProducer 时通过 value_serializer 参数来自动编码,要么就是手动在每一个 send 里面对要发送的 value 内容进行编码处理。

>>> import json
>>> info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert"}

# 方法一
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', info_dict)

# 方法二
>>> producer = KafkaProducer()
>>> producer.send('fizzbuzz', json.dumps(info_dict).encode('utf-8'))

2. 测试的时候如果 send 之后但是 topic 里没消息,可能需要手动 flush 才行
>>> producer.flush()
  1. Python中 dict 的处理
# 将 dict 按json字符串格式写文件
import json
exDict = {'exDict': 'test'}
with open('mydict_file1.txt', 'w') as fp:
     fp.write(json.dumps(exDict)) # use `json.loads` to do the reverse

# Python 3 测试可行,但是写入的内容不是标准的json格式,用的是单引号而非双引号
with open('mydict_file2.txt', 'w') as fp:
    print(exDict, file=fp)
  1. Python中使用 APScheduler 实现定时任务

安装依赖:

$ pip install apscheduler

简单示例:

# 当你的程序只是用来执行调度时可以使用 BlockingScheduler ,它会以阻塞的方式一直运行,以便定期执行任务调度
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

n_seconds = 3

def my_clock():
    print('time is {0}'.format(datetime.now()))

if __name__ == "__main__":
    scheduler = BlockingScheduler()
    scheduler.add_job(my_clock, trigger="interval", seconds=n_seconds, next_run_time=datetime.now())
    scheduler.start()
#if


# 当你不使用 Tornado/Twisted/asyncio/gevent 等框架或模块,又希望调度程序在应用程序的后台运行时使用 BackgroundScheduler ,但是你自己的程序需要一直在线,否则主程序退出了调度任务肯定也就无法正常执行了
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

from apscheduler.schedulers.background import BackgroundScheduler
def job1():
    print('job1: time is {0}'.format(datetime.now()))
    with open('test_BackgroundScheduler.txt', 'a') as fp:
        fp.write('job1: time is {0}\n'.format(datetime.now()))
    #with

if __name__ == "__main__":
    scheduler = BackgroundScheduler()
    scheduler.add_job(job1, trigger='cron', second='*/10')
    scheduler.start()
    while 1: #需要在代码中有一个类似while循环的功能保证主程序在线,否则任务无法按照预期进行调度
        time.sleep(5)
    #while
#if
  1. Python中使用 elasticsearch5 模块进行ES的查询
# 查询body可以通过在页面上用Kibana时,通过浏览器控制台抓包看看Request Payload来快速获取
from elasticsearch5 import Elasticsearch
es = Elasticsearch(hosts="10.20.30.40:9200", timeout=60)

body = {
    # "key": "value"
}

result = es.search(index="index_name-*", body=body) #search()返回的是 dict 类型
hits = result.get('hits')
shards = result.get('_shards')
print('{0}\n{1}'.format(hits, shards))

import jsonpath
data = jsonpath.jsonpath(result, "$.aggregations.aggr_name.buckets[*]")
参考链接:

kafka-python
https://kafka-python.readthedocs.io/en/master/index.html#kafkaproducer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send

Unable to send messages to topic in Kafka Python
https://stackoverflow.com/questions/62405458/unable-to-send-messages-to-topic-in-kafka-python

Writing a dictionary to a text file?
https://stackoverflow.com/questions/36965507/writing-a-dictionary-to-a-text-file

APScheduler User guide
https://apscheduler.readthedocs.io/en/3.x/userguide.html

Python 定时任务框架 APScheduler 详解
https://www.cnblogs.com/leffss/p/11912364.html

python BlockingScheduler定时任务
https://blog.csdn.net/lipachong/article/details/99962134

elasticsearch5 5.5.6
https://pypi.org/project/elasticsearch5/
https://elasticsearch-py.readthedocs.io/en/v7.17.7/#example-usage

ElasticSearch的scroll接口搜索示例
https://ixyzero.com/blog/archives/4112.html

JSONPath – XPath for JSON
https://goessner.net/articles/JsonPath/

https://pypi.org/project/jsonpath/

=END=


发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注