#1 Yesterday 18:52:23

firstfriday
Member
Registered: 2015-07-21
Posts: 33

Access a Database with Threads

I was experiencing intermittent crashes and database errors when using latest mORMot2 + SQLite (3.51.2) in a multi-threaded scenario.

Scenario:

  • Multiple worker threads (e.g. 4 threads)

  • All threads access the same SQLite database file.

I prepared a smal example with runs with no other dependencies. Just create a console application with name SqliteMormotThreadTest and past all in in.
Place a breakpoint at bm12345. I would be very thankful if somebody can help me.

program SqliteMormotThreadTest;

{$APPTYPE CONSOLE}

uses
  System.SysUtils,
  System.Classes,
  System.SyncObjs,
  System.Generics.Collections,
  mormot.db.sql,
  mormot.db.raw.sqlite3,
  mormot.db.sql.sqlite3,
  mormot.db.raw.sqlite3.static,
  mormot.core.base,
  mormot.core.os,
  mormot.core.text;

type
  TTestMode = (tmSelectOnly, tmInsert);

  TRunStats = record
    ThreadId: Cardinal;
    Iterations: Integer;
    SuccessCount: Integer;
    ErrorCount: Integer;
    LastError: string;
  end;

  TConsoleLog = class
  private
    class var FLock: TObject;
  public
    class constructor Create;
    class destructor Destroy;
    class procedure Line(const S: string); static;
  end;

  TWorker = class(TThread)
  private
    FDbFile: string;
    FMode: TTestMode;
    FIterations: Integer;
    FStats: TRunStats;
    procedure Log(const Msg: string);
    procedure ExecuteSelectOnly;
    procedure ExecuteInsert;
    procedure VerifyHeader;
  protected
    procedure Execute; override;
  public
    constructor Create(const ADbFile: string; AMode: TTestMode; AIterations: Integer);
    property Stats: TRunStats read FStats;
  end;

class constructor TConsoleLog.Create;
begin
  FLock := TObject.Create;
end;

class destructor TConsoleLog.Destroy;
begin
  FLock.Free;
end;

class procedure TConsoleLog.Line(const S: string);
begin
  TMonitor.Enter(FLock);
  try
    Writeln(FormatDateTime('hh:nn:ss.zzz', Now), ' ', S);
  finally
    TMonitor.Exit(FLock);
  end;
end;

constructor TWorker.Create(const ADbFile: string; AMode: TTestMode; AIterations: Integer);
begin
  inherited Create(True);
  FreeOnTerminate := False;
  FDbFile := ADbFile;
  FMode := AMode;
  FIterations := AIterations;
  FillChar(FStats, SizeOf(FStats), 0);
end;

procedure TWorker.Log(const Msg: string);
begin
  TConsoleLog.Line(Format('[TID=%d] %s', [GetCurrentThreadId, Msg]));
end;

procedure TWorker.VerifyHeader;
var
  FS: TFileStream;
  Buf: TBytes;
  Header: AnsiString;
begin
  if not FileExists(FDbFile) then
    raise Exception.Create('DB file not found: ' + FDbFile);

  FS := TFileStream.Create(FDbFile, fmOpenRead or fmShareDenyNone);
  try
    if FS.Size < 16 then
      raise Exception.Create('DB file too small: ' + FDbFile);
    SetLength(Buf, 16);
    FS.ReadBuffer(Buf[0], 16);
    SetString(Header, PAnsiChar(@Buf[0]), 15);
    if Header <> 'SQLite format 3' then
      raise Exception.Create('Invalid SQLite header: ' + string(Header));
  finally
    FS.Free;
  end;
end;

procedure TWorker.ExecuteSelectOnly;
var
  I: Integer;
  Props: TSQLDBSQLite3ConnectionProperties;
  Conn: TSQLDBConnection;
  Stmt: ISQLDBStatement;
  V: Int64;
begin
  for I := 1 to FIterations do
  begin
    if Terminated then
      Exit;

    VerifyHeader;
    Props := nil;
    Conn := nil;
    try
      Log(Format('ITER=%d pre-props DB="%s"', [I, FDbFile]));
      Props := TSQLDBSQLite3ConnectionProperties.Create(FDbFile, '', '', '');

      Log(Format('ITER=%d pre-connect Props=%p', [I, Pointer(Props)]));
      Conn := Props.NewConnection;
      if Conn = nil then
        raise Exception.Create('NewConnection returned nil');

      Log(Format('ITER=%d pre-Connect Conn=%p', [I, Pointer(Conn)]));
      Conn.Connect;
      Log(Format('ITER=%d connected Conn=%p', [I, Pointer(Conn)]));

      Stmt := Conn.NewStatementPrepared('select 1', True);
      Stmt.ExecutePrepared;
      if not Stmt.Step then
        raise Exception.Create('select 1 returned no row');
      V := Stmt.ColumnInt(0);
      if V <> 1 then
        raise Exception.CreateFmt('select 1 returned %d', [V]);

      Inc(FStats.SuccessCount);
    except
      on E: Exception do
      begin
        Inc(FStats.ErrorCount); // bm12345
        FStats.LastError := E.ClassName + ': ' + E.Message;
        Log(Format('ITER=%d ERROR=%s', [I, FStats.LastError]));
      end;
    end;

    try
      if Conn <> nil then
        Conn.Disconnect;
    except
      on E: Exception do
        Log('Disconnect ERROR=' + E.Message);
    end;

    Conn := nil;
    Props.Free;
    Props := nil;
    Inc(FStats.Iterations);
  end;
