We’ve learned how to use Redux Middleware to manage WebSockets in a recent article. We’ve also got to know the Redux Toolkit Query and how it can improve our code. Interestingly, we can also use RTK Query to deal with a WebSocket connection. This article teaches how to establish a WebSocket connection with RTK Query and receive and send updates.
We model the examples after both the official documentation and code written by Lenz Webber, a RTK Query contributor.
The cache lifecycle
The first feature of RTK Query that we need to get familiar with is the onCacheEntryAdded function. Redux Toolkit calls it when it adds a new entry to the cache when our React application accesses a particular endpoint for the first time in a while.
If you want to know more about how the cache behaves, check out Introduction to Redux Toolkit Query with TypeScript.
To explain how it works, let’s expand on an example from the previous article.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'; import Photo from './photo'; export const api = createApi({ reducerPath: 'api', baseQuery: fetchBaseQuery({ baseUrl: process.env.REACT_APP_API_URL, }), endpoints: (builder) => ({ getPhotoById: builder.query<Photo, number>({ query: (photoId: number) => `photos/${photoId}`, async onCacheEntryAdded(photoId, { cacheDataLoaded, cacheEntryRemoved }) { console.log( `The application requests the photo with the id ${photoId} for the first time in a while`, ); try { const photoResponse = await cacheDataLoaded; console.log( `The /photos/${photoId} endpoint answered`, photoResponse, ); } catch { // if cacheEntryRemoved resolves before cacheDataLoaded, // cacheDataLoaded throws an error } await cacheEntryRemoved; console.log( `No component subscribed to the data from /photos/${photoId} for the last 60 seconds`, ); }, }), // ... }), }); |
The crucial part in the above code are the two promises the onCacheEntryAdded function provides:
- cacheDataLoaded – it resolves when the initial request from our query responds,
- cacheEntryRemoved – resolves when RTK Query removes the entry in the cache. It happens if no component uses it during the last 60 seconds by default.
To sum it up, RTK calls the onCacheEntryAdded function when our application requests a given endpoint for the first time, or quite some time has passed since any of our components used it. It provides us with promises that we can use to take advantage of the cache lifecycle.
Connecting to a WebSocket using React Toolkit Query
In API with NestJS #26. Real-time chat with WebSockets, we’ve created a backend application with a chat that uses socket.io. First, let’s create an enum containing all possible events.
1 2 3 4 5 6 |
enum ChatEvent { SendMessage = 'send_message', RequestAllMessages = 'request_all_messages', SendAllMessages = 'send_all_messages', ReceiveMessage = 'receive_message', } |
One possible approach is to call an endpoint returning all chat messages sent in the past. Then, we can use the onCacheEntryAdded function to establish a WebSocket connection.
Redux Toolkit Query provides the updateCachedData function that we can use to modify the cache. It uses the immer library under the hood. We can safely update the draft object and produce the next immutable state.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'; import ChatMessage from './chatMessage'; import { io } from 'socket.io-client'; import ChatEvent from './chatEvent'; import User from './user'; export const api = createApi({ reducerPath: 'api', baseQuery: fetchBaseQuery({ baseUrl: process.env.REACT_APP_API_URL, credentials: 'include', }), endpoints: (builder) => ({ logIn: builder.mutation<User, { email: string; password: string }>({ query: (logInData) => ({ url: 'authentication/log-in', method: 'POST', body: logInData, }), }), getMessages: builder.query<ChatMessage[], void>({ query: () => 'chat-messages', async onCacheEntryAdded( photoId, { cacheDataLoaded, cacheEntryRemoved, updateCachedData }, ) { try { await cacheDataLoaded; // the /chat-messages endpoint responded already const socket = io(process.env.REACT_APP_API_URL, { withCredentials: true, }); socket.on(ChatEvent.ReceiveMessage, (message: ChatMessage) => { updateCachedData((draft) => { draft.push(message); }); }); await cacheEntryRemoved; } catch { // if cacheEntryRemoved resolved before cacheDataLoaded, // cacheDataLoaded throws } }, }), }), }); |
Above, we add the logIn mutation, because our API needs authentication. This is also why we add credentials: 'include' in the configuration of the fetchBaseQuery.
In the API we’re dealing with, the /chat-messages endpoint does not exist. Instead of using it, we can emit the request_all_messages event and listen to the response in the send_all_messages event.
Because of the above, we can use the queryFn function instead and ignore the baseQuery.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
getMessages: builder.query<ChatMessage[], void>({ queryFn: () => ({ data: [] }), async onCacheEntryAdded( photoId, { cacheDataLoaded, cacheEntryRemoved, updateCachedData }, ) { try { await cacheDataLoaded; const socket = io(process.env.REACT_APP_API_URL, { withCredentials: true, }); socket.on('connect', () => { socket.emit(ChatEvent.RequestAllMessages); }); socket.on(ChatEvent.SendAllMessages, (messages: ChatMessage[]) => { updateCachedData((draft) => { draft.splice(0, draft.length, ...messages); }); }); socket.on(ChatEvent.ReceiveMessage, (message: ChatMessage) => { updateCachedData((draft) => { draft.push(message); }); }); await cacheEntryRemoved; } catch { // if cacheEntryRemoved resolved before cacheDataLoaded, // cacheDataLoaded throws } }, }), |
Cleaning up the connection
It is worth cleaning up after listening to a stream after Redux Toolkit Query removes the cache entry. With socket.io, we can do that using the off function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
getMessages: builder.query<ChatMessage[], void>({ queryFn: () => ({ data: [] }), async onCacheEntryAdded( photoId, { cacheDataLoaded, cacheEntryRemoved, updateCachedData }, ) { try { await cacheDataLoaded; const socket = io(process.env.REACT_APP_API_URL, { withCredentials: true, }); socket.on('connect', () => { socket.emit(ChatEvent.RequestAllMessages); }); socket.on(ChatEvent.SendAllMessages, (messages: ChatMessage[]) => { updateCachedData((draft) => { draft.splice(0, draft.length, ...messages); }); }); socket.on(ChatEvent.ReceiveMessage, (message: ChatMessage) => { updateCachedData((draft) => { draft.push(message); }); }); await cacheEntryRemoved; socket.off('connect'); socket.off(ChatEvent.SendAllMessages); socket.off(ChatEvent.ReceiveMessage); } catch { // if cacheEntryRemoved resolved before cacheDataLoaded, // cacheDataLoaded throws } }, }), |
Sending chat messages with Redux Toolkit Query
The official documentation does not have an example of sending messages using WebSockets. To do that with Redux Toolkit Query, we need a way to share the WebSocket connection across different endpoints.
To do that, we can look at the example written by Lenz Webber. Since we use cookie-based authentication in our WebSocket API, we should not connect immediately. Instead, let’s create a function that returns the socket.
1 2 3 4 5 6 7 8 9 |
let socket: Socket; function getSocket() { if (!socket) { socket = io(process.env.REACT_APP_API_URL, { withCredentials: true, }); } return socket; } |
Our API sends an acknowledgment when it receives a message. Thanks to that, our mutation can return a promise that resolves when our API recognizes the message we’ve sent.
The last step is to use the above function in our query and the mutation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
sendMessage: builder.mutation<ChatMessage, string>({ queryFn: (chatMessageContent: string) => { const socket = getSocket(); return new Promise(resolve => { socket.emit(ChatEvent.SendMessage, chatMessageContent, (message: ChatMessage) => { resolve({ data: message }); }); }) }, }), getMessages: builder.query<ChatMessage[], void>({ queryFn: () => ({ data: [] }), async onCacheEntryAdded( photoId, { cacheDataLoaded, cacheEntryRemoved, updateCachedData }, ) { try { await cacheDataLoaded; const socket = getSocket(); socket.on('connect', () => { socket.emit(ChatEvent.RequestAllMessages); }); socket.on(ChatEvent.SendAllMessages, (messages: ChatMessage[]) => { updateCachedData((draft) => { draft.splice(0, draft.length, ...messages); }); }); socket.on(ChatEvent.ReceiveMessage, (message: ChatMessage) => { updateCachedData((draft) => { draft.push(message); }); }); await cacheEntryRemoved; socket.off('connect'); socket.off(ChatEvent.SendAllMessages); socket.off(ChatEvent.ReceiveMessage); } catch { // if cacheEntryRemoved resolved before cacheDataLoaded, // cacheDataLoaded throws } }, }), |
In our API, the backend sends back our new message to all of the chat users through the ChatEvent.ReceiveMessage event. Thanks to that, we don’t need to invalidate the cache after sending a message. If your API works in a different way and you want to invalidate the cache, check out Introduction to Redux Toolkit Query with TypeScript.
Because our sendMessage mutation responds with a promise, we can use the isLoading property to know that a chat message is yet to be processed by our API.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import React, { ChangeEvent, FormEvent, useState } from 'react'; import { useSendMessageMutation } from './api/api'; const MessageForm = () => { const [message, setMessage] = useState(''); const [sendMessage, { isLoading }] = useSendMessageMutation(); const handleChange = (event: ChangeEvent<HTMLInputElement>) => { setMessage(event.target.value); }; const handleSubmit = (event: FormEvent<HTMLFormElement>) => { event.preventDefault(); sendMessage(message); setMessage(''); }; return ( <form onSubmit={handleSubmit}> <input name="message" onChange={handleChange} /> <button disabled={isLoading}>submit</button> </form> ); }; export default MessageForm; |
Summary
In this article, we’ve figured out a way to use Redux Toolkit Query to listen to chat messages and send them. To do that, we’ve taken examples both from the documentation and from the Redux Toolkit Query contributor. To do that, we’ve also grasped the idea of how to use the cache lifecycle to our advantage. Doing all of the above increased our knowledge of the Redux Toolkit Query.
I’ve been creating a project using RTK Query + Websockets and this post helped me a lot. I was able to implement everything I wanted using the official docs and this post. Thanks!
Thanks! Didn’t want to create a separate POST endpoint for the data as I already have a WS connection. This post helped me to understand how to emit events from the client-side
I’m attempting use RTK Query together with Firestore’s real-time updates. This post should be helpful, thanks!
Thanks a lot
where can we find the source code for this tutorial?
Is there any way we can write unit test cases with jest for the same
I was wondering how to deal with socket connection lose when page is refreshed ?