#include "vodTCPServer.h"
#include <process.h>
#pragma comment( lib, "ws2_32.lib" )
using namespace GXTVod;
//--------------------------------------------------
#ifdef SEPARATE_THREAD
CTCPServer::CTCPServer( unsigned short nPort,
CTCPServerEventCallBack *pEventCallBack,
int nAutoDisconnectTick,
int nProcessThreadCount )
: m_nPort( nPort ),
m_pEventCallBack( pEventCallBack ),
m_nAutoDisconnectTick( nAutoDisconnectTick ),
m_nProcessThreadCount( nProcessThreadCount )
#else
CTCPServer::CTCPServer( unsigned short nPort,
CTCPServerEventCallBack *pEventCallBack,
int nAutoDisconnectTick )
: m_nPort( nPort ),
m_pEventCallBack( pEventCallBack ),
m_nAutoDisconnectTick( nAutoDisconnectTick )
#endif
{
m_ListenSocket = INVALID_SOCKET;
m_hAcceptThread = NULL;
m_pFirstClient = NULL;
m_bActive = false;
m_nSessionCount = 0;
InitializeCriticalSection( &m_csClientSession );
#ifdef SEPARATE_THREAD
/*m_hProcessSem = CreateSemaphore( NULL, 0, 0x7FFFFFFF, NULL );
if ( m_hProcessSem == NULL )
{
throw "Error create m_hProcessSem!";
}*/
m_pProcessThreadList = NULL;
#else
m_pThreadPool = new ThreadPool( 8 );
if ( m_pThreadPool == NULL )
{
throw "Error create m_hThreadPool!";
}
#endif
}
//------------------------------------------------------
CTCPServer::~CTCPServer()
{
Stop();
DeleteCriticalSection( &m_csClientSession );
#ifdef SEPARATE_THREAD
//CloseHandle( m_hProcessSem );
#else
delete m_pThreadPool;
#endif
}
//------------------------------------------------------
int CTCPServer::Start()
{
if ( m_bActive )
{
return START_ERROR_ALREADYSTARTED;
}
WORD wMajorVersion, wMinorVersion;
WSADATA wsaData;
WORD VersionReqd;
int ret;
wMajorVersion = MAJOR_VERSION;
wMinorVersion = MINOR_VERSION;
VersionReqd = MAKEWORD( wMajorVersion, wMinorVersion );
ret = WSAStartup( VersionReqd, &wsaData );
if ( ret != 0 )
{
return START_ERROR_WSASTARTUP;
}
ret = InitListenSocket();
if ( START_SUCCESS != ret)
{
return ret;
}
m_hAcceptThread = reinterpret_cast< HANDLE >( _beginthreadex( NULL,
0,
reinterpret_cast< LPBEGIN_THREADEX_FUNC >( AcceptThread ),
this,
0,
NULL ) );
#ifdef SEPARATE_THREAD
m_pProcessThreadList = new HANDLE[ m_nProcessThreadCount ];
if ( m_pProcessThreadList == NULL )
{
return START_ERROR_ECREATETHREADLIST;
}
for ( int i = 0; i < m_nProcessThreadCount; i++ )
{
m_pProcessThreadList[ i ] = reinterpret_cast< HANDLE >( _beginthreadex( NULL,
0,
reinterpret_cast< LPBEGIN_THREADEX_FUNC >( ProcessThread ),
this,
0,
NULL ) );
if ( m_pProcessThreadList[ i ] == NULL )
{
return START_ERROR_ECREATETHREAD;
}
}
#else
m_hWorkerThread = reinterpret_cast< HANDLE >( _beginthreadex( NULL,
0,
reinterpret_cast< LPBEGIN_THREADEX_FUNC >( WorkerThread ),
this,
0,
NULL ) );
if ( m_hWorkerThread == NULL )
{
return START_ERROR_ECREATETHREAD;
}
#endif
m_bActive = true;
return START_SUCCESS;
}
//----------------------------------------------------------------------------
void CTCPServer::Stop()
{
if ( !m_bActive )
{
return;
}
m_bActive = false;
if ( m_ListenSocket != INVALID_SOCKET )
{
closesocket( m_ListenSocket );
m_ListenSocket = INVALID_SOCKET;
DisconnectAll();
}
CloseHandle( m_hAcceptThread );
#ifdef SEPARATE_THREAD
if ( m_pProcessThreadList != NULL )
{
WaitForMultipleObjects( m_nProcessThreadCount, m_pProcessThreadList, TRUE, INFINITE );
for ( int i = 0; i < m_nProcessThreadCount; i++ )
{
CloseHandle( m_pProcessThreadList[ i ] );
}
delete[] m_pProcessThreadList;
m_pProcessThreadList = NULL;
}
#else
while ( m_hWorkerThread != NULL ) //线程在判断ListenSocket为INVALID时会退出,这里需要等待一下。或者用WaitForsingleobject?
{
Sleep( 50 );
}
CloseHandle( m_hWorkerThread );
#endif
RemoveAllSession();
}
//--------------------------------------------------------------------------
int CTCPServer::InitListenSocket()
{
//create recv socket
int ret;
m_ListenSocket = socket( AF_INET, SOCK_STREAM, 0 );
if( m_ListenSocket == INVALID_SOCKET )
{
return START_ERROR_INVALIDSOCKET;
}
struct sockaddr_in Local_Addr;
ZeroMemory( &Local_Addr, sizeof( Local_Addr ) );
Local_Addr.sin_family = AF_INET;
Local_Addr.sin_port = htons( m_nPort );
Local_Addr.sin_addr.S_un.S_addr = INADDR_ANY;
ret = bind( m_ListenSocket, ( sockaddr* )&Local_Addr, sizeof( struct sockaddr_in ) );
if( ret == SOCKET_ERROR )
{
closesocket( m_ListenSocket );
m_ListenSocket = INVALID_SOCKET;
return START_ERROR_BIND;
}
ret = listen( m_ListenSocket, SOMAXCONN );
if ( ret == SOCKET_ERROR )
{
closesocket( m_ListenSocket );
m_ListenSocket = INVALID_SOCKET;
return START_ERROR_LISTEN;
}
return START_SUCCESS;
}
//----------------------------------------------------------------------
DWORD WINAPI CTCPServer::AcceptThread( CTCPServer *pServer )
{
SOCKET AcceptSocket;
struct sockaddr Client;
int nLen = sizeof( Client );
do
{
AcceptSocket = accept( pServer->GetSocket(), ( struct sockaddr FAR * )&Client, &nLen);
if ( INVALID_SOCKET != AcceptSocket )
{
pServer->AddSession( AcceptSocket );
pServer->OnAccept( AcceptSocket );
}
} while ( AcceptSocket != INVALID_SOCKET );
return 1;
}
//------------------------------------------------------------------------
#ifndef SEPARATE_THREAD
DWORD WINAPI CTCPServer::WorkerThread( CTCPServer *pServer )
{
int nLen;
char buffer[ MAX_SOCKET_BUFFLEN ] = { 0 };
while ( pServer->GetSocket() != INVALID_SOCKET )
{
struct timeval timeout = { 0, 1000 };
fd_set fdRead;
FD_ZERO( &fdRead );
//将会话的SOCKET加入fdRead列表
CSessionNode* pSession = pServer->GetNextSession( NULL ); //first node;
while ( pSession )
{
FD_SET( pSession->GetSocket(), &fdRead );
CSessionNode* pNextSession = pServer->GetNextSession( pSession );
pSession->Release();
pSession = pNextSession;
}
if ( fdRead.fd_count == 0 )
{
Sleep( 500 );
continue;
}
//等待1ms, 这里不能block, 因为在block期间可能会有新的连接建立并发送数据,但这里的fdRead并没有包含
int nRet = select( 0, &fdRead, NULL, NULL, &timeout );
if ( nRet == SOCKET_ERROR )
{
break;
}
if ( nRet == 0 )
{
continue;
}
pSession = pServer->GetNextSession( NULL ); //first node;
while ( pSession )
{
if ( pServer->CheckAutoDisconnect( pSession ) )
{
pServer->OnDisconnect( pSession );
break;
}
if ( FD_ISSET( pSession->GetSocket(), &fdRead ) )
{
nLen = recv( pSession->GetSocket(), buffer, MAX_SOCKET_BUFFLEN, 0 );
if ( SOCKET_ERROR == nLen || 0 == nLen )
{
pServer->OnDisconnect( pSession );
break;
}
else
{
pSession->UpdateActive();
pServer->OnProcessData( pSession, buffer, nLen );
}
}
CSessionNode* pNextSession = pServer->GetNextSession( pSession );
pSession->Release();
pSession = pNextSession;
}//end of while( pSession )
}/