mORMot and Open Source friends
Check-in [d79db41d83]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:{5781} try to reduce TSynThreadPool futex/lock contention
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: d79db41d834e097248b9d63a71db9efae285cfc0
User & Date: ab 2020-03-07 15:44:50
Context
2020-03-07
16:05
{5782} added padding to avoid TSynThreadPool locks cpu cache line collision on 32-bit (ensure it is >128) check-in: f31fcb67b3 user: ab tags: trunk
15:44
{5781} try to reduce TSynThreadPool futex/lock contention check-in: d79db41d83 user: ab tags: trunk
15:21
{5780} optimized server socket accept() on Linux - from https://github.com/synopse/mORMot/pull/278 check-in: 4db0ea3636 user: ab tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to SynCrtSock.pas.

810
811
812
813
814
815
816

817
818
819
820
821
822
823
...
829
830
831
832
833
834
835

836
837
838
839
840
841
842
...
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
....
7081
7082
7083
7084
7085
7086
7087
7088

7089
7090
7091
7092
7093
7094
7095
....
7117
7118
7119
7120
7121
7122
7123
7124

7125
7126
7127
7128
7129
7130
7131
....
7132
7133
7134
7135
7136
7137
7138

7139
7140
7141

7142
7143
7144
7145
7146


7147








7148
7149
7150
7151
7152


7153
7154
7155
7156
7157
7158
7159
7160
7161
7162
7163
7164
7165
7166
7167
7168
7169
7170
7171
....
7198
7199
7200
7201
7202
7203
7204
7205
7206
7207
7208
7209
7210
7211
7212
7213
7214
7215
7216
7217
7218
7219
7220
7221
7222
7223
7224
7225
7226
7227
7228
7229
7230
7231
7232
7233
7234
7235
7236
7237
7238
....
7252
7253
7254
7255
7256
7257
7258

7259
7260
7261
7262
7263
7264
7265
7266

7267
7268
7269
7270
7271
7272
7273
....
7303
7304
7305
7306
7307
7308
7309
7310
7311
7312
7313
7314
7315
7316
7317
7318
7319
7320
7321
7322
7323
7324
7325
7326
  TSynThreadPoolSubThread = class(TSynThread)
  protected
    fOwner: TSynThreadPool;
    fNotifyThreadStartName: AnsiString;
    fThreadNumber: integer;
    {$ifndef USE_WINIOCP}
    fProcessingContext: pointer;

    fEvent: TEvent;
    {$endif USE_WINIOCP}
    procedure NotifyThreadStart(Sender: TSynThread);
    procedure DoTask(Context: pointer); // exception-safe call of fOwner.Task()
  public
    /// initialize the thread
    constructor Create(Owner: TSynThreadPool); reintroduce;
................................................................................

  {$M+}
  /// a simple Thread Pool, used e.g. for fast handling HTTP requests
  // - implemented over I/O Completion Ports under Windows, or a classical
  // Event-driven approach under Linux/POSIX
  TSynThreadPool = class
  protected

    fSubThread: TObjectList; // holds TSynThreadPoolSubThread
    fRunningThreads: integer;
    fExceptionsCount: integer;
    fOnTerminate: TNotifyThreadEvent;
    fOnThreadStart: TNotifyThreadEvent;
    fTerminated: boolean;
    fContentionAbortCount: cardinal;
................................................................................
    fContentionAbortDelay: integer;
    {$ifdef USE_WINIOCP}
    fRequestQueue: THandle; // IOCSP has its own internal queue
    {$else}
    fQueuePendingContext: boolean;
    fPendingContext: array of pointer;
    fPendingContextCount: integer;
    fSafe: TRTLCriticalSection;
    function GetPendingContextCount: integer;
    function PopPendingContext: pointer;
    function QueueLength: integer; virtual;
    {$endif USE_WINIOCP}
    /// end thread on IO error
    function NeedStopOnIOError: boolean; virtual;
    /// process to be executed after notification
................................................................................
  {$ifdef USE_WINIOCP}
  fRequestQueue := CreateIoCompletionPort(aOverlapHandle, 0, 0, NumberOfThreads);
  if fRequestQueue=INVALID_HANDLE_VALUE then begin
    fRequestQueue := 0;
    exit;
  end;
  {$else}
  InitializeCriticalSection(fSafe);

  fQueuePendingContext := aQueuePendingContext;
  {$endif}
  // now create the worker threads
  fSubThread := TObjectList.Create(true);
  for i := 1 to NumberOfThreads do
    fSubThread.Add(TSynThreadPoolSubThread.Create(Self));
