Merge pull request #48 from antebrl/25-iptv-session-management

25 iptv session management
This commit is contained in:
Ante Brähler
2025-01-06 03:02:52 +01:00
committed by GitHub
19 changed files with 422 additions and 62 deletions

View File

@@ -1,8 +1,10 @@
const ChannelService = require('../services/ChannelService');
module.exports = {
getChannels(req, res) {
res.json(ChannelService.getChannels());
const channels = ChannelService.getFilteredChannels(req.query);
res.json(channels);
},
getCurrentChannel(req, res) {

View File

@@ -4,7 +4,7 @@ const ProxyHelperService = require('../services/proxy/ProxyHelperService');
module.exports = {
channel(req, res) {
let { url: targetUrl, channelId, headers } = req.query;
let { url: targetUrl, channelId, headers, id } = req.query;
if(!targetUrl) {
const channel = channelId ?
@@ -16,6 +16,11 @@ module.exports = {
return;
}
targetUrl = channel.url;
if(id) {
targetUrl += `?id=${id}`;
}
if(channel.headers && channel.headers.length > 0) {
headers = Buffer.from(JSON.stringify(channel.headers)).toString('base64');
}

View File

@@ -4,6 +4,7 @@ class Channel {
this.id = Channel.nextId++;
this.name = name;
this.url = url;
this.sessionProvider = null;
this.avatar = avatar;
this.mode = mode;
this.headers = headers;
@@ -14,6 +15,15 @@ class Channel {
restream() {
return this.mode === 'restream';
}
toFrontendJson() {
if(!this.sessionProvider) {
return this;
}
//Remove sessionProvider
const { sessionProvider, ...sanitizedObject } = this;
return sanitizedObject;
}
}
module.exports = Channel;

View File

@@ -35,10 +35,10 @@ proxyRouter.get('/key', proxyController.key);
app.use('/proxy', proxyRouter);
const PORT = 5000;
const server = app.listen(PORT, () => {
const server = app.listen(PORT, async () => {
console.log(`Server listening on Port ${PORT}`);
if (ChannelService.getCurrentChannel().restream()) {
streamController.start(process.env.DEFAULT_CHANNEL_URL);
await streamController.start(ChannelService.getCurrentChannel());
}
});

View File

@@ -1,5 +1,6 @@
const streamController = require('./restream/StreamController');
const Channel = require('../models/Channel');
const storageService = require('./restream/StorageService');
class ChannelService {
constructor() {
@@ -33,6 +34,17 @@ class ChannelService {
return this.channels;
}
getFilteredChannels({ playlist, group }) {
let filtered = this.channels;
if (playlist) {
filtered = filtered.filter(ch => ch.playlist && ch.playlist == playlist);
}
if (group) {
filtered = filtered.filter(ch => ch.group && ch.group.toLowerCase() === group.toLowerCase());
}
return filtered;
}
addChannel({ name, url, avatar, mode, headersJson, group = false, playlist = false }) {
const existing = this.channels.find(channel => channel.url === url);
@@ -47,7 +59,7 @@ class ChannelService {
return newChannel;
}
setCurrentChannel(id) {
async setCurrentChannel(id) {
const nextChannel = this.channels.find(channel => channel.id === id);
if (!nextChannel) {
throw new Error('Channel does not exist');
@@ -55,11 +67,11 @@ class ChannelService {
if (this.currentChannel !== nextChannel) {
if (nextChannel.restream()) {
streamController.stop(this.currentChannel.id);
streamController.stop(nextChannel.id);
streamController.start(nextChannel);
streamController.stop(this.currentChannel);
storageService.deleteChannelStorage(nextChannel.id);
await streamController.start(nextChannel);
} else {
streamController.stop(this.currentChannel.id);
streamController.stop(this.currentChannel);
}
this.currentChannel = nextChannel;
}
@@ -74,7 +86,7 @@ class ChannelService {
return this.channels.find(channel => channel.id === id);
}
deleteChannel(id) {
async deleteChannel(id) {
const channelIndex = this.channels.findIndex(channel => channel.id === id);
if (channelIndex === -1) {
throw new Error('Channel does not exist');
@@ -84,12 +96,12 @@ class ChannelService {
if (this.currentChannel.id === id) {
if (deletedChannel.restream()) {
streamController.stop(deletedChannel.id);
streamController.stop(deletedChannel);
}
this.currentChannel = this.channels.length > 0 ? this.channels[0] : null;
if (this.currentChannel?.restream()) {
streamController.start(this.currentChannel);
await streamController.start(this.currentChannel);
}
}
@@ -97,7 +109,7 @@ class ChannelService {
return this.currentChannel;
}
updateChannel(id, updatedAttributes) {
async updateChannel(id, updatedAttributes) {
const channelIndex = this.channels.findIndex(channel => channel.id === id);
if (channelIndex === -1) {
throw new Error('Channel does not exist');
@@ -112,9 +124,9 @@ class ChannelService {
if (this.currentChannel.id == id) {
if (streamChanged) {
streamController.stop(channel.id);
streamController.stop(channel);
if (channel.restream()) {
streamController.start(channel);
await streamController.start(channel);
}
}
}

View File

@@ -34,25 +34,25 @@ class PlaylistService {
}
updatePlaylist(playlistUrl, updatedAttributes) {
async updatePlaylist(playlistUrl, updatedAttributes) {
const channels = ChannelService
.getChannels()
.filter(channel => channel.playlist === playlistUrl);
for(let channel of channels) {
channel = ChannelService.updateChannel(channel.id, updatedAttributes);
channel = await ChannelService.updateChannel(channel.id, updatedAttributes);
}
return channels;
}
deletePlaylist(playlistUrl) {
async deletePlaylist(playlistUrl) {
const channels = ChannelService
.getChannels()
.filter(channel => channel.playlist === playlistUrl);
for(const channel of channels) {
ChannelService.deleteChannel(channel.id);
await ChannelService.deleteChannel(channel.id);
}
return channels;

View File

@@ -7,12 +7,19 @@ const STORAGE_PATH = process.env.STORAGE_PATH;
function startFFmpeg(nextChannel) {
console.log('Starting FFmpeg process with channel:', nextChannel.id);
if (currentFFmpegProcess) {
console.log('Gracefully terminate previous ffmpeg-Prozess...');
currentFFmpegProcess.kill('SIGTERM');
// if (currentFFmpegProcess) {
// console.log('Gracefully terminate previous ffmpeg-Prozess...');
// await stopFFmpeg();
// }
let channelUrl = nextChannel.url;
if(nextChannel.sessionProvider) {
const sessionQuery = nextChannel.sessionProvider.getSessionQuery();
const querySeparator = channelUrl.includes('?') ? '&' : '?';
channelUrl += `${querySeparator}${sessionQuery}`;
}
const channelUrl = nextChannel.url;
currentChannelId = nextChannel.id;
const headers = nextChannel.headers;
@@ -41,25 +48,36 @@ function startFFmpeg(nextChannel) {
console.error(`stderr: ${data}`);
});
currentFFmpegProcess.on('close', (code) => {
console.log(`ffmpeg-Process terminated with code: ${code}`);
// currentFFmpegProcess.on('close', (code) => {
// console.log(`ffmpeg-Process terminated with code: ${code}`);
// currentFFmpegProcess = null;
// //Restart if crashed
// if (code !== null && code !== 255) {
// console.log(`Restarting FFmpeg process with channel: ${nextChannel.id}`);
// //wait 1 second before restarting
// setTimeout(() => startFFmpeg(nextChannel), 2000);
// }
});
// // currentFFmpegProcess = null;
// // //Restart if crashed
// // if (code !== null && code !== 255) {
// // console.log(`Restarting FFmpeg process with channel: ${nextChannel.id}`);
// // //wait 1 second before restarting
// // setTimeout(() => startFFmpeg(nextChannel), 2000);
// // }
// });
}
function stopFFmpeg() {
if (currentFFmpegProcess) {
console.log('Gracefully terminate ffmpeg-Process...');
currentFFmpegProcess.kill('SIGTERM');
currentFFmpegProcess = null;
}
return new Promise((resolve, reject) => {
if (currentFFmpegProcess) {
console.log('Gracefully terminate ffmpeg-Process...');
currentFFmpegProcess.on('close', (code) => {
console.log(`ffmpeg-Process terminated with code: ${code}`);
currentFFmpegProcess = null;
resolve();
});
currentFFmpegProcess.kill('SIGTERM');
} else {
console.log('No ffmpeg process is running.');
resolve();
}
});
}
function isFFmpegRunning() {

View File

@@ -1,19 +1,32 @@
const ffmpegService = require('./FFmpegService');
const storageService = require('./StorageService');
const SessionFactory = require('../session/SessionFactory');
function start(nextChannel) {
async function start(nextChannel) {
console.log('Starting channel', nextChannel.id);
storageService.createChannelStorage(nextChannel.id);
if (!ffmpegService.isFFmpegRunning()) {
ffmpegService.startFFmpeg(nextChannel);
nextChannel.sessionProvider = SessionFactory.getSessionProvider(nextChannel.url);
if(nextChannel.sessionProvider) {
await nextChannel.sessionProvider.createSession();
}
ffmpegService.startFFmpeg(nextChannel);
}
function stop(channelId) {
async function stop(channel) {
console.log('Stopping channel', channel.id);
if (ffmpegService.isFFmpegRunning()) {
ffmpegService.stopFFmpeg();
await ffmpegService.stopFFmpeg();
}
storageService.deleteChannelStorage(channelId);
if (channel.sessionProvider) {
channel.sessionProvider.destroySession();
channel.sessionProvider = null;
}
storageService.deleteChannelStorage(channel.id);
}
module.exports = {

View File

@@ -0,0 +1,14 @@
const StreamedSuSession = require('./StreamedSuSession');
class SessionFactory {
static getSessionProvider(channelUrl) {
switch (true) {
case channelUrl.includes('vipstreams.in'): //StreamedSU
return new StreamedSuSession(channelUrl, 'https://secure.embedme.top');
default:
return null;
}
}
}
module.exports = SessionFactory;

View File

@@ -0,0 +1,22 @@
//Implement this interface for your specific session provider
class SessionHandler {
constructor() {
if (this.constructor === SessionHandler) {
throw new Error("Abstract class cannot be instantiated");
}
}
async createSession(url, interval) {
throw new Error("Method 'startSession()' must be implemented");
}
destroySession() {
throw new Error("Method 'destroySession()' must be implemented");
}
getSessionQuery() {
throw new Error("Method 'getSessionQuery()' must be implemented");
}
}
module.exports = SessionHandler;

View File

@@ -0,0 +1,101 @@
const SessionHandler = require('./SessionHandler');
class StreamedSuSession extends SessionHandler {
constructor(channelUrl, baseUrl) {
super();
this.channelUrl = channelUrl;
this.baseUrl = baseUrl;
this.checkInterval = null;
this.sessionData = null;
}
async #initSession() {
console.log('Creating session:', this.channelUrl);
try {
const response = await fetch(`${this.baseUrl}/init-session`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
path: new URL(this.channelUrl).pathname,
})
});
if (!response.ok) {
console.log('Failed to initialize session: ', response);
throw new Error('Failed to initialize session');
}
this.sessionData = await response.json();
return this.sessionData;
} catch (error) {
console.error('Session initialization failed:', error);
throw error;
}
}
async #checkSession() {
if (!this.sessionData?.id) {
return false;
}
console.log('Checking session:', this.sessionData.id);
try {
const response = await fetch(`${this.baseUrl}/check/${this.sessionData.id}`);
return response.status === 200;
} catch (error) {
console.error('Session check failed:', error);
return false;
}
}
#startAutoCheck(interval = 15000) {
if (this.checkInterval) {
this.stopAutoCheck();
}
this.checkInterval = setInterval(async () => {
const isValid = await this.#checkSession();
if (!isValid) {
console.log('Session aborted');
this.#initSession();
}
}, interval);
}
#stopAutoCheck() {
if (this.checkInterval) {
clearInterval(this.checkInterval);
this.checkInterval = null;
}
}
// Public Methods
async createSession(interval = 15000) {
if (!this.sessionData) {
await this.#initSession();
this.#startAutoCheck(interval);
}
return this.getSessionQuery();
}
destroySession() {
this.#stopAutoCheck();
this.sessionData = null;
return true;
}
getSessionQuery() {
if (!this.sessionData?.id) {
return '';
}
return `id=${this.sessionData.id}`;
}
}
module.exports = StreamedSuSession;

View File

@@ -11,31 +11,31 @@ module.exports = (io, socket) => {
}
});
socket.on('set-current-channel', (id) => {
socket.on('set-current-channel', async (id) => {
try {
const nextChannel = ChannelService.setCurrentChannel(id);
io.emit('channel-selected', nextChannel); // Broadcast to all clients
const nextChannel = await ChannelService.setCurrentChannel(id);
io.emit('channel-selected', nextChannel.toFrontendJson()); // Broadcast to all clients
} catch (err) {
console.error(err);
socket.emit('app-error', { message: err.message });
}
});
socket.on('delete-channel', (id) => {
socket.on('delete-channel', async (id) => {
try {
const current = ChannelService.deleteChannel(id);
const current = await ChannelService.deleteChannel(id);
io.emit('channel-deleted', id); // Broadcast to all clients
io.emit('channel-selected', current);
io.emit('channel-selected', current.toFrontendJson());
} catch (err) {
console.error(err);
socket.emit('app-error', { message: err.message });
}
});
socket.on('update-channel', ({ id, updatedAttributes }) => {
socket.on('update-channel', async ({ id, updatedAttributes }) => {
try {
const updatedChannel = ChannelService.updateChannel(id, updatedAttributes);
io.emit('channel-updated', updatedChannel); // Broadcast to all clients
const updatedChannel = await ChannelService.updateChannel(id, updatedAttributes);
io.emit('channel-updated', updatedChannel.toFrontendJson()); // Broadcast to all clients
} catch (err) {
console.error(err);
socket.emit('app-error', { message: err.message });

View File

@@ -19,11 +19,11 @@ module.exports = (io, socket) => {
});
socket.on('update-playlist', ({ playlist, updatedAttributes }) => {
socket.on('update-playlist', async ({ playlist, updatedAttributes }) => {
try {
const channels = PlaylistService.updatePlaylist(playlist, updatedAttributes);
channels.forEach(channel => {
io.emit('channel-updated', channel);
io.emit('channel-updated', channel.toFrontendJson());
});
} catch (err) {
console.error(err);
@@ -32,13 +32,13 @@ module.exports = (io, socket) => {
});
socket.on('delete-playlist', (playlist) => {
socket.on('delete-playlist', async (playlist) => {
try {
const channels = PlaylistService.deletePlaylist(playlist);
channels.forEach(channel => {
io.emit('channel-deleted', channel.id);
});
io.emit('channel-selected', ChannelService.getCurrentChannel());
io.emit('channel-selected', ChannelService.getCurrentChannel().toFrontendJson());
} catch (err) {
console.error(err);
socket.emit('app-error', { message: err.message });

View File

@@ -10,6 +10,8 @@ import apiService from './services/ApiService';
import SettingsModal from './components/SettingsModal';
import { ToastProvider } from './components/notifications/ToastContext';
import ToastContainer from './components/notifications/ToastContainer';
import SessionFactory from './services/session/SessionFactory';
import { SessionHandler } from './services/session/SessionHandler';
function App() {
const [channels, setChannels] = useState<Channel[]>([]);
@@ -23,6 +25,9 @@ function App() {
const [searchQuery, setSearchQuery] = useState('');
const [editChannel, setEditChannel] = useState<Channel | null>(null);
const [sessionProvider, setSessionProvider] = useState<SessionHandler | null>(null);
const [sessionQuery, setSessionQuery] = useState<string | undefined>(undefined);
useEffect(() => {
apiService
.request<Channel[]>('/channels/', 'GET')
@@ -40,6 +45,7 @@ function App() {
};
const channelSelectedListener = (nextChannel: Channel) => {
checkSession(nextChannel, selectedChannel?.url != nextChannel.url);
setSelectedChannel(nextChannel);
};
@@ -81,6 +87,24 @@ function App() {
socketService.connect();
const checkSession = (channel : Channel, urlHasChanged : boolean | undefined) => {
const newProvider = SessionFactory.getSessionProvider(channel.url, setSessionQuery);
if(!newProvider || channel.mode === 'restream') {
sessionProvider?.destroySession();
setSessionProvider(null);
return;
}
if(newProvider?.type() != sessionProvider?.type() || urlHasChanged) {
sessionProvider?.destroySession();
setSessionProvider(null);
setSessionProvider(newProvider);
sessionProvider?.createSession();
}
};
return () => {
socketService.unsubscribeFromEvent('channel-added', channelAddedListener);
socketService.unsubscribeFromEvent('channel-selected', channelSelectedListener);
@@ -154,7 +178,7 @@ function App() {
/>
</div>
<VideoPlayer channel={selectedChannel} syncEnabled={syncEnabled} />
<VideoPlayer channel={selectedChannel} sessionQuery={sessionQuery} syncEnabled={syncEnabled} />
</div>
<div className="col-span-12 lg:col-span-4">

View File

@@ -5,10 +5,11 @@ import { ToastContext } from './notifications/ToastContext';
interface VideoPlayerProps {
channel: Channel | null;
sessionQuery: string | undefined;
syncEnabled: boolean;
}
function VideoPlayer({ channel, syncEnabled }: VideoPlayerProps) {
function VideoPlayer({ channel, sessionQuery, syncEnabled }: VideoPlayerProps) {
const videoRef = useRef<HTMLVideoElement>(null);
const hlsRef = useRef<Hls | null>(null);
const { addToast, removeToast, clearToasts, editToast } = useContext(ToastContext);
@@ -50,9 +51,11 @@ function VideoPlayer({ channel, syncEnabled }: VideoPlayerProps) {
},
});
const querySeparator = channel.url.includes('?') ? '&' : '?';
const sourceLinks: Record<ChannelMode, string> = {
direct: channel.url,
proxy: import.meta.env.VITE_BACKEND_URL + '/proxy/channel', //TODO: needs update for multi-channel streaming
direct: sessionQuery ? channel.url + querySeparator + sessionQuery : channel.url,
//TODO: needs update for multi-channel streaming
proxy: sessionQuery ? import.meta.env.VITE_BACKEND_URL + '/proxy/channel?' + sessionQuery : import.meta.env.VITE_BACKEND_URL + '/proxy/channel',
restream: import.meta.env.VITE_BACKEND_URL + '/streams/' + channel.id + "/" + channel.id + ".m3u8", //e.g. http://backend:3000/streams/1/1.m3u8
};
@@ -214,7 +217,8 @@ function VideoPlayer({ channel, syncEnabled }: VideoPlayerProps) {
hlsRef.current.destroy();
}
};
}, [channel?.url, channel?.mode, syncEnabled]);
}, [channel?.url, channel?.mode, syncEnabled, sessionQuery]);
const handleVideoClick = (event: React.MouseEvent<HTMLVideoElement>) => {
if (videoRef.current?.muted) {

View File

@@ -7,6 +7,7 @@ const apiService = {
* Execute API request
* @param path - Path (e.g. "/channels/")
* @param method - HTTP-Method (GET, POST, etc.)
* @param api_url - The API URL (default: API_BASE_URL + '/api')
* @param body - The request body (e.g. POST)
* @returns Ein Promise with the parsed JSON response to class T
*/

View File

@@ -0,0 +1,15 @@
import { SessionHandler } from "./SessionHandler";
import { StreamedSuSession } from "./StreamedSuSession";
class SessionFactory {
static getSessionProvider(channelUrl: string, setSessionQuery: React.Dispatch<React.SetStateAction<string | undefined>>): SessionHandler | null {
switch (true) {
case channelUrl.includes('vipstreams.in'): //StreamedSU
return new StreamedSuSession(channelUrl, 'https://secure.embedme.top', setSessionQuery);
default:
return null;
}
}
}
export default SessionFactory;

View File

@@ -0,0 +1,9 @@
//Implement this interface for your specific session provider
interface SessionHandler {
createSession(interval?: number): Promise<void>;
destroySession(): boolean;
//getSessionQuery(): string;
type(): string;
}
export type { SessionHandler };

View File

@@ -0,0 +1,110 @@
import { SessionHandler } from "./SessionHandler";
class StreamedSuSession implements SessionHandler {
private baseUrl: string;
private channelUrl: string;
private checkInterval: number | null;
private sessionId: string | null;
private setSessionQuery: React.Dispatch<React.SetStateAction<string | undefined>>;
constructor(channelUrl: string, baseUrl: string, setSessionQuery: React.Dispatch<React.SetStateAction<string | undefined>>) {
this.channelUrl = channelUrl;
this.baseUrl = baseUrl;
this.checkInterval = null;
this.sessionId = null;
this.setSessionQuery = setSessionQuery;
}
private async initSession(): Promise<any> {
console.log('Creating session:', this.channelUrl);
try {
const response = await fetch(`${this.baseUrl}/init-session`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
path: new URL(this.channelUrl).pathname,
})
});
if (!response.ok) {
throw new Error('Failed to initialize session');
}
const sessionData = await response.json();
this.sessionId = sessionData.id;
this.setSessionQuery(`id=${this.sessionId}`);
return sessionData.id;
} catch (error) {
console.error('Session initialization failed:', error);
throw error;
}
}
private async checkSession(): Promise<boolean> {
if (!this.sessionId) {
return false;
}
console.log('Checking session:', this.sessionId);
try {
const response = await fetch(`${this.baseUrl}/check/${this.sessionId}`);
return response.status === 200;
} catch (error) {
console.error('Session check failed:', error);
return false;
}
}
private startAutoCheck(interval: number = 15000): void {
if (this.checkInterval) {
this.stopAutoCheck();
}
this.checkInterval = window.setInterval(async () => {
const isValid = await this.checkSession();
if (!isValid) {
console.log('Session aborted');
this.initSession();
}
}, interval);
}
private stopAutoCheck(): void {
if (this.checkInterval) {
window.clearInterval(this.checkInterval);
this.checkInterval = null;
}
}
// Public Methods
async createSession(interval: number = 15000): Promise<void> {
if (!this.sessionId) {
await this.initSession();
this.startAutoCheck(interval);
}
}
destroySession(): boolean {
console.log('Destroying session:', this.sessionId);
this.stopAutoCheck();
this.sessionId = null;
this.setSessionQuery(undefined);
return true;
}
// getSessionQuery(): string {
// console.log('Session ID:', this.sessionId);
// if (!this.sessionId) {
// return '';
// }
// return `id=${this.sessionId}`;
// }
type(): string {
return 'streamed-su';
}
}
export { StreamedSuSession };