1 module database.postgresql.pool; 2 3 import core.time; 4 import core.thread; 5 import std.stdio; 6 import std.array; 7 import std.concurrency; 8 import std.datetime; 9 import std.algorithm.searching : any; 10 import std.algorithm.mutation : remove; 11 import std.exception : enforce, collectException; 12 13 import database.postgresql.connection; 14 15 alias ConnectionPool = shared ConnectionProvider; 16 17 final class ConnectionProvider 18 { 19 static ConnectionPool getInstance(string host, string user, string password, string database, ushort port = 5432, 20 uint maxConnections = 10, uint initialConnections = 3, uint incrementalConnections = 3, uint waitSeconds = 5, 21 ConnectionOptions options = ConnectionOptions.Default) 22 { 23 assert(initialConnections > 0 && incrementalConnections > 0); 24 25 if (_instance is null) 26 { 27 synchronized(ConnectionProvider.classinfo) 28 { 29 if (_instance is null) 30 { 31 _instance = new ConnectionPool(host, user, password, database, port, 32 maxConnections, initialConnections, incrementalConnections, waitSeconds, options); 33 } 34 } 35 } 36 37 return _instance; 38 } 39 40 private this(string host, string user, string password, string database, ushort port, 41 uint maxConnections, uint initialConnections, uint incrementalConnections, uint waitSeconds, 42 ConnectionOptions options) shared 43 { 44 _pool = cast(shared Tid)spawn(new shared Pool(host, user, password, database, port, 45 maxConnections, initialConnections, incrementalConnections, waitSeconds.seconds, options)); 46 _waitSeconds = waitSeconds; 47 while (!__instantiated) Thread.sleep(0.msecs); 48 } 49 50 ~this() shared 51 { 52 (cast(Tid)_pool).send(new shared Terminate(cast(shared Tid)thisTid)); 53 54 L_receive: try 55 { 56 receive( 57 (shared Terminate _t) 58 { 59 return; 60 } 61 ); 62 } 63 catch (OwnerTerminated e) 64 { 65 if (e.tid != thisTid) goto L_receive; 66 } 67 68 __instantiated = true; 69 } 70 71 Connection getConnection() shared 72 { 73 (cast(Tid)_pool).send(new shared RequestConnection(cast(shared Tid)thisTid)); 74 Connection conn; 75 76 L_receive: try 77 { 78 receiveTimeout( 79 _waitSeconds.seconds, 80 (shared ConnenctionHolder holder) 81 { 82 conn = cast(Connection)holder.conn; 83 }, 84 (immutable ConnectionBusy _m) 85 { 86 conn = null; 87 } 88 ); 89 } 90 catch (OwnerTerminated e) 91 { 92 if (e.tid != thisTid) goto L_receive; 93 } 94 95 return conn; 96 } 97 98 void releaseConnection(ref Connection conn) shared 99 { 100 enforce(conn.pooled, "This connection is not a managed connection in the pool."); 101 enforce(!conn.inTransaction, "This connection also has uncommitted or unrollbacked transaction."); 102 103 (cast(Tid)_pool).send(new shared ConnenctionHolder(cast(shared Connection)conn)); 104 conn = null; 105 } 106 107 private: 108 109 __gshared ConnectionPool _instance = null; 110 111 Tid _pool; 112 int _waitSeconds; 113 } 114 115 private: 116 117 shared bool __instantiated; 118 119 class Pool 120 { 121 this(string host, string user, string password, string database, ushort port, 122 uint maxConnections, uint initialConnections, uint incrementalConnections, Duration waitTime, 123 ConnectionOptions options) shared 124 { 125 _host = host; 126 _user = user; 127 _password = password; 128 _database = database; 129 _port = port; 130 _maxConnections = maxConnections; 131 _initialConnections = initialConnections; 132 _incrementalConnections = incrementalConnections; 133 _waitTime = waitTime; 134 _options = options; 135 136 createConnections(initialConnections); 137 _lastShrinkTime = cast(DateTime)Clock.currTime; 138 __instantiated = true; 139 } 140 141 void opCall() shared 142 { 143 auto loop = true; 144 145 while (loop) 146 { 147 try 148 { 149 receive( 150 (shared RequestConnection req) 151 { 152 getConnection(req); 153 }, 154 (shared ConnenctionHolder holder) 155 { 156 releaseConnection(holder); 157 }, 158 (shared Terminate t) 159 { 160 foreach (conn; _pool) 161 { 162 (cast(Connection)conn).close(); 163 } 164 165 (cast(Tid)t.tid).send(t); 166 loop = false; 167 } 168 ); 169 } 170 catch (OwnerTerminated e) { } 171 172 // Shrink the pool. 173 DateTime now = cast(DateTime)Clock.currTime; 174 if (((now - _lastShrinkTime) > 60.seconds) && (_pool.length > _initialConnections)) 175 { 176 foreach (ref conn; cast(Connection[])_pool) 177 { 178 if ((conn is null) || conn.busy || ((now - conn.releaseTime) <= 120.seconds)) 179 { 180 continue; 181 } 182 183 collectException({ conn.close(); }()); 184 conn = null; 185 } 186 187 if (_pool.any!((a) => (a is null))) 188 { 189 _pool = _pool.remove!((a) => (a is null)); 190 } 191 192 _lastShrinkTime = now; 193 } 194 } 195 } 196 197 private: 198 199 Connection createConnection() shared 200 { 201 try 202 { 203 Connection conn = new Connection(_host, _user, _password, _database, _port, _options); 204 conn.pooled = true; 205 return conn; 206 } 207 catch (Exception e) 208 { 209 return null; 210 } 211 } 212 213 void createConnections(uint num) shared 214 { 215 for (int i; i < num; i++) 216 { 217 if ((_maxConnections > 0) && (_pool.length >= _maxConnections)) 218 { 219 break; 220 } 221 222 Connection conn = createConnection(); 223 224 if (conn !is null) 225 { 226 _pool ~= cast(shared Connection)conn; 227 } 228 } 229 } 230 231 void getConnection(shared RequestConnection req) shared 232 { 233 immutable start = Clock.currTime(); 234 235 while (true) 236 { 237 Connection conn = getFreeConnection(); 238 239 if (conn !is null) 240 { 241 (cast(Tid)req.tid).send(new shared ConnenctionHolder(cast(shared Connection)conn)); 242 return; 243 } 244 245 if ((Clock.currTime() - start) >= _waitTime) 246 { 247 break; 248 } 249 250 Thread.sleep(100.msecs); 251 } 252 253 (cast(Tid)req.tid).send(new immutable ConnectionBusy); 254 } 255 256 Connection getFreeConnection() shared 257 { 258 Connection conn = findFreeConnection(); 259 260 if (conn is null) 261 { 262 createConnections(_incrementalConnections); 263 conn = findFreeConnection(); 264 } 265 266 return conn; 267 } 268 269 Connection findFreeConnection() shared 270 { 271 Connection result; 272 273 for (size_t i = 0; i < _pool.length; i++) 274 { 275 Connection conn = cast(Connection)_pool[i]; 276 277 if ((conn is null) || conn.busy) 278 { 279 continue; 280 } 281 282 if (!testConnection(conn)) 283 { 284 continue; 285 } 286 287 conn.busy = true; 288 result = conn; 289 break; 290 } 291 292 if (_pool.any!((a) => (a is null))) 293 { 294 _pool = _pool.remove!((a) => (a is null)); 295 } 296 297 return result; 298 } 299 300 bool testConnection(Connection conn) shared 301 { 302 try 303 { 304 conn.ping(); 305 return true; 306 } 307 catch (Exception e) 308 { 309 collectException({ conn.close(); }()); 310 conn = null; 311 312 return false; 313 } 314 } 315 316 void releaseConnection(shared ConnenctionHolder holder) shared 317 { 318 if (holder.conn !is null) 319 { 320 Connection conn = cast(Connection)holder.conn; 321 conn.busy = false; 322 conn.releaseTime = cast(DateTime)Clock.currTime; 323 } 324 } 325 326 Connection[] _pool; 327 328 string _host; 329 string _user; 330 string _password; 331 string _database; 332 ushort _port; 333 uint _maxConnections; 334 uint _initialConnections; 335 uint _incrementalConnections; 336 Duration _waitTime; 337 ConnectionOptions _options; 338 339 DateTime _lastShrinkTime; 340 } 341 342 shared class RequestConnection 343 { 344 Tid tid; 345 346 this(shared Tid tid) shared 347 { 348 this.tid = tid; 349 } 350 } 351 352 shared class ConnenctionHolder 353 { 354 Connection conn; 355 356 this(shared Connection conn) shared 357 { 358 this.conn = conn; 359 } 360 } 361 362 immutable class ConnectionBusy 363 { 364 } 365 366 shared class Terminate 367 { 368 Tid tid; 369 370 this(shared Tid tid) shared 371 { 372 this.tid = tid; 373 } 374 }