1 module database.mysql.pool;
2 
3 import core.time;
4 import core.thread;
5 import std.stdio;
6 import std.array;
7 import std.concurrency;
8 import std.datetime;
9 import std.algorithm.searching : any;
10 import std.algorithm.mutation : remove;
11 import std.exception : enforce, collectException;
12 
13 import database.mysql.connection;
14 import database.mysql.protocol;
15 
16 alias ConnectionPool = shared ConnectionProvider;
17 
18 final class ConnectionProvider
19 {
20     static ConnectionPool getInstance(string host, string user, string password, string database, ushort port = 3306,
21         uint maxConnections = 10, uint initialConnections = 3, uint incrementalConnections = 3, uint waitSeconds = 5,
22         CapabilityFlags caps = DefaultClientCaps)
23     {
24         assert(initialConnections > 0 && incrementalConnections > 0);
25 
26         if (_instance is null)
27         {
28             synchronized(ConnectionProvider.classinfo)
29             {
30                 if (_instance is null)
31                 {
32                     _instance = new ConnectionPool(host, user, password, database, port,
33                         maxConnections, initialConnections, incrementalConnections, waitSeconds, caps);
34                 }
35             }
36         }
37 
38         return _instance;
39     }
40 
41     private this(string host, string user, string password, string database, ushort port,
42         uint maxConnections, uint initialConnections, uint incrementalConnections, uint waitSeconds,
43         CapabilityFlags caps) shared
44     {
45         _pool = cast(shared Tid)spawn(new shared Pool(host, user, password, database, port,
46             maxConnections, initialConnections, incrementalConnections, waitSeconds.seconds, caps));
47         _waitSeconds = waitSeconds;
48         while (!__instantiated) Thread.sleep(0.msecs);
49     }
50 
51     ~this() shared
52     {
53         (cast(Tid)_pool).send(new shared Terminate(cast(shared Tid)thisTid));
54 
55         L_receive: try
56         {
57             receive(
58                 (shared Terminate _t)
59                 {
60                     return;
61                 }
62             );
63         }
64         catch (OwnerTerminated e)
65         {
66             if (e.tid != thisTid) goto L_receive;
67         }
68 
69         __instantiated = false;
70     }
71 
72     Connection getConnection() shared
73     {
74         (cast(Tid)_pool).send(new shared RequestConnection(cast(shared Tid)thisTid));
75         Connection conn;
76 
77         L_receive: try
78         {
79             receiveTimeout(
80                 _waitSeconds.seconds,
81                 (shared ConnenctionHolder holder)
82                 {
83                     conn = cast(Connection)holder.conn;
84                 },
85                 (immutable ConnectionBusy _m)
86                 {
87                     conn = null;
88                 }
89             );
90         }
91         catch (OwnerTerminated e)
92         {
93             if (e.tid != thisTid) goto L_receive;
94         }
95 
96         return conn;
97     }
98 
99     void releaseConnection(ref Connection conn) shared
100     {
101         enforce(conn.pooled, "This connection is not a managed connection in the pool.");
102         enforce(!conn.inTransaction, "This connection also has uncommitted or unrollbacked transaction.");
103 
104         (cast(Tid)_pool).send(new shared ConnenctionHolder(cast(shared Connection)conn));
105         conn = null;
106     }
107 
108 private:
109 
110     __gshared ConnectionPool _instance = null;
111 
112     Tid _pool;
113     int _waitSeconds;
114 }
115 
116 private:
117 
118 shared bool __instantiated;
119 
120 class Pool
121 {
122     this(string host, string user, string password, string database, ushort port,
123         uint maxConnections, uint initialConnections, uint incrementalConnections, Duration waitTime,
124         CapabilityFlags caps) shared
125     {
126         _host = host;
127         _user = user;
128         _password = password;
129         _database = database;
130         _port = port;
131         _maxConnections = maxConnections;
132         _initialConnections = initialConnections;
133         _incrementalConnections = incrementalConnections;
134         _waitTime = waitTime;
135         _caps = caps;
136 
137         createConnections(initialConnections);
138         _lastShrinkTime = cast(DateTime)Clock.currTime;
139         __instantiated = true;
140     }
141 
142     void opCall() shared
143     {
144         auto loop = true;
145 
146         while (loop)
147         {
148             try
149             {
150                 receive(
151                     (shared RequestConnection req)
152                     {
153                         getConnection(req);
154                     },
155                     (shared ConnenctionHolder holder)
156                     {
157                         releaseConnection(holder);
158                     },
159                     (shared Terminate t)
160                     {
161                         foreach (conn; _pool)
162                         {
163                             (cast(Connection)conn).close();
164                         }
165 
166                         (cast(Tid)t.tid).send(t);
167                         loop = false;
168                     }
169                 );
170             }
171             catch (OwnerTerminated e) { }
172 
173             // Shrink the pool.
174             DateTime now = cast(DateTime)Clock.currTime;
175             if (((now - _lastShrinkTime) > 60.seconds) && (_pool.length > _initialConnections))
176             {
177                 foreach (ref conn; cast(Connection[])_pool)
178                 {
179                     if ((conn is null) || conn.busy || ((now - conn.releaseTime) <= 120.seconds))
180                     {
181                         continue;
182                     }
183 
184                     collectException({ conn.close(); }());
185                     conn = null;
186                 }
187 
188                 if (_pool.any!((a) => (a is null)))
189                 {
190                     _pool = _pool.remove!((a) => (a is null));
191                 }
192 
193                 _lastShrinkTime = now;
194             }
195         }
196     }
197 
198 private:
199 
200     Connection createConnection() shared
201     {
202         try
203         {
204             Connection conn = new Connection(_host, _user, _password, _database, _port, _caps);
205             conn.pooled = true;
206             return conn;
207         }
208         catch (Exception e)
209         {
210             return null;
211         }
212     }
213 
214     void createConnections(uint num) shared
215     {
216         for (int i; i < num; i++)
217         {
218             if ((_maxConnections > 0) && (_pool.length >= _maxConnections))
219             {
220                 break;
221             }
222 
223             Connection conn = createConnection();
224 
225             if (conn !is null)
226             {
227                 _pool ~= cast(shared Connection)conn;
228             }
229         }
230     }
231 
232     void getConnection(shared RequestConnection req) shared
233     {
234         immutable start = Clock.currTime();
235 
236         while (true)
237         {
238             Connection conn = getFreeConnection();
239 
240             if (conn !is null)
241             {
242                 (cast(Tid)req.tid).send(new shared ConnenctionHolder(cast(shared Connection)conn));
243                 return;
244             }
245 
246             if ((Clock.currTime() - start) >= _waitTime)
247             {
248                 break;
249             }
250  
251             Thread.sleep(100.msecs);
252         }
253 
254         (cast(Tid)req.tid).send(new immutable ConnectionBusy);
255     }
256 
257     Connection getFreeConnection() shared
258     {
259         Connection conn = findFreeConnection();
260 
261         if (conn is null)
262         {
263             createConnections(_incrementalConnections);
264             conn = findFreeConnection();
265         }     
266 
267         return conn;
268     }
269 
270     Connection findFreeConnection() shared
271     {
272         Connection result;
273 
274         for (size_t i = 0; i < _pool.length; i++)
275         {
276             Connection conn = cast(Connection)_pool[i];
277 
278             if ((conn is null) || conn.busy)
279             {
280                 continue;
281             }
282 
283             if (!testConnection(conn))
284             {
285                 continue;
286             }
287 
288             conn.busy = true;
289             result = conn;
290             break;
291         }
292 
293         if (_pool.any!((a) => (a is null)))
294         {
295             _pool = _pool.remove!((a) => (a is null));
296         }
297 
298         return result;
299     }
300 
301     bool testConnection(Connection conn) shared
302     {
303         try
304         {
305             conn.ping();
306             return true;
307         }
308         catch (Exception e)
309         {
310             collectException({ conn.close(); }());
311             conn = null;
312 
313             return false;
314         }
315     }
316 
317     void releaseConnection(shared ConnenctionHolder holder) shared
318     {
319         if (holder.conn !is null)
320         {
321             Connection conn = cast(Connection)holder.conn;
322             conn.busy = false;
323             conn.releaseTime = cast(DateTime)Clock.currTime;
324         }
325     }
326 
327     Connection[] _pool;
328 
329     string _host;
330     string _user;
331     string _password;
332     string _database;
333     ushort _port;
334     uint _maxConnections;
335     uint _initialConnections;
336     uint _incrementalConnections;
337     Duration _waitTime;
338     CapabilityFlags _caps;
339 
340     DateTime _lastShrinkTime;
341 }
342 
343 shared class RequestConnection
344 {
345     Tid tid;
346 
347     this(shared Tid tid) shared
348     {
349         this.tid = tid;
350     }
351 }
352 
353 shared class ConnenctionHolder
354 {
355     Connection conn;
356 
357     this(shared Connection conn) shared
358     {
359         this.conn = conn;
360     }
361 }
362 
363 immutable class ConnectionBusy
364 {
365 }
366 
367 shared class Terminate
368 {
369     Tid tid;
370 
371     this(shared Tid tid) shared
372     {
373         this.tid = tid;
374     }
375 }
376 
377 // unittest
378 // {
379 //     import core.thread;
380 //     import std.stdio;
381 
382 //     ConnectionPool pool = ConnectionPool.getInstance("127.0.0.1", "root", "111111", "test", 3306);
383 
384 //     int i = 0;
385 //     while (i++ < 20)
386 //     {
387 //         Thread.sleep(100.msecs);
388 
389 //         Connection conn = pool.getConnection();
390 
391 //         if (conn !is null)
392 //         {
393 //             writeln(conn.connected());
394 //             pool.releaseConnection(conn);
395 //         }
396 //     }
397 
398 //     pool.destroy();
399 // }