end;

procedure TWorker.ExecuteInsert;
var
  I: Integer;
  Props: TSQLDBSQLite3ConnectionProperties;
  Conn: TSQLDBConnection;
  Stmt: ISQLDBStatement;
begin
  for I := 1 to FIterations do
  begin
    if Terminated then
      Exit;

    VerifyHeader;
    Props := nil;
    Conn := nil;
    try
      Props := TSQLDBSQLite3ConnectionProperties.Create(FDbFile, '', '', '');

      Conn := Props.NewConnection;
      if Conn = nil then
        raise Exception.Create('NewConnection returned nil');
      Conn.Connect;

      Stmt := Conn.NewStatementPrepared(
        'insert into thread_test(thread_id, iteration_no, created_utc) values(?,?,strftime(''%Y-%m-%d %H:%M:%f'',''now''))',
        False);
      Stmt.Bind(1, GetCurrentThreadId);
      Stmt.Bind(2, I);
      Stmt.ExecutePrepared;
      Inc(FStats.SuccessCount);
    except
      on E: Exception do
      begin
        Inc(FStats.ErrorCount);
        FStats.LastError := E.ClassName + ': ' + E.Message;
        Log(Format('ITER=%d ERROR=%s', [I, FStats.LastError]));
      end;
    end;

    try
      if Conn <> nil then
        Conn.Disconnect;
    except
      on E: Exception do
        Log('Disconnect ERROR=' + E.Message);
    end;

    Conn := nil;
    Props.Free;
    Props := nil;
    Inc(FStats.Iterations);
  end;
end;

procedure TWorker.Execute;
begin
  FStats.ThreadId := GetCurrentThreadId;
  Log('started');
  case FMode of
    tmSelectOnly:
      ExecuteSelectOnly;
    tmInsert:
      ExecuteInsert;
  end;
  Log(Format('finished Iter=%d Ok=%d Err=%d Last="%s"',
    [FStats.Iterations, FStats.SuccessCount, FStats.ErrorCount, FStats.LastError]));
end;

procedure EnsureDatabase(const DbFile: string);
var
  Props: TSQLDBSQLite3ConnectionProperties;
  Conn: TSQLDBConnection;
  Stmt: ISQLDBStatement;
begin
  ForceDirectories(ExtractFilePath(DbFile));

  Props := TSQLDBSQLite3ConnectionProperties.Create(DbFile, '', '', '');
  try
    Conn := Props.NewConnection;
    try
      Conn.Connect;

      Stmt := Conn.NewStatementPrepared(
        'create table if not exists thread_test (' +
        ' id integer primary key autoincrement,' +
        ' thread_id integer not null,' +
        ' iteration_no integer not null,' +
        ' created_utc text not null' +
        ')', False);
      Stmt.ExecutePrepared;

      Stmt := Conn.NewStatementPrepared('delete from thread_test', False);
      Stmt.ExecutePrepared;
      while Stmt.Step() do
      ;
      Stmt:=nil;
    finally
      Conn.Disconnect;
      Conn := nil;
    end;
  finally
    Props.Free;
  end;
end;

procedure RunTest(const DbFile: string; ThreadCount, Iterations: Integer; Mode: TTestMode);
var
  Workers: TObjectList<TWorker>;
  I: Integer;
  TotalOk: Integer;
  TotalErr: Integer;
  ModeName: string;
begin
  case Mode of
    tmSelectOnly: ModeName := 'SELECT';
    tmInsert:     ModeName := 'INSERT';
  end;

  TConsoleLog.Line(Format('RUN mode=%s threads=%d iterations=%d db="%s"',
    [ModeName, ThreadCount, Iterations, DbFile]));

  Workers := TObjectList<TWorker>.Create(True);
  try
    for I := 1 to ThreadCount do
      Workers.Add(TWorker.Create(DbFile, Mode, Iterations));

    for I := 0 to Workers.Count - 1 do
      Workers[I].Start;

    for I := 0 to Workers.Count - 1 do
      Workers[I].WaitFor;

    TotalOk := 0;
    TotalErr := 0;
    for I := 0 to Workers.Count - 1 do
    begin
      Inc(TotalOk, Workers[I].Stats.SuccessCount);
      Inc(TotalErr, Workers[I].Stats.ErrorCount);
      TConsoleLog.Line(Format(
        'SUMMARY tid=%d iter=%d ok=%d err=%d last="%s"',
        [Workers[I].Stats.ThreadId,
         Workers[I].Stats.Iterations,
         Workers[I].Stats.SuccessCount,
         Workers[I].Stats.ErrorCount,
         Workers[I].Stats.LastError]));
    end;

    TConsoleLog.Line(Format('DONE mode=%s totalOk=%d totalErr=%d',
      [ModeName, TotalOk, TotalErr]));
  finally
    Workers.Free;
  end;
