Skip to content

Websocket

Available Events

CometBFT offers the following events to which you can subscribe via Websockets

EventNewBlock            = "NewBlock"
EventNewBlockHeader      = "NewBlockHeader"
EventNewBlockEvents      = "NewBlockEvents"
EventNewEvidence         = "NewEvidence"
EventTx                  = "Tx"
EventValidatorSetUpdates = "ValidatorSetUpdates"

Example

The following example shows how to subsribe to them

Install dependency

pip install websockets
python
import websockets
import asyncio
import json
import gc


class Event:
    def __init__(self, cfg):
        self.cfg = cfg

    async def init(self):
        asyncio.create_task(self.websocket_loop())

    async def websocket_loop(self):
        base_wait_time = self.cfg.get('base_wait_time', 1)
        max_retries = self.cfg.get('max_retries', 5)
        retry_attempts = 0

        while True:
            try:
                print(f'Initiating websocket connection...')
                uri = self.cfg.get('ws_masternode')
                async with websockets.connect(uri) as ws:
                    await self.on_open(ws)
                    try:
                        async for message in ws:
                            await self.on_message(ws, message)
                            # Reset retry attempts on successful message
                            retry_attempts = 0
                    except websockets.ConnectionClosed as e:
                        await self.on_close(ws, e.code, e.reason)
            except Exception as e:
                await self.on_error(e)
                gc.collect()

                retry_attempts += 1
                if retry_attempts > max_retries:
                    print(f'Max retries reached. Stopping websocket loop.')
                    break

                # Exponential backoff, cap at 60 seconds
                wait_secs = min(base_wait_time * (2 ** (retry_attempts - 1)), 60)
                print(f'Websocket reconnect after {wait_secs} seconds')
                await asyncio.sleep(wait_secs)

    async def on_message(self, ws, msg):
        print(f'New event --> {msg}')

    async def on_error(self, error):
        print(f'Websocket error: {error}')

    async def on_close(self, ws, status_code, msg):
        print(f'Websocket connection closed with code {status_code} and message {msg}')

    async def on_open(self, ws):
        print("Websocket connection opened")

        # Sending subscription message
        subscribe_message = {
            "jsonrpc": "2.0",
            "method": "subscribe",
            "id": 0,
            "params": {
                "query": f"tm.event='{self.cfg.get('event')}'"
            }
        }

        await ws.send(json.dumps(subscribe_message))
        print("Sent subscription message")


if __name__ == '__main__':
    cfg = {
        # Replace with your WebSocket URI
        'ws_masternode': 'ws://127.0.0.1:26657/websocket',
        'base_wait_time': 1,
        'max_retries': 5,
        'event': 'Tx'
    }

    event = Event(cfg)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(event.init())
    loop.run_forever()