#1 2015-09-23 10:50:37

oz
Member
Registered: 2015-09-02
Posts: 95

Serious bug/issue regarding WebSockets

Hi Arnaud!

I've just faced a serious bug in mORMot's WebSockets implementation.
The following testcase can reproduce the bug:

program mORMotBug;

{$I Synopse.inc} // define HASINLINE USETYPEINFO CPU32 CPU64 OWNNORMTOUPPER

{$APPTYPE CONSOLE}

{$R *.res}

uses
  FastMM4,
  SysUtils,
  Classes,
  Windows,
  SynSqlite3Static,
  SynSqlite3,
  SynCommons,
  SynLog,
  SynTests,
  mORMot,
  mORMotHttpServer,
  mORMotHttpClient,
  mORMotSqlite3,
  mORMotDDD;

const
  /// "FORCE_BUG = true;" will send WebSocket notifications to the other connected clients in IMyCommand.Add() method.
  // At some point there will happen an ESynCrypto Exception: 'TAESCFB.DecryptPKCS7: Invalid content'!
  // The Websocket bug will occur. "FORCE_BUG = false;" won't produce any issues.
  FORCE_BUG = true;

  HTTP_PORT = '80';
  WEBSOCKET_KEY = 'key';

type
  ICallback = interface(IInvokable)
    ['{B42B0BBA-AA9C-459B-8731-1C52441361C9}']
    procedure DoCallback(const aDescription: RawUTF8);
  end;

var
  gCallbackArray: array of ICallback;
  gCriticalSection: TRTLCriticalSection;

type
  TCallback = class(TInterfacedObject, ICallback)
  public
    procedure DoCallback(const aDescription: RawUTF8);
  end;


  // This is our simple Test data class. Will be mapped to TSQLRecordTest.
  TTest = class(TSynPersistent)
  private
    fDescription: RawUTF8;
  published
    property Description: RawUTF8 read fDescription write fDescription;
  end;
  TTestObjArray = array of TTest;

  // The corresponding TSQLRecord for TTest.
  TSQLRecordTest = class(TSQLRecord)
  private
    fDescription: RawUTF8;
  published
    property Description: RawUTF8 read fDescription write fDescription;
  end;

  // CQRS Query Interface fo TTest
  IMyQuery = interface(ICQRSService)
    ['{DD402806-39C2-4921-98AA-A575DD1117D6}']
    function SelectByDescription(const aDescription: RawUTF8): TCQRSResult;
    function SelectAll: TCQRSResult;
    function Get(out aAggregate: TTest): TCQRSResult;
    function GetAll(out aAggregates: TTestObjArray): TCQRSResult;
    function GetNext(out aAggregate: TTest): TCQRSResult;
    function GetCount: integer;
  end;

  // CQRS Command Interface for TTest
  IMyCommand = interface(IMyQuery)
    ['{F0E4C64C-B43A-491B-85E9-FD136843BFCB}']
    function Add(const aAggregate: TTest): TCQRSResult;
    function Update(const aUpdatedAggregate: TTest): TCQRSResult;
    function Delete: TCQRSResult;
    function DeleteAll: TCQRSResult;
    function Commit: TCQRSResult;
    function Rollback: TCQRSResult;
    function RegisterCallback(const aInterface: ICallback): TCQRSResult;
  end;

  // The infratructure REST class implementing the Query and Command Interfaces for TTest
  TTestRest = class(TDDDRepositoryRestCommand,IMyCommand,IMyQuery)
  public
    function SelectByDescription(const aDescription: RawUTF8): TCQRSResult;
    function SelectAll: TCQRSResult;
    function Get(out aAggregate: TTest): TCQRSResult;
    function GetAll(out aAggregates: TTestObjArray): TCQRSResult;
    function GetNext(out aAggregate: TTest): TCQRSResult;
    function Add(const aAggregate: TTest): TCQRSResult;
    function Update(const aUpdatedAggregate: TTest): TCQRSResult;
    function RegisterCallback(const aInterface: ICallback): TCQRSResult;
  end;

  // REST Factory for TTestRest instances
  TTestRestFactory = class(TDDDRepositoryRestFactory)
  public
    constructor Create(aRest: TSQLRest; aOwner: TDDDRepositoryRestManager=nil); reintroduce;
  end;

  // Test container
  TMyTests = class(TSynTestsLogged)
  published
    procedure MyTests;
  end;

  // Test case doing the actual work
  TMyTestCase = class(TSynTestCase)
  private
    // Rest server
    fRestServer: TSQLRestServerDB;
    // Http server
    fHttpServer: TSQLHttpServer;
    /// Will create as many Clients as specified by aClient.
    // - Each client will perform as many Requests as specified by aRequests.
    // - This function will wait for all Clients until finished.
    function ClientTest(const aClients, aRequests: integer):boolean;
  protected
    // Cleaning up the test
    procedure CleanUp; override;
  published
    // Delete any old Test database on start
    procedure DeleteOldDatabase;
    // Start the whole DDD Server (http and rest)
    procedure StartServer;
    // Test straight-forward access using 1 thread and 1 client
    procedure SingleClientTest;
    // Test concurrent access with multiple clients. This will crash!
    procedure MultiClientTest;
  end;

  // Custom TSQLHttpClient encapsulating the remote IMyCommand interface.
  TMyHttpClient=class(TSQLHttpClientWebsockets)
  private
    // Internal Model
    fModel: TSQLModel;
    // IMyCommand interface. Will be assigned inside SetUser
    fMyCommand: IMyCommand;
    fCallback: ICallback;
  public
    constructor Create(const aServer,aPort: RawUTF8); //overload;
    destructor Destroy; override;
    function SetUser(const aUserName, aPassword: RawUTF8; aHashedPassword: Boolean=false): boolean; reintroduce;
    property MyCommand: IMyCommand read fMyCommand;
  end;

  // The thread used by TMyTestCase.ClientTest
  TMyThread = class(TThread)
  private
    fHttpClient: TMyHttpClient;
    fRequestCount: integer;
    fId: integer;
    fIsError: boolean;
  protected
    procedure Execute; override;
  public
    constructor Create(const aId, aRequestCount: integer);
    destructor Destroy; override;
    property IsError: boolean read fIsError;
  end;

