1 module database.postgresql.connection; 2 3 import std.algorithm; 4 import std.array; 5 import std.conv : to; 6 import std.regex : ctRegex, matchFirst; 7 import std.string; 8 import std.traits; 9 import std.uni : sicmp; 10 import std.utf : decode, UseReplacementDchar; 11 import std.format; 12 import std.datetime; 13 14 import database.postgresql.exception; 15 import database.postgresql.packet; 16 import database.postgresql.protocol; 17 import database.postgresql.type; 18 import database.postgresql.socket; 19 import database.postgresql.row; 20 import database.postgresql.appender; 21 22 struct ConnectionStatus 23 { 24 bool ready; 25 TransactionStatus transaction = TransactionStatus.Idle; 26 27 ulong affected; 28 ulong lastInsertId; 29 } 30 31 struct ConnectionNotice 32 { 33 enum Severity : ubyte 34 { 35 ERROR = 1, 36 FATAL, 37 PANIC, 38 WARNING, 39 NOTICE, 40 DEBUG, 41 INFO, 42 LOG, 43 } 44 45 Severity severity; 46 uint position; 47 const(char)[] message; 48 const(char)[] code; 49 const(char)[] hint; 50 const(char)[] detail; 51 const(char)[] where; 52 const(char)[] schema; 53 const(char)[] table; 54 const(char)[] column; 55 const(char)[] type; 56 const(char)[] constraint; 57 58 string toString() const 59 { 60 auto writer = appender!string; 61 toString(writer); 62 return writer.data; 63 } 64 65 void toString(W)(ref W writer) const 66 { 67 writer.formattedWrite("%s(%s) %s", severity, code, message); 68 } 69 } 70 71 private struct ConnectionSettings 72 { 73 this(const(char)[] connectionString) 74 { 75 parse(connectionString); 76 } 77 78 void parse(const(char)[] connectionString) 79 { 80 auto remaining = connectionString; 81 82 auto indexValue = remaining.indexOf("="); 83 while (!remaining.empty) 84 { 85 auto indexValueEnd = remaining.indexOf(";", indexValue); 86 if (indexValueEnd <= 0) 87 indexValueEnd = remaining.length; 88 89 auto name = strip(remaining[0..indexValue]); 90 auto value = strip(remaining[indexValue+1..indexValueEnd]); 91 92 switch (name) 93 { 94 case "host": 95 host = value; 96 break; 97 case "user": 98 user = value; 99 break; 100 case "pwd": 101 pwd = value; 102 break; 103 case "db": 104 db = value; 105 break; 106 case "port": 107 port = to!ushort(value); 108 break; 109 default: 110 throw new PgSQLException(format("Bad connection string: %s", connectionString)); 111 } 112 113 if (indexValueEnd == remaining.length) 114 return; 115 116 remaining = remaining[indexValueEnd+1..$]; 117 indexValue = remaining.indexOf("="); 118 } 119 120 throw new PgSQLException(format("Bad connection string: %s", connectionString)); 121 } 122 123 ConnectionOptions options = ConnectionOptions.Default; 124 125 const(char)[] host; 126 const(char)[] user; 127 const(char)[] pwd; 128 const(char)[] db; 129 ushort port = 3306; 130 } 131 132 private struct ServerInfo 133 { 134 const(char)[] versionString; 135 const(char)[] encoding; 136 const(char)[] application; 137 const(char)[] timeZone; 138 139 uint processId; 140 uint cancellationKey; 141 } 142 143 @property string placeholders(size_t x, bool parens = true) 144 { 145 if (x) 146 { 147 auto app = appender!string; 148 if (parens) 149 { 150 app.reserve(x + x - 1); 151 152 app.put('('); 153 foreach (i; 0..x - 1) 154 app.put("?,"); 155 app.put('?'); 156 app.put(')'); 157 } 158 else 159 { 160 app.reserve(x + x + 1); 161 162 foreach (i; 0..x - 1) 163 app.put("?,"); 164 app.put('?'); 165 } 166 return app.data; 167 } 168 169 return null; 170 } 171 172 @property string placeholders(T)(T x, bool parens = true) if (is(typeof(() { auto y = x.length; }))) 173 { 174 return x.length.placeholders(parens); 175 } 176 177 enum ConnectionOptions 178 { 179 Default = 0 180 } 181 182 class Connection 183 { 184 this(string connectionString, ConnectionOptions options = ConnectionOptions.Default) 185 { 186 settings_ = ConnectionSettings(connectionString); 187 settings_.options = options; 188 connect(); 189 } 190 191 this(const(char)[] host, const(char)[] user, const(char)[] pwd, const(char)[] db, ushort port = 5432) 192 { 193 this(host, user, pwd, db, port, ConnectionOptions.Default); 194 } 195 196 this(const(char)[] host, const(char)[] user, const(char)[] pwd, const(char)[] db, ushort port = 5432, ConnectionOptions options = ConnectionOptions.Default) 197 { 198 settings_.host = host; 199 settings_.user = user; 200 settings_.pwd = pwd; 201 settings_.db = db; 202 settings_.port = port; 203 settings_.options = options; 204 205 connect(); 206 } 207 208 void ping() 209 { 210 } 211 212 const(char)[] schema() const 213 { 214 return schema_; 215 } 216 217 const(ConnectionNotice)[] notices() const 218 { 219 return notices_; 220 } 221 222 void execute(Args...)(const(char)[] sql, Args args) 223 { 224 query(sql, args); 225 } 226 227 void set(T)(const(char)[] variable, T value) 228 { 229 //query("set session ?=?", PgSQLFragment(variable), value); 230 } 231 232 const(char)[] get(const(char)[] variable) 233 { 234 const(char)[] result; 235 query("show session variables like ?", variable, (PgSQLRow row) { 236 result = row[1].peek!(const(char)[]).dup; 237 }); 238 239 return result; 240 } 241 242 void startTransaction() 243 { 244 if (inTransaction) 245 { 246 throw new PgSQLErrorException("PgSQL does not support nested transactions - commit or rollback before starting a new transaction"); 247 } 248 249 query("start transaction"); 250 251 assert(inTransaction); 252 } 253 254 void commit() 255 { 256 if (!inTransaction) 257 { 258 throw new PgSQLErrorException("No active transaction"); 259 } 260 261 query("commit"); 262 263 assert(!inTransaction); 264 } 265 266 void rollback() 267 { 268 if (connected) 269 { 270 if (status_.transaction != TransactionStatus.Inside) 271 { 272 throw new PgSQLErrorException("No active transaction"); 273 } 274 275 query("rollback"); 276 277 assert(!inTransaction); 278 } 279 } 280 281 @property bool inTransaction() const 282 { 283 return connected && (status_.transaction == TransactionStatus.Inside); 284 } 285 286 @property ulong lastInsertId() const 287 { 288 return status_.lastInsertId; 289 } 290 291 @property ulong affected() const 292 { 293 return status_.affected; 294 } 295 296 @property bool connected() const 297 { 298 return socket_.connected; 299 } 300 301 void close() 302 { 303 socket_.close(); 304 } 305 306 void reuse() 307 { 308 ensureConnected(); 309 310 if (inTransaction) 311 { 312 rollback; 313 } 314 } 315 316 package: 317 318 @property bool busy() 319 { 320 return busy_; 321 } 322 323 @property void busy(bool value) 324 { 325 busy_ = value; 326 } 327 328 @property bool pooled() 329 { 330 return pooled_; 331 } 332 333 @property void pooled(bool value) 334 { 335 pooled_ = value; 336 } 337 338 @property DateTime releaseTime() 339 { 340 return releaseTime_; 341 } 342 343 @property void releaseTime(DateTime value) 344 { 345 releaseTime_ = value; 346 } 347 348 private: 349 350 void close_() 351 { 352 close(); 353 } 354 355 void query(Args...)(const(char)[] sql, Args args) 356 { 357 //scope(failure) close_(); 358 359 static if (args.length == 0) 360 { 361 enum shouldDiscard = true; 362 } 363 else 364 { 365 enum shouldDiscard = !isCallable!(args[args.length - 1]); 366 } 367 368 enum argCount = shouldDiscard ? args.length : (args.length - 1); 369 370 static if (argCount) 371 { 372 auto querySQL = prepareSQL(sql, args[0..argCount]); 373 } 374 else 375 { 376 auto querySQL = sql; 377 } 378 379 send(querySQL); 380 381 auto answer = retrieve(); 382 if (isStatus(answer)) 383 { 384 eatStatuses(answer); 385 } 386 else 387 { 388 static if (!shouldDiscard) 389 { 390 resultSetText(answer, args[args.length - 1]); 391 } 392 else 393 { 394 discardAll(answer); 395 } 396 } 397 } 398 399 void connect() 400 { 401 socket_.connect(settings_.host, settings_.port); 402 403 auto startup = OutputPacket(&out_); 404 startup.put!uint(0x00030000); 405 startup.putz("user"); 406 startup.putz(settings_.user); 407 if (!settings_.db.empty()) 408 { 409 startup.putz("database"); 410 startup.putz(settings_.db); 411 } 412 startup.put!ubyte(0); 413 startup.finalize(0); 414 415 socket_.write(startup.get()); 416 417 if (eatAuth(retrieve())) 418 eatAuth(retrieve()); 419 eatStatuses(retrieve()); 420 } 421 422 void send(Args...)(Args args) 423 { 424 ensureConnected(); 425 426 auto cmd = OutputPacket(OutputMessageType.Query, &out_); 427 foreach (ref arg; args) 428 cmd.put!(const(char[]))(arg); 429 cmd.put!ubyte(0); 430 cmd.finalize(); 431 432 socket_.write(cmd.get()); 433 } 434 435 void ensureConnected() 436 { 437 if (!socket_.connected) 438 { 439 connect(); 440 } 441 } 442 443 bool isStatus(InputPacket packet) 444 { 445 auto id = packet.type; 446 447 switch (id) with (InputMessageType) 448 { 449 case ErrorResponse: 450 case NoticeResponse: 451 case ReadyForQuery: 452 case NotificationResponse: 453 case CommandComplete: 454 return true; 455 default: 456 return false; 457 } 458 } 459 460 InputPacket retrieve(ubyte control) 461 { 462 //scope(failure) close_(); 463 464 ubyte[4] header; 465 socket_.read(header); 466 467 auto len = native!uint(header.ptr) - 4; 468 in_.length = len; 469 socket_.read(in_); 470 471 if (in_.length != len) 472 { 473 throw new PgSQLConnectionException("Wrong number of bytes read"); 474 } 475 476 return InputPacket(control, &in_); 477 } 478 479 InputPacket retrieve() 480 { 481 //scope(failure) close_(); 482 483 ubyte[5] header; 484 socket_.read(header); 485 486 auto len = native!uint(header.ptr + 1) - 4; 487 in_.length = len; 488 socket_.read(in_); 489 490 if (in_.length != len) 491 { 492 throw new PgSQLConnectionException("Wrong number of bytes read"); 493 } 494 495 return InputPacket(header[0], &in_); 496 } 497 498 bool eatAuth(InputPacket packet) 499 { 500 //scope(failure) close_(); 501 502 auto type = cast(InputMessageType)packet.type; 503 504 switch (type) with (InputMessageType) 505 { 506 case Authentication: 507 auto auth = packet.eat!uint; 508 auto reply = OutputPacket(OutputMessageType.PasswordMessage, &out_); 509 510 switch (auth) 511 { 512 case 0: 513 return false; 514 case 2: 515 goto default; 516 case 3: 517 reply.putz(settings_.pwd); 518 break; 519 case 5: 520 static char[32] MD5toHex(T...)(in T data) 521 { 522 import std.ascii : LetterCase; 523 import std.digest.md : md5Of, toHexString; 524 return md5Of(data).toHexString!(LetterCase.lower); 525 } 526 527 auto salt = packet.eat!(ubyte[])(4); 528 reply.put("md5"); 529 reply.putz(MD5toHex(MD5toHex(settings_.pwd, settings_.user), salt)); 530 break; 531 case 6: // SCM 532 case 7: // GSS 533 case 8: 534 case 9: 535 case 10: // SASL 536 case 11: 537 case 12: 538 goto default; 539 default: 540 throw new PgSQLProtocolException(format("Unsupported authentication method: %s", auth)); 541 } 542 543 reply.finalize(0); 544 socket_.write(reply.get()); 545 break; 546 case NoticeResponse: 547 eatNoticeResponse(packet); 548 break; 549 case ErrorResponse: 550 eatNoticeResponse(packet); 551 throwError(true); 552 break; 553 default: 554 throw new PgSQLProtocolException(format("Unexpected message: %s", type)); 555 } 556 557 return true; 558 } 559 560 void eatParameterStatus(InputPacket packet) 561 { 562 assert(packet.type == InputMessageType.ParameterStatus); 563 auto name = packet.eatz(); 564 auto value = packet.eatz(); 565 566 switch (name) 567 { 568 case "server_version": 569 server_.versionString = value.dup; 570 break; 571 case "server_encoding": 572 server_.encoding = value.dup; 573 break; 574 case "application_name": 575 server_.application = value.dup; 576 break; 577 case "TimeZone": 578 server_.timeZone = value.dup; 579 break; 580 default: 581 break; 582 } 583 assert(packet.empty()); 584 } 585 586 void eatBackendKeyData(InputPacket packet) 587 { 588 assert(packet.type == InputMessageType.BackendKeyData); 589 590 server_.processId = packet.eat!uint; 591 server_.cancellationKey = packet.eat!uint; 592 } 593 594 void eatNoticeResponse(InputPacket packet) 595 { 596 assert(packet.type == InputMessageType.NoticeResponse || packet.type == InputMessageType.ErrorResponse); 597 598 ConnectionNotice notice; 599 auto field = packet.eat!ubyte; 600 while (field) 601 { 602 auto value = packet.eatz(); 603 import database.postgresql.row : hashOf; 604 605 switch (field) with (NoticeMessageField) 606 { 607 case Severity: 608 case SeverityLocal: 609 switch (hashOf(value)) with (ConnectionNotice.Severity) 610 { 611 case hashOf("ERROR"): 612 notice.severity = ERROR; 613 break; 614 case hashOf("FATAL"): 615 notice.severity = FATAL; 616 break; 617 case hashOf("PANIC"): 618 notice.severity = PANIC; 619 break; 620 case hashOf("WARNING"): 621 notice.severity = WARNING; 622 break; 623 case hashOf("DEBUG"): 624 notice.severity = DEBUG; 625 break; 626 case hashOf("INFO"): 627 notice.severity = INFO; 628 break; 629 case hashOf("LOG"): 630 notice.severity = LOG; 631 break; 632 default: 633 break; 634 } 635 break; 636 case Code: 637 notice.code = value.idup; 638 break; 639 case Message: 640 notice.message = value.idup; 641 break; 642 case Detail: 643 notice.detail = value.idup; 644 break; 645 case Hint: 646 notice.hint = value.idup; 647 break; 648 case Position: 649 notice.position = value.to!uint; 650 break; 651 case Where: 652 notice.where = value.idup; 653 break; 654 case Schema: 655 notice.schema = value.idup; 656 break; 657 case Table: 658 notice.table = value.idup; 659 break; 660 case Column: 661 notice.column = value.idup; 662 break; 663 case DataType: 664 notice.type = value.idup; 665 break; 666 case Constraint: 667 notice.constraint = value.idup; 668 break; 669 case File: 670 case Line: 671 case Routine: 672 break; 673 default: 674 //writeln(" notice: ", cast(char)field, " ", value); 675 break; 676 } 677 field = packet.eat!ubyte; 678 } 679 680 notices_ ~= notice; 681 } 682 683 void eatCommandComplete(InputPacket packet) 684 { 685 assert(packet.type == InputMessageType.CommandComplete); 686 import database.postgresql.row : hashOf; 687 688 auto tag = packet.eatz().splitter(' '); 689 auto command = tag.front(); 690 tag.popFront(); 691 692 switch (hashOf(command)) 693 { 694 case hashOf("INSERT"): 695 status_.lastInsertId = tag.front().to!ulong; 696 tag.popFront(); 697 status_.affected = tag.front().to!ulong; 698 break; 699 case hashOf("SELECT"): 700 case hashOf("DELETE"): 701 case hashOf("UPDATE"): 702 case hashOf("MOVE"): 703 case hashOf("FETCH"): 704 case hashOf("COPY"): 705 status_.lastInsertId = 0; 706 status_.affected = tag.empty() ? 0 : tag.front().to!ulong; 707 break; 708 case hashOf("CREATE"): 709 case hashOf("DROP"): 710 status_.lastInsertId = 0; 711 break; 712 default: 713 throw new PgSQLProtocolException(format("Unexpected command tag: %s", command)); 714 } 715 } 716 717 auto eatStatus(InputPacket packet) 718 { 719 auto type = cast(InputMessageType)packet.type(); 720 721 switch (type) with (InputMessageType) 722 { 723 case ParameterStatus: 724 eatParameterStatus(packet); 725 break; 726 case BackendKeyData: 727 eatBackendKeyData(packet); 728 break; 729 case ReadyForQuery: 730 status_.transaction = cast(TransactionStatus)packet.eat!ubyte; 731 status_.ready = true; 732 break; 733 case NoticeResponse: 734 eatNoticeResponse(packet); 735 break; 736 case ErrorResponse: 737 eatNoticeResponse(packet); 738 throwError(true); 739 break; 740 case CommandComplete: 741 eatCommandComplete(packet); 742 break; 743 default: 744 throw new PgSQLProtocolException(format("Unexpected message: %s", type)); 745 } 746 747 return type; 748 } 749 750 void throwError(bool force) 751 { 752 foreach (ref notice; notices_) 753 { 754 switch (notice.severity) with (ConnectionNotice.Severity) 755 { 756 case PANIC: 757 case ERROR: 758 case FATAL: 759 throw new PgSQLErrorException(cast(string)notice.message); 760 default: 761 break; 762 } 763 } 764 765 if (force) 766 throw new PgSQLErrorException(cast(string)notices_.front().message); 767 } 768 769 void eatStatuses(InputPacket packet) 770 { 771 notices_.length = 0; 772 773 auto status = eatStatus(packet); 774 while (status != InputMessageType.ReadyForQuery) 775 status = eatStatus(retrieve()); 776 } 777 778 void skipColumnDef(ref InputPacket packet) 779 { 780 packet.skipz(); 781 packet.skip(18); 782 } 783 784 void columnDef(ref InputPacket packet, ref PgSQLColumn def) 785 { 786 auto name = packet.eatz(); 787 columns_ ~= name; 788 def.name = columns_[$-name.length..$]; 789 790 packet.skip(6); 791 792 def.type = cast(PgColumnTypes)packet.eat!uint; 793 def.length = packet.eat!short; 794 def.modifier = packet.eat!int; 795 def.format = cast(FormatCode)packet.eat!short; 796 } 797 798 void columnDefs(size_t count, ref PgSQLColumn[] defs, ref InputPacket packet) 799 { 800 defs.length = count; 801 foreach (i; 0..count) 802 columnDef(packet, defs[i]); 803 } 804 805 bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLRow)) 806 { 807 static if (is(ReturnType!(RowHandler) == void)) 808 { 809 handler(row); 810 return true; 811 } 812 else 813 { 814 return handler(row); // return type must be bool 815 } 816 } 817 818 bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow)) 819 { 820 static if (is(ReturnType!(RowHandler) == void)) 821 { 822 handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); 823 return true; 824 } 825 else 826 { 827 return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool 828 } 829 } 830 831 bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow)) 832 { 833 static if (is(ReturnType!(RowHandler) == void)) 834 { 835 handler(header, row); 836 return true; 837 } 838 else 839 { 840 return handler(header, row); // return type must be bool 841 } 842 } 843 844 bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == PgSQLRow)) 845 { 846 static if (is(ReturnType!(RowHandler) == void)) 847 { 848 handler(i, header, row); 849 return true; 850 } 851 else 852 { 853 return handler(i, header, row); // return type must be bool 854 } 855 } 856 857 void resultSetRowText(InputPacket packet, PgSQLHeader header, ref PgSQLRow row) 858 { 859 assert(row.columns.length == header.length); 860 861 assert(packet.type == InputMessageType.DataRow); 862 const rowlen = packet.eat!ushort(); 863 864 foreach(i, ref column; header) 865 { 866 if (i < rowlen) 867 { 868 if (column.format == FormatCode.Text) 869 { 870 eatValueText(packet, column, row.get_(i)); 871 } 872 else 873 { 874 assert(false); 875 } 876 } 877 else 878 { 879 row.get_(i) = PgSQLValue(column.name, PgColumnTypes.NULL, null, 0); 880 } 881 } 882 assert(packet.empty); 883 } 884 885 void resultSetText(RowHandler)(InputPacket packet, RowHandler handler) 886 { 887 columns_.length = 0; 888 889 auto columns = cast(size_t)packet.eat!ushort; 890 891 columnDefs(columns, header_, packet); 892 row_.header_(header_); 893 894 size_t index; 895 while (true) 896 { 897 auto row = retrieve(); 898 if (isStatus(row)) 899 { 900 eatStatuses(row); 901 break; 902 } 903 904 resultSetRowText(row, header_, row_); 905 if (!callHandler(handler, index++, header_, row_)) 906 { 907 discardUntilStatus(); 908 break; 909 } 910 } 911 } 912 913 void discardAll(InputPacket packet) 914 { 915 auto columns = cast(size_t)packet.eatLenEnc(); 916 columnDefs(columns, header_, packet); 917 918 discardUntilStatus(); 919 } 920 921 void discardUntilStatus() 922 { 923 while (true) 924 { 925 auto row = retrieve(); 926 if (isStatus(row)) 927 { 928 eatStatuses(row); 929 break; 930 } 931 } 932 } 933 934 auto prepareSQL(Args...)(const(char)[] sql, Args args) 935 { 936 auto estimated = sql.length; 937 size_t argCount; 938 939 foreach(i, arg; args) 940 { 941 static if (is(typeof(arg) == typeof(null))) 942 { 943 ++argCount; 944 estimated += 4; 945 } 946 else static if (is(Unqual!(typeof(arg)) == PgSQLValue)) 947 { 948 ++argCount; 949 final switch(arg.type) with (PgColumnTypes) 950 { 951 case NULL: 952 estimated += 4; 953 break; 954 case CHAR: 955 estimated += 2; 956 break; 957 case BOOL: 958 estimated += 2; 959 break; 960 case INT2: 961 estimated += 6; 962 break; 963 case INT4: 964 estimated += 7; 965 break; 966 case INT8: 967 estimated += 15; 968 break; 969 case REAL: 970 case DOUBLE: 971 estimated += 8; 972 break; 973 case UNKNOWN: 974 case MONEY: 975 case POINT: 976 case LINE: 977 case LSEG: 978 case PATH: 979 case POLYGON: 980 case TINTERVAL: 981 case CIRCLE: 982 case BOX: 983 case JSON: 984 case JSONB: 985 case XML: 986 case MACADDR: 987 case MACADDR8: 988 case INET: 989 case CIDR: 990 case NAME: 991 case TEXT: 992 case INTERVAL: 993 case BIT: 994 case VARBIT: 995 case NUMERIC: 996 case UUID: 997 case CHARA: 998 case BYTEA: 999 case VARCHAR: 1000 estimated += 4 + arg.peek!(const(char)[]).length; 1001 break; 1002 case DATE: 1003 estimated += 10; 1004 break; 1005 case TIME: 1006 case TIMETZ: 1007 estimated += 22; 1008 break; 1009 case TIMESTAMP: 1010 case TIMESTAMPTZ: 1011 estimated += 30; 1012 break; 1013 } 1014 } 1015 else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg))) 1016 { 1017 argCount += arg.length; 1018 estimated += arg.length * 6; 1019 } 1020 else static if (isSomeString!(typeof(arg)) || is(Unqual!(typeof(arg)) == PgSQLRawString) || is(Unqual!(typeof(arg)) == PgSQLFragment) || is(Unqual!(typeof(arg)) == PgSQLBinary)) 1021 { 1022 ++argCount; 1023 estimated += 2 + arg.length; 1024 } 1025 else 1026 { 1027 ++argCount; 1028 estimated += 6; 1029 } 1030 } 1031 1032 sql_.clear; 1033 sql_.reserve(max(8192, estimated)); 1034 1035 alias AppendFunc = bool function(ref Appender!(char[]), ref const(char)[] sql, ref size_t, const(void)*) @safe pure nothrow; 1036 AppendFunc[Args.length] funcs; 1037 const(void)*[Args.length] addrs; 1038 1039 foreach (i, Arg; Args) 1040 { 1041 static if (is(Arg == enum)) 1042 { 1043 funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(OriginalType!Arg); }(); 1044 addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(cast(OriginalType!(Unqual!Arg))args[i]); 1045 } 1046 else 1047 { 1048 funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(Arg); }(); 1049 addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(args[i]); 1050 } 1051 } 1052 1053 size_t indexArg; 1054 foreach (i; 0..Args.length) 1055 { 1056 if (!funcs[i](sql_, sql, indexArg, addrs[i])) 1057 throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg)); 1058 } 1059 1060 if (copyUpToNext(sql_, sql)) 1061 { 1062 ++indexArg; 1063 while (copyUpToNext(sql_, sql)) 1064 ++indexArg; 1065 throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg)); 1066 } 1067 1068 return sql_.data; 1069 } 1070 1071 Socket socket_; 1072 PgSQLHeader header_; 1073 PgSQLRow row_; 1074 char[] columns_; 1075 char[] schema_; 1076 ubyte[] in_; 1077 ubyte[] out_; 1078 ubyte seq_; 1079 Appender!(char[]) sql_; 1080 1081 ConnectionStatus status_; 1082 ConnectionSettings settings_; 1083 ServerInfo server_; 1084 1085 ConnectionNotice[] notices_; 1086 1087 bool busy_; 1088 bool pooled_; 1089 DateTime releaseTime_; 1090 } 1091 1092 private auto copyUpToNext(ref Appender!(char[]) app, ref const(char)[] sql) 1093 { 1094 size_t offset; 1095 dchar quote = '\0'; 1096 1097 while (offset < sql.length) 1098 { 1099 auto ch = decode!(UseReplacementDchar.no)(sql, offset); 1100 switch (ch) 1101 { 1102 case '?': 1103 if (!quote) 1104 { 1105 app.put(sql[0..offset - 1]); 1106 sql = sql[offset..$]; 1107 return true; 1108 } 1109 else 1110 { 1111 goto default; 1112 } 1113 case '\'': 1114 case '\"': 1115 case '`': 1116 if (quote == ch) 1117 { 1118 quote = '\0'; 1119 } 1120 else if (!quote) 1121 { 1122 quote = ch; 1123 } 1124 goto default; 1125 case '\\': 1126 if (quote && (offset < sql.length)) 1127 decode!(UseReplacementDchar.no)(sql, offset); 1128 goto default; 1129 default: 1130 break; 1131 } 1132 } 1133 app.put(sql[0..offset]); 1134 sql = sql[offset..$]; 1135 return false; 1136 } 1137 1138 private bool appendNextValue(T)(ref Appender!(char[]) app, ref const(char)[] sql, ref size_t indexArg, const(void)* arg) 1139 { 1140 static if (isArray!T && !isSomeString!(OriginalType!T)) 1141 { 1142 foreach (i, ref v; *cast(T*)arg) 1143 { 1144 if (copyUpToNext(app, sql)) 1145 { 1146 appendValue(app, v); 1147 ++indexArg; 1148 } 1149 else 1150 { 1151 return false; 1152 } 1153 } 1154 } 1155 else 1156 { 1157 if (copyUpToNext(app, sql)) 1158 { 1159 appendValue(app, *cast(T*)arg); 1160 ++indexArg; 1161 } 1162 else 1163 { 1164 return false; 1165 } 1166 } 1167 1168 return true; 1169 }