2025-03-16 19:59:39
Azure OpenAI和OpenAI提供的服务基本一致的,除了了Azure 更新会慢一点之外,最大的区别是请求路径不同。
对于Azure而言,需要去Azure AI Foundry 里创建部署
部署名称就是请求时的URL参数的一部分,举例如下
https://xxx.openai.azure.com/openai/deployments/{your_deployment_name}/chat/completions?api-version=2024-12-01-preview
api-version
需要根据个人情况选择,新版本的包含更多的功能。
那么问题就简单了,如果要做代理转发,需要做的事情就是创建一个模型名称(如gpt-4-mini)和部署名称的映射表,写一个简单的程序读取body,然后动态拼接为正确的URL,然后请求返回即可。
如果你会写lua,那么直接搭配下openresty就可以了。然而……
为了图简单,语言就选Python吧。在网上找到了一个 Python Web framework的性能对比图,还有一个比较新的对比图
信我的,不要选择blacksheep(后面会讲)。这次我们就选择第二名的sanic,文档很好,语法很像flask,原生支持asyncio,星标也很多
非流式响应直接用httpx发请求,然后返回就行了,非常简单,记得跟着把状态码也设置了
代码如下,非常简单,应该不用解释就能看懂
import os import httpx from sanic import Sanic from sanic import json as json_response from sanic.request import Request client = httpx.AsyncClient( http2=True, timeout=httpx.Timeout( connect=15.0, read=300.0, write=300.0, pool=10.0, ), ) app = Sanic(__name__) url = os.getenv("URL") api_key = os.getenv("API_KEY") @app.route("/v1/chat/completions", methods=["POST"]) async def chat_completions(request: Request): body = request.json if body.get("stream"): pass else: return await non_stream(body) async def non_stream(body): response = await client.post(url, json=body, headers={"api-key": api_key}) return json_response(response.json(), status=response.status_code) if __name__ == "__main__": app.run(host="127.0.0.1", port=8000, debug=True, dev=True, auto_reload=True)
Sanic的流式也很简单,我们直接使用 httpx的stream
就可以。
需要注意的是,如果请求参数错误,那么Azure会给返回400类错误,此时不能返回SSE,而且普通的json。这里可以通过response.aread()
来读取响应体。
也就是说代码大概长这样
async with client.stream("POST", url, json=body, headers={"api-key": api_key}) as response: if response.status_code != 200: error = await response.aread() # 由于 aread() 直接返回了bytes,所以就用raw方法返回,设置content-type为application json,没必要在反序列化一次用json返回 return raw(error, content_type="application/json", status=response.status_code)
如果请求正常开始返回,那么就先设置设置content-type
server = await request.respond(content_type="text/event-stream")
然后去迭代
async for chunk in response.aiter_text(): await server.send(chunk)
恭喜你,用50行代码实现了代理服务!至于 text-embedding 这种根本不支持流式的模型,甚至可以写一个通用的函数,反正就是原样发送、原样返回
当然, 你可以根据自己的需求进一步扩展。比如说……
content_filter_results
之类的)用Python操作json可比用Go方便多了!不用两眼一发黑的写type真的是太幸福了!
具体操作空间,那留给自己想象啦!
最开始我选择了blacksheep,因为这个最快嘛,文档看起来也不错。
后来发现踩了很多坑,直接整个一天时间没了🫠……
Blacksheep用SSE是这样子滴:
import asyncio from collections.abc import AsyncIterable from blacksheep import Application, get from blacksheep.server.sse import ServerSentEvent, ServerSentEventsResponse app = Application() # An AsyncGenerator yielding ServerSentEvent... async def events_provider() -> AsyncIterable[ServerSentEvent]: for i in range(3): yield ServerSentEvent({"message": f"Hello World {i}"}) await asyncio.sleep(1) # A request handler returning a streaming response bound to the generator... @get("/events") def events_handler(): return ServerSentEventsResponse(events_provider)
ServerSentEvent
会自动json序列化你的传入的参数,正常OpenAI最后一个响应是[DONE]
然而用它你发现……你永远无法正确返回 [DONE]
,比如
yield ServerSentEvent("[DONE]") yield ServerSentEvent(["DONE"])
你会发现这引号是怎么回事🤡
解决方案是自定义他的json dumps,硬编码一下,如果是[DONE]
的时候直接返回
from blacksheep.settings.json import default_json_dumps, json_settings def custom_dumps(value): if value == "[DONE]": return value else: return default_json_dumps(value) json_settings.use(dumps=custom_dumps)
Blacksheep是使用的异步生成器,看yield
和async
就知道。但是在流式请求的时候,如果azure返回了错误json,我们也要返回错误json给客户端,而不是返回SSE。
然而……一旦你用了yield+async,这个函数就是异步生成器函数了,你可以使用return
结束生成器,但是却不能使用 return 123
这样的表达式。
所以试图在 as response
后判断状态码,然后试图返回一个json的操作,比如
return json({"message": "Hello, World!"})
都是不行滴!
实际上,当你路由中调用return ServerSentEventsResponse(events_provider)
后,整个请求只可能以SSE的格式返回了🫠
聪明的你可能会想着既然不能return
,那我yield
一下
if response.status_code != HTTPStatus.OK: content = await response.aread() yield content return
IDE没报错,但是运行时……
TypeError: Argument 'event' has incorrect type (expected blacksheep.contents.ServerSentEvent, got bytes)
别想着改type annotation了,不管用的🤣
我也想到了这个办法,先在路由后调用 client.stream()
然后看status code是不是200,如果是,那么走 return ServerSentEventsResponse
否则就是 return json
恭喜你!发现了新的坑!你会发现……
raise StreamClosed() httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.
那尝试手动进入,不用async with
了
stream_ctx = client.stream( ) stream_response = await stream_ctx.__aenter__() .... return ServerSentEventsResponse(partial(stream_provider, stream_ctx, stream_response))
很好
line 155, in stream_provider async for c in stream_response.aiter_text(): httpx.ReadTimeout
那就闭包,用前朝的剑指挥今朝的兵!这样的话,其实上面错误差不多😂
给Azure返回的chunks
都缓存起来,等都返回完了,把全部chunks
交stream_provider
,流式直接变非流👍真有你的
抛出一个自定义异常的办法也许管用,但是我一直没接住……
所以,不要使用 blacksheep,否则你的人生会变得不幸
完整代码可见 https://gist.github.com/BennyThink/94ac6e088feb1cec829cf7c280c56783
2024-12-12 14:22:30
经常混币圈、股票和外汇的朋友们都知道有一个东西叫 K线,大概长这样
同时你也可以选择添加更多的图表,如布林线、MACD、RSI等,用这些技术指标来辅助决策。
这些技术参数很有用,如果能把一段时间的结果交给LLM去分析,应该比凭着感觉瞎买更靠谱吧!
那这些技术参数是怎么来的呢……?
去花钱买!这个世界上还有什么是钞能力无法解决的事情吗?
https://taapi.io/ 已经有人做好了!有股票 也有加密货币的数据。就是……免费版频率限制有些严重,想要获取多点数据就等几个小时吧。一下子升级个 Pro 要花14.99欧元,在PoC阶段好像也不太值得……
实际上,这些技术参数,全部都是使用K线数据,由客户端计算出来的。K线数据很容易获取,很多交易所都提供API,但是计算这些参数需要比较强的数学知识😅
还好已经有人写好了相关的库,虽然是 C/C++写的,但是不怕啦早就有人写好了 Python的wrapper
我们需要这个东西来计算数据,这个库支持200多个技术指标,包括RSI,MACD
安装方式可以参考官方文档
以币安的合约数据为例,非常简单
pip install binance-futures-connector from binance.um_futures import UMFutures um_client = UMFutures() candles=um_client.klines(symbol=‘BTCUSDT’, interval=‘1m’)
这样我们就获取到了 1分钟时间间隔的数据
转换成 pandas的DataFrame
方便后续处理
pd.DataFrame(candles)
布林线使用收盘价格进行计算,而且需要多组数据才可以计算出来
upperband, middleband, lowerband = talib.BBANDS(self.df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
同样也是收盘价
rsi6 = talib.RSI(self.df["close"], timeperiod=6) rsi12 = talib.RSI(self.df["close"], timeperiod=12) rsi24 = talib.RSI(self.df["close"], timeperiod=24)
收盘价
dif, dea, macd = talib.MACD(self.df["close"], fastperiod=12, slowperiod=26, signalperiod=9)
包括SMA和EMA
sma7 = talib.SMA(self.df["close"], timeperiod=7) sma25 = talib.SMA(self.df["close"], timeperiod=25) sma99 = talib.SMA(self.df["close"], timeperiod=99) ema7 = talib.EMA(self.df["close"], timeperiod=7) ema25 = talib.EMA(self.df["close"], timeperiod=25) ema99 = talib.EMA(self.df["close"], timeperiod=99)
LLM更适合处理json数据,pandas就很方便啦
self.df.to_json(orient="records", date_format="epoch", date_unit="s")
有了数据之后就可以把他们导出成 json,然后调用LLM进行处理。
这里需要找一个可用的LLM,比如 Mistral AI、OpenAI 、Gemini或者 Claude。由于国内注册比较麻烦,而且风控很严格,建议大家使用「头顶冒火」的API接口
使用什么模型因人而异,一次数据量比较大,gpt-4o可能是比较好的选择,Claude Opus和o1-preview比较贵,但是也可以考虑,头顶冒火网站也都有的
然后要做的事情就是写提示词,让AI分析结果了!🧐
很麻烦是不是……不要怕,我已经写好并且开源了。使用方式详见下一页!⬇️⬇️⬇️ 部分RSS阅读器可能无法显示下一页的内容,请点击链接打开网站阅读
2024-11-03 20:53:15
Cloduflare WARP真的非常好用,并且支持代理模式。启用这个模式之后,它会在本机监听一个socks端口,应用程序可以配置到这个端口来使用代理。
对于服务器来说,一般使用 warp-cli
warp-cli proxy port 60606 warp-cli mode proxy
之后你就可以 curl
啦
https_proxy=socks5://127.0.0.1:60606 http_proxy=socks5://127.0.0.1:60606 curl ipv4.win IP: 104.28.157.116 CLOUDFLARE.COM CLOUDFLARE.COM
然而,Go的程序不支持 socks 代理,要手动加transport我可没那个功夫去加。
好消息是,Go默认是尊重环境变量http_proxy
的。那么就要想办法给socks代理转换为http代理
这事很简单嘛!用pproxy
就好了,别的不会,这个肯定很会!
pproxy -v -l http://127.0.0.1:8118 -r socks5://127.0.0.1:60606 https_proxy=http://127.0.0.1:8118 http_proxy=http://127.0.0.1:8118 curl ipv4.win curl: (52) Empty reply from server # pproxy logs Serving on ipv? 127.0.0.1:8118 by http http 127.0.0.1:45012 -> socks5 127.0.0.1:60606 -> ipv4.win:80 Unknown remote protocol from 127.0.0.1
怎么报错了呢🤨
可能是 pproxy
的问题,那么用gost
!
gost -L http://127.0.0.1:8118 -F socks5://127.0.0.1:60606 2024/11/03 12:32:43 route.go:700: http://127.0.0.1:8118 on 127.0.0.1:8118 2024/11/03 12:32:46 http.go:162: [http] 127.0.0.1:33284 -> http://127.0.0.1:8118 -> ipv4.win:80 2024/11/03 12:32:46 http.go:257: [route] 127.0.0.1:33284 -> http://127.0.0.1:8118 -> 1@socks5://127.0.0.1:60606 -> ipv4.win:80 2024/11/03 12:32:46 http.go:280: [http] 127.0.0.1:33284 -> 127.0.0.1:8118 : unexpected EOF
那……Privoxy
forward-socks5 / 127.0.0.1:60606 .
也不行!
甚至直接在 Firefox里设置socks5代理,也不行🤨
任何网站都打不开
偶然取消 DNS请求的勾选,就成功了……突然恍然大悟,WARP可能不支持远程解析DNS
那么要么用回 socks4
pproxy -v -l http://127.0.0.1:8118 -r socks4://127.0.0.1:60606 gost -L http://127.0.0.1:8118 -F socks4://127.0.0.1:60606
要么给加上DNS的支持
gost -L "http://127.0.0.1:8118?dns=1.1.1.1" -F socks5://127.0.0.1:60606
人生中宝贵的几个小时就这么没了。
2024-10-12 15:10:27
既然是容器,那么也可以跑数据库的吧?那只要再加上一个PHP,就可以跑 WordPress、Typecho之类的应用了!
在 Azure上,用 Container Apps也是可以做到serverless PHP应用的,具体来说就是:
一个pod里开两个容器(不太推荐这种方式),一个是php-apache,一个是MySQL,通过挂载 Azure Files作为volumes来做数据存储。
为什么想这么玩,是因为:
使用这种方案的好处是,在访问量不是很大的情况下,成本应该可以忽略不计,甚至可以在无访问时缩放到0副本,真正“无服务器” 😂
创建资源组,创建一个vnet,因为存储不想公开给所有人,也方便以后和其他虚拟机内网互通。
创建的时候,Container Apps Environment 选择已有的虚拟网络
配置镜像,这个镜像是我自己构建的,基于 php:8.3-apache,支持MySQL、PostgreSQL、SQLite,添加了 mod_rewrite,足够给WordPress和Typecho用
Primary Service选择 Azure Files
我创建了两个,一个是数据库的,一个是网站文件的
在创建完存储后,去复制一下access key
在容器环境中添加 SMB
依次输入信息,这里没有补全,不要输入错了哦
在容器环境中创建完之后,就可以到容器应用中添加啦
这里需要注意,数据库的卷可能需要如下额外挂载参数
uid=999,gid=999,nobrl,mfsymlinks,cache=none
编辑容器,添加volume就可以了
然后添加一个数据库的sidecar
如果需要配置环境变量,也可以一起配置了
最终结果是这样的两个容器,在一个pod里,我这种穷人自然只能选择最低配置啦
默认存储是可以公网访问的(需要用户名密码),为了更安全,我们可以配置为只有某几个虚拟网络可以访问
可能需要进入 console,执行一下创建数据库之类的操作,这点就自行发挥了。
在不使用的时候,容器可以缩为0(可以配置为最低1副本)
在有请求的时候,真的能访问耶,而且还可以自动扩容!
正常的应用不应该这样配置的,两个容器在同一个pod内耦合也是不理想的。
如果想要利用 Azure的优良线路,正常来说应该选择如下方法之一
最后……
PHP用什么跑不好要用这个,真是只有真正的赛博精神病才能够想出来这种操作🤡
2024-10-06 15:24:51
我的所有的代码最终都做成了 Docker image,因此可以非常方便的拿出来部署,自然也可以部署在各种 managed container或者managed k8s的环境。
更重要的是,Azure Container Apps 提供一定的免费额度,一般普通用户都够用的了!
这次的情况有点特殊:
通常来说,在 Azure 上的虚拟机,如果是同一个数据中心的,想要内网互通,直接都创建在一个虚拟网络下就可以了,默认情况下就是互通的,而且默认的安全组是允许同一个虚拟网络的通信的。
如果是不同的数据中心,或者不同的订阅,那么就要创建不同的虚拟网络,不同的网段,然后创建 peering
创建的过程非常简单,鼠标点点下一步就可以了
容器应用需要使用独享的虚拟网络,因此没法重复利用虚拟机已有的虚拟网络。
在创建容器应用时,选择数据中心,这里就建议就近选择啦。然后Container Apps Environment 点击create new
在新的窗口中选择 Networking,配置自己的虚拟网络,如果你的容器不需要被访问,那么甚至可以选择 internal
下一步需要配置我们使用什么镜像
需要注意的是 Command Override和Arguments Override这两个。Command Override并不是docker
和docker-compose
中的command
,比如你像 docker-compose.yml
里这样写的
services: generic: image: ghcr.io/webp-pt/webplb:latest command: webplb -mode=worker -queue=generic
,把这个webplb -mode=worker -queue=generic
复制到 Azure Portal上,那就错了……
那就简单了嘛!直接把 webplb -mode=worker -queue=generic
这一串粘贴到 Arguments Override的文本框,恭喜你又错了🎉。
因为,每一个参数都要用逗号和空格隔开,正确的形式是这样的
webplb, -mode=worker, -queue=generic
再接下来选择容器的配置,小到 0.25C 0.5G RAM,大到 4C 8G都在这里,如需环境变量也可以一起配置
再下一步选择端口映射,一个是容器的一个是发布的端口
创建的过程比较慢,大概要10-20分钟。如果你的应用有 ingress,那么还会看到一个 Application URL。
容器创建好之后,在 Monitoring – Console 可以连接到容器,或者说是pod内部,之后可以用ping……嗯?😐 其实是通的,nc一下就知道了,我检查了很多地方也检查了安全组的ICMP配置,但是还是没法ping通不过就这样了吧😐
通过http请求数量去扩容,实时的,默认就应该有这条规则,可以根据自身的情况进行配置。在满足条件时会自动扩容
需要使用 Custom,azure使用的是KEDA, 在metadata中填入两个字段
比如下图的写法,就是CPU使用率超过80%就触发;内存也同理,只需要把cpu改成memory就可以
在这里可以配置缩放的上下限,上限是1000,下限……可以选择0。
对于 Container Apps来说,最低副本数量是可以为0的。这意味着,如果你的网站没有收到任何流量或请求,ACA 将会自动缩减到零副本,从而节省资源。
当然这样有一定的冷启动时间……所以如果希望长时间运行,可以改成1或者任何合适的数值。
需要注意的是,缩放出来的新的容器的配置,都和最开始创建时配置的是一样的哦。
轮询间隔30秒(KEDA),冷却期300秒。这些参数无法改动。
如果镜像更新了,或者需要更改配置,在 Azure Portal上也可以轻松完成
Containers – Edit and deploy 在下面container image这里就可以重新配置了,包括环境变量在内。
当然了懂得都懂,一个pod可以包含多个container,这里也可以加另外一个container的。
这我是没想到的!
在急需扩容的时候,我的代码却出了问题,导致程序直接卡死并不退出,无法触发扩容规则🤡
在 Azure 容器应用中设置缩放规则 https://learn.microsoft.com/zh-cn/azure/container-apps/scale-app?pivots=azure-portal
Scaling options in Azure Container Apps https://techcommunity.microsoft.com/t5/apps-on-azure-blog/scaling-options-in-azure-container-apps/ba-p/3878282