{ TTestRest }

function TTestRest.SelectByDescription(
  const aDescription: RawUTF8): TCQRSResult;
begin
  result := ORMSelectOne('Description=?',[aDescription],(aDescription=''));
end;

function TTestRest.SelectAll: TCQRSResult;
begin
  result := ORMSelectAll('',[]);
end;

function TTestRest.Get(out aAggregate: TTest): TCQRSResult;
begin
  result := ORMGetAggregate(aAggregate);
end;

function TTestRest.GetAll(
  out aAggregates: TTestObjArray): TCQRSResult;
begin
  result := ORMGetAllAggregates(aAggregates);
end;

function TTestRest.GetNext(out aAggregate: TTest): TCQRSResult;
begin
  result := ORMGetNextAggregate(aAggregate);
end;

function TTestRest.Add(const aAggregate: TTest): TCQRSResult;
var
  i: integer;
begin
  result := ORMAdd(aAggregate);
  if not FORCE_BUG then
    exit;
  EnterCriticalSection(gCriticalSection);
  try
    for i := high(gCallbackArray) downto 0 do // downwards for InterfaceArrayDelete()
      try
        gCallbackArray[i].DoCallback('');
      except
        InterfaceArrayDelete(gCallbackArray,i); // unsubscribe the callback on failure
      end;
  finally
    LeaveCriticalSection(gCriticalSection);
  end;
end;

function TTestRest.Update(
  const aUpdatedAggregate: TTest): TCQRSResult;
begin
  result := ORMUpdate(aUpdatedAggregate);
end;


{ TInfraRepoUserFactory }

constructor TTestRestFactory.Create(aRest: TSQLRest;
  aOwner: TDDDRepositoryRestManager);
begin
  inherited Create(IMyCommand,TTestRest,TTest,aRest,TSQLRecordTest,aOwner);
end;

function TTestRest.RegisterCallback(
  const aInterface: ICallback): TCQRSResult;
