DefaultTransaction.php
1 <?php
2 /**
3  * wCMF - wemove Content Management Framework
4  * Copyright (C) 2005-2020 wemove digital solutions GmbH
5  *
6  * Licensed under the terms of the MIT License.
7  *
8  * See the LICENSE file distributed with this work for
9  * additional information.
10  */
12 
13 use Exception;
23 
24 /**
25  * Default implementation of Transaction.
26  *
27  * @author ingo herwig <ingo@wemove.com>
28  */
29 class DefaultTransaction implements Transaction {
30 
31  private static $isInfoEnabled = false;
32  private static $isDebugEnabled = false;
33  private static $logger = null;
34 
35  private $persistenceFacade = null;
36  private $eventManager = null;
37 
38  private $id = '';
39  private $isActive = false;
40  private $isInCommit = false;
41  private $observedObjects = [];
42 
43  protected $newObjects = [];
44  protected $dirtyObjects = [];
45  protected $deletedObjects = [];
46  protected $detachedObjects = [];
47 
48  /**
49  * Contains all loaded objects no matter which state they have
50  */
51  protected $loadedObjects = [];
52 
53  /**
54  * Constructor
55  * @param $persistenceFacade
56  * @param $eventManager
57  */
58  public function __construct(PersistenceFacade $persistenceFacade,
59  EventManager $eventManager) {
60  if (self::$logger == null) {
61  self::$logger = LogManager::getLogger(__CLASS__);
62  }
63  $this->persistenceFacade = $persistenceFacade;
64  $this->eventManager = $eventManager;
65 
66  $this->id = __CLASS__.'_'.ObjectId::getDummyId();
67  $this->eventManager->addListener(StateChangeEvent::NAME, [$this, 'stateChanged']);
68  self::$isInfoEnabled = self::$logger->isInfoEnabled();
69  self::$isDebugEnabled = self::$logger->isDebugEnabled();
70  }
71 
72  /**
73  * Destructor.
74  */
75  public function __destruct() {
76  $this->eventManager->removeListener(StateChangeEvent::NAME, [$this, 'stateChanged']);
77  }
78 
79  /**
80  * @see Transaction::begin()
81  */
82  public function begin() {
83  if (self::$isInfoEnabled) {
84  self::$logger->info("Starting transaction");
85  }
86  $this->isActive = true;
87  }
88 
89  /**
90  * @see Transaction::commit()
91  */
92  public function commit() {
93  return $this->commitImpl(false);
94  }
95 
96  /**
97  * @see Transaction::commitCollect()
98  */
99  public function commitCollect() {
100  return $this->commitImpl(true);
101  }
102 
103  /**
104  * @see Transaction::rollback()
105  */
106  public function rollback() {
107  if (self::$isInfoEnabled) {
108  self::$logger->info("Rollback transaction");
109  }
110  // forget changes
111  $this->clear();
112  $this->isActive = false;
113  $this->isInCommit = false;
114  $this->eventManager->dispatch(TransactionEvent::NAME, new TransactionEvent(
116  }
117 
118  /**
119  * @see Transaction::isActive()
120  */
121  public function isActive() {
122  return $this->isActive;
123  }
124 
125  /**
126  * @see Transaction::attach()
127  */
128  public function attach(PersistentObject $object) {
129  unset($this->detachedObjects[$object->getOID()->__toString()]);
130 
131  $state = $object->getState();
132  if (!$this->isActive() && $state != PersistentObject::STATE_CLEAN) {
133  return $object;
134  }
135  switch ($state) {
137  return $this->registerLoaded($object);
138 
140  return $this->registerNew($object);
141 
143  return $this->registerDirty($object);
144 
146  return $this->registerDeleted($object);
147  }
148  return null;
149  }
150 
151  /**
152  * @see Transaction::detach()
153  */
154  public function detach(ObjectId $oid) {
155  $key = $oid->__toString();
156  $object = null;
157  if (isset($this->newObjects[$key])) {
158  $object = $this->newObjects[$key];
159  unset($this->newObjects[$key]);
160  }
161  if (isset($this->dirtyObjects[$key])) {
162  $object = $this->dirtyObjects[$key];
163  unset($this->dirtyObjects[$key]);
164  }
165  if (isset($this->deletedObjects[$key])) {
166  $object = $this->deletedObjects[$key];
167  unset($this->deletedObjects[$key]);
168  }
169  if (isset($this->loadedObjects[$key])) {
170  $object = $this->loadedObjects[$key];
171  unset($this->loadedObjects[$key]);
172  }
173  unset($this->observedObjects[$key]);
174  $this->detachedObjects[$key] = $object;
175  }
176 
177  /**
178  * @see Transaction::getLoaded()
179  */
180  public function getLoaded(ObjectId $oid) {
181  $registeredObject = null;
182  $key = $oid->__toString();
183  if (isset($this->loadedObjects[$key])) {
184  $registeredObject = $this->loadedObjects[$key];
185  }
186  if (isset($this->newObjects[$key])) {
187  $registeredObject = $this->newObjects[$key];
188  }
189  return $registeredObject;
190  }
191 
192  /**
193  * @see Transaction::getObjects()
194  */
195  public function getObjects() {
196  return $this->observedObjects;
197  }
198 
199  /**
200  * Register a loaded object. The returned object is the registered instance.
201  * @param $object PersistentObject instance
202  * @return PersistentObject instance
203  */
204  protected function registerLoaded(PersistentObject $object) {
205  $oid = $object->getOID();
206  $key = $oid->__toString();
207  if (self::$isDebugEnabled) {
208  self::$logger->debug("New Data:\n".$object->dump());
209  self::$logger->debug("Registry before:\n".$this->dump());
210  }
211  // register the object if it is newly loaded or
212  // merge the attributes, if it is already loaded
213  $registeredObject = null;
214  if (isset($this->loadedObjects[$key])) {
215  $registeredObject = $this->loadedObjects[$key];
216  // merge existing attributes with new attributes
217  if (self::$isDebugEnabled) {
218  self::$logger->debug("Merging data of ".$key);
219  }
220  $registeredObject->mergeValues($object);
221  }
222  else {
223  if (self::$isDebugEnabled) {
224  self::$logger->debug("Register loaded object: ".$key);
225  }
226  $this->loadedObjects[$key] = $object;
227  // start to listen to changes if the transaction is active
228  if ($this->isActive) {
229  if (self::$isDebugEnabled) {
230  self::$logger->debug("Start listening to: ".$key);
231  }
232  $this->observedObjects[$key] = $object;
233  }
234  $registeredObject = $object;
235  }
236  if (self::$isDebugEnabled) {
237  self::$logger->debug("Registry after:\n".$this->dump());
238  }
239  return $registeredObject;
240  }
241 
242  /**
243  * Register a newly created object. The returned object is the registered instance.
244  * @param $object PersistentObject instance
245  */
246  protected function registerNew(PersistentObject $object) {
247  $key = $object->getOID()->__toString();
248  if (self::$isDebugEnabled) {
249  self::$logger->debug("Register new object: ".$key);
250  }
251  $this->newObjects[$key] = $object;
252  $this->observedObjects[$key] = $object;
253  return $object;
254  }
255 
256  /**
257  * Register a dirty object. The returned object is the registered instance.
258  * @param $object PersistentObject instance
259  */
260  protected function registerDirty(PersistentObject $object) {
261  $key = $object->getOID()->__toString();
262  // if it was a new or deleted object, we return immediatly
263  if (isset($this->newObjects[$key]) || isset($this->deletedObjects[$key])) {
264  return $object;
265  }
266  if (self::$isDebugEnabled) {
267  self::$logger->debug("Register dirty object: ".$key);
268  }
269  $this->dirtyObjects[$key] = $object;
270  return $object;
271  }
272 
273  /**
274  * Register a deleted object. The returned object is the registered instance.
275  * @param $object PersistentObject instance
276  */
277  protected function registerDeleted(PersistentObject $object) {
278  $key = $object->getOID()->__toString();
279  // if it was a new object, we remove it from the registry and
280  // return immediatly
281  if (isset($this->newObjects[$key])) {
282  unset($this->newObjects[$key]);
283  return $object;
284  }
285  // if it was a dirty object, we remove it from the registry
286  if (isset($this->dirtyObjects[$key])) {
287  unset($this->dirtyObjects[$key]);
288  }
289  if (self::$isDebugEnabled) {
290  self::$logger->debug("Register deleted object: ".$key);
291  }
292  $this->deletedObjects[$key] = $object;
293  return $object;
294  }
295 
296  /**
297  * Commit the transaction
298  * @param $collect
299  * @return Array of statements
300  */
301  protected function commitImpl($collect) {
302  if ($this->isInCommit) {
303  return;
304  }
305  if (self::$isInfoEnabled) {
306  self::$logger->info("Commit transaction [collect=".($collect ? "true" : "false")."]");
307  }
308  $this->isInCommit = true;
309  $this->eventManager->dispatch(TransactionEvent::NAME, new TransactionEvent(
311  $insertedOids = [];
312  $updatedOids = [];
313  $deletedOids = [];
314  $statements = [];
315  if ($this->isActive) {
316  $knowTypes = $this->persistenceFacade->getKnownTypes();
317  try {
318  // start transaction for each mapper
319  foreach ($knowTypes as $type) {
320  $mapper = $this->persistenceFacade->getMapper($type);
321  $mapper->beginTransaction();
322  }
323  // process the recorded object changes, since new
324  // object changes may occure during the commit, we
325  // loop until all queues are empty
326  $commitDone = false;
327  $emptyState = '0:0:0';
328  while (!$commitDone) {
329  // check queues before processing
330  $oldState = sizeof($this->newObjects).':'.
331  sizeof($this->dirtyObjects).':'.
332  sizeof($this->deletedObjects);
333  $insertedOids = array_merge($insertedOids, $this->processInserts());
334  $updatedOids = array_merge($updatedOids, $this->processUpdates());
335  $deletedOids = array_merge($deletedOids, $this->processDeletes());
336  // check queues after processing
337  $newState = sizeof($this->newObjects).':'.
338  sizeof($this->dirtyObjects).':'.
339  sizeof($this->deletedObjects);
340  // prevent recursion (queue sizes didn't change)
341  if ($oldState != $emptyState && $oldState == $newState) {
342  throw new PersistenceException("Recursion in transaction commit");
343  }
344  // check if all queues are empty
345  $commitDone = $newState == $emptyState;
346  }
347  // commit transaction for each mapper
348  if (self::$isInfoEnabled) {
349  self::$logger->info("Committing transaction");
350  }
351  foreach ($knowTypes as $type) {
352  $mapper = $this->persistenceFacade->getMapper($type);
353  $statements = array_merge($statements, $mapper->getStatements());
354  if ($collect) {
355  $mapper->rollbackTransaction();
356  }
357  else {
358  $mapper->commitTransaction();
359  }
360  }
361  }
362  catch (Exception $ex) {
363  // rollback transaction for each mapper
364  self::$logger->error("Rolling back transaction. Exception: ".$ex->__toString());
365  foreach ($knowTypes as $type) {
366  $mapper = $this->persistenceFacade->getMapper($type);
367  $mapper->rollbackTransaction();
368  }
369  $this->rollback();
370  throw $ex;
371  }
372  }
373  // forget changes
374  $this->clear();
375  $this->isActive = false;
376  $this->isInCommit = false;
377  $this->eventManager->dispatch(TransactionEvent::NAME, new TransactionEvent(
378  TransactionEvent::AFTER_COMMIT, $insertedOids, $updatedOids, $deletedOids));
379  return $statements;
380  }
381 
382  /**
383  * Clear all internal
384  */
385  protected function clear() {
386  foreach ($this->newObjects as $object) {
387  unset($this->observedObjects[$object->getOID()->__toString()]);
388  }
389  $this->newObjects = [];
390 
391  foreach ($this->dirtyObjects as $object) {
392  unset($this->observedObjects[$object->getOID()->__toString()]);
393  }
394  $this->dirtyObjects = [];
395 
396  foreach ($this->deletedObjects as $object) {
397  unset($this->observedObjects[$object->getOID()->__toString()]);
398  }
399  $this->deletedObjects = [];
400 
401  foreach ($this->loadedObjects as $object) {
402  unset($this->observedObjects[$object->getOID()->__toString()]);
403  }
404  $this->loadedObjects = [];
405 
406  $this->detachedObjects = [];
407  }
408 
409  /**
410  * Process the new objects queue
411  * @return Map of oids of inserted objects (key: oid string before commit, value: oid string after commit)
412  */
413  protected function processInserts() {
414  $insertedOids = [];
415  $pendingInserts = [];
416  $insertOids = array_keys($this->newObjects);
417  while (sizeof($insertOids) > 0) {
418  $key = array_shift($insertOids);
419  if (self::$isDebugEnabled) {
420  self::$logger->debug("Process insert on object: ".$key);
421  }
422  $object = $this->newObjects[$key];
423  // postpone insert, if the object has required objects that are
424  // not persisted yet
425  $canInsert = true;
426  $requiredObjects = $object->getIndispensableObjects();
427  foreach ($requiredObjects as $requiredObject) {
428  if ($requiredObject->getState() == PersistentObject::STATE_NEW) {
429  if (self::$isDebugEnabled) {
430  self::$logger->debug("Postpone insert of object: ".$key.". Required objects are not saved yet.");
431  }
432  $pendingInserts[] = $object;
433  $canInsert = false;
434  break;
435  }
436  }
437  if ($canInsert) {
438  $oldOid = $object->getOID();
439  $object->getMapper()->save($object);
440  $insertedOids[$oldOid->__toString()] = $object->getOID()->__toString();
441  }
442  unset($this->newObjects[$key]);
443  $insertOids = array_keys($this->newObjects);
444  unset($this->observedObjects[$key]);
445  $this->observedObjects[$object->getOID()->__toString()] = $object;
446  }
447  // re-add pending inserts
448  foreach ($pendingInserts as $object) {
449  $key = $object->getOID()->__toString();
450  $this->newObjects[$key] = $object;
451  }
452  return $insertedOids;
453  }
454 
455  /**
456  * Process the dirty objects queue
457  * @return Array of oid strings of updated objects
458  */
459  protected function processUpdates() {
460  $updatedOids = [];
461  $updateOids = array_keys($this->dirtyObjects);
462  while (sizeof($updateOids) > 0) {
463  $key = array_shift($updateOids);
464  if (self::$isDebugEnabled) {
465  self::$logger->debug("Process update on object: ".$key);
466  }
467  $object = $this->dirtyObjects[$key];
468  $object->getMapper()->save($object);
469  unset($this->dirtyObjects[$key]);
470  $updatedOids[] = $key;
471  $updateOids = array_keys($this->dirtyObjects);
472  }
473  return $updatedOids;
474  }
475 
476  /**
477  * Process the deleted objects queue
478  * @return Array of oid strings of deleted objects
479  */
480  protected function processDeletes() {
481  $deletedOids = [];
482  $deleteOids = array_keys($this->deletedObjects);
483  while (sizeof($deleteOids) > 0) {
484  $key = array_shift($deleteOids);
485  if (self::$isDebugEnabled) {
486  self::$logger->debug("Process delete on object: ".$key);
487  }
488  $object = $this->deletedObjects[$key];
489  $object->getMapper()->delete($object);
490  unset($this->deletedObjects[$key]);
491  $deletedOids[] = $key;
492  $deleteOids = array_keys($this->deletedObjects);
493  }
494  return $deletedOids;
495  }
496 
497  /**
498  * Listen to StateChangeEvents
499  * @param $event StateChangeEvent instance
500  */
501  public function stateChanged(StateChangeEvent $event) {
502  $object = $event->getObject();
503 
504  // make sure the event contains an object and don't listen to detached object changes
505  if (!$object || in_array($object->getOID()->__toString(), array_map(function($obj) {
506  return $obj ? $obj->getOID()->__toString() : null;
507  }, $this->detachedObjects))) {
508  return;
509  }
510 
511  $oldState = $event->getOldValue();
512  $newState = $event->getNewValue();
513  if (self::$isDebugEnabled) {
514  self::$logger->debug("State changed: ".$object->getOID()." old:".$oldState." new:".$newState);
515  }
516  switch ($newState) {
518  $this->registerNew($object);
519  break;
520 
522  $this->registerDirty($object);
523  break;
524 
526  $this->registerDeleted($object);
527  break;
528  }
529  }
530 
531  /**
532  * Dump the registry content into a string
533  * @return String
534  */
535  protected function dump() {
536  $str = '';
537  foreach (array_values($this->loadedObjects) as $curObject) {
538  $str .= $curObject->dump();
539  }
540  return $str;
541  }
542 }
543 ?>
commitImpl($collect)
Commit the transaction.
static getDummyId()
Get a dummy id ("wcmf" + unique 32 character string).
Definition: ObjectId.php:230
EventManager is responsible for dispatching events to registered listeners.
getObject()
Get the object whose state has changed.
PersistenceException signals an exception in the persistence service.
processInserts()
Process the new objects queue.
getOID()
Get the object id of the PersistentObject.
__toString()
Get a string representation of the object id.
Definition: ObjectId.php:215
const BEFORE_COMMIT
A BEFORE_COMMIT event occurs before the transaction is committed.
StateChangeEvent signals a change of the state of a PersistentObject instance.
registerNew(PersistentObject $object)
Register a newly created object.
processUpdates()
Process the dirty objects queue.
const AFTER_COMMIT
An AFTER_COMMIT event occurs after the transaction is committed.
TransactionEvent instances are fired at different phases of a transaction.
ObjectId is the unique identifier of an object.
Definition: ObjectId.php:28
registerDeleted(PersistentObject $object)
Register a deleted object.
registerLoaded(PersistentObject $object)
Register a loaded object.
PersistenceFacade defines the interface for PersistenceFacade implementations.
registerDirty(PersistentObject $object)
Register a dirty object.
processDeletes()
Process the deleted objects queue.
Default implementation of Transaction.
static getLogger($name)
Get the logger with the given name.
Definition: LogManager.php:37
__construct(PersistenceFacade $persistenceFacade, EventManager $eventManager)
Constructor.
$loadedObjects
Contains all loaded objects no matter which state they have.
dump()
Dump the registry content into a string.
PersistentObject defines the interface of all persistent objects.
Transaction implements the Unit of Work pattern as it defines the interface for maintaining a list of...
Definition: Transaction.php:21
const AFTER_ROLLBACK
An AFTER_ROLLBACK event occurs after the transaction is rolled back.
getState()
Get the object's state:
LogManager is used to retrieve Logger instances.
Definition: LogManager.php:20
dump()
Get a string representation of the values of the PersistentObject.
stateChanged(StateChangeEvent $event)
Listen to StateChangeEvents.