Skip to content

Commit de40b26

Browse files
authored
chore: add cdc pattern example (#369)
* chore: add mutations table * refactor: use codely for commands instead of codelytv * chore: remove symfony deprecations * chore: add command to publish mutations as domain events * fix: no finalize kernel classes
1 parent ecf8d27 commit de40b26

16 files changed

Lines changed: 297 additions & 70 deletions

File tree

apps/backoffice/backend/src/BackofficeBackendKernel.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public function getProjectDir(): string
3636
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
3737
{
3838
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
39-
$container->setParameter('container.dumper.inline_class_loader', true);
39+
$container->setParameter('.container.dumper.inline_class_loader', true);
4040
$confDir = $this->getProjectDir() . '/config';
4141

4242
$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');

apps/backoffice/frontend/src/BackofficeFrontendKernel.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public function getProjectDir(): string
3636
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
3737
{
3838
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
39-
$container->setParameter('container.dumper.inline_class_loader', true);
39+
$container->setParameter('.container.dumper.inline_class_loader', true);
4040
$confDir = $this->getProjectDir() . '/config';
4141

4242
$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');

apps/mooc/backend/src/Command/DomainEvents/MySql/ConsumeMySqlDomainEventsCommand.php

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
99
use CodelyTv\Shared\Infrastructure\Bus\Event\MySql\MySqlDoctrineDomainEventsConsumer;
1010
use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections;
11+
use Symfony\Component\Console\Attribute\AsCommand;
1112
use Symfony\Component\Console\Command\Command;
1213
use Symfony\Component\Console\Input\InputArgument;
1314
use Symfony\Component\Console\Input\InputInterface;
1415
use Symfony\Component\Console\Output\OutputInterface;
1516

1617
use function Lambdish\Phunctional\pipe;
1718

19+
#[AsCommand(name: 'codely:domain-events:mysql:consume', description: 'Consume domain events from MySql',)]
1820
final class ConsumeMySqlDomainEventsCommand extends Command
1921
{
20-
protected static $defaultName = 'codelytv:domain-events:mysql:consume';
21-
2222
public function __construct(
2323
private readonly MySqlDoctrineDomainEventsConsumer $consumer,
2424
private readonly DatabaseConnections $connections,
@@ -29,9 +29,7 @@ public function __construct(
2929

3030
protected function configure(): void
3131
{
32-
$this
33-
->setDescription('Consume domain events from MySql')
34-
->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
32+
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
3533
}
3634

3735
protected function execute(InputInterface $input, OutputInterface $output): int
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents;
6+
7+
use CodelyTv\Mooc\Courses\Infrastructure\Cdc\DatabaseMutationToCourseCreatedDomainEvent;
8+
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
9+
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationAction;
10+
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationToDomainEvent;
11+
use Doctrine\ORM\EntityManager;
12+
use RuntimeException;
13+
use Symfony\Component\Console\Attribute\AsCommand;
14+
use Symfony\Component\Console\Command\Command;
15+
use Symfony\Component\Console\Input\InputArgument;
16+
use Symfony\Component\Console\Input\InputInterface;
17+
use Symfony\Component\Console\Output\OutputInterface;
18+
19+
#[AsCommand(
20+
name: 'codely:domain-events:generate-from-mutations',
21+
description: 'Publish domain events from mutations',
22+
)]
23+
final class PublishDomainEventsFromMutationsCommand extends Command
24+
{
25+
private array $transformers;
26+
27+
public function __construct(
28+
private readonly EntityManager $entityManager,
29+
private readonly EventBus $eventBus
30+
) {
31+
parent::__construct();
32+
33+
$this->transformers = [
34+
'courses' => [
35+
DatabaseMutationAction::INSERT->value => DatabaseMutationToCourseCreatedDomainEvent::class,
36+
DatabaseMutationAction::UPDATE->value => null,
37+
DatabaseMutationAction::DELETE->value => null,
38+
],
39+
];
40+
}
41+
42+
protected function configure(): void
43+
{
44+
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of mutations to process');
45+
}
46+
47+
protected function execute(InputInterface $input, OutputInterface $output): int
48+
{
49+
$totalMutations = (int) $input->getArgument('quantity');
50+
51+
$this->entityManager->wrapInTransaction(function (EntityManager $entityManager) use ($totalMutations) {
52+
$mutations = $entityManager->getConnection()
53+
->executeQuery("SELECT * FROM mutations ORDER BY id ASC LIMIT $totalMutations FOR UPDATE")
54+
->fetchAllAssociative();
55+
56+
foreach ($mutations as $mutation) {
57+
$transformer = $this->findTransformer($mutation['table_name'], $mutation['operation']);
58+
59+
if ($transformer === null) {
60+
echo sprintf("Ignoring %s %s\n", $mutation['table_name'], $mutation['operation']);
61+
continue;
62+
}
63+
64+
$domainEvents = $transformer->transform($mutation);
65+
66+
$this->eventBus->publish(...$domainEvents);
67+
}
68+
69+
$entityManager->getConnection()->executeStatement(
70+
sprintf('DELETE FROM mutations WHERE id IN (%s)', implode(',', array_column($mutations, 'id')))
71+
);
72+
});
73+
74+
return 0;
75+
}
76+
77+
private function findTransformer(string $tableName, string $operation): ?DatabaseMutationToDomainEvent
78+
{
79+
if (!array_key_exists($tableName, $this->transformers) && array_key_exists(
80+
$operation,
81+
$this->transformers[$tableName]
82+
)) {
83+
throw new RuntimeException("Transformer not found for table $tableName and operation $operation");
84+
}
85+
86+
/** @var class-string<DatabaseMutationToDomainEvent>|null $class */
87+
$class = $this->transformers[$tableName][$operation];
88+
89+
return $class ? new $class() : null;
90+
}
91+
}

apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConfigureRabbitMqCommand.php

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents\RabbitMq;
66

77
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqConfigurer;
8+
use Symfony\Component\Console\Attribute\AsCommand;
89
use Symfony\Component\Console\Command\Command;
910
use Symfony\Component\Console\Input\InputInterface;
1011
use Symfony\Component\Console\Output\OutputInterface;
1112
use Traversable;
1213

14+
#[AsCommand(
15+
name: 'codely:domain-events:rabbitmq:configure',
16+
description: 'Configure the RabbitMQ to allow publish & consume domain events',
17+
)]
1318
final class ConfigureRabbitMqCommand extends Command
1419
{
15-
protected static $defaultName = 'codelytv:domain-events:rabbitmq:configure';
16-
1720
public function __construct(
1821
private readonly RabbitMqConfigurer $configurer,
1922
private readonly string $exchangeName,
@@ -22,11 +25,6 @@ public function __construct(
2225
parent::__construct();
2326
}
2427

25-
protected function configure(): void
26-
{
27-
$this->setDescription('Configure the RabbitMQ to allow publish & consume domain events');
28-
}
29-
3028
protected function execute(InputInterface $input, OutputInterface $output): int
3129
{
3230
$this->configurer->configure($this->exchangeName, ...iterator_to_array($this->subscribers));

apps/mooc/backend/src/Command/DomainEvents/RabbitMq/ConsumeRabbitMqDomainEventsCommand.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@
77
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
88
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqDomainEventsConsumer;
99
use CodelyTv\Shared\Infrastructure\Doctrine\DatabaseConnections;
10+
use Symfony\Component\Console\Attribute\AsCommand;
1011
use Symfony\Component\Console\Command\Command;
1112
use Symfony\Component\Console\Input\InputArgument;
1213
use Symfony\Component\Console\Input\InputInterface;
1314
use Symfony\Component\Console\Output\OutputInterface;
1415

1516
use function Lambdish\Phunctional\repeat;
1617

18+
#[AsCommand(
19+
name: 'codely:domain-events:rabbitmq:consume',
20+
description: 'Consume domain events from the RabbitMQ',
21+
)]
1722
final class ConsumeRabbitMqDomainEventsCommand extends Command
1823
{
19-
protected static $defaultName = 'codelytv:domain-events:rabbitmq:consume';
20-
2124
public function __construct(
2225
private readonly RabbitMqDomainEventsConsumer $consumer,
2326
private readonly DatabaseConnections $connections,
@@ -29,7 +32,6 @@ public function __construct(
2932
protected function configure(): void
3033
{
3134
$this
32-
->setDescription('Consume domain events from the RabbitMQ')
3335
->addArgument('queue', InputArgument::REQUIRED, 'Queue name')
3436
->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of events to process');
3537
}

apps/mooc/backend/src/Command/DomainEvents/RabbitMq/GenerateSupervisorRabbitMqConsumerFilesCommand.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,23 @@
77
use CodelyTv\Shared\Domain\Bus\Event\DomainEventSubscriber;
88
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventSubscriberLocator;
99
use CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq\RabbitMqQueueNameFormatter;
10+
use Symfony\Component\Console\Attribute\AsCommand;
1011
use Symfony\Component\Console\Command\Command;
1112
use Symfony\Component\Console\Input\InputArgument;
1213
use Symfony\Component\Console\Input\InputInterface;
1314
use Symfony\Component\Console\Output\OutputInterface;
1415

1516
use function Lambdish\Phunctional\each;
1617

18+
#[AsCommand(
19+
name: 'codely:domain-events:rabbitmq:generate-supervisor-files',
20+
description: 'Generate the supervisor configuration for every RabbitMQ subscriber',
21+
)]
1722
final class GenerateSupervisorRabbitMqConsumerFilesCommand extends Command
1823
{
1924
private const EVENTS_TO_PROCESS_AT_TIME = 200;
2025
private const NUMBERS_OF_PROCESSES_PER_SUBSCRIBER = 1;
2126
private const SUPERVISOR_PATH = __DIR__ . '/../../../../build/supervisor';
22-
protected static $defaultName = 'codelytv:domain-events:rabbitmq:generate-supervisor-files';
2327

2428
public function __construct(private readonly DomainEventSubscriberLocator $locator)
2529
{
@@ -28,9 +32,7 @@ public function __construct(private readonly DomainEventSubscriberLocator $locat
2832

2933
protected function configure(): void
3034
{
31-
$this
32-
->setDescription('Generate the supervisor configuration for every RabbitMQ subscriber')
33-
->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www');
35+
$this->addArgument('command-path', InputArgument::OPTIONAL, 'Path on this is gonna be deployed', '/var/www');
3436
}
3537

3638
protected function execute(InputInterface $input, OutputInterface $output): int
@@ -68,7 +70,7 @@ private function template(): string
6870
{
6971
return <<<EOF
7072
[program:codelytv_{queue_name}]
71-
command = {path}/apps/mooc/backend/bin/console codelytv:domain-events:rabbitmq:consume --env=prod {queue_name} {events_to_process}
73+
command = {path}/apps/mooc/backend/bin/console codely:domain-events:rabbitmq:consume --env=prod {queue_name} {events_to_process}
7274
process_name = %(program_name)s_%(process_num)02d
7375
numprocs = {processes}
7476
startsecs = 1

apps/mooc/backend/src/MoocBackendKernel.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public function getProjectDir(): string
3636
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
3737
{
3838
$container->addResource(new FileResource($this->getProjectDir() . '/config/bundles.php'));
39-
$container->setParameter('container.dumper.inline_class_loader', true);
39+
$container->setParameter('.container.dumper.inline_class_loader', true);
4040
$confDir = $this->getProjectDir() . '/config';
4141

4242
$loader->load($confDir . '/services' . self::CONFIG_EXTS, 'glob');

ecs.php

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,20 @@
77
use Symplify\EasyCodingStandard\Config\ECSConfig;
88

99
return function (ECSConfig $ecsConfig): void {
10-
$ecsConfig->paths([
11-
__DIR__ . '/apps',
12-
__DIR__ . '/src',
13-
__DIR__ . '/tests',
14-
]);
10+
$ecsConfig->paths([__DIR__ . '/apps', __DIR__ . '/src', __DIR__ . '/tests', ]);
1511

16-
$ecsConfig->sets([CodingStyle::DEFAULT]);
12+
$ecsConfig->sets([CodingStyle::DEFAULT]);
1713

18-
$ecsConfig->skip([
19-
FinalClassFixer::class => [
20-
__DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php',
21-
__DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php',
22-
__DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php',
23-
__DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php',
24-
],
25-
__DIR__ . '/apps/backoffice/backend/var',
26-
__DIR__ . '/apps/backoffice/frontend/var',
27-
__DIR__ . '/apps/mooc/backend/var',
28-
__DIR__ . '/apps/mooc/frontend/var',
29-
]);
14+
$ecsConfig->skip([
15+
FinalClassFixer::class => [
16+
__DIR__ . '/apps/backoffice/backend/src/BackofficeBackendKernel.php',
17+
__DIR__ . '/apps/backoffice/frontend/src/BackofficeFrontendKernel.php',
18+
__DIR__ . '/apps/mooc/backend/src/MoocBackendKernel.php',
19+
__DIR__ . '/src/Shared/Infrastructure/Bus/Event/InMemory/InMemorySymfonyEventBus.php',
20+
],
21+
__DIR__ . '/apps/backoffice/backend/var',
22+
__DIR__ . '/apps/backoffice/frontend/var',
23+
__DIR__ . '/apps/mooc/backend/var',
24+
__DIR__ . '/apps/mooc/frontend/var',
25+
]);
3026
};

0 commit comments

Comments
 (0)