1 module database.mysql.pool; 2 3 import core.time; 4 import core.thread; 5 import std.stdio; 6 import std.array; 7 import std.concurrency; 8 import std.datetime; 9 import std.algorithm.searching : any; 10 import std.algorithm.mutation : remove; 11 import std.exception : enforce, collectException; 12 13 import database.mysql.connection; 14 import database.mysql.protocol; 15 16 alias ConnectionPool = shared ConnectionProvider; 17 18 final class ConnectionProvider 19 { 20 static ConnectionPool getInstance(string host, string user, string password, string database, ushort port = 3306, 21 uint maxConnections = 10, uint initialConnections = 3, uint incrementalConnections = 3, uint waitSeconds = 5, 22 CapabilityFlags caps = DefaultClientCaps) 23 { 24 assert(initialConnections > 0 && incrementalConnections > 0); 25 26 if (_instance is null) 27 { 28 synchronized(ConnectionProvider.classinfo) 29 { 30 if (_instance is null) 31 { 32 _instance = new ConnectionPool(host, user, password, database, port, 33 maxConnections, initialConnections, incrementalConnections, waitSeconds, caps); 34 } 35 } 36 } 37 38 return _instance; 39 } 40 41 private this(string host, string user, string password, string database, ushort port, 42 uint maxConnections, uint initialConnections, uint incrementalConnections, uint waitSeconds, 43 CapabilityFlags caps) shared 44 { 45 _pool = cast(shared Tid)spawn(new shared Pool(host, user, password, database, port, 46 maxConnections, initialConnections, incrementalConnections, waitSeconds.seconds, caps)); 47 _waitSeconds = waitSeconds; 48 while (!__instantiated) Thread.sleep(0.msecs); 49 } 50 51 ~this() shared 52 { 53 (cast(Tid)_pool).send(new shared Terminate(cast(shared Tid)thisTid)); 54 55 L_receive: try 56 { 57 receive( 58 (shared Terminate _t) 59 { 60 return; 61 } 62 ); 63 } 64 catch (OwnerTerminated e) 65 { 66 if (e.tid != thisTid) goto L_receive; 67 } 68 69 __instantiated = false; 70 } 71 72 Connection getConnection() shared 73 { 74 (cast(Tid)_pool).send(new shared RequestConnection(cast(shared Tid)thisTid)); 75 Connection conn; 76 77 L_receive: try 78 { 79 receiveTimeout( 80 _waitSeconds.seconds, 81 (shared ConnenctionHolder holder) 82 { 83 conn = cast(Connection)holder.conn; 84 }, 85 (immutable ConnectionBusy _m) 86 { 87 conn = null; 88 } 89 ); 90 } 91 catch (OwnerTerminated e) 92 { 93 if (e.tid != thisTid) goto L_receive; 94 } 95 96 return conn; 97 } 98 99 void releaseConnection(ref Connection conn) shared 100 { 101 enforce(conn.pooled, "This connection is not a managed connection in the pool."); 102 enforce(!conn.inTransaction, "This connection also has uncommitted or unrollbacked transaction."); 103 104 (cast(Tid)_pool).send(new shared ConnenctionHolder(cast(shared Connection)conn)); 105 conn = null; 106 } 107 108 private: 109 110 __gshared ConnectionPool _instance = null; 111 112 Tid _pool; 113 int _waitSeconds; 114 } 115 116 private: 117 118 shared bool __instantiated; 119 120 class Pool 121 { 122 this(string host, string user, string password, string database, ushort port, 123 uint maxConnections, uint initialConnections, uint incrementalConnections, Duration waitTime, 124 CapabilityFlags caps) shared 125 { 126 _host = host; 127 _user = user; 128 _password = password; 129 _database = database; 130 _port = port; 131 _maxConnections = maxConnections; 132 _initialConnections = initialConnections; 133 _incrementalConnections = incrementalConnections; 134 _waitTime = waitTime; 135 _caps = caps; 136 137 createConnections(initialConnections); 138 _lastShrinkTime = cast(DateTime)Clock.currTime; 139 __instantiated = true; 140 } 141 142 void opCall() shared 143 { 144 auto loop = true; 145 146 while (loop) 147 { 148 try 149 { 150 receive( 151 (shared RequestConnection req) 152 { 153 getConnection(req); 154 }, 155 (shared ConnenctionHolder holder) 156 { 157 releaseConnection(holder); 158 }, 159 (shared Terminate t) 160 { 161 foreach (conn; _pool) 162 { 163 (cast(Connection)conn).close(); 164 } 165 166 (cast(Tid)t.tid).send(t); 167 loop = false; 168 } 169 ); 170 } 171 catch (OwnerTerminated e) { } 172 173 // Shrink the pool. 174 DateTime now = cast(DateTime)Clock.currTime; 175 if (((now - _lastShrinkTime) > 60.seconds) && (_pool.length > _initialConnections)) 176 { 177 foreach (ref conn; cast(Connection[])_pool) 178 { 179 if ((conn is null) || conn.busy || ((now - conn.releaseTime) <= 120.seconds)) 180 { 181 continue; 182 } 183 184 collectException({ conn.close(); }()); 185 conn = null; 186 } 187 188 if (_pool.any!((a) => (a is null))) 189 { 190 _pool = _pool.remove!((a) => (a is null)); 191 } 192 193 _lastShrinkTime = now; 194 } 195 } 196 } 197 198 private: 199 200 Connection createConnection() shared 201 { 202 try 203 { 204 Connection conn = new Connection(_host, _user, _password, _database, _port, _caps); 205 conn.pooled = true; 206 return conn; 207 } 208 catch (Exception e) 209 { 210 return null; 211 } 212 } 213 214 void createConnections(uint num) shared 215 { 216 for (int i; i < num; i++) 217 { 218 if ((_maxConnections > 0) && (_pool.length >= _maxConnections)) 219 { 220 break; 221 } 222 223 Connection conn = createConnection(); 224 225 if (conn !is null) 226 { 227 _pool ~= cast(shared Connection)conn; 228 } 229 } 230 } 231 232 void getConnection(shared RequestConnection req) shared 233 { 234 immutable start = Clock.currTime(); 235 236 while (true) 237 { 238 Connection conn = getFreeConnection(); 239 240 if (conn !is null) 241 { 242 (cast(Tid)req.tid).send(new shared ConnenctionHolder(cast(shared Connection)conn)); 243 return; 244 } 245 246 if ((Clock.currTime() - start) >= _waitTime) 247 { 248 break; 249 } 250 251 Thread.sleep(100.msecs); 252 } 253 254 (cast(Tid)req.tid).send(new immutable ConnectionBusy); 255 } 256 257 Connection getFreeConnection() shared 258 { 259 Connection conn = findFreeConnection(); 260 261 if (conn is null) 262 { 263 createConnections(_incrementalConnections); 264 conn = findFreeConnection(); 265 } 266 267 return conn; 268 } 269 270 Connection findFreeConnection() shared 271 { 272 Connection result; 273 274 for (size_t i = 0; i < _pool.length; i++) 275 { 276 Connection conn = cast(Connection)_pool[i]; 277 278 if ((conn is null) || conn.busy) 279 { 280 continue; 281 } 282 283 if (!testConnection(conn)) 284 { 285 continue; 286 } 287 288 conn.busy = true; 289 result = conn; 290 break; 291 } 292 293 if (_pool.any!((a) => (a is null))) 294 { 295 _pool = _pool.remove!((a) => (a is null)); 296 } 297 298 return result; 299 } 300 301 bool testConnection(Connection conn) shared 302 { 303 try 304 { 305 conn.ping(); 306 return true; 307 } 308 catch (Exception e) 309 { 310 collectException({ conn.close(); }()); 311 conn = null; 312 313 return false; 314 } 315 } 316 317 void releaseConnection(shared ConnenctionHolder holder) shared 318 { 319 if (holder.conn !is null) 320 { 321 Connection conn = cast(Connection)holder.conn; 322 conn.busy = false; 323 conn.releaseTime = cast(DateTime)Clock.currTime; 324 } 325 } 326 327 Connection[] _pool; 328 329 string _host; 330 string _user; 331 string _password; 332 string _database; 333 ushort _port; 334 uint _maxConnections; 335 uint _initialConnections; 336 uint _incrementalConnections; 337 Duration _waitTime; 338 CapabilityFlags _caps; 339 340 DateTime _lastShrinkTime; 341 } 342 343 shared class RequestConnection 344 { 345 Tid tid; 346 347 this(shared Tid tid) shared 348 { 349 this.tid = tid; 350 } 351 } 352 353 shared class ConnenctionHolder 354 { 355 Connection conn; 356 357 this(shared Connection conn) shared 358 { 359 this.conn = conn; 360 } 361 } 362 363 immutable class ConnectionBusy 364 { 365 } 366 367 shared class Terminate 368 { 369 Tid tid; 370 371 this(shared Tid tid) shared 372 { 373 this.tid = tid; 374 } 375 } 376 377 // unittest 378 // { 379 // import core.thread; 380 // import std.stdio; 381 382 // ConnectionPool pool = ConnectionPool.getInstance("127.0.0.1", "root", "111111", "test", 3306); 383 384 // int i = 0; 385 // while (i++ < 20) 386 // { 387 // Thread.sleep(100.msecs); 388 389 // Connection conn = pool.getConnection(); 390 391 // if (conn !is null) 392 // { 393 // writeln(conn.connected()); 394 // pool.releaseConnection(conn); 395 // } 396 // } 397 398 // pool.destroy(); 399 // }