Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Comment: | {5798} refactoring of the HTTP socket server for better stability on high load - set directly the client IP without calling GetRemoteIP/GetPeerName - do not ACCEPT any new connection when the queue is full (wait ContentionAbortDelay = 5 seconds) |
---|---|
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
62688161a7e100f0f5c164ba5fb98315 |
User & Date: | ab 2020-03-10 23:19:42 |
2020-03-11
| ||
00:17 | {5799} socket HTTP server will return only meaningful HTTP headers to the processing code check-in: 93592d95a9 user: ab tags: trunk | |
2020-03-10
| ||
23:19 | {5798} refactoring of the HTTP socket server for better stability on high load - set directly the client IP without calling GetRemoteIP/GetPeerName - do not ACCEPT any new connection when the queue is full (wait ContentionAbortDelay = 5 seconds) check-in: 62688161a7 user: ab tags: trunk | |
21:26 | {5797} fixed recent memory leak regression in TSynCache check-in: 350928a9a8 user: ab tags: trunk | |
Changes to SynBidirSock.pas.
3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 |
function TAsynchConnections.ConnectionCreate(aSocket: TSocket; const aRemoteIp: RawUTF8;
out aConnection: TAsynchConnection): boolean;
begin // you can override this class then call ConnectionAdd
if Terminated then
result := false else begin
aConnection := fStreamClass.Create(aRemoteIP);
result := ConnectionAdd(aSocket, aConnection);
end;
end;
function TAsynchConnections.ConnectionAdd(aSocket: TSocket;
aConnection: TAsynchConnection): boolean;
begin
result := false; // caller should release aSocket
|
| |
3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 |
function TAsynchConnections.ConnectionCreate(aSocket: TSocket; const aRemoteIp: RawUTF8; out aConnection: TAsynchConnection): boolean; begin // you can override this class then call ConnectionAdd if Terminated then result := false else begin aConnection := fStreamClass.Create(aRemoteIP); result := ConnectionAdd(aSocket,aConnection); end; end; function TAsynchConnections.ConnectionAdd(aSocket: TSocket; aConnection: TAsynchConnection): boolean; begin result := false; // caller should release aSocket |
Changes to SynCrtSock.pas.
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 ... 294 295 296 297 298 299 300 301 302 303 304 305 306 307 ... 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 ... 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 ... 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 ... 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 ... 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 ... 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 .... 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 .... 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 .... 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 .... 4869 4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 .... 4930 4931 4932 4933 4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 .... 5056 5057 5058 5059 5060 5061 5062 5063 5064 5065 5066 5067 5068 5069 5070 5071 5072 5073 5074 5075 5076 5077 5078 5079 5080 5081 5082 5083 5084 .... 6075 6076 6077 6078 6079 6080 6081 6082 6083 6084 6085 6086 6087 6088 6089 6090 .... 6220 6221 6222 6223 6224 6225 6226 6227 6228 6229 6230 6231 6232 6233 6234 6235 .... 6260 6261 6262 6263 6264 6265 6266 6267 6268 6269 6270 6271 6272 6273 6274 6275 6276 6277 6278 6279 6280 6281 .... 6484 6485 6486 6487 6488 6489 6490 6491 6492 6493 6494 6495 6496 6497 6498 6499 6500 6501 6502 6503 6504 6505 .... 6598 6599 6600 6601 6602 6603 6604 6605 6606 6607 6608 6609 6610 6611 6612 6613 .... 6881 6882 6883 6884 6885 6886 6887 6888 6889 6890 6891 6892 6893 6894 6895 6896 6897 6898 6899 6900 6901 6902 6903 6904 6905 6906 6907 6908 .... 6921 6922 6923 6924 6925 6926 6927 6928 6929 6930 6931 6932 6933 6934 6935 .... 7121 7122 7123 7124 7125 7126 7127 7128 7129 7130 7131 7132 7133 7134 7135 .... 7196 7197 7198 7199 7200 7201 7202 7203 7204 7205 7206 7207 7208 7209 7210 7211 7212 7213 7214 7215 7216 7217 7218 .... 7389 7390 7391 7392 7393 7394 7395 7396 7397 7398 7399 7400 7401 7402 7403 7404 7405 7406 7407 .... 7435 7436 7437 7438 7439 7440 7441 7442 7443 7444 7445 7446 7447 7448 7449 ..... 12743 12744 12745 12746 12747 12748 12749 12750 12751 12752 12753 12754 12755 12756 |
fBytesOut: Int64; fSocketLayer: TCrtSocketLayer; fSockInEofError: integer; fTLS, fWasBind: boolean; // updated by every SockSend() call fSndBuf: SockString; fSndBufLen: integer; // updated during UDP connection, accessed via PeerAddress/PeerPort fPeerAddr: TSockAddr; {$ifdef MSWINDOWS} fSecure: TSChannelClient; {$endif MSWINDOWS} procedure SetInt32OptionByIndex(OptName, OptVal: integer); virtual; procedure AcceptRequest(aClientSock: TSocket; aRemoteIP: PSockString); public /// common initialization of all constructors // - do not call directly, but use Open / Bind constructors instead constructor Create(aTimeOut: PtrInt=10000); reintroduce; virtual; /// connect to aServer:aPort // - you may ask for a TLS secured client connection (only available under // Windows by now, using the SChannel API) ................................................................................ constructor Bind(const aPort: SockString; aLayer: TCrtSocketLayer=cslTCP); /// low-level internal method called by Open() and Bind() constructors // - raise an ECrtSocket exception on error // - you may ask for a TLS secured client connection (only available under // Windows by now, using the SChannel API) procedure OpenBind(const aServer, aPort: SockString; doBind: boolean; aSock: integer=-1; aLayer: TCrtSocketLayer=cslTCP; aTLS: boolean=false); /// initialize SockIn for receiving with read[ln](SockIn^,...) // - data is buffered, filled as the data is available // - read(char) or readln() is indeed very fast // - multithread applications would also use this SockIn pseudo-text file // - by default, expect CR+LF as line feed (i.e. the HTTP way) procedure CreateSockIn(LineBreak: TTextLineBreakStyle=tlbsCRLF; InputBufferSize: Integer=1024); ................................................................................ // - raw Data is sent directly to OS: no LF/CRLF is appened to the block procedure Write(const Data: SockString); /// direct accept an new incoming connection on a bound socket // - instance should have been setup as a server via a previous Bind() call // - returns nil on error or a ResultClass instance on success // - if ResultClass is nil, will return a plain TCrtSocket, but you may // specify e.g. THttpServerSocket if you expect incoming HTTP requests function AcceptIncoming(RemoteIP: PSockString=nil; ResultClass: TCrtSocketClass=nil): TCrtSocket; /// remote IP address of the last packet received (SocketLayer=slUDP only) function PeerAddress: SockString; /// remote IP port of the last packet received (SocketLayer=slUDP only) function PeerPort: integer; /// set the TCP_NODELAY option for the connection // - default 1 (true) will disable the Nagle buffering algorithm; it should // only be set for applications that send frequent small bursts of information ................................................................................ /// Socket API based HTTP/1.1 server class used by THttpServer Threads THttpServerSocket = class(THttpSocket) protected fMethod: SockString; fURL: SockString; fKeepAliveClient: boolean; fRemoteIP: SockString; fRemoteConnectionID: THttpServerConnectionID; fServer: THttpServer; public /// create the socket according to a server // - will register the THttpSocketCompress functions from the server constructor Create(aServer: THttpServer); reintroduce; /// main object function called after aClientSock := Accept + Create // - initialize the internal TCrtSocket with the supplied accepted socket // - caller will then use the GetRequest method below to // get the request procedure InitRequest(aClientSock: TSocket; const aRemoteIP: SockString=''); /// main object function called after aClientSock := Accept + Create: // - get Command, Method, URL, Headers and Body (if withBody is TRUE) // - get sent data in Content (if withBody=true and ContentLength<>0) // - returned enumeration will indicates the processing state function GetRequest(withBody: boolean; headerMaxTix: Int64): THttpServerSocketGetRequestResult; virtual; /// contains the method ('GET','POST'.. e.g.) after GetRequest() property Method: SockString read fMethod; ................................................................................ property URL: SockString read fURL; /// true if the client is HTTP/1.1 and 'Connection: Close' is not set // - default HTTP/1.1 behavior is "keep alive", unless 'Connection: Close' // is specified, cf. RFC 2068 page 108: "HTTP/1.1 applications that do not // support persistent connections MUST include the "close" connection option // in every message" property KeepAliveClient: boolean read fKeepAliveClient write fKeepAliveClient; /// the recognized client IP, after a call to GetRequest() // - is either the raw connection IP to the current server socket, or // a custom header value set by a local proxy, e.g. // THttpServerGeneric.RemoteIPHeader='X-Real-IP' for nginx property RemoteIP: SockString read fRemoteIP; /// the recognized connection ID, after a call to GetRequest() // - identifies either the raw connection on the current server, or is // a custom header value set by a local proxy, e.g. // THttpServerGeneric.RemoteConnIDHeader='X-Conn-ID' for nginx property RemoteConnectionID: THttpServerConnectionID read fRemoteConnectionID; end; ................................................................................ // override THttpServer.Request() function or, if you need a lower-level access // (change the protocol, e.g.) THttpServer.Process() method itself THttpServerResp = class(TSynThread) protected fServer: THttpServer; fServerSock: THttpServerSocket; fClientSock: TSocket; fConnectionID: THttpServerConnectionID; /// main thread loop: read request from socket, send back answer procedure Execute; override; public /// initialize the response thread for the corresponding incoming socket // - this version will get the request directly from an incoming socket constructor Create(aSock: TSocket; aServer: THttpServer); reintroduce; overload; /// initialize the response thread for the corresponding incoming socket // - this version will handle KeepAlive, for such an incoming request constructor Create(aServerSock: THttpServerSocket; aServer: THttpServer); reintroduce; overload; virtual; /// the associated socket to communicate with the client property ServerSock: THttpServerSocket read fServerSock; /// the associated main HTTP server instance ................................................................................ /// how many threads are currently running in this thread pool property RunningThreads: integer read fRunningThreads; /// how many tasks were rejected due to thread pool contention // - if this number is high, consider setting a higher number of threads, // or profile and tune the Task method property ContentionAbortCount: cardinal read fContentionAbortCount; /// milliseconds delay to reject a connection due to contention // - default is 5000, i.e. 5 seconds for IOCP, and disabled otherwise // (since aQueuePendingContext is supposed to be used) property ContentionAbortDelay: integer read fContentionAbortDelay write fContentionAbortDelay; /// total milliseconds spent waiting for an available slot in the queue // - contention won't fail immediately, but will retry until ContentionAbortDelay // - any high number here requires code refactoring of the Task method property ContentionTime: Int64 read fContentionTime; /// how many times the pool waited for an available slot in the queue ................................................................................ // identified as HTTP/1.1 keep alive, or HTTP body length is bigger than 1 MB TSynThreadPoolTHttpServer = class(TSynThreadPool) protected fServer: THttpServer; {$ifndef USE_WINIOCP} function QueueLength: integer; override; {$endif} // here aContext is a pointer(TSocket=THandle) value procedure Task(aCaller: TSynThread; aContext: Pointer); override; procedure TaskAbort(aContext: Pointer); override; public /// initialize a thread pool with the supplied number of threads // - Task() overridden method processs the HTTP request set by Push() // - up to 256 threads can be associated to a Thread Pool constructor Create(Server: THttpServer; NumberOfThreads: Integer=32); reintroduce; ................................................................................ property MaximumAllowedContentLength: cardinal read fMaximumAllowedContentLength write SetMaximumAllowedContentLength; /// defines request/response internal queue length // - default value if 1000, which sounds fine for most use cases // - for THttpApiServer, will return 0 if the system does not support HTTP // API 2.0 (i.e. under Windows XP or Server 2003) // - for THttpServer, will shutdown any incoming accepted socket if the // internal TSynThreadPool.PendingContextCount+ThreadCount exceeds this limit // - increase this value if you don't have any load-balancing in place, and // in case of e.g. many 503 HTTP answers or if many "QueueFull" messages // appear in HTTP.sys log files (normally in // C:\Windows\System32\LogFiles\HTTPERR\httperr*.log) - may appear with // thousands of concurrent clients accessing at once the same server - // see @http://msdn.microsoft.com/en-us/library/windows/desktop/aa364501 // - you can use this property with a reverse-proxy as load balancer, e.g. ................................................................................ procedure AppendChar(chr: AnsiChar; var dest: shortstring); {$ifdef FPC}inline;{$endif} begin inc(dest[0]); dest[ord(dest[0])] := chr; end; procedure IP4Text(const ip4addr; var result: SockString); var b: array[0..3] of byte absolute ip4addr; s: shortstring; i: PtrInt; begin if cardinal(ip4addr)=0 then result := '' else if cardinal(ip4addr)=$0100007f then result := '127.0.0.1' else begin s := ''; i := 0; repeat AppendI32(b[i],s); if i=3 then break; AppendChar('.',s); ................................................................................ procedure IPText(const sin: TVarSin; var result: SockString); begin if sin.sin_family=AF_INET then IP4Text(sin.sin_addr,result) else begin result := GetSinIP(sin); // AF_INET6 may be optimized in a future revision if result='::1' then result := '127.0.0.1'; // IP6 localhost loopback benefits of matching IP4 end; end; function IsPublicIP(ip4: cardinal): boolean; begin result := false; case ip4 and 255 of // ignore IANA private IP4 address spaces ................................................................................ end; procedure TCrtSocket.SetInt32OptionByIndex(OptName, OptVal: integer); begin SetInt32Option(Sock,OptName,OptVal); end; procedure TCrtSocket.AcceptRequest(aClientSock: TSocket; aRemoteIP: PSockString); begin CreateSockIn; // use SockIn by default if not already initialized: 2x faster {$ifdef LINUX} // on Linux fd returned from accept() inherits all parent fd options // except O_NONBLOCK and O_ASYNC; fSock := aClientSock; {$else} // on other OS inheritance is undefined, so call OpenBind to set all fd options OpenBind('','',false,aClientSock, fSocketLayer); // set the ACCEPTed aClientSock Linger := 5; // should remain open for 5 seconds after a closesocket() call {$endif} if (aRemoteIP<>nil) and (aRemoteIP^='') then aRemoteIP^ := GetRemoteIP(aClientSock); end; procedure TCrtSocket.OpenBind(const aServer, aPort: SockString; doBind: boolean; aSock: integer; aLayer: TCrtSocketLayer; aTLS: boolean); const BINDTXT: array[boolean] of string[4] = ('open','bind'); BINDMSG: array[boolean] of string = ('Is a server running on this address:port?', 'Another process may be currently listening to this port!'); begin fSocketLayer := aLayer; ................................................................................ end; end; {$ifdef SYNCRTDEBUGLOW} TSynLog.Add.Log(sllCustom2, 'OpenBind(%:%) % sock=% (accept=%) ', [fServer,fPort,BINDTXT[doBind], fSock, aSock], self); {$endif} end; procedure TCrtSocket.SockSend(const Values: array of const); var i: integer; tmp: shortstring; begin for i := 0 to high(Values) do with Values[i] do ................................................................................ end; procedure TCrtSocket.Write(const Data: SockString); begin SndLow(pointer(Data),length(Data)); end; function TCrtSocket.AcceptIncoming(RemoteIP: PSockString; ResultClass: TCrtSocketClass): TCrtSocket; var client: TSocket; sin: TVarSin; begin result := nil; if (self=nil) or (fSock<=0) then exit; client := Accept(fSock,sin); if client<=0 then exit; if ResultClass=nil then ResultClass := TCrtSocket; result := ResultClass.Create; result.AcceptRequest(client,RemoteIP); end; function TCrtSocket.SockInRead(Content: PAnsiChar; Length: integer; UseOnlySockIn: boolean): integer; var len,res: integer; // read Length bytes from SockIn^ buffer + Sock if necessary begin ................................................................................ ServerThreadPoolCount: integer; KeepAliveTimeOut: integer); begin fInternalHttpServerRespList := {$ifdef FPC}TFPList{$else}TList{$endif}.Create; InitializeCriticalSection(fProcessCS); fSock := TCrtSocket.Bind(aPort); // BIND + LISTEN fServerKeepAliveTimeOut := KeepAliveTimeOut; // 30 seconds by default if fThreadPool<>nil then {$ifndef USE_WINIOCP}if not fThreadPool.QueuePendingContext then{$endif} fThreadPool.ContentionAbortDelay := 5000; // 5 seconds default // event handlers set before inherited Create to be visible in childs fOnHttpThreadStart := OnStart; SetOnTerminate(OnStop); if fThreadRespClass=nil then fThreadRespClass := THttpServerResp; if fSocketClass=nil then fSocketClass := THttpServerSocket; ................................................................................ {.$define MONOTHREAD} // define this not to create a thread at every connection (not recommended) procedure THttpServer.Execute; var ClientSock: TSocket; ClientSin: TVarSin; {$ifdef MONOTHREAD} ClientCrtSock: THttpServerSocket; endtix: Int64; {$endif} begin // THttpServerGeneric thread preparation: launch any OnHttpThreadStart event NotifyThreadStart(self); // main server process loop if Sock.Sock>0 then ................................................................................ DirectShutdown(ClientSock); finally ClientCrtSock.Free; end; {$else} if Assigned(fThreadPool) then begin // use thread pool to process the request header, and probably its body if not fThreadPool.Push(pointer(PtrUInt(ClientSock)),{waitoncontention=}true) then begin // returned false if there is no idle thread in the pool, and queue is full DirectShutdown(ClientSock); // expects the proxy to balance to another server continue; end; end else // default implementation creates one thread for each incoming socket fThreadRespClass.Create(ClientSock, self); {$endif MONOTHREAD} end; except on Exception do ; // any exception would break and release the thread end; EnterCriticalSection(fProcessCS); ................................................................................ Resume; end; {$endif} { THttpServerResp } constructor THttpServerResp.Create(aSock: TSocket; aServer: THttpServer); var c: THttpServerSocketClass; begin if aServer=nil then c := THttpServerSocket else c := aServer.fSocketClass; fClientSock := aSock; // ensure it is set ASAP: on Linux, Execute raises immediately Create(c.Create(aServer),aServer); end; constructor THttpServerResp.Create(aServerSock: THttpServerSocket; aServer: THttpServer); begin fServer := aServer; fServerSock := aServerSock; fOnTerminate := fServer.fOnTerminate; ................................................................................ begin fServer.NotifyThreadStart(self); try try if fClientSock<>0 then begin // direct call from incoming socket aSock := fClientSock; fClientSock := 0; // mark no need to Shutdown and close fClientSock fServerSock.InitRequest(aSock); // now fClientSock is in fServerSock if fServer<>nil then HandleRequestsProcess; end else begin // call from TSynThreadPoolTHttpServer -> handle first request if not fServerSock.fBodyRetrieved then fServerSock.GetBody; fServer.Process(fServerSock,ConnectionID,self); ................................................................................ fCompress := aServer.fCompress; fCompressAcceptEncoding := aServer.fCompressAcceptEncoding; fSocketLayer:=aServer.Sock.SocketLayer; TCPPrefix := aServer.TCPPrefix; end; end; procedure THttpServerSocket.InitRequest(aClientSock: TSocket; const aRemoteIP: SockString); begin fRemoteIP := aRemoteIP; AcceptRequest(aClientSock, @fRemoteIP); end; function THttpServerSocket.GetRequest(withBody: boolean; headerMaxTix: Int64): THttpServerSocketGetRequestResult; var P: PAnsiChar; status: cardinal; pending: integer; reason, allheaders: SockString; begin result := grError; try // abort now with no exception if socket is obviously broken if fServer<>nil then begin pending := SockInPending(100,{alsosocket=}true); if (pending<0) or (fServer=nil) or fServer.Terminated then exit; end; // 1st line is command: 'GET /path HTTP/1.1' e.g. ................................................................................ Content := ''; // get headers and content GetHeader; if fServer<>nil then begin // nil from TRTSPOverHTTPServer if fServer.fRemoteIPHeaderUpper<>'' then begin P := FindHeader(pointer(Headers),length(Headers),fServer.fRemoteIPHeaderUpper); if (P<>nil) and (P^<>#0) then fRemoteIP := P; end; if fServer.fRemoteConnIDHeaderUpper<>'' then begin P := FindHeader(pointer(Headers),length(Headers),fServer.fRemoteConnIDHeaderUpper); if P<>nil then fRemoteConnectionID := GetNextItemUInt64(P); end; end; ................................................................................ for i := 0 to fSubThreadCount-1 do {$ifdef USE_WINIOCP} PostQueuedCompletionStatus(fRequestQueue,0,0,nil); {$else} fSubThread[i].fEvent.SetEvent; {$endif} {$ifndef USE_WINIOCP} // cleanup now any pending task for i := 0 to fPendingContextCount-1 do TaskAbort(fPendingContext[i]); {$endif} // wait for threads to finish, with 30 seconds TimeOut endtix := GetTick64+30000; while (fRunningThreads>0) and (GetTick64<endtix) do SleepHiRes(5); ................................................................................ if result then exit; inc(fContentionCount); if (fContentionAbortDelay>0) and aWaitOnContention then begin tix := GetTick64; starttix := tix; endtix := tix+fContentionAbortDelay; // default 5 sec repeat if tix-starttix<50 then // wait for an available slot in the queue SleepHiRes(1) else SleepHiRes(10); tix := GetTick64; if fTerminated then exit; if Enqueue then begin result := true; // thread pool acquired the client sock break; end; until fTerminated or (tix>endtix); inc(fContentionTime,tix-starttix); end; if not result then inc(fContentionAbortCount); ................................................................................ {$endif USE_WINIOCP} procedure TSynThreadPoolTHttpServer.Task(aCaller: TSynThread; aContext: Pointer); var ServerSock: THttpServerSocket; headertix: Int64; res: THttpServerSocketGetRequestResult; begin if fServer.Terminated then exit; ServerSock := fServer.fSocketClass.Create(fServer); try ServerSock.InitRequest(TSocket(PtrUInt(aContext))); // get Header of incoming request in the thread pool headertix := fServer.HeaderRetrieveAbortDelay; if headertix>0 then headertix := headertix+GetTick64; res := ServerSock.GetRequest({withbody=}false,headertix); if (fServer=nil) or fServer.Terminated then exit; ................................................................................ finally FreeAndNil(ServerSock); end; end; procedure TSynThreadPoolTHttpServer.TaskAbort(aContext: Pointer); begin DirectShutdown(TSocket(PtrUInt(aContext))); end; {$ifdef MSWINDOWS} { ************ http.sys / HTTP API low-level direct access } ................................................................................ procedure Initialize; var i: integer; begin for i := 0 to high(NormToUpper) do NormToUpper[i] := i; for i := ord('a') to ord('z') do dec(NormToUpper[i],32); {$ifdef MSWINDOWS} Assert( {$ifdef CPU64} (sizeof(HTTP_REQUEST)=864) and (sizeof(HTTP_SSL_INFO)=48) and (sizeof(HTTP_DATA_CHUNK_INMEMORY)=32) and (sizeof(HTTP_DATA_CHUNK_FILEHANDLE)=32) and |
> > < > > > < | > > > > > > < > < < < < < < < < < < > | | | > > > > | | > > > > | | < < < < < < < < < < < < < < < < > > > > > > > > > > > > > > > < | | > < | < > > > | | < | | > > < | | | < < < < < < > > | | | | > > | | < < < | > |
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 ... 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 ... 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 ... 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 ... 648 649 650 651 652 653 654 655 656 657 658 659 660 661 ... 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 ... 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 ... 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 .... 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 .... 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 .... 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 .... 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 .... 4922 4923 4924 4925 4926 4927 4928 4929 4930 4931 4932 4933 4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948 4949 4950 .... 5063 5064 5065 5066 5067 5068 5069 5070 5071 5072 5073 5074 5075 5076 5077 5078 5079 5080 5081 5082 5083 5084 5085 5086 5087 5088 5089 5090 5091 .... 6082 6083 6084 6085 6086 6087 6088 6089 6090 6091 6092 6093 6094 6095 6096 .... 6226 6227 6228 6229 6230 6231 6232 6233 6234 6235 6236 6237 6238 6239 6240 6241 .... 6266 6267 6268 6269 6270 6271 6272 6273 6274 6275 6276 6277 6278 6279 6280 6281 6282 6283 6284 6285 6286 6287 6288 .... 6491 6492 6493 6494 6495 6496 6497 6498 6499 6500 6501 6502 6503 6504 6505 6506 6507 6508 6509 6510 6511 6512 6513 .... 6606 6607 6608 6609 6610 6611 6612 6613 6614 6615 6616 6617 6618 6619 6620 6621 .... 6889 6890 6891 6892 6893 6894 6895 6896 6897 6898 6899 6900 6901 6902 6903 6904 6905 6906 6907 6908 6909 6910 6911 6912 .... 6925 6926 6927 6928 6929 6930 6931 6932 6933 6934 6935 6936 6937 6938 6939 .... 7125 7126 7127 7128 7129 7130 7131 7132 7133 7134 7135 7136 7137 7138 7139 .... 7200 7201 7202 7203 7204 7205 7206 7207 7208 7209 7210 7211 7212 7213 7214 7215 7216 7217 7218 7219 7220 7221 7222 .... 7393 7394 7395 7396 7397 7398 7399 7400 7401 7402 7403 7404 7405 7406 7407 7408 7409 7410 .... 7438 7439 7440 7441 7442 7443 7444 7445 7446 7447 7448 7449 7450 7451 7452 ..... 12746 12747 12748 12749 12750 12751 12752 12753 12754 12755 12756 12757 12758 12759 12760 |
fBytesOut: Int64; fSocketLayer: TCrtSocketLayer; fSockInEofError: integer; fTLS, fWasBind: boolean; // updated by every SockSend() call fSndBuf: SockString; fSndBufLen: integer; // set by AcceptRequest() from TVarSin fRemoteIP: SockString; // updated during UDP connection, accessed via PeerAddress/PeerPort fPeerAddr: TSockAddr; {$ifdef MSWINDOWS} fSecure: TSChannelClient; {$endif MSWINDOWS} procedure SetInt32OptionByIndex(OptName, OptVal: integer); virtual; public /// common initialization of all constructors // - do not call directly, but use Open / Bind constructors instead constructor Create(aTimeOut: PtrInt=10000); reintroduce; virtual; /// connect to aServer:aPort // - you may ask for a TLS secured client connection (only available under // Windows by now, using the SChannel API) ................................................................................ constructor Bind(const aPort: SockString; aLayer: TCrtSocketLayer=cslTCP); /// low-level internal method called by Open() and Bind() constructors // - raise an ECrtSocket exception on error // - you may ask for a TLS secured client connection (only available under // Windows by now, using the SChannel API) procedure OpenBind(const aServer, aPort: SockString; doBind: boolean; aSock: integer=-1; aLayer: TCrtSocketLayer=cslTCP; aTLS: boolean=false); /// initialize the instance with the supplied accepted socket // - is called from a bound TCP Server, just after Accept() procedure AcceptRequest(aClientSock: TSocket; aClientSin: PVarSin); /// initialize SockIn for receiving with read[ln](SockIn^,...) // - data is buffered, filled as the data is available // - read(char) or readln() is indeed very fast // - multithread applications would also use this SockIn pseudo-text file // - by default, expect CR+LF as line feed (i.e. the HTTP way) procedure CreateSockIn(LineBreak: TTextLineBreakStyle=tlbsCRLF; InputBufferSize: Integer=1024); ................................................................................ // - raw Data is sent directly to OS: no LF/CRLF is appened to the block procedure Write(const Data: SockString); /// direct accept an new incoming connection on a bound socket // - instance should have been setup as a server via a previous Bind() call // - returns nil on error or a ResultClass instance on success // - if ResultClass is nil, will return a plain TCrtSocket, but you may // specify e.g. THttpServerSocket if you expect incoming HTTP requests function AcceptIncoming(ResultClass: TCrtSocketClass=nil): TCrtSocket; /// remote IP address after AcceptRequest() call over TCP // - is either the raw connection IP to the current server socket, or // a custom header value set by a local proxy as retrieved by inherited // THttpServerSocket.GetRequest, searching the header named in // THttpServerGeneric.RemoteIPHeader (e.g. 'X-Real-IP' for nginx) property RemoteIP: SockString read fRemoteIP write fRemoteIP; /// remote IP address of the last packet received (SocketLayer=slUDP only) function PeerAddress: SockString; /// remote IP port of the last packet received (SocketLayer=slUDP only) function PeerPort: integer; /// set the TCP_NODELAY option for the connection // - default 1 (true) will disable the Nagle buffering algorithm; it should // only be set for applications that send frequent small bursts of information ................................................................................ /// Socket API based HTTP/1.1 server class used by THttpServer Threads THttpServerSocket = class(THttpSocket) protected fMethod: SockString; fURL: SockString; fKeepAliveClient: boolean; fRemoteConnectionID: THttpServerConnectionID; fServer: THttpServer; public /// create the socket according to a server // - will register the THttpSocketCompress functions from the server // - once created, caller should call AcceptRequest() to accept the socket constructor Create(aServer: THttpServer); reintroduce; /// main object function called after aClientSock := Accept + Create: // - get Command, Method, URL, Headers and Body (if withBody is TRUE) // - get sent data in Content (if withBody=true and ContentLength<>0) // - returned enumeration will indicates the processing state function GetRequest(withBody: boolean; headerMaxTix: Int64): THttpServerSocketGetRequestResult; virtual; /// contains the method ('GET','POST'.. e.g.) after GetRequest() property Method: SockString read fMethod; ................................................................................ property URL: SockString read fURL; /// true if the client is HTTP/1.1 and 'Connection: Close' is not set // - default HTTP/1.1 behavior is "keep alive", unless 'Connection: Close' // is specified, cf. RFC 2068 page 108: "HTTP/1.1 applications that do not // support persistent connections MUST include the "close" connection option // in every message" property KeepAliveClient: boolean read fKeepAliveClient write fKeepAliveClient; /// the recognized connection ID, after a call to GetRequest() // - identifies either the raw connection on the current server, or is // a custom header value set by a local proxy, e.g. // THttpServerGeneric.RemoteConnIDHeader='X-Conn-ID' for nginx property RemoteConnectionID: THttpServerConnectionID read fRemoteConnectionID; end; ................................................................................ // override THttpServer.Request() function or, if you need a lower-level access // (change the protocol, e.g.) THttpServer.Process() method itself THttpServerResp = class(TSynThread) protected fServer: THttpServer; fServerSock: THttpServerSocket; fClientSock: TSocket; fClientSin: TVarSin; fConnectionID: THttpServerConnectionID; /// main thread loop: read request from socket, send back answer procedure Execute; override; public /// initialize the response thread for the corresponding incoming socket // - this version will get the request directly from an incoming socket constructor Create(aSock: TSocket; const aSin: TVarSin; aServer: THttpServer); reintroduce; overload; /// initialize the response thread for the corresponding incoming socket // - this version will handle KeepAlive, for such an incoming request constructor Create(aServerSock: THttpServerSocket; aServer: THttpServer); reintroduce; overload; virtual; /// the associated socket to communicate with the client property ServerSock: THttpServerSocket read fServerSock; /// the associated main HTTP server instance ................................................................................ /// how many threads are currently running in this thread pool property RunningThreads: integer read fRunningThreads; /// how many tasks were rejected due to thread pool contention // - if this number is high, consider setting a higher number of threads, // or profile and tune the Task method property ContentionAbortCount: cardinal read fContentionAbortCount; /// milliseconds delay to reject a connection due to contention // - default is 5000, i.e. 5 seconds wait for some room to be available // in the IOCP or aQueuePendingContext internal list // - during this delay, no new connection is available (i.e. Accept is not // called), so that a load balancer could detect the contention and switch // to another instance in the pool, or a direct client may eventually have // its connection rejected, so won't start sending data property ContentionAbortDelay: integer read fContentionAbortDelay write fContentionAbortDelay; /// total milliseconds spent waiting for an available slot in the queue // - contention won't fail immediately, but will retry until ContentionAbortDelay // - any high number here requires code refactoring of the Task method property ContentionTime: Int64 read fContentionTime; /// how many times the pool waited for an available slot in the queue ................................................................................ // identified as HTTP/1.1 keep alive, or HTTP body length is bigger than 1 MB TSynThreadPoolTHttpServer = class(TSynThreadPool) protected fServer: THttpServer; {$ifndef USE_WINIOCP} function QueueLength: integer; override; {$endif} // here aContext is a THttpServerSocket instance procedure Task(aCaller: TSynThread; aContext: Pointer); override; procedure TaskAbort(aContext: Pointer); override; public /// initialize a thread pool with the supplied number of threads // - Task() overridden method processs the HTTP request set by Push() // - up to 256 threads can be associated to a Thread Pool constructor Create(Server: THttpServer; NumberOfThreads: Integer=32); reintroduce; ................................................................................ property MaximumAllowedContentLength: cardinal read fMaximumAllowedContentLength write SetMaximumAllowedContentLength; /// defines request/response internal queue length // - default value if 1000, which sounds fine for most use cases // - for THttpApiServer, will return 0 if the system does not support HTTP // API 2.0 (i.e. under Windows XP or Server 2003) // - for THttpServer, will shutdown any incoming accepted socket if the // internal TSynThreadPool.PendingContextCount+ThreadCount exceeds this limit; // each pending connection is a THttpServerSocket instance in the queue // - increase this value if you don't have any load-balancing in place, and // in case of e.g. many 503 HTTP answers or if many "QueueFull" messages // appear in HTTP.sys log files (normally in // C:\Windows\System32\LogFiles\HTTPERR\httperr*.log) - may appear with // thousands of concurrent clients accessing at once the same server - // see @http://msdn.microsoft.com/en-us/library/windows/desktop/aa364501 // - you can use this property with a reverse-proxy as load balancer, e.g. ................................................................................ procedure AppendChar(chr: AnsiChar; var dest: shortstring); {$ifdef FPC}inline;{$endif} begin inc(dest[0]); dest[ord(dest[0])] := chr; end; var IP4local: SockString; // contains '127.0.0.1' procedure IP4Text(const ip4addr; var result: SockString); var b: array[0..3] of byte absolute ip4addr; s: shortstring; i: PtrInt; begin if cardinal(ip4addr)=0 then result := '' else if cardinal(ip4addr)=$0100007f then result := IP4local else begin s := ''; i := 0; repeat AppendI32(b[i],s); if i=3 then break; AppendChar('.',s); ................................................................................ procedure IPText(const sin: TVarSin; var result: SockString); begin if sin.sin_family=AF_INET then IP4Text(sin.sin_addr,result) else begin result := GetSinIP(sin); // AF_INET6 may be optimized in a future revision if result='::1' then result := IP4local; // IP6 localhost loopback benefits of matching IP4 end; end; function IsPublicIP(ip4: cardinal): boolean; begin result := false; case ip4 and 255 of // ignore IANA private IP4 address spaces ................................................................................ end; procedure TCrtSocket.SetInt32OptionByIndex(OptName, OptVal: integer); begin SetInt32Option(Sock,OptName,OptVal); end; procedure TCrtSocket.OpenBind(const aServer, aPort: SockString; doBind: boolean; aSock: integer; aLayer: TCrtSocketLayer; aTLS: boolean); const BINDTXT: array[boolean] of string[4] = ('open','bind'); BINDMSG: array[boolean] of string = ('Is a server running on this address:port?', 'Another process may be currently listening to this port!'); begin fSocketLayer := aLayer; ................................................................................ end; end; {$ifdef SYNCRTDEBUGLOW} TSynLog.Add.Log(sllCustom2, 'OpenBind(%:%) % sock=% (accept=%) ', [fServer,fPort,BINDTXT[doBind], fSock, aSock], self); {$endif} end; procedure TCrtSocket.AcceptRequest(aClientSock: TSocket; aClientSin: PVarSin); begin {$ifdef LINUX} // on Linux fd returned from accept() inherits all parent fd options // except O_NONBLOCK and O_ASYNC; fSock := aClientSock; {$else} // on other OS inheritance is undefined, so call OpenBind to set all fd options OpenBind('','',false,aClientSock, fSocketLayer); // set the ACCEPTed aClientSock Linger := 5; // should remain open for 5 seconds after a closesocket() call {$endif} if aClientSin<>nil then IPText(aClientSin^,fRemoteIP); end; procedure TCrtSocket.SockSend(const Values: array of const); var i: integer; tmp: shortstring; begin for i := 0 to high(Values) do with Values[i] do ................................................................................ end; procedure TCrtSocket.Write(const Data: SockString); begin SndLow(pointer(Data),length(Data)); end; function TCrtSocket.AcceptIncoming(ResultClass: TCrtSocketClass): TCrtSocket; var client: TSocket; sin: TVarSin; begin result := nil; if (self=nil) or (fSock<=0) then exit; client := Accept(fSock,sin); if client<=0 then exit; if ResultClass=nil then ResultClass := TCrtSocket; result := ResultClass.Create; result.AcceptRequest(client,@sin); result.CreateSockIn; // use SockIn with 1KB input buffer: 2x faster end; function TCrtSocket.SockInRead(Content: PAnsiChar; Length: integer; UseOnlySockIn: boolean): integer; var len,res: integer; // read Length bytes from SockIn^ buffer + Sock if necessary begin ................................................................................ ServerThreadPoolCount: integer; KeepAliveTimeOut: integer); begin fInternalHttpServerRespList := {$ifdef FPC}TFPList{$else}TList{$endif}.Create; InitializeCriticalSection(fProcessCS); fSock := TCrtSocket.Bind(aPort); // BIND + LISTEN fServerKeepAliveTimeOut := KeepAliveTimeOut; // 30 seconds by default if fThreadPool<>nil then fThreadPool.ContentionAbortDelay := 5000; // 5 seconds default // event handlers set before inherited Create to be visible in childs fOnHttpThreadStart := OnStart; SetOnTerminate(OnStop); if fThreadRespClass=nil then fThreadRespClass := THttpServerResp; if fSocketClass=nil then fSocketClass := THttpServerSocket; ................................................................................ {.$define MONOTHREAD} // define this not to create a thread at every connection (not recommended) procedure THttpServer.Execute; var ClientSock: TSocket; ClientSin: TVarSin; ClientCrtSock: THttpServerSocket; {$ifdef MONOTHREAD} endtix: Int64; {$endif} begin // THttpServerGeneric thread preparation: launch any OnHttpThreadStart event NotifyThreadStart(self); // main server process loop if Sock.Sock>0 then ................................................................................ DirectShutdown(ClientSock); finally ClientCrtSock.Free; end; {$else} if Assigned(fThreadPool) then begin // use thread pool to process the request header, and probably its body ClientCrtSock := fSocketClass.Create(self); ClientCrtSock.AcceptRequest(ClientSock,@ClientSin); if not fThreadPool.Push(pointer(PtrUInt(ClientCrtSock)),{waitoncontention=}true) then begin // returned false if there is no idle thread in the pool, and queue is full ClientCrtSock.Free; // will call DirectShutdown(ClientSock) end; end else // default implementation creates one thread for each incoming socket fThreadRespClass.Create(ClientSock,ClientSin,self); {$endif MONOTHREAD} end; except on Exception do ; // any exception would break and release the thread end; EnterCriticalSection(fProcessCS); ................................................................................ Resume; end; {$endif} { THttpServerResp } constructor THttpServerResp.Create(aSock: TSocket; const aSin: TVarSin; aServer: THttpServer); var c: THttpServerSocketClass; begin fClientSock := aSock; fClientSin := aSin; if aServer=nil then c := THttpServerSocket else c := aServer.fSocketClass; Create(c.Create(aServer),aServer); // on Linux, Execute raises during Create end; constructor THttpServerResp.Create(aServerSock: THttpServerSocket; aServer: THttpServer); begin fServer := aServer; fServerSock := aServerSock; fOnTerminate := fServer.fOnTerminate; ................................................................................ begin fServer.NotifyThreadStart(self); try try if fClientSock<>0 then begin // direct call from incoming socket aSock := fClientSock; fClientSock := 0; // fServerSock owns fClientSock fServerSock.AcceptRequest(aSock,@fClientSin); if fServer<>nil then HandleRequestsProcess; end else begin // call from TSynThreadPoolTHttpServer -> handle first request if not fServerSock.fBodyRetrieved then fServerSock.GetBody; fServer.Process(fServerSock,ConnectionID,self); ................................................................................ fCompress := aServer.fCompress; fCompressAcceptEncoding := aServer.fCompressAcceptEncoding; fSocketLayer:=aServer.Sock.SocketLayer; TCPPrefix := aServer.TCPPrefix; end; end; function THttpServerSocket.GetRequest(withBody: boolean; headerMaxTix: Int64): THttpServerSocketGetRequestResult; var P: PAnsiChar; status: cardinal; pending: integer; reason, allheaders: SockString; begin result := grError; try // use SockIn with 1KB buffer if not already initialized: 2x faster CreateSockIn; // abort now with no exception if socket is obviously broken if fServer<>nil then begin pending := SockInPending(100,{alsosocket=}true); if (pending<0) or (fServer=nil) or fServer.Terminated then exit; end; // 1st line is command: 'GET /path HTTP/1.1' e.g. ................................................................................ Content := ''; // get headers and content GetHeader; if fServer<>nil then begin // nil from TRTSPOverHTTPServer if fServer.fRemoteIPHeaderUpper<>'' then begin P := FindHeader(pointer(Headers),length(Headers),fServer.fRemoteIPHeaderUpper); if (P<>nil) and (P^<>#0) then fRemoteIP := P; // real Internet IP (replace 127.0.0.1 from a proxy) end; if fServer.fRemoteConnIDHeaderUpper<>'' then begin P := FindHeader(pointer(Headers),length(Headers),fServer.fRemoteConnIDHeaderUpper); if P<>nil then fRemoteConnectionID := GetNextItemUInt64(P); end; end; ................................................................................ for i := 0 to fSubThreadCount-1 do {$ifdef USE_WINIOCP} PostQueuedCompletionStatus(fRequestQueue,0,0,nil); {$else} fSubThread[i].fEvent.SetEvent; {$endif} {$ifndef USE_WINIOCP} // cleanup now any pending task (e.g. THttpServerSocket instance) for i := 0 to fPendingContextCount-1 do TaskAbort(fPendingContext[i]); {$endif} // wait for threads to finish, with 30 seconds TimeOut endtix := GetTick64+30000; while (fRunningThreads>0) and (GetTick64<endtix) do SleepHiRes(5); ................................................................................ if result then exit; inc(fContentionCount); if (fContentionAbortDelay>0) and aWaitOnContention then begin tix := GetTick64; starttix := tix; endtix := tix+fContentionAbortDelay; // default 5 sec repeat // during this delay, no new connection is ACCEPTed if tix-starttix<50 then // wait for an available slot in the queue SleepHiRes(1) else SleepHiRes(10); tix := GetTick64; if fTerminated then exit; if Enqueue then begin result := true; // thread pool acquired or queued the client sock break; end; until fTerminated or (tix>endtix); inc(fContentionTime,tix-starttix); end; if not result then inc(fContentionAbortCount); ................................................................................ {$endif USE_WINIOCP} procedure TSynThreadPoolTHttpServer.Task(aCaller: TSynThread; aContext: Pointer); var ServerSock: THttpServerSocket; headertix: Int64; res: THttpServerSocketGetRequestResult; begin ServerSock := aContext; try if fServer.Terminated then exit; // get Header of incoming request in the thread pool headertix := fServer.HeaderRetrieveAbortDelay; if headertix>0 then headertix := headertix+GetTick64; res := ServerSock.GetRequest({withbody=}false,headertix); if (fServer=nil) or fServer.Terminated then exit; ................................................................................ finally FreeAndNil(ServerSock); end; end; procedure TSynThreadPoolTHttpServer.TaskAbort(aContext: Pointer); begin THttpServerSocket(aContext).Free; end; {$ifdef MSWINDOWS} { ************ http.sys / HTTP API low-level direct access } ................................................................................ procedure Initialize; var i: integer; begin for i := 0 to high(NormToUpper) do NormToUpper[i] := i; for i := ord('a') to ord('z') do dec(NormToUpper[i],32); IP4local := '127.0.0.1'; // use var string with refcount=1 to avoid allocation {$ifdef MSWINDOWS} Assert( {$ifdef CPU64} (sizeof(HTTP_REQUEST)=864) and (sizeof(HTTP_SSL_INFO)=48) and (sizeof(HTTP_DATA_CHUNK_INMEMORY)=32) and (sizeof(HTTP_DATA_CHUNK_FILEHANDLE)=32) and |
Changes to SynProtoRTSPHTTP.pas.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
aConnection := nil; get := nil; result := false; log := fLog.Enter('ConnectionCreate(%)', [aSocket], self); try sock := TProxySocket.Create(nil); try sock.InitRequest(aSocket,aRemoteIP); if (sock.GetRequest({withBody=}false, {headertix=}0)=grHeaderReceived) and (sock.URL <> '') then begin if log<>nil then log.Log(sllTrace, 'ConnectionCreate received % % %', [sock.Method, sock.URL, sock.HeaderGetText], self); cookie := sock.HeaderGetValue('X-SESSIONCOOKIE'); if cookie = '' then |
| > > |
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
aConnection := nil; get := nil; result := false; log := fLog.Enter('ConnectionCreate(%)', [aSocket], self); try sock := TProxySocket.Create(nil); try sock.AcceptRequest(aSocket,nil); sock.RemoteIP := aRemoteIP; sock.CreateSockIn; // faster header process (released below once not needed) if (sock.GetRequest({withBody=}false, {headertix=}0)=grHeaderReceived) and (sock.URL <> '') then begin if log<>nil then log.Log(sllTrace, 'ConnectionCreate received % % %', [sock.Method, sock.URL, sock.HeaderGetText], self); cookie := sock.HeaderGetValue('X-SESSIONCOOKIE'); if cookie = '' then |
Changes to SynopseCommit.inc.
1 |
'1.18.5797'
|
| |
1 |
'1.18.5798'
|