o2o-infinith-backend/app/integrations/google_places.py

62 lines
2.2 KiB
Python

from http import HTTPMethod
from common.utils import http_request
PLACES_BASE = "https://places.googleapis.com/v1"
FIELD_MASK = ",".join([
"places.id", "places.displayName", "places.formattedAddress",
"places.rating", "places.userRatingCount",
"places.internationalPhoneNumber", "places.websiteUri",
"places.googleMapsUri", "places.primaryTypeDisplayName",
"places.regularOpeningHours", "places.reviews",
])
class GooglePlacesClient:
def __init__(self, api_key: str):
self.api_key = api_key
def _headers(self) -> dict:
return {
"Content-Type": "application/json",
"X-Goog-Api-Key": self.api_key,
"X-Goog-FieldMask": FIELD_MASK,
}
async def fetch_place(self, query: str) -> dict | None:
resp = await http_request(
HTTPMethod.POST,
url=f"{PLACES_BASE}/places:searchText",
headers=self._headers(),
json_body={"textQuery": query, "languageCode": "ko", "regionCode": "KR", "maxResultCount": 3},
timeout=15,
label="google-places",
)
if not resp or not resp.is_success:
return None
places = resp.json().get("places", [])
return places[0] if places else None
async def get_place(self, query: str) -> dict | None:
p = await self.fetch_place(query)
if not p:
return None
return {
"name": (p.get("displayName") or {}).get("text", ""),
"rating": p.get("rating"),
"reviewCount": p.get("userRatingCount", 0),
"address": p.get("formattedAddress", ""),
"phone": p.get("internationalPhoneNumber", ""),
"clinicWebsite": p.get("websiteUri", ""),
"mapsUrl": p.get("googleMapsUri", ""),
"placeId": p.get("id", ""),
"category": (p.get("primaryTypeDisplayName") or {}).get("text", ""),
"topReviews": [
{
"stars": r.get("rating", 0),
"text": ((r.get("text") or {}).get("text", ""))[:500],
"date": r.get("publishTime", ""),
}
for r in (p.get("reviews") or [])[:10]
],
}