end;
................................................................................
    while (fRunningThreads>0) and (GetTick64<endtix) do
      SleepHiRes(5);
    fSubThread.Free;
  finally
    {$ifdef USE_WINIOCP}
    CloseHandle(fRequestQueue);
    {$else}
    DeleteCriticalSection(fSafe);

    {$endif USE_WINIOCP}
  end;
  inherited Destroy;
end;

function TSynThreadPool.Push(aContext: pointer; aWaitOnContention: boolean): boolean;
  {$ifdef USE_WINIOCP}
................................................................................
  function Enqueue: boolean;
  begin // IOCP has its own queue
    result := PostQueuedCompletionStatus(fRequestQueue,0,0,aContext);
  end;
  {$else}
  function Enqueue: boolean;
  var i, n: integer;

      thread: ^TSynThreadPoolSubThread;
  begin
    result := false;

    EnterCriticalsection(fSafe);
    try
      thread := pointer(fSubThread.List);
      for i := 1 to fSubThread.Count do
        if thread^.fProcessingContext=nil then begin


          thread^.fProcessingContext := aContext;








          thread^.fEvent.SetEvent;
          result := true; // found one available thread
          exit;
        end else
          inc(thread);


      if not fQueuePendingContext then
        exit;
      n := fPendingContextCount;
      if n+fSubThread.Count>QueueLength then
        exit; // too many connection limit reached (see QueueIsFull)
      if n=length(fPendingContext) then
        SetLength(fPendingContext,n+n shr 3+64);
      fPendingContext[n] := aContext;
      inc(fPendingContextCount);
      result := true; // added in pending queue
    finally
      LeaveCriticalsection(fSafe);
    end;
  end;
  {$endif}
var tix, starttix, endtix: Int64;
begin
  result := false;
  if (self=nil) or fTerminated then
................................................................................

{$ifndef USE_WINIOCP}
function TSynThreadPool.GetPendingContextCount: integer;
begin
  result := 0;
  if (self=nil) or fTerminated or (fPendingContext=nil) then
    exit;
  EnterCriticalsection(fSafe);
  try
    result := fPendingContextCount;
  finally
    LeaveCriticalsection(fSafe);
  end;
end;

function TSynThreadPool.QueueIsFull: boolean;
begin
  result := fQueuePendingContext and (GetPendingContextCount+fSubThread.Count>QueueLength);
end;

function TSynThreadPool.PopPendingContext: pointer;
begin
  result := nil;
  if (self=nil) or fTerminated or (fPendingContext=nil) then
    exit;
  EnterCriticalsection(fSafe);
  try
    if fPendingContextCount>0 then begin
      result := fPendingContext[0];
      dec(fPendingContextCount);
      Move(fPendingContext[1],fPendingContext[0],fPendingContextCount*SizeOf(pointer));
    end;
  finally
    LeaveCriticalsection(fSafe);
  end;
end;

function TSynThreadPool.QueueLength: integer;
begin
  result := 10000; // lazy high value
end;
................................................................................

constructor TSynThreadPoolSubThread.Create(Owner: TSynThreadPool);
begin
  fOwner := Owner; // ensure it is set ASAP: on Linux, Execute raises immediately
  fOnTerminate := Owner.fOnTerminate;
  {$ifndef USE_WINIOCP}
  fEvent := TEvent.Create(nil,false,false,'');

  {$endif}
  inherited Create(false);
end;

destructor TSynThreadPoolSubThread.Destroy;
begin
  inherited Destroy;
  {$ifndef USE_WINIOCP}

  fEvent.Free;
  {$endif}
end;