begin
  EnterCriticalSection(gCriticalSection);
  try
    InterfaceArrayAdd(gCallbackArray, aInterface);
  finally
    LeaveCriticalSection(gCriticalSection);
  end;
  result:=cqrsSuccess;
end;

{ TMyTests }

procedure TMyTests.MyTests;
begin
  AddCase([TMyTestCase]);
end;

{ TMyTestCase }

procedure TMyTestCase.CleanUp;
var
  i: integer;
begin
  EnterCriticalSection(gCriticalSection);
  try
    for i:=High(gCallbackArray) downto 0 do
      InterfaceArrayDelete(gCallbackArray, i);
  finally
    LeaveCriticalSection(gCriticalSection);
  end;
  if Assigned(fHttpServer) then
    FreeAndNil(fHttpServer);
  if Assigned(fRestServer) then
    FreeAndNil(fRestServer);
end;

procedure TMyTestCase.DeleteOldDatabase;
begin
  if FileExists(ChangeFileExt(ParamStr(0), '.db3')) then
    SysUtils.DeleteFile(ChangeFileExt(ParamStr(0), '.db3'));
  CheckNot(FileExists(ChangeFileExt(ParamStr(0), '.db3')));
end;

procedure TMyTestCase.StartServer;
begin
  fRestServer:=TSQLRestServerDB.CreateWithOwnModel([TSQLRecordTest], ChangeFileExt(ParamStr(0), '.db3'), true);
  with fRestServer do begin
      DB.Synchronous := smNormal;
      DB.LockingMode := lmExclusive;
      CreateMissingTables();
      TInterfaceFactory.RegisterInterfaces([TypeInfo(IMyQuery),TypeInfo(IMyCommand)]);
      ServiceContainer.InjectResolver([TTestRestFactory.Create(fRestServer)],true);
      ServiceDefine(TTestRest,[IMyCommand],sicClientDriven);
    end;
  fHttpServer:=TSQLHttpServer.Create(HTTP_PORT, fRestServer, '+', useBidirSocket);
  fHttpServer.WebSocketsEnable(fRestServer, WEBSOCKET_KEY);
end;

procedure TMyTestCase.MultiClientTest;
begin
  ClientTest(20,50);
end;

procedure TMyTestCase.SingleClientTest;
var
  HttpClient: TMyHttpClient;
  test: TTest;
  i: integer;
const
  MAX = 1000;
begin
  HttpClient:=TMyHttpClient.Create('localhost', HTTP_PORT);
  try
    Check(HttpClient.SetUser('Admin', 'synopse'));
    test:=TTest.Create;
    try
      for i:=0 to MAX-1 do begin
        test.Description:=FormatUTF8('test-%',[i]);
        Check(HttpClient.MyCommand.Add(test)=cqrsSuccess);
      end;
      Check(HttpClient.MyCommand.Commit=cqrsSuccess);
    finally
      test.Free;
    end;
  finally
    HttpClient.Free;
  end;
end;

function TMyTestCase.ClientTest(const aClients, aRequests: integer):boolean;
var
  i: integer;
  arrThreads: array of TMyThread;
  arrHandles: array of THandle;
  rWait: Cardinal;
begin
  result := false;
  SetLength(arrThreads, aClients);
  SetLength(arrHandles, aClients);
  for i:=Low(arrThreads) to High(arrThreads) do
  begin
    arrThreads[i]:=TMyThread.Create(i,aRequests);
    arrHandles[i]:=arrThreads[i].Handle;
    arrThreads[i].Resume;
  end;
  try
    repeat
      rWait:= WaitForMultipleObjects(aClients, @arrHandles[0], True, INFINITE);
    until rWait<>WAIT_TIMEOUT;
  finally
    for i:=Low(arrThreads) to High(arrThreads) do
    begin
      CheckNot(arrThreads[i].IsError);
      arrThreads[i].Free;
    end;
    SetLength(arrThreads, 0);
    SetLength(arrHandles, 0);
  end;
end;

{ TMyHttpClient }

