You are not logged in.
Pages: 1
Hi Arnaud!
I've just faced a serious bug in mORMot's WebSockets implementation.
The following testcase can reproduce the bug:
program mORMotBug;
{$I Synopse.inc} // define HASINLINE USETYPEINFO CPU32 CPU64 OWNNORMTOUPPER
{$APPTYPE CONSOLE}
{$R *.res}
uses
FastMM4,
SysUtils,
Classes,
Windows,
SynSqlite3Static,
SynSqlite3,
SynCommons,
SynLog,
SynTests,
mORMot,
mORMotHttpServer,
mORMotHttpClient,
mORMotSqlite3,
mORMotDDD;
const
/// "FORCE_BUG = true;" will send WebSocket notifications to the other connected clients in IMyCommand.Add() method.
// At some point there will happen an ESynCrypto Exception: 'TAESCFB.DecryptPKCS7: Invalid content'!
// The Websocket bug will occur. "FORCE_BUG = false;" won't produce any issues.
FORCE_BUG = true;
HTTP_PORT = '80';
WEBSOCKET_KEY = 'key';
type
ICallback = interface(IInvokable)
['{B42B0BBA-AA9C-459B-8731-1C52441361C9}']
procedure DoCallback(const aDescription: RawUTF8);
end;
var
gCallbackArray: array of ICallback;
gCriticalSection: TRTLCriticalSection;
type
TCallback = class(TInterfacedObject, ICallback)
public
procedure DoCallback(const aDescription: RawUTF8);
end;
// This is our simple Test data class. Will be mapped to TSQLRecordTest.
TTest = class(TSynPersistent)
private
fDescription: RawUTF8;
published
property Description: RawUTF8 read fDescription write fDescription;
end;
TTestObjArray = array of TTest;
// The corresponding TSQLRecord for TTest.
TSQLRecordTest = class(TSQLRecord)
private
fDescription: RawUTF8;
published
property Description: RawUTF8 read fDescription write fDescription;
end;
// CQRS Query Interface fo TTest
IMyQuery = interface(ICQRSService)
['{DD402806-39C2-4921-98AA-A575DD1117D6}']
function SelectByDescription(const aDescription: RawUTF8): TCQRSResult;
function SelectAll: TCQRSResult;
function Get(out aAggregate: TTest): TCQRSResult;
function GetAll(out aAggregates: TTestObjArray): TCQRSResult;
function GetNext(out aAggregate: TTest): TCQRSResult;
function GetCount: integer;
end;
// CQRS Command Interface for TTest
IMyCommand = interface(IMyQuery)
['{F0E4C64C-B43A-491B-85E9-FD136843BFCB}']
function Add(const aAggregate: TTest): TCQRSResult;
function Update(const aUpdatedAggregate: TTest): TCQRSResult;
function Delete: TCQRSResult;
function DeleteAll: TCQRSResult;
function Commit: TCQRSResult;
function Rollback: TCQRSResult;
function RegisterCallback(const aInterface: ICallback): TCQRSResult;
end;
// The infratructure REST class implementing the Query and Command Interfaces for TTest
TTestRest = class(TDDDRepositoryRestCommand,IMyCommand,IMyQuery)
public
function SelectByDescription(const aDescription: RawUTF8): TCQRSResult;
function SelectAll: TCQRSResult;
function Get(out aAggregate: TTest): TCQRSResult;
function GetAll(out aAggregates: TTestObjArray): TCQRSResult;
function GetNext(out aAggregate: TTest): TCQRSResult;
function Add(const aAggregate: TTest): TCQRSResult;
function Update(const aUpdatedAggregate: TTest): TCQRSResult;
function RegisterCallback(const aInterface: ICallback): TCQRSResult;
end;
// REST Factory for TTestRest instances
TTestRestFactory = class(TDDDRepositoryRestFactory)
public
constructor Create(aRest: TSQLRest; aOwner: TDDDRepositoryRestManager=nil); reintroduce;
end;
// Test container
TMyTests = class(TSynTestsLogged)
published
procedure MyTests;
end;
// Test case doing the actual work
TMyTestCase = class(TSynTestCase)
private
// Rest server
fRestServer: TSQLRestServerDB;
// Http server
fHttpServer: TSQLHttpServer;
/// Will create as many Clients as specified by aClient.
// - Each client will perform as many Requests as specified by aRequests.
// - This function will wait for all Clients until finished.
function ClientTest(const aClients, aRequests: integer):boolean;
protected
// Cleaning up the test
procedure CleanUp; override;
published
// Delete any old Test database on start
procedure DeleteOldDatabase;
// Start the whole DDD Server (http and rest)
procedure StartServer;
// Test straight-forward access using 1 thread and 1 client
procedure SingleClientTest;
// Test concurrent access with multiple clients. This will crash!
procedure MultiClientTest;
end;
// Custom TSQLHttpClient encapsulating the remote IMyCommand interface.
TMyHttpClient=class(TSQLHttpClientWebsockets)
private
// Internal Model
fModel: TSQLModel;
// IMyCommand interface. Will be assigned inside SetUser
fMyCommand: IMyCommand;
fCallback: ICallback;
public
constructor Create(const aServer,aPort: RawUTF8); //overload;
destructor Destroy; override;
function SetUser(const aUserName, aPassword: RawUTF8; aHashedPassword: Boolean=false): boolean; reintroduce;
property MyCommand: IMyCommand read fMyCommand;
end;
// The thread used by TMyTestCase.ClientTest
TMyThread = class(TThread)
private
fHttpClient: TMyHttpClient;
fRequestCount: integer;
fId: integer;
fIsError: boolean;
protected
procedure Execute; override;
public
constructor Create(const aId, aRequestCount: integer);
destructor Destroy; override;
property IsError: boolean read fIsError;
end;
{ TTestRest }
function TTestRest.SelectByDescription(
const aDescription: RawUTF8): TCQRSResult;
begin
result := ORMSelectOne('Description=?',[aDescription],(aDescription=''));
end;
function TTestRest.SelectAll: TCQRSResult;
begin
result := ORMSelectAll('',[]);
end;
function TTestRest.Get(out aAggregate: TTest): TCQRSResult;
begin
result := ORMGetAggregate(aAggregate);
end;
function TTestRest.GetAll(
out aAggregates: TTestObjArray): TCQRSResult;
begin
result := ORMGetAllAggregates(aAggregates);
end;
function TTestRest.GetNext(out aAggregate: TTest): TCQRSResult;
begin
result := ORMGetNextAggregate(aAggregate);
end;
function TTestRest.Add(const aAggregate: TTest): TCQRSResult;
var
i: integer;
begin
result := ORMAdd(aAggregate);
if not FORCE_BUG then
exit;
EnterCriticalSection(gCriticalSection);
try
for i := high(gCallbackArray) downto 0 do // downwards for InterfaceArrayDelete()
try
gCallbackArray[i].DoCallback('');
except
InterfaceArrayDelete(gCallbackArray,i); // unsubscribe the callback on failure
end;
finally
LeaveCriticalSection(gCriticalSection);
end;
end;
function TTestRest.Update(
const aUpdatedAggregate: TTest): TCQRSResult;
begin
result := ORMUpdate(aUpdatedAggregate);
end;
{ TInfraRepoUserFactory }
constructor TTestRestFactory.Create(aRest: TSQLRest;
aOwner: TDDDRepositoryRestManager);
begin
inherited Create(IMyCommand,TTestRest,TTest,aRest,TSQLRecordTest,aOwner);
end;
function TTestRest.RegisterCallback(
const aInterface: ICallback): TCQRSResult;
begin
EnterCriticalSection(gCriticalSection);
try
InterfaceArrayAdd(gCallbackArray, aInterface);
finally
LeaveCriticalSection(gCriticalSection);
end;
result:=cqrsSuccess;
end;
{ TMyTests }
procedure TMyTests.MyTests;
begin
AddCase([TMyTestCase]);
end;
{ TMyTestCase }
procedure TMyTestCase.CleanUp;
var
i: integer;
begin
EnterCriticalSection(gCriticalSection);
try
for i:=High(gCallbackArray) downto 0 do
InterfaceArrayDelete(gCallbackArray, i);
finally
LeaveCriticalSection(gCriticalSection);
end;
if Assigned(fHttpServer) then
FreeAndNil(fHttpServer);
if Assigned(fRestServer) then
FreeAndNil(fRestServer);
end;
procedure TMyTestCase.DeleteOldDatabase;
begin
if FileExists(ChangeFileExt(ParamStr(0), '.db3')) then
SysUtils.DeleteFile(ChangeFileExt(ParamStr(0), '.db3'));
CheckNot(FileExists(ChangeFileExt(ParamStr(0), '.db3')));
end;
procedure TMyTestCase.StartServer;
begin
fRestServer:=TSQLRestServerDB.CreateWithOwnModel([TSQLRecordTest], ChangeFileExt(ParamStr(0), '.db3'), true);
with fRestServer do begin
DB.Synchronous := smNormal;
DB.LockingMode := lmExclusive;
CreateMissingTables();
TInterfaceFactory.RegisterInterfaces([TypeInfo(IMyQuery),TypeInfo(IMyCommand)]);
ServiceContainer.InjectResolver([TTestRestFactory.Create(fRestServer)],true);
ServiceDefine(TTestRest,[IMyCommand],sicClientDriven);
end;
fHttpServer:=TSQLHttpServer.Create(HTTP_PORT, fRestServer, '+', useBidirSocket);
fHttpServer.WebSocketsEnable(fRestServer, WEBSOCKET_KEY);
end;
procedure TMyTestCase.MultiClientTest;
begin
ClientTest(20,50);
end;
procedure TMyTestCase.SingleClientTest;
var
HttpClient: TMyHttpClient;
test: TTest;
i: integer;
const
MAX = 1000;
begin
HttpClient:=TMyHttpClient.Create('localhost', HTTP_PORT);
try
Check(HttpClient.SetUser('Admin', 'synopse'));
test:=TTest.Create;
try
for i:=0 to MAX-1 do begin
test.Description:=FormatUTF8('test-%',[i]);
Check(HttpClient.MyCommand.Add(test)=cqrsSuccess);
end;
Check(HttpClient.MyCommand.Commit=cqrsSuccess);
finally
test.Free;
end;
finally
HttpClient.Free;
end;
end;
function TMyTestCase.ClientTest(const aClients, aRequests: integer):boolean;
var
i: integer;
arrThreads: array of TMyThread;
arrHandles: array of THandle;
rWait: Cardinal;
begin
result := false;
SetLength(arrThreads, aClients);
SetLength(arrHandles, aClients);
for i:=Low(arrThreads) to High(arrThreads) do
begin
arrThreads[i]:=TMyThread.Create(i,aRequests);
arrHandles[i]:=arrThreads[i].Handle;
arrThreads[i].Resume;
end;
try
repeat
rWait:= WaitForMultipleObjects(aClients, @arrHandles[0], True, INFINITE);
until rWait<>WAIT_TIMEOUT;
finally
for i:=Low(arrThreads) to High(arrThreads) do
begin
CheckNot(arrThreads[i].IsError);
arrThreads[i].Free;
end;
SetLength(arrThreads, 0);
SetLength(arrHandles, 0);
end;
end;
{ TMyHttpClient }
constructor TMyHttpClient.Create(const aServer,aPort: RawUTF8);
begin
fModel:=TSQLModel.Create([TSQLRecordTest]);
fCallback:=TCallback.Create;
inherited Create(aServer, aPort, fModel);
end;
destructor TMyHttpClient.Destroy;
begin
fCallback:=nil;
fMyCommand:=nil;
inherited;
fModel.Free;
end;
function TMyHttpClient.SetUser(const aUserName, aPassword: RawUTF8; aHashedPassword: Boolean=false): boolean;
begin
result := inherited SetUser(aUserName, aPassword, aHashedPassword);
WebSocketsUpgrade(WEBSOCKET_KEY);
if result then
begin
ServiceDefine([IMyCommand],sicClientDriven);
Services.Resolve(IMyCommand, fMyCommand);
end;
fMyCommand.RegisterCallback(fCallback);
end;
{ TMyThread }
constructor TMyThread.Create(const aID, aRequestCount: integer);
begin
inherited Create(true);
fRequestCount:=aRequestCount;
fId:=aId;
fIsError:=false;
fHttpClient := TMyHttpClient.Create('localhost', HTTP_PORT);
fHttpClient.SetUser('Admin', 'synopse');
end;
destructor TMyThread.Destroy;
begin
fHttpClient.Free;
inherited;
end;
procedure TMyThread.Execute;
var
i: integer;
test: TTest;
success: boolean;
begin
test:=TTest.Create;
try
success:=true;
for i:=0 to fRequestCount-1 do begin
test.Description:=FormatUTF8('test-%-%',[fID, i]);
success:=success and (fHttpClient.MyCommand.Add(test)=cqrsSuccess);
if not success then
break;
end;
if success then
success:=fHttpClient.MyCommand.Commit=cqrsSuccess;
if not success then
begin
fIsError:=true;
raise Exception.Create('Something went wrong!');
end;
finally
test.Free;
end;
end;
{ TCallback }
procedure TCallback.DoCallback(const aDescription: RawUTF8);
begin
//writeln('callback!');
end;
begin
InitializeCriticalSection(gCriticalSection);
with TMyTests.Create('mORMot DDD Test') do
try
Run;
finally
Free;
end;
WriteLn(#13#10'Done - Press ENTER to Exit');
ReadLn;
DeleteCriticalSection(gCriticalSection);
end.
The issue happens under heavy server load.
At some point there will happen an ESynCrypto Exception: 'TAESCFB.DecryptPKCS7: Invalid content' while sending WebSocket notifications inside IMyCommand.Add() !
If you set "FORCE_BUG = false;" then there won't be any issues. This option will disable WebSocket notifications inside IMyCommand.Add method.
King regards,
oz.
Offline
I did not have the ESynCrypto exception.
I received a 404 error, mainly due to the fact that the callbacks are a global list.
This IMHO a wrong design of your sample.
Callbacks should be private to each instance, and released/unregistered ASAP from the client side.
You are sharing callbacks, and some are still trying to be notified even if their TMyHttpClient owner is not there any more.
IMHO it should be in each TTestRest to maintain the callback list.
Offline
Hi Arnaud,
first of all: thanks for your quick reply, as usual!
Yes, you are right, the 404 error can happen! But, that's not the problem. As far as I understand, it's not because a global list used to store callbacks on server side.
This global list ist only used by the server. The clients are keeping their callback instances in the private fCallback: ICallback variable. Such 404 errors can always happen because the client which should be notified, could already have disconnected. That 404 error is perfectly catched by the try...except block inside TTestRest.Add().
I'm quite sure that this is not the problem. Let me describe my thougths in more detail:
TTestRest is implementing IMyCommand in sicClientDriven mode.
This means that each connected client has its own TTestRest object living on server side.
That's why some kind of global list has to be used, otherwise each callbacks list (which is private to TTestRest then) would only contain one single entry. Access to the global List ist secured using CriticalSections, so there can't be any threading issues.
By the way: Imho the included demo "Project31ChatServer.dpr" is doing exactly the same at the end. The server-side interface is using "sicShared", which means that there is only one instance on server, shared by all clients. So, at the end, "fConnected: array of IChatCallback;" used in the demo to store the callbacks is exactly the same which is done by my testcase.
I think i'm right about that, or did I miss the point?
Arnaud, i'm quite sure that there's an issue with WebSockets implementation under heavy server load.
In my real project, those callbacks on server-side are not stored within a global list. I'm using my own custom session handling inside my business logic tier, which does not depend on mORMot's sessions. At the end, there's something like a TmySession instance for each connected client. Every TmySession has one IMyCallback reference to the callback Interface registered by the client. Everything runs perfectly fine, until running the heavy load stress test. From time to time the 404 errors occur, but that's ok, they are catched and those 404 errors can't be prevented. They just happen sometimes.
I'm pretty sure there's an issue with WebSockets, because of strange testcase behaviour.
70 out of 100 times the testcase will produce 404 errors. Ok.
29 out of 100 times the testcase runs without errors. Ok.
BUT: Sometimes, maybe 1 out of 100 times there are exceptions in:
Stacktrace:
TAESAbstract.DecryptPKCS7('PƒäÈZ#]2gzþ¥éÁŒNçSá¶î¢âM”G'#4#5'~xp`*væ©hW'#$A'†'#$1F'¥NÃgPcc@œeÁ€67$æù'#$1E'$dþ>^‘'#$1C'ª8GCOA½$'#$E',šEÇÞGŒ'#$1C'ÕfÙÔ¶°'#$1C'â'#3#$11'Ëÿm¿Œ+Ç'#$F#$B'•'#$1A'Ü'#$B'›[ÈZ¹Ö'#$C'='#$10#$10'ÿÿ'#8'6´'#$15'T¾?ÊY¼–á!@WÆÚEŒŒJ'#$F'1æ2,ušÌXJçß'#$14'P#…Šïü9érÀ?wöûŒ'#$12#$12#$12#$10'Ýw<TnëÀÓûžk²¹F|'#$C'‰ùEôÏYj!§O'#8#$E'§'#$A'[Ý«i7Sàå&(×€ý®Ëï¹jË~Øù7•'#$F']¨'#$15'ؾ«ëÃ'#7'à×.‹äFÝÖÖáãf„îÐ'#$A'1 â^3óuÜa(¸'#$D'˜',True)
TWebSocketProtocolBinary.AfterGetFrame((focBinary, 'PƒäÈZ#]2gzþ¥éÁŒNçSá¶î¢âM”G'#4#5'~xp`*væ©hW'#$A'†'#$1F'¥NÃgPcc@œeÁ€67$æù'#$1E'$dþ>^‘'#$1C'ª8GCOA½$'#$E',šEÇÞGŒ'#$1C'ÕfÙÔ¶°'#$1C'â'#3#$11'Ëÿm¿Œ+Ç'#$F#$B'•'#$1A'Ü'#$B'›[ÈZ¹Ö'#$C'='#$10#$10'ÿÿ'#8'6´'#$15'T¾?ÊY¼–á!@WÆÚEŒŒJ'#$F'1æ2,ušÌXJçß'#$14'P#…Šïü9érÀ?wöûŒ'#$12#$12#$12#$10'Ýw<TnëÀÓûžk²¹F|'#$C'‰ùEôÏYj!§O'#8#$E'§'#$A'[Ý«i7Sàå&(×€ý®Ëï¹jË~Øù7•'#$F']¨'#$15'ؾ«ëÃ'#7'à×.‹äFÝÖÖáãf„îÐ'#$A'1 â^3óuÜa(¸'#$D'˜'))
TWebSocketProcess.GetFrame((focBinary, 'PƒäÈZ#]2gzþ¥éÁŒNçSá¶î¢âM”G'#4#5'~xp`*væ©hW'#$A'†'#$1F'¥NÃgPcc@œeÁ€67$æù'#$1E'$dþ>^‘'#$1C'ª8GCOA½$'#$E',šEÇÞGŒ'#$1C'ÕfÙÔ¶°'#$1C'â'#3#$11'Ëÿm¿Œ+Ç'#$F#$B'•'#$1A'Ü'#$B'›[ÈZ¹Ö'#$C'='#$10#$10'ÿÿ'#8'6´'#$15'T¾?ÊY¼–á!@WÆÚEŒŒJ'#$F'1æ2,ušÌXJçß'#$14'P#…Šïü9érÀ?wöûŒ'#$12#$12#$12#$10'Ýw<TnëÀÓûžk²¹F|'#$C'‰ùEôÏYj!§O'#8#$E'§'#$A'[Ý«i7Sàå&(×€ý®Ëï¹jË~Øù7•'#$F']¨'#$15'ؾ«ëÃ'#7'à×.‹äFÝÖÖáãf„îÐ'#$A'1 â^3óuÜa(¸'#$D'˜'),1,False)
TWebSocketProcess.ProcessLoop
TWebSocketProcessClientThread.Execute
ThreadProc($7EEE12E0)
ThreadWrapper($7EF88580)
The exception is raised here:
function TAESAbstract.DecryptPKCS7(const Input: RawByteString;
IVAtBeginning: boolean): RawByteString;
var len,iv,padding: integer;
begin
len := length(Input);
DecryptLen(len,iv,pointer(Input),IVAtBeginning);
SetString(result,nil,len);
Decrypt(@PByteArray(Input)^[iv],pointer(result),len);
padding := ord(result[len]); // result[1..len]
if padding>AESBlockSize then // <- THIS IS WHERE IT HAPPENS
raise ESynCrypto.CreateUTF8('%.DecryptPKCS7: Invalid content',[self]);
SetLength(result,len-padding);
end;
It's hard to reproduce this issue, because most of the times everything is ok. But sometimes, things start going wrong.
I hope you get my point. Feel free to ask any further questions!
I could rewrite the testcase if it helps, but I don't see that there's something wrong with it.
King regards,
oz.
Offline
One more thing:
Most of the times there are 404 errors prior to the exception stated above.
But just right now at the moment the exception occured without any prior 404 error in my testcase.
Maybe that information helps.
King regards,
oz.
Offline
I did not have the ESynCrypto exception.
I received a 404 error, mainly due to the fact that the callbacks are a global list.
This IMHO a wrong design of your sample.
Callbacks should be private to each instance, and released/unregistered ASAP from the client side.
You are sharing callbacks, and some are still trying to be notified even if their TMyHttpClient owner is not there any more.
IMHO it should be in each TTestRest to maintain the callback list.
Hey, it's me again
After re-reading your post: the usage of this is to notify all connected clients that some event has occured. So the design is based on the chat server sample.
Offline
Access to the sockets is protected by critical sections.
See how GetFrame and SendFrame are implemented in TWebSocketProcess.
So I doubt there is some race condition here.
As you can see, the issue is in TWebSocketProcessClientThread.
So it occurs on the client side, not on the server side.
The issue may come from clients still receiving notifications in their thread, during the short time of their destruction, since the callback has not been notified as released to the server side.
Once you get rid of the 404 issue, i.e. once the callbacks are properly notified as removed to the server side, before the client is destroyed, I'm quite confident you would not have other issues.
Offline
Pages: 1