Build a Crypto Price Alert System with Telegram and AWS Lambda

Build a Crypto Price Alert System with Telegram and AWS Lambda

December 18, 2024

7 min read

Build a Crypto Price Alert System with Telegram and AWS Lambda
Watch on YouTube

Introduction to the Project

In this post, we'll build a simple system that sends a Telegram message when a cryptocurrency's price goes above or below a specified threshold. We'll run this system as an AWS Lambda function to keep it lightweight and cost-effective. To kickstart the project, we forked RadzionKit and used it as a base. You can find all the source code in the GitHub repository.

Why You Need a Price Alert System

If you occasionally trade crypto or have a specific price threshold at which you’d like to sell your holdings to make a tangible purchase, a simple notification system can provide peace of mind. With such a system, you won’t need to constantly monitor prices, freeing up your time and reducing stress.

Defining a Price Alert

To represent a price alert, we define it with the following fields:

  • id: A unique identifier for the alert, used in the database.
  • asset: The ID of the cryptocurrency to monitor.
  • targetPrice: The price at which the alert should trigger.
  • condition: Specifies whether the alert triggers when the price goes above or below the target price.
  • isNotified: A flag to indicate if the user has already been notified about this alert, preventing multiple notifications for the same condition.
import { EntityWithId } from "@lib/utils/entities/EntityWithId"

type PriceAlertCondition = "more" | "less"

export type PriceAlert = EntityWithId & {
  asset: string
  targetPrice: number
  condition: PriceAlertCondition
  isNotified: boolean
}

Supporting Multiple Users (Optional)

Since we are the only user of this system, we don't need to include a field for the notification destination. However, you could easily extend this system to support multiple users by adding different Telegram chat IDs for each user.

Setting Up Price Alerts

To set and update price alerts, we use a TypeScript script called setPriceAlert.ts. In this script, we define two alerts for Ethereum: one to trigger when the price exceeds $4,000 and another for when it falls below $3,800. The script first clears all existing alerts from the database and then inserts the newly defined alerts.

import { deleteAllPriceAlerts, putPriceAlert } from "../db/priceAlerts"
import { PriceAlert } from "../entities/PriceAlert"

const rules: Omit<PriceAlert, "id" | "isNotified">[] = [
  {
    asset: "ethereum",
    targetPrice: 3800,
    condition: "less",
  },
  {
    asset: "ethereum",
    targetPrice: 4000,
    condition: "more",
  },
]

const priceAlerts: PriceAlert[] = rules.map((rule, index) => ({
  ...rule,
  id: index.toString(),
  isNotified: false,
}))

const setPriceAlerts = async () => {
  await deleteAllPriceAlerts()

  await Promise.all(priceAlerts.map((alert) => putPriceAlert(alert)))
}

setPriceAlerts()

Managing Price Alerts in the Database

To perform CRUD operations on price alerts, we use a set of functions in the database-related file. These functions leverage utilities from RadzionKit, which simplify interactions with DynamoDB.

import { getPickParams } from "@lib/dynamodb/getPickParams"
import { totalScan } from "@lib/dynamodb/totalScan"
import { PriceAlert } from "../entities/PriceAlert"
import { getEnvVar } from "../getEnvVar"
import { DeleteCommand, PutCommand } from "@aws-sdk/lib-dynamodb"
import { dbDocClient } from "@lib/dynamodb/client"
import { updateItem } from "@lib/dynamodb/updateItem"

const tableName = getEnvVar("PRICE_ALERTS_TABLE_NAME")

export const getPriceAlertItemParams = (id: string) => ({
  TableName: tableName,
  Key: {
    id,
  },
})

export const getAllPriceAlerts = async <T extends (keyof PriceAlert)[]>(
  attributes?: T,
) => {
  return totalScan<Pick<PriceAlert, T[number]>>({
    TableName: tableName,
    ...getPickParams(attributes),
  })
}

export const deletePriceAlert = (id: string) => {
  const command = new DeleteCommand(getPriceAlertItemParams(id))

  return dbDocClient.send(command)
}

export const deleteAllPriceAlerts = async () => {
  const alerts = await getAllPriceAlerts(["id"])

  return Promise.all(alerts.map(({ id }) => deletePriceAlert(id)))
}

export const putPriceAlert = (user: PriceAlert) => {
  const command = new PutCommand({
    TableName: tableName,
    Item: user,
  })

  return dbDocClient.send(command)
}

export const updatePriceAlert = async (
  id: string,
  fields: Partial<PriceAlert>,
) => {
  return updateItem({
    tableName: tableName,
    key: { id },
    fields,
  })
}

Fetching Cryptocurrency Prices

To fetch cryptocurrency prices, we use the CoinGecko API. The getAssetPrices function accepts an array of asset IDs and an optional fiat currency.