constructor TMyHttpClient.Create(const aServer,aPort: RawUTF8);
begin
  fModel:=TSQLModel.Create([TSQLRecordTest]);
  fCallback:=TCallback.Create;
  inherited Create(aServer, aPort, fModel);
end;

destructor TMyHttpClient.Destroy;
begin
  fCallback:=nil;
  fMyCommand:=nil;
  inherited;
  fModel.Free;
end;

function TMyHttpClient.SetUser(const aUserName, aPassword: RawUTF8; aHashedPassword: Boolean=false): boolean;
begin
  result := inherited SetUser(aUserName, aPassword, aHashedPassword);
  WebSocketsUpgrade(WEBSOCKET_KEY);
  if result then
  begin
    ServiceDefine([IMyCommand],sicClientDriven);
    Services.Resolve(IMyCommand, fMyCommand);
  end;
  fMyCommand.RegisterCallback(fCallback);
end;


{ TMyThread }

constructor TMyThread.Create(const aID, aRequestCount: integer);
begin
  inherited Create(true);
  fRequestCount:=aRequestCount;
  fId:=aId;
  fIsError:=false;
  fHttpClient := TMyHttpClient.Create('localhost', HTTP_PORT);
  fHttpClient.SetUser('Admin', 'synopse');
end;

destructor TMyThread.Destroy;
begin
  fHttpClient.Free;
  inherited;
end;

procedure TMyThread.Execute;
var
  i: integer;
  test: TTest;
  success: boolean;
begin
  test:=TTest.Create;
  try
    success:=true;
    for i:=0 to fRequestCount-1 do begin
      test.Description:=FormatUTF8('test-%-%',[fID, i]);
      success:=success and (fHttpClient.MyCommand.Add(test)=cqrsSuccess);
      if not success then
        break;
    end;
    if success then
      success:=fHttpClient.MyCommand.Commit=cqrsSuccess;
    if not success then
    begin
      fIsError:=true;
      raise Exception.Create('Something went wrong!');
    end;
  finally
    test.Free;
  end;
end;

{ TCallback }

procedure TCallback.DoCallback(const aDescription: RawUTF8);
begin
  //writeln('callback!');
end;

