mORMot and Open Source friends
Check-in [9fe77d6f21]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:{5865} try to enable SynDBPostgres PostgreSQL ORM bulk insert/update/delete via nested array binding
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 9fe77d6f21406f4c38af5639f410f4d3ffffa5a6
User & Date: ab 2020-03-21 16:59:37
Context
2020-03-21
18:01
{5866} introduce TAESCTR.ComposeIV() modular method instead of TAESCTRNIST - from https://github.com/synopse/mORMot/pull/290 check-in: 204c90705c user: ab tags: trunk
16:59
{5865} try to enable SynDBPostgres PostgreSQL ORM bulk insert/update/delete via nested array binding check-in: 9fe77d6f21 user: ab tags: trunk
16:26
{5864} try to enable SynDBPostgres bulk insert via 'insert into ... values (unnest...)' - more feedback and testing are needed! check-in: ae580adfca user: ab tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to SQLite3/mORMot.pas.

643
644
645
646
647
648
649
650

651
652
653
654
655
656
657
658
.....
27373
27374
27375
27376
27377
27378
27379




























27380
27381
27382
27383
27384
27385
27386
27387
27388

27389
27390
27391
27392
27393
27394
27395
.....
27399
27400
27401
27402
27403
27404
27405
27406

27407
27408
27409
27410
27411
27412

27413
27414
27415
27416
27417
27418
27419
    InlinedParams: TJSONObjectDecoderParams;
    /// internal pointer over field names to be used after Decode() call
    // - either FieldNames, either Fields[] array as defined in Decode()
    DecodedFieldNames: PRawUTF8Array;
    /// the ID=.. value as sent within the JSON object supplied to Decode()
    DecodedRowID: TID;
    /// internal pointer over field types to be used after Decode() call
    // - to create 'INSERT INTO ... SELECT UNNEST(...)' statement for very

    // efficient bulk insertion in a PostgreSQL database
    DecodedFieldTypesToUnnest: PSQLDBFieldTypeArray;
    /// decode the JSON object fields into FieldNames[] and FieldValues[]
    // - if Fields=nil, P should be a true JSON object, i.e. defined
    // as "COL1"="VAL1" pairs, stopping at '}' or ']'; otherwise, Fields[]
    // contains column names and expects a JSON array as "VAL1","VAL2".. in P
    // - P should be after the initial '{' or '[' character, i.e. at first field
    // - P returns the next object start or nil on unexpected end of input
................................................................................
  try
    case Occasion of
    soUpdate: begin
      if FieldCount=0 then
        raise EORMException.Create('Invalid EncodeAsSQLPrepared(0)');
      W.AddShort('update ');
      W.AddString(TableName);




























      W.AddShort(' set ');
      for F := 0 to FieldCount-1 do begin // append 'COL1=?,COL2=?'
        W.AddString(DecodedFieldNames^[F]);
        W.AddShort('=?,');
      end;
      W.CancelLastComma;
      W.AddShort(' where ');
      W.AddString(UpdateIDFieldName);
      W.Add('=','?');

    end;
    soInsert: begin
      if boInsertOrIgnore in BatchOptions then
        W.AddShort('insert or ignore into ') else
      if boInsertOrReplace in BatchOptions then
        W.AddShort('insert or replace into ') else
        W.AddShort('insert into ');
................................................................................
        W.Add(' ','(');
        for F := 0 to FieldCount-1 do begin // append 'COL1,COL2'
          W.AddString(DecodedFieldNames^[F]);
          W.Add(',');
        end;
        W.CancelLastComma;
        W.AddShort(') values (');
        if DecodedFieldTypesToUnnest<>nil then begin

          for F := 0 to FieldCount-1 do begin
            W.AddShort('unnest(?::');
            W.AddShort(PG_FT[DecodedFieldTypesToUnnest^[F]]);
            W.AddShort('[]),');
          end;
        end else

          W.AddStrings('?,',FieldCount);
        W.CancelLastComma;
        W.Add(')');
      end;
    end;
    else
      raise EORMException.CreateUTF8('Unexpected EncodeAsSQLPrepared(%)',[ord(Occasion)]);






|
>
|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
>







 







|
>




<
|
>







643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
.....
27374
27375
27376
27377
27378
27379
27380
27381
27382
27383
27384
27385
27386
27387
27388
27389
27390
27391
27392
27393
27394
27395
27396
27397
27398
27399
27400
27401
27402
27403
27404
27405
27406
27407
27408
27409
27410
27411
27412
27413
27414
27415
27416
27417
27418
27419
27420
27421
27422
27423
27424
27425
.....
27429
27430
27431
27432
27433
27434
27435
27436
27437
27438
27439
27440
27441