end;

const
  dbpath='..\..\test_thread.db';
var
  DbFile: string;
begin
  try
    ReportMemoryLeaksOnShutdown := True;
    // start fresh
    DeleteFile(dbpath);

    if ParamCount > 0 then
      DbFile := ExpandFileName(ParamStr(1))
    else
      DbFile := ExpandFileName(dbpath);

    TConsoleLog.Line('EXE=' + ParamStr(0));
    TConsoleLog.Line('DB =' + DbFile);
    // create empty database;
    EnsureDatabase(DbFile);

    // fill it with data in non thread manner
    TConsoleLog.Line('INSERT DATA NON THREADED.');

    var w1:TWorker;
    w1:=TWorker.Create(DbFile,TTestMode.tmInsert,500);
    w1.ExecuteInsert;
    w1.ExecuteSelectOnly;
    w1.free;

    // test threading
    TConsoleLog.Line('INSERT DATA THREADED.');
    RunTest(DbFile, 1, 200, tmSelectOnly);  // works with on thread
    RunTest(DbFile, 4, 500, tmSelectOnly);  // error with >1 thead, see bm12345
    //RunTest(DbFile, 4, 500, tmInsert); // fails also

    TConsoleLog.Line('ALL TESTS FINISHED.');
  except
    on E: Exception do
    begin
      TConsoleLog.Line('FATAL ' + E.ClassName + ': ' + E.Message);
    end;
  end;
  ReadLn;
end.

Offline

#2 Yesterday 18:59:07

ab
Administrator
From: France
Registered: 2010-06-21
Posts: 15,420
Website

Re: Access a Database with Threads

Our SQLite3 wrapper has already its own lock.
Otherwise, it is not thread safe.

Offline

#3 Today 01:42:59

firstfriday
Member
Registered: 2015-07-21
Posts: 33

Re: Access a Database with Threads

Could you please be more specific. E.g. what functions suite my requirements. Or where to find the info.

Offline

#4 Today 08:40:18

pvn0
Member
From: Slovenia
Registered: 2018-02-12
Posts: 221

Re: Access a Database with Threads

You are using mORMot framework but without the ORM engine instead opting to create low level connection to the database, that's fine but you probably want to read some sqlite documentation:
https://sqlite.org/threadsafe.html

By default the mORMot sqlite static library implementation sets the sqlite threading mode to multi-thread instead of default seralized so the following statement from the sqlite documentation is important :

Multi-thread. In this mode, SQLite can be safely used by multiple threads provided that no single database connection nor any object derived from database connection, such as a prepared statement, is used in two or more threads at the same time

AB is refering to TSqlDataBase class found in mormot.db.raw.sqlite3.pas, this is the wrapper that will handle concurrency for you and that you should be using if you decide to not use the ORM engine.

By the way, did you use AI to generate that example?

Last edited by pvn0 (Today 08:41:19)

Offline

#5 Today 16:53:32

firstfriday
Member
Registered: 2015-07-21
Posts: 33

Re: Access a Database with Threads

Btw: Yes I did use GPT to create this example. But it was based on my real code.

And Claude Sonnet 4.6 explained me what I did wrong.

Btw: I really appreciate that you and Arnaud responded fast.
But "Our SQLite3 wrapper has already its own lock." and to "use TSqlDataBase" didn't help me to understand  my problem.

I should have done:
1) use WALMode
2) create ONE TSqlDBSQLite3ConnectionProperties per application
3) use ONE instance of conn:=Props.NewConnection for every thread

I will post my running example for others having to solve this problem.

Offline

#6 Today 16:56:47

firstfriday
Member
Registered: 2015-07-21
Posts: 33

Re: Access a Database with Threads

Accessing a SQLite Database with multiple threads without ORM. Direct Data Access using mORmot2.

program SqliteMormotThreadTest_ThreadSafe;

{$APPTYPE CONSOLE}


uses
  System.SysUtils,
  System.Classes,
  System.SyncObjs,
  System.Generics.Collections,
  mormot.db.sql,
  mormot.db.raw.sqlite3,
  mormot.db.sql.sqlite3,
  mormot.db.raw.sqlite3.static,
  mormot.core.base,
  mormot.core.os,
  mormot.core.text;

