1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import os, json, time, asyncio, requests, websockets
from dotenv import load_dotenv
load_dotenv()
API_BASE = os.getenv("API_BASE")
WS_URL = os.getenv("WS_URL")
SYMBOL = os.getenv("SYMBOL", "BTC_USD")
API_KEY = os.getenv("API_KEY", "")
THRESHOLD = float(os.getenv("THRESHOLD", "65000"))
HEADERS = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
def get_last_price():
# Example REST bootstrap; adjust path/params to your provider
url = f"{API_BASE}/v1/market/price"
r = requests.get(url, params={"symbol": SYMBOL}, headers=HEADERS, timeout=10)
r.raise_for_status()
data = r.json()
# normalize to a float; adapt to your provider’s response shape
return float(data.get("price") or data["data"]["price"])
async def stream_prices():
# Example WS subscribe message; adjust to your provider
subscribe_msg = json.dumps({"op": "subscribe", "channel": "ticker", "symbol": SYMBOL})
backoff = 1
while True:
try:
async with websockets.connect(WS_URL, ping_interval=20, ping_timeout=20) as ws:
await ws.send(subscribe_msg)
print(f"Subscribed to {SYMBOL} @ {WS_URL}")
backoff = 1 # reset backoff on successful connect
async for msg in ws:
payload = json.loads(msg)
# normalize to your provider’s schema
# e.g., {"type":"ticker","symbol":"BTC_USD","price":"65123.45"}
price = float(payload.get("price") or payload["data"]["price"])
print(f"Price: {price}")
if price >= THRESHOLD:
print(f"🚨 ALERT: {SYMBOL} crossed {THRESHOLD} (now {price})")
# TODO: send webhook/email/Slack here
except Exception as e:
print(f"WS error: {e}; reconnecting in {backoff}s")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 60)
if __name__ == "__main__":
try:
p = get_last_price()
print(f"Bootstrap {SYMBOL} price via REST: {p}")
except Exception as e:
print(f"REST bootstrap failed: {e}")
asyncio.run(stream_prices())
|