27442
27443
27444
27445
27446
27447
27448
27449
27450
    InlinedParams: TJSONObjectDecoderParams;
    /// internal pointer over field names to be used after Decode() call
    // - either FieldNames, either Fields[] array as defined in Decode()
    DecodedFieldNames: PRawUTF8Array;
    /// the ID=.. value as sent within the JSON object supplied to Decode()
    DecodedRowID: TID;
    /// internal pointer over field types to be used after Decode() call
    // - to create 'INSERT INTO ... SELECT UNNEST(...)' or 'UPDATE ... FROM
    // SELECT UNNEST(...)' statements for very efficient bulk writes in a
    // PostgreSQL database
    DecodedFieldTypesToUnnest: PSQLDBFieldTypeArray;
    /// decode the JSON object fields into FieldNames[] and FieldValues[]
    // - if Fields=nil, P should be a true JSON object, i.e. defined
    // as "COL1"="VAL1" pairs, stopping at '}' or ']'; otherwise, Fields[]
    // contains column names and expects a JSON array as "VAL1","VAL2".. in P
    // - P should be after the initial '{' or '[' character, i.e. at first field
    // - P returns the next object start or nil on unexpected end of input
................................................................................
  try
    case Occasion of
    soUpdate: begin
      if FieldCount=0 then
        raise EORMException.Create('Invalid EncodeAsSQLPrepared(0)');
      W.AddShort('update ');
      W.AddString(TableName);
      if DecodedFieldTypesToUnnest<>nil then begin
        // PostgreSQL bulk update via nested array binding
        W.AddShort(' as t set ');
        for F := 0 to FieldCount-1 do begin
          W.AddString(DecodedFieldNames^[F]);
          W.AddShort('=v.');
          W.AddString(DecodedFieldNames^[F]);
          W.Add(',');
        end;
        W.CancelLastComma;
        W.AddShort(' from ( select');
        for F := 0 to FieldCount-1 do begin
          W.AddShort(' unnest(?::');
          W.AddShort(PG_FT[DecodedFieldTypesToUnnest^[F]]);
          W.AddShort('[]),');
        end;
        W.AddShort(' unnest(?::int8[]) ) as v('); // last param is ID
        for F := 0 to FieldCount-1 do begin
          W.AddString(DecodedFieldNames^[F]);
          W.Add(',');
        end;
        W.AddString(UpdateIDFieldName);
        W.AddShort(') where t.');
        W.AddString(UpdateIDFieldName);
        W.AddShort('=v.');
        W.AddString(UpdateIDFieldName);
      end else begin
        // regular UPDATE statement
        W.AddShort(' set ');
        for F := 0 to FieldCount-1 do begin // append 'COL1=?,COL2=?'
          W.AddString(DecodedFieldNames^[F]);
          W.AddShort('=?,');
        end;
        W.CancelLastComma;
        W.AddShort(' where ');
        W.AddString(UpdateIDFieldName);
        W.Add('=','?'); // last param is ID
      end;
    end;
    soInsert: begin
      if boInsertOrIgnore in BatchOptions then
        W.AddShort('insert or ignore into ') else
      if boInsertOrReplace in BatchOptions then
        W.AddShort('insert or replace into ') else
        W.AddShort('insert into ');
................................................................................
        W.Add(' ','(');
        for F := 0 to FieldCount-1 do begin // append 'COL1,COL2'
          W.AddString(DecodedFieldNames^[F]);
          W.Add(',');
        end;
        W.CancelLastComma;
        W.AddShort(') values (');
        if DecodedFieldTypesToUnnest<>nil then
          // PostgreSQL bulk insert via nested array binding
          for F := 0 to FieldCount-1 do begin
            W.AddShort('unnest(?::');
            W.AddShort(PG_FT[DecodedFieldTypesToUnnest^[F]]);
            W.AddShort('[]),');

          end else
          // regular INSERT statement
          W.AddStrings('?,',FieldCount);
        W.CancelLastComma;
        W.Add(')');
      end;
    end;
    else
      raise EORMException.CreateUTF8('Unexpected EncodeAsSQLPrepared(%)',[ord(Occasion)]);

Changes to SQLite3/mORMotDB.pas.

1096
1097
1098
1099
1100
1101
1102


1103
1104
1105
1106
1107
1108
1109
1110
1111
....
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
              break; // do not send too much items at once, for better speed
          finally
            tmp.Done;
          end;
        end;
      end;
      mDelete: begin


        SQL := FormatUTF8('delete from % where %=?',
          [fTableName,fStoredClassMapping^.RowIDFieldName]);
        n := BatchEnd-BatchBegin+1;
        if n+1>=max then begin
          n := max; // do not send too much items at once, for better speed
          BatchEnd := BatchBegin+max-1;
        end;
        SetLength(Values,1);
        SetLength(Values[0],n);