import { addQueryParams } from "@lib/utils/query/addQueryParams"
import { FiatCurrency } from "../FiatCurrency"
import { queryUrl } from "@lib/utils/query/queryUrl"
import { recordMap } from "@lib/utils/record/recordMap"

type Input = {
  ids: string[]
  fiatCurrency?: FiatCurrency
}

type Response = Record<string, Record<FiatCurrency, number>>

const baseUrl = "https://api.coingecko.com/api/v3/simple/price"

export const getAssetPrices = async ({ ids, fiatCurrency = "usd" }: Input) => {
  const url = addQueryParams(baseUrl, {
    ids: ids.join(","),
    vs_currencies: fiatCurrency,
  })

  const result = await queryUrl<Response>(url)

  return recordMap(result, (value) => value[fiatCurrency])
}

Constructing the API Request

To construct the URL, we start with the baseUrl and append the asset IDs and currency query parameters using the addQueryParams function from RadzionKit. The data is then fetched with the queryUrl helper function. Since we only need results in a single currency, we simplify the nested response structure by flattening it with the recordMap function.

Sending Telegram Notifications

To receive notifications about price changes, we use the node-telegram-bot-api library. The sendPriceChangeAlert function takes the price and asset as input, initializes a Telegram bot instance, and sends a message to the specified chat ID, ensuring timely alerts for significant price movements.

import { getEnvVar } from "../getEnvVar"
import TelegramBot from "node-telegram-bot-api"

type Input = {
  price: number
  asset: string
}

export const sendPriceChangeAlert = async ({ price, asset }: Input) => {
  const bot = new TelegramBot(getEnvVar("TELEGRAM_BOT_TOKEN"))

  const message = `${asset} price: ${price}`

  return bot.sendMessage(getEnvVar("TELEGRAM_BOT_CHAT_ID"), message)
}

Managing Environment Variables

To ensure a single source of truth for environment variables, we use a getEnvVar function. This function throws an error if a required variable is missing.

type VariableName =
  | "TELEGRAM_BOT_TOKEN"
  | "TELEGRAM_BOT_CHAT_ID"
  | "SENTRY_KEY"
  | "PRICE_ALERTS_TABLE_NAME"

export const getEnvVar = (name: VariableName): string => {
  const value = process.env[name]
  if (!value) {
    throw new Error(`Missing ${name} environment variable`)
  }

  return value
}

Running the Price Watcher

With all the essential components in place, we can bring them together in the core function of our system: runPriceWatcher.

import { match } from "@lib/utils/match"
import { getAssetPrices } from "../../../lib/chain/price/utils/getAssetPrices"
import { getAllPriceAlerts, updatePriceAlert } from "../db/priceAlerts"
import { sendPriceChangeAlert } from "./sendPriceChangeAlert"

export const runPriceWatcher = async () => {
  const items = await getAllPriceAlerts()

  const assets = items.map((item) => item.asset)

  const assetPrices = await getAssetPrices({ ids: assets })

  await Promise.all(
    items.map(async ({ id, isNotified, condition, asset, targetPrice }) => {
      const price = assetPrices[asset]
      const isConditionMet = match(condition, {
        more: () => price > targetPrice,
        less: () => price < targetPrice,
      })

      if (isConditionMet && !isNotified) {
        await sendPriceChangeAlert({ price, asset })

        return updatePriceAlert(id, {
          isNotified: true,
        })
      } else if (!isConditionMet && isNotified) {
        return updatePriceAlert(id, {
          isNotified: false,
        })
      }
    }),
  )
}

How the Watcher Works

First, runPriceWatcher retrieves all the price alerts from the database and extracts the asset IDs to fetch their current prices. For each alert, it checks if the condition is met and whether the user has already been notified. If the condition is met and the user hasn't been notified, a Telegram message is sent, and the alert is updated accordingly. Conversely, if the condition is no longer met but the user was previously notified, the notification flag is reset to ensure the system behaves as expected.

We'll deploy our code as an AWS Lambda function, wrapping it with Sentry to receive notifications about any potential issues.

import { AWSLambda } from "@sentry/serverless"
import { getEnvVar } from "./getEnvVar"
import { runPriceWatcher } from "./core/runPriceWatcher"

AWSLambda.init({
  dsn: getEnvVar("SENTRY_KEY"),
})

exports.handler = AWSLambda.wrapHandler(runPriceWatcher)

Setting Up Resources with Terraform

To set up the necessary resources for our watcher, we'll use Terraform. To schedule the function to run every 10 minutes, we'll configure a CloudWatch Event Rule. You can find the Terraform code in the GitHub repository.

Conclusion

By combining AWS Lambda, Telegram notifications, and Terraform for resource management, we’ve created a simple yet effective system to monitor cryptocurrency prices. With this setup, you can stay informed without constantly checking the market, and the flexibility of the code allows for easy customization and expansion. Happy coding!