begin
  InitializeCriticalSection(gCriticalSection);
  with TMyTests.Create('mORMot DDD Test') do
  try
    Run;
  finally
    Free;
  end;
  WriteLn(#13#10'Done - Press ENTER to Exit');
  ReadLn;
  DeleteCriticalSection(gCriticalSection);
end.

The issue happens under heavy server load.
At some point there will happen an ESynCrypto Exception: 'TAESCFB.DecryptPKCS7: Invalid content' while sending WebSocket notifications inside IMyCommand.Add() !
If you set "FORCE_BUG = false;" then there won't be any issues. This option will disable WebSocket notifications inside IMyCommand.Add method.
King regards,
oz.

Offline

#2 2015-09-23 13:19:42

ab
Administrator
From: France
Registered: 2010-06-21
Posts: 14,242
Website

Re: Serious bug/issue regarding WebSockets

I did not have the ESynCrypto exception.

I received a 404 error, mainly due to the fact that the callbacks are a global list.
This IMHO a wrong design of your sample.
Callbacks should be private to each instance, and released/unregistered ASAP from the client side.
You are sharing callbacks, and some are still trying to be notified even if their TMyHttpClient owner is not there any more.
IMHO it should be in each TTestRest to maintain the callback list.

Offline

#3 2015-09-23 16:20:14

oz
Member
Registered: 2015-09-02
Posts: 95

Re: Serious bug/issue regarding WebSockets

Hi Arnaud,

first of all: thanks for your quick reply, as usual!

Yes, you are right, the 404 error can happen! But, that's not the problem. As far as I understand, it's not because a global list used to store callbacks on server side.
This global list ist only used by the server. The clients are keeping their callback instances in the private fCallback: ICallback variable. Such 404 errors can always happen because the client which should be notified, could already have disconnected. That 404 error is perfectly catched by the try...except block inside TTestRest.Add().

I'm quite sure that this is not the problem. Let me describe my thougths in more detail:

TTestRest is implementing IMyCommand in sicClientDriven mode.
This means that each connected client has its own TTestRest object living on server side.
That's why some kind of global list has to be used, otherwise each callbacks list (which is private to TTestRest then) would only contain one single entry. Access to the global List ist secured using CriticalSections, so there can't be any threading issues.

By the way: Imho the included demo "Project31ChatServer.dpr" is doing exactly the same at the end. The server-side interface is using "sicShared", which means that there is only one instance on server, shared by all clients. So, at the end, "fConnected: array of IChatCallback;" used in the demo to store the callbacks is exactly the same which is done by my testcase.
I think i'm right about that, or did I miss the point?

Arnaud, i'm quite sure that there's an issue with WebSockets implementation under heavy server load.

In my real project, those callbacks on server-side are not stored within a global list. I'm using my own custom session handling inside my business logic tier, which does not depend on mORMot's sessions. At the end, there's something like a TmySession instance for each connected client. Every TmySession has one IMyCallback reference to the callback Interface registered by the client. Everything runs perfectly fine, until running the heavy load stress test. From time to time the 404 errors occur, but that's ok, they are catched and those 404 errors can't be prevented. They just happen sometimes.

I'm pretty sure there's an issue with WebSockets, because of strange testcase behaviour.
70 out of 100 times the testcase will produce 404 errors. Ok.
29 out of 100 times the testcase runs without errors. Ok.
BUT: Sometimes, maybe 1 out of 100 times there are exceptions in:
Stacktrace:

TAESAbstract.DecryptPKCS7('PƒäÈZ#]2gzþ¥éÁŒNçSá¶î¢âM”G'#4#5'~xp`*væ©hW'#$A'†'#$1F'¥NÃgPcc@œeÁ€67$æù'#$1E'$dþ>^‘'#$1C'ª8GCOA½$'#$E',šEÇÞGŒ'#$1C'ÕfÙÔ¶°'#$1C'â'#3#$11'Ëÿm¿Œ+Ç'#$F#$B'•'#$1A'Ü'#$B'›[ÈZ¹Ö'#$C'='#$10#$10'ÿÿ'#8'6´'#$15'T¾?ÊY¼–á!@WÆÚEŒŒJ'#$F'1æ2,ušÌXJçß'#$14'P#…Šïü9érÀ?wöûŒ'#$12#$12#$12#$10'Ýw<TnëÀÓûžk²¹F|'#$C'‰ùEôÏYj!§O'#8#$E'§'#$A'[Ý«i7Sàå&(×€ý®Ëï¹jË~؁ù7•'#$F']¨'#$15'ؾ«ëÃ'#7'à×.‹­äFÝÖÖáãf„îÐ'#$A'1 â^3óuÜa(¸'#$D'˜',True)
TWebSocketProtocolBinary.AfterGetFrame((focBinary, 'PƒäÈZ#]2gzþ¥éÁŒNçSá¶î¢âM”G'#4#5'~xp`*væ©hW'#$A'†'#$1F'¥NÃgPcc@œeÁ€67$æù'#$1E'$dþ>^‘'#$1C'ª8GCOA½$'#$E',šEÇÞGŒ'#$1C'ÕfÙÔ¶°'#$1C'â'#3#$11'Ëÿm¿Œ+Ç'#$F#$B'•'#$1A'Ü'#$B'›[ÈZ¹Ö'#$C'='#$10#$10'ÿÿ'#8'6´'#$15'T¾?ÊY¼–á!@WÆÚEŒŒJ'#$F'1æ2,ušÌXJçß'#$14'P#…Šïü9érÀ?wöûŒ'#$12#$12#$12#$10'Ýw<TnëÀÓûžk²¹F|'#$C'‰ùEôÏYj!§O'#8#$E'§'#$A'[Ý«i7Sàå&(×€ý®Ëï¹jË~؁ù7•'#$F']¨'#$15'ؾ«ëÃ'#7'à×.‹­äFÝÖÖáãf„îÐ'#$A'1 â^3óuÜa(¸'#$D'˜'))
TWebSocketProcess.GetFrame((focBinary, 'PƒäÈZ#]2gzþ¥éÁŒNçSá¶î¢âM”G'#4#5'~xp`*væ©hW'#$A'†'#$1F'¥NÃgPcc@œeÁ€67$æù'#$1E'$dþ>^‘'#$1C'ª8GCOA½$'#$E',šEÇÞGŒ'#$1C'ÕfÙÔ¶°'#$1C'â'#3#$11'Ëÿm¿Œ+Ç'#$F#$B'•'#$1A'Ü'#$B'›[ÈZ¹Ö'#$C'='#$10#$10'ÿÿ'#8'6´'#$15'T¾?ÊY¼–á!@WÆÚEŒŒJ'#$F'1æ2,ušÌXJçß'#$14'P#…Šïü9érÀ?wöûŒ'#$12#$12#$12#$10'Ýw<TnëÀÓûžk²¹F|'#$C'‰ùEôÏYj!§O'#8#$E'§'#$A'[Ý«i7Sàå&(×€ý®Ëï¹jË~؁ù7•'#$F']¨'#$15'ؾ«ëÃ'#7'à×.‹­äFÝÖÖáãf„îÐ'#$A'1 â^3óuÜa(¸'#$D'˜'),1,False)
TWebSocketProcess.ProcessLoop
TWebSocketProcessClientThread.Execute
ThreadProc($7EEE12E0)
ThreadWrapper($7EF88580)

The exception is raised here:

function TAESAbstract.DecryptPKCS7(const Input: RawByteString;
  IVAtBeginning: boolean): RawByteString;
var len,iv,padding: integer;
begin
  len := length(Input);
  DecryptLen(len,iv,pointer(Input),IVAtBeginning);
  SetString(result,nil,len);
  Decrypt(@PByteArray(Input)^[iv],pointer(result),len);
  padding := ord(result[len]); // result[1..len]
  if padding>AESBlockSize then   // <-  THIS IS WHERE IT HAPPENS
    raise ESynCrypto.CreateUTF8('%.DecryptPKCS7: Invalid content',[self]);
  SetLength(result,len-padding);
end;

It's hard to reproduce this issue, because most of the times everything is ok. But sometimes, things start going wrong.

I hope you get my point. Feel free to ask any further questions!
I could rewrite the testcase if it helps, but I don't see that there's something wrong with it.

King regards,
oz.

Offline

#4 2015-09-23 16:26:52

oz
Member
Registered: 2015-09-02
Posts: 95

Re: Serious bug/issue regarding WebSockets

One more thing:
Most of the times there are 404 errors prior to the exception stated above.
But just right now at the moment the exception occured without any prior 404 error in my testcase.
Maybe that information helps.
King regards,
oz.

Offline

#5 2015-09-23 19:13:32

oz
Member
Registered: 2015-09-02
Posts: 95

Re: Serious bug/issue regarding WebSockets

ab wrote:

I did not have the ESynCrypto exception.

I received a 404 error, mainly due to the fact that the callbacks are a global list.
This IMHO a wrong design of your sample.
Callbacks should be private to each instance, and released/unregistered ASAP from the client side.
You are sharing callbacks, and some are still trying to be notified even if their TMyHttpClient owner is not there any more.
IMHO it should be in each TTestRest to maintain the callback list.

Hey, it's me again wink
After re-reading your post: the usage of this is to notify all connected clients that some event has occured. So the design is based on the chat server sample.

Offline

#6 2015-09-24 08:03:00

ab
Administrator
From: France
Registered: 2010-06-21
Posts: 14,242
Website

Re: Serious bug/issue regarding WebSockets

Access to the sockets is protected by critical sections.
See how GetFrame and SendFrame are implemented in TWebSocketProcess.
So I doubt there is some race condition here.

As you can see, the issue is in TWebSocketProcessClientThread.
So it occurs on the client side, not on the server side.
The issue may come from clients still receiving notifications in their thread, during the short time of their destruction, since the callback has not been notified as released to the server side.

Once you get rid of the 404 issue, i.e. once the callbacks are properly notified as removed to the server side, before the client is destroyed, I'm quite confident you would not have other issues.

Offline

Board footer

Powered by FluxBB