................................................................................
      raise ESQLDBException.CreateUTF8(
        '%.JSONDecodedPrepareToSQL(%): No column for [%] field in table %',
        [self,StoredClass,Decoder.FieldNames[f],fTableName]);
    Types[f] := fFieldsExternal[k].ColumnType;
  end;
  // compute SQL statement and associated bound parameters
  Decoder.DecodedFieldNames := pointer(ExternalFields);
  if (Occasion=soInsert) and not Assigned(fProperties.OnBatchInsert) and
     (fProperties.DBMS=dPostgreSQL) and (cCreate in fProperties.BatchSendingAbilities) then
    // enable SynDBPostgres bulk insert via 'insert into ... values (unnest...)'
    Decoder.DecodedFieldTypesToUnnest := @Types;
  result := Decoder.EncodeAsSQLPrepared(fTableName,Occasion,
    fStoredClassMapping^.RowIDFieldName,BatchOptions);
  if Occasion=soUpdate then
    if Decoder.FieldCount=MAX_SQLFIELDS then
      raise EParsingException.CreateUTF8('Too many fields for '+






>
>
|
|







 







|
<







1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
....
1893
1894
1895
1896
1897
1898
1899
1900

1901
1902
1903
1904
1905
1906
1907
              break; // do not send too much items at once, for better speed
          finally
            tmp.Done;
          end;
        end;
      end;
      mDelete: begin
        if fProperties.DBMS=dPostgreSQL then // for SynDBPostgres array binding
          SQL := 'delete from % where %=ANY(?)' else
          SQL := 'delete from % where %=?';
        SQL := FormatUTF8(SQL,[fTableName,fStoredClassMapping^.RowIDFieldName]);
        n := BatchEnd-BatchBegin+1;
        if n+1>=max then begin
          n := max; // do not send too much items at once, for better speed
          BatchEnd := BatchBegin+max-1;
        end;
        SetLength(Values,1);
        SetLength(Values[0],n);
................................................................................
      raise ESQLDBException.CreateUTF8(
        '%.JSONDecodedPrepareToSQL(%): No column for [%] field in table %',
        [self,StoredClass,Decoder.FieldNames[f],fTableName]);
    Types[f] := fFieldsExternal[k].ColumnType;
  end;
  // compute SQL statement and associated bound parameters
  Decoder.DecodedFieldNames := pointer(ExternalFields);
  if (fProperties.DBMS=dPostgreSQL) and not Assigned(fProperties.OnBatchInsert) then

    // enable SynDBPostgres bulk insert via 'insert into ... values (unnest...)'
    Decoder.DecodedFieldTypesToUnnest := @Types;
  result := Decoder.EncodeAsSQLPrepared(fTableName,Occasion,
    fStoredClassMapping^.RowIDFieldName,BatchOptions);
  if Occasion=soUpdate then
    if Decoder.FieldCount=MAX_SQLFIELDS then
      raise EParsingException.CreateUTF8('Too many fields for '+

Changes to SynDBPostgres.pas.

661
662
663
664
665
666
667
668


669
670
671
672
673
674
675
    GlobalUnLock;
  end;
  if PQ.IsThreadSafe <> 1 then
    raise ESQLDBPostgres.Create('libpq should be compiled in threadsafe mode');
  fDBMS := dPostgreSQL;
  FillOidMapping;
  inherited Create(aServerName, aDatabaseName, aUserID, aPassWord);
  // TSQLRestStorageExternal.JSONDecodedPrepareToSQL -> DecodedFieldTypesToUnnest


  fOnBatchInsert := nil;
end;

function TSQLDBPostgresConnectionProperties.NewConnection: TSQLDBConnection;
begin
  result := TSQLDBPostgresConnection.Create(self);
end;






|
>
>







661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
    GlobalUnLock;
  end;
  if PQ.IsThreadSafe <> 1 then
    raise ESQLDBPostgres.Create('libpq should be compiled in threadsafe mode');
  fDBMS := dPostgreSQL;
  FillOidMapping;
  inherited Create(aServerName, aDatabaseName, aUserID, aPassWord);
  // TSQLRestStorageExternal.JSONDecodedPrepareToSQL will detect it and set
  // DecodedFieldTypesToUnnest -> fast bulk insert/delete/update
  fBatchSendingAbilities := [cCreate, cDelete, cUpdate];
  fOnBatchInsert := nil;
end;

function TSQLDBPostgresConnectionProperties.NewConnection: TSQLDBConnection;
begin
  result := TSQLDBPostgresConnection.Create(self);
end;

Changes to SynopseCommit.inc.

1
'1.18.5864'
|
1
'1.18.5865'