{$ifdef USE_WINIOCP}
function GetQueuedCompletionStatus(CompletionPort: THandle;
  var lpNumberOfBytesTransferred: DWORD; var lpCompletionKey: PtrUInt;
................................................................................
        break;
      if Context<>nil then
        DoTask(Context);
      {$else}
      fEvent.WaitFor(INFINITE);
      if fOwner.fTerminated then
        break;
      EnterCriticalSection(fOwner.fSafe);
      Context := fProcessingContext; { TODO : use Interlocked*() to reduce OS calls? }
      LeaveCriticalSection(fOwner.fSafe);
      while Context<>nil do begin
        DoTask(Context);
        Context := fOwner.PopPendingContext; // unqueue any pending context
      end;
      EnterCriticalSection(fOwner.fSafe);
      fProcessingContext := nil; // indicates this thread is now available
      LeaveCriticalSection(fOwner.fSafe);
      {$endif USE_WINIOCP}
    until fOwner.fTerminated;
  finally
    InterlockedDecrement(fOwner.fRunningThreads);
  end;
end;







>







 







>







 







|







 







|
>







 







|
>







 







>



>
|




>
>
|
>
>
>
>
>
>
>
>
|
|
|
|
<
>
>











|







 







|



|













|







|







 







>








>







 







|
|
|




|

|







810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
...
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
...
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
....
7083
7084
7085
7086
7087
7088
7089
7090
7091
7092
7093
7094
7095
7096
7097
7098
....
7120
7121
7122
7123
7124
7125
7126
7127
7128
7129
7130
7131
7132
7133
7134
7135
....
7136
7137
7138
7139
7140
7141
7142
7143
7144
7145
7146
7147
7148
7149
7150
7151
7152
7153
7154
7155
7156
7157
7158
7159
7160
7161
7162
7163
7164
7165
7166
7167

7168
7169
7170
7171
7172
7173
7174
7175
7176
7177
7178
7179
7180
7181
7182
7183
7184
7185
7186
7187
7188
....
7215
7216
7217
7218
7219
7220
7221
7222
7223
7224
7225
7226
7227
7228
7229
7230
7231
7232
7233
7234
7235
7236
7237
7238
7239
7240
7241
7242
7243
7244
7245
7246
7247
7248
7249
7250
7251
7252
7253
7254
7255
....
7269
7270
7271
7272
7273
7274
7275
7276
7277
7278
7279
7280
7281
7282
7283
7284
7285
7286
7287
7288
7289
7290
7291
7292
....
7322
7323
7324
7325
7326
7327
7328
7329
7330
7331
7332
7333
7334
7335
7336
7337
7338
7339
7340
7341
7342
7343
7344
7345
  TSynThreadPoolSubThread = class(TSynThread)
  protected
    fOwner: TSynThreadPool;
    fNotifyThreadStartName: AnsiString;
    fThreadNumber: integer;
    {$ifndef USE_WINIOCP}
    fProcessingContext: pointer;
    fSafeProcessingContext: TRTLCriticalSection;
    fEvent: TEvent;
    {$endif USE_WINIOCP}
    procedure NotifyThreadStart(Sender: TSynThread);
    procedure DoTask(Context: pointer); // exception-safe call of fOwner.Task()
  public
    /// initialize the thread
    constructor Create(Owner: TSynThreadPool); reintroduce;
................................................................................

  {$M+}
  /// a simple Thread Pool, used e.g. for fast handling HTTP requests
  // - implemented over I/O Completion Ports under Windows, or a classical
  // Event-driven approach under Linux/POSIX
  TSynThreadPool = class
  protected
    fSafeSubThread: TRTLCriticalSection;
    fSubThread: TObjectList; // holds TSynThreadPoolSubThread
    fRunningThreads: integer;
    fExceptionsCount: integer;
    fOnTerminate: TNotifyThreadEvent;
    fOnThreadStart: TNotifyThreadEvent;
    fTerminated: boolean;
    fContentionAbortCount: cardinal;
................................................................................
    fContentionAbortDelay: integer;
    {$ifdef USE_WINIOCP}
    fRequestQueue: THandle; // IOCSP has its own internal queue
    {$else}
    fQueuePendingContext: boolean;
    fPendingContext: array of pointer;
    fPendingContextCount: integer;
    fSafePendingContext: TRTLCriticalSection; // but last to avoid CPU cache issue
    function GetPendingContextCount: integer;
    function PopPendingContext: pointer;
    function QueueLength: integer; virtual;
    {$endif USE_WINIOCP}
    /// end thread on IO error
    function NeedStopOnIOError: boolean; virtual;
    /// process to be executed after notification
................................................................................
  {$ifdef USE_WINIOCP}
  fRequestQueue := CreateIoCompletionPort(aOverlapHandle, 0, 0, NumberOfThreads);
  if fRequestQueue=INVALID_HANDLE_VALUE then begin
    fRequestQueue := 0;
    exit;
  end;
  {$else}
  InitializeCriticalSection(fSafeSubThread);
  InitializeCriticalSection(fSafePendingContext);
  fQueuePendingContext := aQueuePendingContext;
  {$endif}
  // now create the worker threads
  fSubThread := TObjectList.Create(true);
  for i := 1 to NumberOfThreads do
    fSubThread.Add(TSynThreadPoolSubThread.Create(Self));
end;
................................................................................
    while (fRunningThreads>0) and (GetTick64<endtix) do
      SleepHiRes(5);
    fSubThread.Free;
  finally
    {$ifdef USE_WINIOCP}
    CloseHandle(fRequestQueue);
    {$else}
    DeleteCriticalSection(fSafeSubThread);
    DeleteCriticalSection(fSafePendingContext);
    {$endif USE_WINIOCP}
  end;
  inherited Destroy;
end;

function TSynThreadPool.Push(aContext: pointer; aWaitOnContention: boolean): boolean;
  {$ifdef USE_WINIOCP}
................................................................................
  function Enqueue: boolean;
  begin // IOCP has its own queue
    result := PostQueuedCompletionStatus(fRequestQueue,0,0,aContext);
  end;
  {$else}
  function Enqueue: boolean;
  var i, n: integer;
      found: TSynThreadPoolSubThread;
      thread: ^TSynThreadPoolSubThread;
  begin
    result := false;
    found := nil;
    EnterCriticalsection(fSafeSubThread);
    try
      thread := pointer(fSubThread.List);
      for i := 1 to fSubThread.Count do
        if thread^.fProcessingContext=nil then begin
          found := thread^;
          EnterCriticalSection(found.fSafeProcessingContext);
          found.fProcessingContext := aContext;
          LeaveCriticalSection(found.fSafeProcessingContext);
          break;
        end else
          inc(thread);
    finally
      LeaveCriticalsection(fSafeSubThread);
    end;
    if found<>nil then begin
      found.fEvent.SetEvent;
      result := true; // found one available thread
      exit;
    end;

    EnterCriticalsection(fSafePendingContext);
    try
      if not fQueuePendingContext then
        exit;
      n := fPendingContextCount;
      if n+fSubThread.Count>QueueLength then
        exit; // too many connection limit reached (see QueueIsFull)
      if n=length(fPendingContext) then
        SetLength(fPendingContext,n+n shr 3+64);
      fPendingContext[n] := aContext;
      inc(fPendingContextCount);
      result := true; // added in pending queue
    finally
      LeaveCriticalsection(fSafePendingContext);
    end;
  end;
  {$endif}
var tix, starttix, endtix: Int64;
begin
  result := false;
  if (self=nil) or fTerminated then
................................................................................

{$ifndef USE_WINIOCP}
function TSynThreadPool.GetPendingContextCount: integer;
begin
  result := 0;
  if (self=nil) or fTerminated or (fPendingContext=nil) then
    exit;
  EnterCriticalsection(fSafePendingContext);
  try
    result := fPendingContextCount;
  finally
    LeaveCriticalsection(fSafePendingContext);
  end;
end;

function TSynThreadPool.QueueIsFull: boolean;
begin
  result := fQueuePendingContext and (GetPendingContextCount+fSubThread.Count>QueueLength);
end;

function TSynThreadPool.PopPendingContext: pointer;
begin
  result := nil;
  if (self=nil) or fTerminated or (fPendingContext=nil) then
    exit;
  EnterCriticalsection(fSafePendingContext);
  try
    if fPendingContextCount>0 then begin
      result := fPendingContext[0];
      dec(fPendingContextCount);
      Move(fPendingContext[1],fPendingContext[0],fPendingContextCount*SizeOf(pointer));
    end;
  finally
    LeaveCriticalsection(fSafePendingContext);
  end;
end;

function TSynThreadPool.QueueLength: integer;
begin
  result := 10000; // lazy high value
end;
................................................................................

constructor TSynThreadPoolSubThread.Create(Owner: TSynThreadPool);
begin
  fOwner := Owner; // ensure it is set ASAP: on Linux, Execute raises immediately
  fOnTerminate := Owner.fOnTerminate;
  {$ifndef USE_WINIOCP}
  fEvent := TEvent.Create(nil,false,false,'');
  InitializeCriticalSection(fSafeProcessingContext);
  {$endif}
  inherited Create(false);
end;

destructor TSynThreadPoolSubThread.Destroy;
begin
  inherited Destroy;
  {$ifndef USE_WINIOCP}
  DeleteCriticalSection(fSafeProcessingContext);
  fEvent.Free;
  {$endif}
end;

{$ifdef USE_WINIOCP}
function GetQueuedCompletionStatus(CompletionPort: THandle;
  var lpNumberOfBytesTransferred: DWORD; var lpCompletionKey: PtrUInt;
................................................................................
        break;
      if Context<>nil then
        DoTask(Context);
      {$else}
      fEvent.WaitFor(INFINITE);
      if fOwner.fTerminated then
        break;
      EnterCriticalSection(fSafeProcessingContext);
      Context := fProcessingContext;
      LeaveCriticalSection(fSafeProcessingContext);
      while Context<>nil do begin
        DoTask(Context);
        Context := fOwner.PopPendingContext; // unqueue any pending context
      end;
      EnterCriticalSection(fSafeProcessingContext);
      fProcessingContext := nil; // indicates this thread is now available
      LeaveCriticalSection(fSafeProcessingContext);
      {$endif USE_WINIOCP}
    until fOwner.fTerminated;
  finally
    InterlockedDecrement(fOwner.fRunningThreads);
  end;
end;

Changes to SynopseCommit.inc.

1
'1.18.5780'
|
1
'1.18.5781'