1 module database.postgresql.pool;
2 
3 import core.time;
4 import core.thread;
5 import std.stdio;
6 import std.array;
7 import std.concurrency;
8 import std.datetime;
9 import std.algorithm.searching : any;
10 import std.algorithm.mutation : remove;
11 import std.exception : enforce, collectException;
12 
13 import database.postgresql.connection;
14 
15 alias ConnectionPool = shared ConnectionProvider;
16 
17 final class ConnectionProvider
18 {
19     static ConnectionPool getInstance(string host, string user, string password, string database, ushort port = 5432,
20         uint maxConnections = 10, uint initialConnections = 3, uint incrementalConnections = 3, uint waitSeconds = 5,
21         ConnectionOptions options = ConnectionOptions.Default)
22     {
23         assert(initialConnections > 0 && incrementalConnections > 0);
24 
25         if (_instance is null)
26         {
27             synchronized(ConnectionProvider.classinfo)
28             {
29                 if (_instance is null)
30                 {
31                     _instance = new ConnectionPool(host, user, password, database, port,
32                         maxConnections, initialConnections, incrementalConnections, waitSeconds, options);
33                 }
34             }
35         }
36 
37         return _instance;
38     }
39 
40     private this(string host, string user, string password, string database, ushort port,
41         uint maxConnections, uint initialConnections, uint incrementalConnections, uint waitSeconds,
42         ConnectionOptions options) shared
43     {
44         _pool = cast(shared Tid)spawn(new shared Pool(host, user, password, database, port,
45             maxConnections, initialConnections, incrementalConnections, waitSeconds.seconds, options));
46         _waitSeconds = waitSeconds;
47         while (!__instantiated) Thread.sleep(0.msecs);
48     }
49 
50     ~this() shared
51     {
52         (cast(Tid)_pool).send(new shared Terminate(cast(shared Tid)thisTid));
53 
54         L_receive: try
55         {
56             receive(
57                 (shared Terminate _t)
58                 {
59                     return;
60                 }
61             );
62         }
63         catch (OwnerTerminated e)
64         {
65             if (e.tid != thisTid) goto L_receive;
66         }
67 
68         __instantiated = true;
69     }
70 
71     Connection getConnection() shared
72     {
73         (cast(Tid)_pool).send(new shared RequestConnection(cast(shared Tid)thisTid));
74         Connection conn;
75 
76         L_receive: try
77         {
78             receiveTimeout(
79                 _waitSeconds.seconds,
80                 (shared ConnenctionHolder holder)
81                 {
82                     conn = cast(Connection)holder.conn;
83                 },
84                 (immutable ConnectionBusy _m)
85                 {
86                     conn = null;
87                 }
88             );
89         }
90         catch (OwnerTerminated e)
91         {
92             if (e.tid != thisTid) goto L_receive;
93         }
94 
95         return conn;
96     }
97 
98     void releaseConnection(ref Connection conn) shared
99     {
100         enforce(conn.pooled, "This connection is not a managed connection in the pool.");
101         enforce(!conn.inTransaction, "This connection also has uncommitted or unrollbacked transaction.");
102 
103         (cast(Tid)_pool).send(new shared ConnenctionHolder(cast(shared Connection)conn));
104         conn = null;
105     }
106 
107 private:
108 
109     __gshared ConnectionPool _instance = null;
110 
111     Tid _pool;
112     int _waitSeconds;
113 }
114 
115 private:
116 
117 shared bool __instantiated;
118 
119 class Pool
120 {
121     this(string host, string user, string password, string database, ushort port,
122         uint maxConnections, uint initialConnections, uint incrementalConnections, Duration waitTime,
123         ConnectionOptions options) shared
124     {
125         _host = host;
126         _user = user;
127         _password = password;
128         _database = database;
129         _port = port;
130         _maxConnections = maxConnections;
131         _initialConnections = initialConnections;
132         _incrementalConnections = incrementalConnections;
133         _waitTime = waitTime;
134         _options = options;
135 
136         createConnections(initialConnections);
137         _lastShrinkTime = cast(DateTime)Clock.currTime;
138         __instantiated = true;
139     }
140 
141     void opCall() shared
142     {
143         auto loop = true;
144 
145         while (loop)
146         {
147             try
148             {
149                 receive(
150                     (shared RequestConnection req)
151                     {
152                         getConnection(req);
153                     },
154                     (shared ConnenctionHolder holder)
155                     {
156                         releaseConnection(holder);
157                     },
158                     (shared Terminate t)
159                     {
160                         foreach (conn; _pool)
161                         {
162                             (cast(Connection)conn).close();
163                         }
164 
165                         (cast(Tid)t.tid).send(t);
166                         loop = false;
167                     }
168                 );
169             }
170             catch (OwnerTerminated e) { }
171 
172             // Shrink the pool.
173             DateTime now = cast(DateTime)Clock.currTime;
174             if (((now - _lastShrinkTime) > 60.seconds) && (_pool.length > _initialConnections))
175             {
176                 foreach (ref conn; cast(Connection[])_pool)
177                 {
178                     if ((conn is null) || conn.busy || ((now - conn.releaseTime) <= 120.seconds))
179                     {
180                         continue;
181                     }
182 
183                     collectException({ conn.close(); }());
184                     conn = null;
185                 }
186 
187                 if (_pool.any!((a) => (a is null)))
188                 {
189                     _pool = _pool.remove!((a) => (a is null));
190                 }
191 
192                 _lastShrinkTime = now;
193             }
194         }
195     }
196 
197 private:
198 
199     Connection createConnection() shared
200     {
201         try
202         {
203             Connection conn = new Connection(_host, _user, _password, _database, _port, _options);
204             conn.pooled = true;
205             return conn;
206         }
207         catch (Exception e)
208         {
209             return null;
210         }
211     }
212 
213     void createConnections(uint num) shared
214     {
215         for (int i; i < num; i++)
216         {
217             if ((_maxConnections > 0) && (_pool.length >= _maxConnections))
218             {
219                 break;
220             }
221 
222             Connection conn = createConnection();
223 
224             if (conn !is null)
225             {
226                 _pool ~= cast(shared Connection)conn;
227             }
228         }
229     }
230 
231     void getConnection(shared RequestConnection req) shared
232     {
233         immutable start = Clock.currTime();
234 
235         while (true)
236         {
237             Connection conn = getFreeConnection();
238 
239             if (conn !is null)
240             {
241                 (cast(Tid)req.tid).send(new shared ConnenctionHolder(cast(shared Connection)conn));
242                 return;
243             }
244 
245             if ((Clock.currTime() - start) >= _waitTime)
246             {
247                 break;
248             }
249  
250             Thread.sleep(100.msecs);
251         }
252 
253         (cast(Tid)req.tid).send(new immutable ConnectionBusy);
254     }
255 
256     Connection getFreeConnection() shared
257     {
258         Connection conn = findFreeConnection();
259 
260         if (conn is null)
261         {
262             createConnections(_incrementalConnections);
263             conn = findFreeConnection();
264         }     
265 
266         return conn;
267     }
268 
269     Connection findFreeConnection() shared
270     {
271         Connection result;
272 
273         for (size_t i = 0; i < _pool.length; i++)
274         {
275             Connection conn = cast(Connection)_pool[i];
276 
277             if ((conn is null) || conn.busy)
278             {
279                 continue;
280             }
281 
282             if (!testConnection(conn))
283             {
284                 continue;
285             }
286 
287             conn.busy = true;
288             result = conn;
289             break;
290         }
291 
292         if (_pool.any!((a) => (a is null)))
293         {
294             _pool = _pool.remove!((a) => (a is null));
295         }
296 
297         return result;
298     }
299 
300     bool testConnection(Connection conn) shared
301     {
302         try
303         {
304             conn.ping();
305             return true;
306         }
307         catch (Exception e)
308         {
309             collectException({ conn.close(); }());
310             conn = null;
311 
312             return false;
313         }
314     }
315 
316     void releaseConnection(shared ConnenctionHolder holder) shared
317     {
318         if (holder.conn !is null)
319         {
320             Connection conn = cast(Connection)holder.conn;
321             conn.busy = false;
322             conn.releaseTime = cast(DateTime)Clock.currTime;
323         }
324     }
325 
326     Connection[] _pool;
327 
328     string _host;
329     string _user;
330     string _password;
331     string _database;
332     ushort _port;
333     uint _maxConnections;
334     uint _initialConnections;
335     uint _incrementalConnections;
336     Duration _waitTime;
337     ConnectionOptions _options;
338 
339     DateTime _lastShrinkTime;
340 }
341 
342 shared class RequestConnection
343 {
344     Tid tid;
345 
346     this(shared Tid tid) shared
347     {
348         this.tid = tid;
349     }
350 }
351 
352 shared class ConnenctionHolder
353 {
354     Connection conn;
355 
356     this(shared Connection conn) shared
357     {
358         this.conn = conn;
359     }
360 }
361 
362 immutable class ConnectionBusy
363 {
364 }
365 
366 shared class Terminate
367 {
368     Tid tid;
369 
370     this(shared Tid tid) shared
371     {
372         this.tid = tid;
373     }
374 }