=Start=
缘由:
我现在的工作内容一般不涉及到具体的代码实现,但工作中也少不了一些需要自动化提高工作效率的内容,所以多少还会写一些代码。Golang现在很火,而且之前也学过一段时间,但是对于字符串处理和网络请求的工作,还是更习惯用Python来实现(用Java也能完成,但是也不太熟,而且实现起这些功能来说略显笨重)。简单记录整理一下最近碰到的几个Python知识点,方便后面有需要的时候参考(才发现上一次记录这个主题还是在2019年5月14日)。
正文:
参考解答:
- 用Python往Kafka发消息
安装依赖:
pip install kafka-python
生产消息示例:
import json
from kafka import KafkaProducer
# bootstrap = '192.168.3.4:9092'
bootstrap = ['192.168.3.4:9092', '192.168.3.5:9092', '192.168.3.6:9092']
producer = KafkaProducer(bootstrap_servers=bootstrap)
info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert", "ip":"1.2.3.4", "user_ip":"sec-test", "user_ip_org":"/a/b/c", "user_sso":"user1", "user_sso_org":"/a/b/d", "extra_info":""}
for idx in range(5):
print(idx)
producer.send('my_favorite_topic_name', json.dumps(info_dict).encode('utf-8'))
#for
producer.flush()
注意事项:
1. 发布消息的send方法的第2个参数,即要发送的消息内容value需要是bytes字节类型的
producer.send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
所以当你要发送的是json字符串时,要么就是在新建 KafkaProducer 时通过 value_serializer 参数来自动编码,要么就是手动在每一个 send 里面对要发送的 value 内容进行编码处理。
>>> import json
>>> info_dict = {"timestr":"2022-10-24 01:02:03", "msg_type":"alert"}
# 方法一
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', info_dict)
# 方法二
>>> producer = KafkaProducer()
>>> producer.send('fizzbuzz', json.dumps(info_dict).encode('utf-8'))
2. 测试的时候如果 send 之后但是 topic 里没消息,可能需要手动 flush 才行
>>> producer.flush()
- Python中 dict 的处理
# 将 dict 按json字符串格式写文件
import json
exDict = {'exDict': 'test'}
with open('mydict_file1.txt', 'w') as fp:
fp.write(json.dumps(exDict)) # use `json.loads` to do the reverse
# Python 3 测试可行,但是写入的内容不是标准的json格式,用的是单引号而非双引号
with open('mydict_file2.txt', 'w') as fp:
print(exDict, file=fp)
- Python中使用 APScheduler 实现定时任务
安装依赖:
$ pip install apscheduler
简单示例:
# 当你的程序只是用来执行调度时可以使用 BlockingScheduler ,它会以阻塞的方式一直运行,以便定期执行任务调度
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
n_seconds = 3
def my_clock():
print('time is {0}'.format(datetime.now()))
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(my_clock, trigger="interval", seconds=n_seconds, next_run_time=datetime.now())
scheduler.start()
#if
# 当你不使用 Tornado/Twisted/asyncio/gevent 等框架或模块,又希望调度程序在应用程序的后台运行时使用 BackgroundScheduler ,但是你自己的程序需要一直在线,否则主程序退出了调度任务肯定也就无法正常执行了
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
def job1():
print('job1: time is {0}'.format(datetime.now()))
with open('test_BackgroundScheduler.txt', 'a') as fp:
fp.write('job1: time is {0}\n'.format(datetime.now()))
#with
if __name__ == "__main__":
scheduler = BackgroundScheduler()
scheduler.add_job(job1, trigger='cron', second='*/10')
scheduler.start()
while 1: #需要在代码中有一个类似while循环的功能保证主程序在线,否则任务无法按照预期进行调度
time.sleep(5)
#while
#if
- Python中使用 elasticsearch5 模块进行ES的查询
# 查询body可以通过在页面上用Kibana时,通过浏览器控制台抓包看看Request Payload来快速获取
from elasticsearch5 import Elasticsearch
es = Elasticsearch(hosts="10.20.30.40:9200", timeout=60)
body = {
# "key": "value"
}
result = es.search(index="index_name-*", body=body) #search()返回的是 dict 类型
hits = result.get('hits')
shards = result.get('_shards')
print('{0}\n{1}'.format(hits, shards))
import jsonpath
data = jsonpath.jsonpath(result, "$.aggregations.aggr_name.buckets[*]")
参考链接:
kafka-python
https://kafka-python.readthedocs.io/en/master/index.html#kafkaproducer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send
Unable to send messages to topic in Kafka Python
https://stackoverflow.com/questions/62405458/unable-to-send-messages-to-topic-in-kafka-python
Writing a dictionary to a text file?
https://stackoverflow.com/questions/36965507/writing-a-dictionary-to-a-text-file
APScheduler User guide
https://apscheduler.readthedocs.io/en/3.x/userguide.html
Python 定时任务框架 APScheduler 详解
https://www.cnblogs.com/leffss/p/11912364.html
python BlockingScheduler定时任务
https://blog.csdn.net/lipachong/article/details/99962134
elasticsearch5 5.5.6
https://pypi.org/project/elasticsearch5/
https://elasticsearch-py.readthedocs.io/en/v7.17.7/#example-usage
ElasticSearch的scroll接口搜索示例
https://ixyzero.com/blog/archives/4112.html
JSONPath – XPath for JSON
https://goessner.net/articles/JsonPath/
https://pypi.org/project/jsonpath/
=END=