1 module database.postgresql.connection;
2 
3 import std.algorithm;
4 import std.array;
5 import std.conv : to;
6 import std.regex : ctRegex, matchFirst;
7 import std.string;
8 import std.traits;
9 import std.uni : sicmp;
10 import std.utf : decode, UseReplacementDchar;
11 import std.format;
12 import std.datetime;
13 
14 import database.postgresql.exception;
15 import database.postgresql.packet;
16 import database.postgresql.protocol;
17 import database.postgresql.type;
18 import database.postgresql.socket;
19 import database.postgresql.row;
20 import database.postgresql.appender;
21 
22 struct ConnectionStatus
23 {
24     bool ready;
25     TransactionStatus transaction = TransactionStatus.Idle;
26 
27     ulong affected;
28     ulong lastInsertId;
29 }
30 
31 struct ConnectionNotice
32 {
33     enum Severity : ubyte
34     {
35         ERROR = 1,
36         FATAL,
37         PANIC,
38         WARNING,
39         NOTICE,
40         DEBUG,
41         INFO,
42         LOG,
43     }
44 
45     Severity severity;
46     uint position;
47     const(char)[] message;
48     const(char)[] code;
49     const(char)[] hint;
50     const(char)[] detail;
51     const(char)[] where;
52     const(char)[] schema;
53     const(char)[] table;
54     const(char)[] column;
55     const(char)[] type;
56     const(char)[] constraint;
57 
58     string toString() const
59     {
60         auto writer = appender!string;
61         toString(writer);
62         return writer.data;
63     }
64 
65     void toString(W)(ref W writer) const
66     {
67         writer.formattedWrite("%s(%s) %s", severity, code, message);
68     }
69 }
70 
71 private struct ConnectionSettings
72 {
73     this(const(char)[] connectionString)
74     {
75         parse(connectionString);
76     }
77 
78     void parse(const(char)[] connectionString)
79     {
80         auto remaining = connectionString;
81 
82         auto indexValue = remaining.indexOf("=");
83         while (!remaining.empty)
84         {
85             auto indexValueEnd = remaining.indexOf(";", indexValue);
86             if (indexValueEnd <= 0)
87                 indexValueEnd = remaining.length;
88 
89             auto name = strip(remaining[0..indexValue]);
90             auto value = strip(remaining[indexValue+1..indexValueEnd]);
91 
92             switch (name)
93             {
94                 case "host":
95                     host = value;
96                     break;
97                 case "user":
98                     user = value;
99                     break;
100                 case "pwd":
101                     pwd = value;
102                     break;
103                 case "db":
104                     db = value;
105                     break;
106                 case "port":
107                     port = to!ushort(value);
108                     break;
109                 default:
110                     throw new PgSQLException(format("Bad connection string: %s", connectionString));
111             }
112 
113             if (indexValueEnd == remaining.length)
114                 return;
115 
116             remaining = remaining[indexValueEnd+1..$];
117             indexValue = remaining.indexOf("=");
118         }
119 
120         throw new PgSQLException(format("Bad connection string: %s", connectionString));
121     }
122 
123     ConnectionOptions options = ConnectionOptions.Default;
124 
125     const(char)[] host;
126     const(char)[] user;
127     const(char)[] pwd;
128     const(char)[] db;
129     ushort port = 3306;
130 }
131 
132 private struct ServerInfo
133 {
134     const(char)[] versionString;
135     const(char)[] encoding;
136     const(char)[] application;
137     const(char)[] timeZone;
138 
139     uint processId;
140     uint cancellationKey;
141 }
142 
143 @property string placeholders(size_t x, bool parens = true)
144 {
145     if (x)
146     {
147         auto app = appender!string;
148         if (parens)
149         {
150             app.reserve(x + x - 1);
151 
152             app.put('(');
153             foreach (i; 0..x - 1)
154                 app.put("?,");
155             app.put('?');
156             app.put(')');
157         }
158         else
159         {
160             app.reserve(x + x + 1);
161 
162             foreach (i; 0..x - 1)
163                 app.put("?,");
164             app.put('?');
165         }
166         return app.data;
167     }
168 
169     return null;
170 }
171 
172 @property string placeholders(T)(T x, bool parens = true) if (is(typeof(() { auto y = x.length; })))
173 {
174     return x.length.placeholders(parens);
175 }
176 
177 enum ConnectionOptions
178 {
179     Default        = 0
180 }
181 
182 class Connection
183 {
184     this(string connectionString, ConnectionOptions options = ConnectionOptions.Default)
185     {
186         settings_ = ConnectionSettings(connectionString);
187         settings_.options = options;
188         connect();
189     }
190 
191     this(const(char)[] host, const(char)[] user, const(char)[] pwd, const(char)[] db, ushort port = 5432)
192     {
193         this(host, user, pwd, db, port, ConnectionOptions.Default);
194     }
195 
196     this(const(char)[] host, const(char)[] user, const(char)[] pwd, const(char)[] db, ushort port = 5432, ConnectionOptions options = ConnectionOptions.Default)
197     {
198         settings_.host = host;
199         settings_.user = user;
200         settings_.pwd = pwd;
201         settings_.db = db;
202         settings_.port = port;
203         settings_.options = options;
204 
205         connect();
206     }
207 
208     void ping()
209     {
210     }
211 
212     const(char)[] schema() const
213     {
214         return schema_;
215     }
216 
217     const(ConnectionNotice)[] notices() const
218     {
219         return notices_;
220     }
221 
222     void execute(Args...)(const(char)[] sql, Args args)
223     {
224         query(sql, args);
225     }
226 
227     void set(T)(const(char)[] variable, T value)
228     {
229         //query("set session ?=?", PgSQLFragment(variable), value);
230     }
231 
232     const(char)[] get(const(char)[] variable)
233     {
234         const(char)[] result;
235         query("show session variables like ?", variable, (PgSQLRow row) {
236             result = row[1].peek!(const(char)[]).dup;
237         });
238 
239         return result;
240     }
241 
242     void startTransaction()
243     {
244         if (inTransaction)
245         {
246             throw new PgSQLErrorException("PgSQL does not support nested transactions - commit or rollback before starting a new transaction");
247         }
248 
249         query("start transaction");
250 
251         assert(inTransaction);
252     }
253 
254     void commit()
255     {
256         if (!inTransaction)
257         {
258             throw new PgSQLErrorException("No active transaction");
259         }
260 
261         query("commit");
262 
263         assert(!inTransaction);
264     }
265 
266     void rollback()
267     {
268         if (connected)
269         {
270             if (status_.transaction != TransactionStatus.Inside)
271             {
272                 throw new PgSQLErrorException("No active transaction");
273             }
274 
275             query("rollback");
276 
277             assert(!inTransaction);
278         }
279     }
280 
281     @property bool inTransaction() const
282     {
283         return connected && (status_.transaction == TransactionStatus.Inside);
284     }
285 
286     @property ulong lastInsertId() const
287     {
288         return status_.lastInsertId;
289     }
290 
291     @property ulong affected() const
292     {
293         return status_.affected;
294     }
295 
296     @property bool connected() const
297     {
298         return socket_.connected;
299     }
300 
301     void close()
302     {
303         socket_.close();
304     }
305 
306     void reuse()
307     {
308         ensureConnected();
309 
310         if (inTransaction)
311         {
312             rollback;
313         }
314     }
315 
316 package:
317 
318     @property bool busy()
319     {
320         return busy_;
321     }
322 
323     @property void busy(bool value)
324     {
325         busy_ = value;
326     }
327 
328     @property bool pooled()
329     {
330         return pooled_;
331     }
332 
333     @property void pooled(bool value)
334     {
335         pooled_ = value;
336     }
337 
338     @property DateTime releaseTime()
339     {
340         return releaseTime_;
341     }
342 
343     @property void releaseTime(DateTime value)
344     {
345         releaseTime_ = value;
346     }
347 
348 private:
349 
350     void close_()
351     {
352         close();
353     }
354 
355     void query(Args...)(const(char)[] sql, Args args)
356     {
357         //scope(failure) close_();
358 
359         static if (args.length == 0)
360         {
361             enum shouldDiscard = true;
362         }
363         else
364         {
365             enum shouldDiscard = !isCallable!(args[args.length - 1]);
366         }
367 
368         enum argCount = shouldDiscard ? args.length : (args.length - 1);
369 
370         static if (argCount)
371         {
372             auto querySQL = prepareSQL(sql, args[0..argCount]);
373         }
374         else
375         {
376             auto querySQL = sql;
377         }
378 
379         send(querySQL);
380 
381         auto answer = retrieve();
382         if (isStatus(answer))
383         {
384             eatStatuses(answer);
385         }
386         else
387         {
388             static if (!shouldDiscard)
389             {
390                 resultSetText(answer, args[args.length - 1]);
391             }
392             else
393             {
394                 discardAll(answer);
395             }
396         }
397     }
398 
399     void connect()
400     {
401         socket_.connect(settings_.host, settings_.port);
402 
403         auto startup = OutputPacket(&out_);
404         startup.put!uint(0x00030000);
405         startup.putz("user");
406         startup.putz(settings_.user);
407         if (!settings_.db.empty())
408         {
409             startup.putz("database");
410             startup.putz(settings_.db);
411         }
412         startup.put!ubyte(0);
413         startup.finalize(0);
414 
415         socket_.write(startup.get());
416 
417         if (eatAuth(retrieve()))
418             eatAuth(retrieve());
419         eatStatuses(retrieve());
420     }
421 
422     void send(Args...)(Args args)
423     {
424         ensureConnected();
425 
426         auto cmd = OutputPacket(OutputMessageType.Query, &out_);
427         foreach (ref arg; args)
428             cmd.put!(const(char[]))(arg);
429         cmd.put!ubyte(0);
430         cmd.finalize();
431 
432         socket_.write(cmd.get());
433     }
434 
435     void ensureConnected()
436     {
437         if (!socket_.connected)
438         {
439             connect();
440         }
441     }
442 
443     bool isStatus(InputPacket packet)
444     {
445         auto id = packet.type;
446 
447         switch (id) with (InputMessageType)
448         {
449             case ErrorResponse:
450             case NoticeResponse:
451             case ReadyForQuery:
452             case NotificationResponse:
453             case CommandComplete:
454                 return true;
455             default:
456                 return false;
457         }
458     }
459 
460     InputPacket retrieve(ubyte control)
461     {
462         //scope(failure) close_();
463 
464         ubyte[4] header;
465         socket_.read(header);
466 
467         auto len = native!uint(header.ptr) - 4;
468         in_.length = len;
469         socket_.read(in_);
470 
471         if (in_.length != len)
472         {
473             throw new PgSQLConnectionException("Wrong number of bytes read");
474         }
475 
476         return InputPacket(control, &in_);
477     }
478 
479     InputPacket retrieve()
480     {
481         //scope(failure) close_();
482 
483         ubyte[5] header;
484         socket_.read(header);
485 
486         auto len = native!uint(header.ptr + 1) - 4;
487         in_.length = len;
488         socket_.read(in_);
489 
490         if (in_.length != len)
491         {
492             throw new PgSQLConnectionException("Wrong number of bytes read");
493         }
494 
495         return InputPacket(header[0], &in_);
496     }
497 
498     bool eatAuth(InputPacket packet)
499     {
500         //scope(failure) close_();
501 
502         auto type = cast(InputMessageType)packet.type;
503 
504         switch (type) with (InputMessageType)
505         {
506             case Authentication:
507                 auto auth = packet.eat!uint;
508                 auto reply = OutputPacket(OutputMessageType.PasswordMessage, &out_);
509 
510                 switch (auth)
511                 {
512                     case 0:
513                         return false;
514                     case 2:
515                         goto default;
516                     case 3:
517                         reply.putz(settings_.pwd);
518                         break;
519                     case 5:
520                         static char[32] MD5toHex(T...)(in T data)
521                         {
522                             import std.ascii : LetterCase;
523                             import std.digest.md : md5Of, toHexString;
524                             return md5Of(data).toHexString!(LetterCase.lower);
525                         }
526 
527                         auto salt = packet.eat!(ubyte[])(4);
528                         reply.put("md5");
529                         reply.putz(MD5toHex(MD5toHex(settings_.pwd, settings_.user), salt));
530                         break;
531                     case 6: // SCM
532                     case 7: // GSS
533                     case 8:
534                     case 9:
535                     case 10: // SASL
536                     case 11:
537                     case 12:
538                         goto default;
539                     default:
540                         throw new PgSQLProtocolException(format("Unsupported authentication method: %s", auth));
541                 }
542 
543                 reply.finalize(0);
544                 socket_.write(reply.get());
545                 break;
546             case NoticeResponse:
547                 eatNoticeResponse(packet);
548                 break;
549             case ErrorResponse:
550                 eatNoticeResponse(packet);
551                 throwError(true);
552                 break;
553             default:
554                 throw new PgSQLProtocolException(format("Unexpected message: %s", type));
555         }
556 
557         return true;
558     }
559 
560     void eatParameterStatus(InputPacket packet)
561     {
562         assert(packet.type == InputMessageType.ParameterStatus);
563         auto name = packet.eatz();
564         auto value = packet.eatz();
565 
566         switch (name)
567         {
568             case "server_version":
569                 server_.versionString = value.dup;
570                 break;
571             case "server_encoding":
572                 server_.encoding = value.dup;
573                 break;
574             case "application_name":
575                 server_.application = value.dup;
576                 break;
577             case "TimeZone":
578                 server_.timeZone = value.dup;
579                 break;
580             default:
581                 break;
582         }
583         assert(packet.empty());
584     }
585 
586     void eatBackendKeyData(InputPacket packet)
587     {
588         assert(packet.type == InputMessageType.BackendKeyData);
589 
590         server_.processId = packet.eat!uint;
591         server_.cancellationKey = packet.eat!uint;
592     }
593 
594     void eatNoticeResponse(InputPacket packet)
595     {
596         assert(packet.type == InputMessageType.NoticeResponse || packet.type == InputMessageType.ErrorResponse);
597 
598         ConnectionNotice notice;
599         auto field = packet.eat!ubyte;
600         while (field)
601         {
602             auto value = packet.eatz();
603             import database.postgresql.row : hashOf;
604 
605             switch (field) with (NoticeMessageField)
606             {
607                 case Severity:
608                 case SeverityLocal:
609                     switch (hashOf(value)) with (ConnectionNotice.Severity)
610                     {
611                         case hashOf("ERROR"):
612                             notice.severity = ERROR;
613                             break;
614                         case hashOf("FATAL"):
615                             notice.severity = FATAL;
616                             break;
617                         case hashOf("PANIC"):
618                             notice.severity = PANIC;
619                             break;
620                         case hashOf("WARNING"):
621                             notice.severity = WARNING;
622                             break;
623                         case hashOf("DEBUG"):
624                             notice.severity = DEBUG;
625                             break;
626                         case hashOf("INFO"):
627                             notice.severity = INFO;
628                             break;
629                         case hashOf("LOG"):
630                             notice.severity = LOG;
631                             break;
632                         default:
633                             break;
634                     }
635                     break;
636                 case Code:
637                     notice.code = value.idup;
638                     break;
639                 case Message:
640                     notice.message = value.idup;
641                     break;
642                 case Detail:
643                     notice.detail = value.idup;
644                     break;
645                 case Hint:
646                     notice.hint = value.idup;
647                     break;
648                 case Position:
649                     notice.position = value.to!uint;
650                     break;
651                 case Where:
652                     notice.where = value.idup;
653                     break;
654                 case Schema:
655                     notice.schema = value.idup;
656                     break;
657                 case Table:
658                     notice.table = value.idup;
659                     break;
660                 case Column:
661                     notice.column = value.idup;
662                     break;
663                 case DataType:
664                     notice.type = value.idup;
665                     break;
666                 case Constraint:
667                     notice.constraint = value.idup;
668                     break;
669                 case File:
670                 case Line:
671                 case Routine:
672                     break;
673                 default:
674                     //writeln("  notice: ", cast(char)field, " ", value);
675                     break;
676             }
677             field = packet.eat!ubyte;
678         }
679 
680         notices_ ~= notice;
681     }
682 
683     void eatCommandComplete(InputPacket packet)
684     {
685         assert(packet.type == InputMessageType.CommandComplete);
686         import database.postgresql.row : hashOf;
687 
688         auto tag = packet.eatz().splitter(' ');
689         auto command = tag.front();
690         tag.popFront();
691 
692         switch (hashOf(command))
693         {
694             case hashOf("INSERT"):
695                 status_.lastInsertId = tag.front().to!ulong;
696                 tag.popFront();
697                 status_.affected = tag.front().to!ulong;
698                 break;
699             case hashOf("SELECT"):
700             case hashOf("DELETE"):
701             case hashOf("UPDATE"):
702             case hashOf("MOVE"):
703             case hashOf("FETCH"):
704             case hashOf("COPY"):
705                 status_.lastInsertId = 0;
706                 status_.affected = tag.empty() ? 0 : tag.front().to!ulong;
707                 break;
708             case hashOf("CREATE"):
709             case hashOf("DROP"):
710                 status_.lastInsertId = 0;
711                 break;
712             default:
713                 throw new PgSQLProtocolException(format("Unexpected command tag: %s", command));
714         }
715     }
716 
717     auto eatStatus(InputPacket packet)
718     {
719         auto type = cast(InputMessageType)packet.type();
720 
721         switch (type) with (InputMessageType)
722         {
723             case ParameterStatus:
724                 eatParameterStatus(packet);
725                 break;
726             case BackendKeyData:
727                 eatBackendKeyData(packet);
728                 break;
729             case ReadyForQuery:
730                 status_.transaction = cast(TransactionStatus)packet.eat!ubyte;
731                 status_.ready = true;
732                 break;
733             case NoticeResponse:
734                 eatNoticeResponse(packet);
735                 break;
736             case ErrorResponse:
737                 eatNoticeResponse(packet);
738                 throwError(true);
739                 break;
740             case CommandComplete:
741                 eatCommandComplete(packet);
742                 break;
743             default:
744                 throw new PgSQLProtocolException(format("Unexpected message: %s", type));
745         }
746 
747         return type;
748     }
749 
750     void throwError(bool force)
751     {
752         foreach (ref notice; notices_)
753         {
754             switch (notice.severity) with (ConnectionNotice.Severity)
755             {
756                 case PANIC:
757                 case ERROR:
758                 case FATAL:
759                     throw new PgSQLErrorException(cast(string)notice.message);
760                 default:
761                     break;
762             }
763         }
764 
765         if (force)
766             throw new PgSQLErrorException(cast(string)notices_.front().message);
767     }
768 
769     void eatStatuses(InputPacket packet)
770     {
771         notices_.length = 0;
772 
773         auto status = eatStatus(packet);
774         while (status != InputMessageType.ReadyForQuery)
775             status = eatStatus(retrieve());
776     }
777 
778     void skipColumnDef(ref InputPacket packet)
779     {
780         packet.skipz();
781         packet.skip(18);
782     }
783 
784     void columnDef(ref InputPacket packet, ref PgSQLColumn def)
785     {
786         auto name = packet.eatz();
787         columns_ ~= name;
788         def.name = columns_[$-name.length..$];
789 
790         packet.skip(6);
791 
792         def.type = cast(PgColumnTypes)packet.eat!uint;
793         def.length = packet.eat!short;
794         def.modifier = packet.eat!int;
795         def.format = cast(FormatCode)packet.eat!short;
796     }
797 
798     void columnDefs(size_t count, ref PgSQLColumn[] defs, ref InputPacket packet)
799     {
800         defs.length = count;
801         foreach (i; 0..count)
802             columnDef(packet, defs[i]);
803     }
804 
805     bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLRow))
806     {
807         static if (is(ReturnType!(RowHandler) == void))
808         {
809             handler(row);
810             return true;
811         }
812         else
813         {
814             return handler(row); // return type must be bool
815         }
816     }
817 
818     bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow))
819     {
820         static if (is(ReturnType!(RowHandler) == void))
821         {
822             handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row);
823             return true;
824         }
825         else
826         {
827             return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool
828         }
829     }
830 
831     bool callHandler(RowHandler)(RowHandler handler, size_t, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLRow))
832     {
833         static if (is(ReturnType!(RowHandler) == void))
834         {
835             handler(header, row);
836             return true;
837         }
838         else
839         {
840             return handler(header, row); // return type must be bool
841         }
842     }
843 
844     bool callHandler(RowHandler)(RowHandler handler, size_t i, PgSQLHeader header, PgSQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == PgSQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == PgSQLRow))
845     {
846         static if (is(ReturnType!(RowHandler) == void))
847         {
848             handler(i, header, row);
849             return true;
850         }
851         else
852         {
853             return handler(i, header, row); // return type must be bool
854         }
855     }
856 
857     void resultSetRowText(InputPacket packet, PgSQLHeader header, ref PgSQLRow row)
858     {
859         assert(row.columns.length == header.length);
860 
861         assert(packet.type == InputMessageType.DataRow);
862         const rowlen = packet.eat!ushort();
863 
864         foreach(i, ref column; header)
865         {
866             if (i < rowlen)
867             {
868                 if (column.format == FormatCode.Text)
869                 {
870                     eatValueText(packet, column, row.get_(i));
871                 }
872                 else
873                 {
874                     assert(false);
875                 }
876             }
877             else
878             {
879                 row.get_(i) = PgSQLValue(column.name, PgColumnTypes.NULL, null, 0);
880             }
881         }
882         assert(packet.empty);
883     }
884 
885     void resultSetText(RowHandler)(InputPacket packet, RowHandler handler)
886     {
887         columns_.length = 0;
888 
889         auto columns = cast(size_t)packet.eat!ushort;
890 
891         columnDefs(columns, header_, packet);
892         row_.header_(header_);
893 
894         size_t index;
895         while (true)
896         {
897             auto row = retrieve();
898             if (isStatus(row))
899             {
900                 eatStatuses(row);
901                 break;
902             }
903 
904             resultSetRowText(row, header_, row_);
905             if (!callHandler(handler, index++, header_, row_))
906             {
907                 discardUntilStatus();
908                 break;
909             }
910         }
911     }
912 
913     void discardAll(InputPacket packet)
914     {
915         auto columns = cast(size_t)packet.eatLenEnc();
916         columnDefs(columns, header_, packet);
917 
918         discardUntilStatus();
919     }
920 
921     void discardUntilStatus()
922     {
923         while (true)
924         {
925             auto row = retrieve();
926             if (isStatus(row))
927             {
928                 eatStatuses(row);
929                 break;
930             }
931         }
932     }
933 
934     auto prepareSQL(Args...)(const(char)[] sql, Args args)
935     {
936         auto estimated = sql.length;
937         size_t argCount;
938 
939         foreach(i, arg; args)
940         {
941             static if (is(typeof(arg) == typeof(null)))
942             {
943                 ++argCount;
944                 estimated += 4;
945             }
946             else static if (is(Unqual!(typeof(arg)) == PgSQLValue))
947             {
948                 ++argCount;
949                 final switch(arg.type) with (PgColumnTypes)
950                 {
951                     case NULL:
952                         estimated += 4;
953                         break;
954                     case CHAR:
955                         estimated += 2;
956                         break;
957                     case BOOL:
958                         estimated += 2;
959                         break;
960                     case INT2:
961                         estimated += 6;
962                         break;
963                     case INT4:
964                         estimated += 7;
965                         break;
966                     case INT8:
967                         estimated += 15;
968                         break;
969                     case REAL:
970                     case DOUBLE:
971                         estimated += 8;
972                         break;
973                     case UNKNOWN:
974                     case MONEY:
975                     case POINT:
976                     case LINE:
977                     case LSEG:
978                     case PATH:
979                     case POLYGON:
980                     case TINTERVAL:
981                     case CIRCLE:
982                     case BOX:
983                     case JSON:
984                     case JSONB:
985                     case XML:
986                     case MACADDR:
987                     case MACADDR8:
988                     case INET:
989                     case CIDR:
990                     case NAME:
991                     case TEXT:
992                     case INTERVAL:
993                     case BIT:
994                     case VARBIT:
995                     case NUMERIC:
996                     case UUID:
997                     case CHARA:
998                     case BYTEA:
999                     case VARCHAR:
1000                         estimated += 4 + arg.peek!(const(char)[]).length;
1001                         break;
1002                     case DATE:
1003                         estimated += 10;
1004                         break;
1005                     case TIME:
1006                     case TIMETZ:
1007                         estimated += 22;
1008                         break;
1009                     case TIMESTAMP:
1010                     case TIMESTAMPTZ:
1011                         estimated += 30;
1012                         break;
1013                 }
1014             }
1015             else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg)))
1016             {
1017                 argCount += arg.length;
1018                 estimated += arg.length * 6;
1019             }
1020             else static if (isSomeString!(typeof(arg)) || is(Unqual!(typeof(arg)) == PgSQLRawString) || is(Unqual!(typeof(arg)) == PgSQLFragment) || is(Unqual!(typeof(arg)) == PgSQLBinary))
1021             {
1022                 ++argCount;
1023                 estimated += 2 + arg.length;
1024             }
1025             else
1026             {
1027                 ++argCount;
1028                 estimated += 6;
1029             }
1030         }
1031 
1032         sql_.clear;
1033         sql_.reserve(max(8192, estimated));
1034 
1035         alias AppendFunc = bool function(ref Appender!(char[]), ref const(char)[] sql, ref size_t, const(void)*) @safe pure nothrow;
1036         AppendFunc[Args.length] funcs;
1037         const(void)*[Args.length] addrs;
1038 
1039         foreach (i, Arg; Args)
1040         {
1041             static if (is(Arg == enum))
1042             {
1043                 funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(OriginalType!Arg); }();
1044                 addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(cast(OriginalType!(Unqual!Arg))args[i]);
1045             }
1046             else
1047             {
1048                 funcs[i] = () @trusted { return cast(AppendFunc)&appendNextValue!(Arg); }();
1049                 addrs[i] = (ref x) @trusted { return cast(const void*)&x; }(args[i]);
1050             }
1051         }
1052 
1053         size_t indexArg;
1054         foreach (i; 0..Args.length)
1055         {
1056             if (!funcs[i](sql_, sql, indexArg, addrs[i]))
1057                 throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg));
1058         }
1059 
1060         if (copyUpToNext(sql_, sql))
1061         {
1062             ++indexArg;
1063             while (copyUpToNext(sql_, sql))
1064                 ++indexArg;
1065             throw new PgSQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", argCount, indexArg));
1066         }
1067 
1068         return sql_.data;
1069     }
1070 
1071     Socket socket_;
1072     PgSQLHeader header_;
1073     PgSQLRow row_;
1074     char[] columns_;
1075     char[] schema_;
1076     ubyte[] in_;
1077     ubyte[] out_;
1078     ubyte seq_;
1079     Appender!(char[]) sql_;
1080 
1081     ConnectionStatus status_;
1082     ConnectionSettings settings_;
1083     ServerInfo server_;
1084 
1085     ConnectionNotice[] notices_;
1086 
1087     bool busy_;
1088     bool pooled_;
1089     DateTime releaseTime_;
1090 }
1091 
1092 private auto copyUpToNext(ref Appender!(char[]) app, ref const(char)[] sql)
1093 {
1094     size_t offset;
1095     dchar quote = '\0';
1096 
1097     while (offset < sql.length)
1098     {
1099         auto ch = decode!(UseReplacementDchar.no)(sql, offset);
1100         switch (ch)
1101         {
1102             case '?':
1103                 if (!quote)
1104                 {
1105                     app.put(sql[0..offset - 1]);
1106                     sql = sql[offset..$];
1107                     return true;
1108                 }
1109                 else
1110                 {
1111                     goto default;
1112                 }
1113             case '\'':
1114             case '\"':
1115             case '`':
1116                 if (quote == ch)
1117                 {
1118                     quote = '\0';
1119                 }
1120                 else if (!quote)
1121                 {
1122                     quote = ch;
1123                 }
1124                 goto default;
1125             case '\\':
1126                 if (quote && (offset < sql.length))
1127                     decode!(UseReplacementDchar.no)(sql, offset);
1128                 goto default;
1129             default:
1130                 break;
1131         }
1132     }
1133     app.put(sql[0..offset]);
1134     sql = sql[offset..$];
1135     return false;
1136 }
1137 
1138 private bool appendNextValue(T)(ref Appender!(char[]) app, ref const(char)[] sql, ref size_t indexArg, const(void)* arg)
1139 {
1140     static if (isArray!T && !isSomeString!(OriginalType!T))
1141     {
1142         foreach (i, ref v; *cast(T*)arg)
1143         {
1144             if (copyUpToNext(app, sql))
1145             {
1146                 appendValue(app, v);
1147                 ++indexArg;
1148             }
1149             else
1150             {
1151                 return false;
1152             }
1153         }
1154     }
1155     else
1156     {
1157         if (copyUpToNext(app, sql))
1158         {
1159             appendValue(app, *cast(T*)arg);
1160             ++indexArg;
1161         }
1162         else
1163         {
1164             return false;
1165         }
1166     }
1167 
1168     return true;
1169 }