import io
import json
import mimetypes
from sentry_sdk._compat import text_type, PY2
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.session import Session
from sentry_sdk.utils import json_dumps, capture_internal_exceptions
if TYPE_CHECKING:
from typing import Any
from typing import Optional
from typing import Union
from typing import Dict
from typing import List
from typing import Iterator
from sentry_sdk._types import Event, EventDataCategory
def parse_json(data):
# type: (Union[bytes, text_type]) -> Any
# on some python 3 versions this needs to be bytes
if not PY2 and isinstance(data, bytes):
data = data.decode("utf-8", "replace")
return json.loads(data)
class Envelope(object):
def __init__(
self,
headers=None, # type: Optional[Dict[str, Any]]
items=None, # type: Optional[List[Item]]
):
# type: (...) -> None
if headers is not None:
headers = dict(headers)
self.headers = headers or {}
if items is None:
items = []
else:
items = list(items)
self.items = items
@property
def description(self):
# type: (...) -> str
return "envelope with %s items (%s)" % (
len(self.items),
", ".join(x.data_category for x in self.items),
)
def add_event(
self, event # type: Event
):
# type: (...) -> None
self.add_item(Item(payload=PayloadRef(json=event), type="event"))
def add_transaction(
self, transaction # type: Event
):
# type: (...) -> None
self.add_item(Item(payload=PayloadRef(json=transaction), type="transaction"))
def add_profile(
self, profile # type: Any
):
# type: (...) -> None
self.add_item(Item(payload=PayloadRef(json=profile), type="profile"))
def add_checkin(
self, checkin # type: Any
):
# type: (...) -> None
self.add_item(Item(payload=PayloadRef(json=checkin), type="check_in"))
def add_session(
self, session # type: Union[Session, Any]
):
# type: (...) -> None
if isinstance(session, Session):
session = session.to_json()
self.add_item(Item(payload=PayloadRef(json=session), type="session"))
def add_sessions(
self, sessions # type: Any
):
# type: (...) -> None
self.add_item(Item(payload=PayloadRef(json=sessions), type="sessions"))
def add_item(
self, item # type: Item
):
# type: (...) -> None
self.items.append(item)
def get_event(self):
# type: (...) -> Optional[Event]
for items in self.items:
event = items.get_event()
if event is not None:
return event
return None
def get_transaction_event(self):
# type: (...) -> Optional[Event]
for item in self.items:
event = item.get_transaction_event()
if event is not None:
return event
return None
def __iter__(self):
# type: (...) -> Iterator[Item]
return iter(self.items)
def serialize_into(
self, f # type: Any
):
# type: (...) -> None
f.write(json_dumps(self.headers))
f.write(b"\n")
for item in self.items:
item.serialize_into(f)
def serialize(self):
# type: (...) -> bytes
out = io.BytesIO()
self.serialize_into(out)
return out.getvalue()
@classmethod
def deserialize_from(
cls, f # type: Any
):
# type: (...) -> Envelope
headers = parse_json(f.readline())
items = []
while 1:
item = Item.deserialize_from(f)
if item is None:
break
items.append(item)
return cls(headers=headers, items=items)
@classmethod
def deserialize(
cls, bytes # type: bytes
):
# type: (...) -> Envelope
return cls.deserialize_from(io.BytesIO(bytes))
def __repr__(self):
# type: (...) -> str
return "