type
  TWorker = class(TThread)
  private
    FProps:TSQLDBSQLite3ConnectionProperties;
    FIterations: Integer;
    FOkCount: Integer;
    FErrCount: Integer;
    FLastError: string;
    function PtrToStr(P: Pointer): string;
    procedure Log(const Msg: string);
  protected
    procedure Execute; override;
  public
    constructor Create(AProps: TSQLDBSQLite3ConnectionProperties; AIterations: Integer);
  end;

var
  LogCS: TCriticalSection;

{ TWorker }

constructor TWorker.Create(AProps: TSqlDBSQLite3ConnectionProperties; AIterations: Integer);
begin
  inherited Create(True);
  FreeOnTerminate := False;
  FProps    := AProps;
  FIterations := AIterations;
end;

function TWorker.PtrToStr(P: Pointer): string;
begin
  Result := IntToHex(NativeUInt(P), SizeOf(Pointer) * 2);
end;

procedure TWorker.Log(const Msg: string);
begin
  LogCS.Acquire;
  try
    Writeln(FormatDateTime('hh:nn:ss.zzz', Now) + ' ' + Msg);
  finally
    LogCS.Release;
  end;
end;

procedure TWorker.Execute;
var
  Conn: TSqlDBSQLite3Connection;
  Stmt: ISqlDBStatement;
  I, V: Integer;
begin

  Conn := FProps.NewConnection as TSqlDBSQLite3Connection;
  try
    Conn.Connect;
    Log(Format('START tid=%d conn=%s', [GetCurrentThreadId, PtrToStr(Pointer(Conn))]));

    Conn.DB.BusyTimeout := 5000;
    try
      for I := 1 to FIterations do
      begin
        Stmt := nil; // explizit nil setzen vor jedem Aufruf
        try
          Stmt := Conn.NewStatementPrepared('SELECT 1', True);
          Stmt.ExecutePrepared;
          if not Stmt.Step then
            raise Exception.Create('No row');
          V := Stmt.ColumnInt(0);
          if V <> 1 then
            raise Exception.CreateFmt('Bad result: %d', [V]);
          Stmt := nil; // Lock freigeben vor nächster Iteration
          Inc(FOkCount);
        except
          on E: Exception do
          begin
            Stmt := nil;
            Inc(FErrCount);
            FLastError := E.ClassName + ': ' + E.Message;
            Log(Format('ERR tid=%d iter=%d %s', [GetCurrentThreadId, I, FLastError]));
          end;
        end;
      end;
    finally
      Conn.Disconnect;
    end;
  finally
    Conn.Free;
  end;
  Log(Format('SUMMARY tid=%d ok=%d err=%d last="%s"', [GetCurrentThreadId, FOkCount, FErrCount, FLastError]));
end;

//
// *** Main ***
//
//TSQLDBSQLite3ConnectionProperties   ← ONE in the application, stores only configurations parameter for the database, no access to database.
//        │
//        ├── ThreadSafeConnection    ← handles connection pooling  via threadvars
//        │
//        └── NewConnection           ← returns a fresh TSqlDBConnection instance
var
  Props: TSQLDBSQLite3ConnectionProperties;
  Conn: TSqlDBConnection;
  Workers: array of TWorker;
  I: Integer;
  ThreadCount: Integer;
  Iterations: Integer;

begin
  ReportMemoryLeaksOnShutdown := True;

  LogCS := TCriticalSection.Create; // only for log
  try
    ThreadCount := 10;
    Iterations := 1150000;

    Props := TSqlDBSQLite3ConnectionProperties.Create('test.db', '', '', '');

    var SetupConn: TSqlDBSQLite3Connection;
    SetupConn := Props.NewConnection as TSqlDBSQLite3Connection;
    try
      SetupConn.Connect;
      SetupConn.DB.WALMode       := True;       // journal_mode=WAL
      SetupConn.DB.BusyTimeout   := 5000;       // busy_timeout=5000ms
      SetupConn.DB.Synchronous   := smNormal;   // PRAGMA synchronous=NORMAL
      SetupConn.Disconnect;
    finally
      SetupConn.Free;
    end;

    try
      SetLength(Workers, ThreadCount);

      for I := 0 to ThreadCount - 1 do
        Workers[I] := TWorker.Create(Props, Iterations);

      for I := 0 to ThreadCount - 1 do
        Workers[I].Start;

      for I := 0 to ThreadCount - 1 do
        Workers[I].WaitFor;

      for I := 0 to ThreadCount - 1 do
        Workers[I].Free;
    finally
      Props.Free;
    end;
  finally
    LogCS.Free;
  end;

  Writeln('DONE');
  Readln;
end.

Offline

Board footer

Powered by FluxBB