Skip to content

Commit abd8991

Browse files
committed
Merge remote-tracking branch 'origin/1.6.x' into feat-logs-db
2 parents 5951640 + 859d556 commit abd8991

File tree

9 files changed

+142
-68
lines changed

9 files changed

+142
-68
lines changed

composer.json

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,6 @@
9191
"laravel/pint": "^1.14",
9292
"phpbench/phpbench": "^1.2"
9393
},
94-
"repositories": [
95-
{
96-
"type": "vcs",
97-
"url": "https://github.com/utopia-php/queue"
98-
}
99-
],
10094
"provide": {
10195
"ext-phpiredis": "*"
10296
},

composer.lock

Lines changed: 10 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Appwrite/Auth/OAuth2/Slack.php

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ class Slack extends OAuth2
2020
* @var array
2121
*/
2222
protected array $scopes = [
23-
'identity.avatar',
24-
'identity.basic',
25-
'identity.email',
26-
'identity.team'
23+
'openid',
24+
'email',
25+
'profile'
2726
];
2827

2928
/**
@@ -35,14 +34,15 @@ public function getName(): string
3534
}
3635

3736
/**
37+
* @link https://api.slack.com/authentication/oauth-v2
38+
*
3839
* @return string
3940
*/
4041
public function getLoginURL(): string
4142
{
42-
// https://api.slack.com/docs/oauth#step_1_-_sending_users_to_authorize_and_or_install
43-
return 'https://slack.com/oauth/authorize?' . \http_build_query([
43+
return 'https://slack.com/oauth/v2/authorize?' . \http_build_query([
4444
'client_id' => $this->appID,
45-
'scope' => \implode(' ', $this->getScopes()),
45+
'user_scope' => \implode(' ', $this->getScopes()),
4646
'redirect_uri' => $this->callback,
4747
'state' => \json_encode($this->state)
4848
]);
@@ -56,16 +56,15 @@ public function getLoginURL(): string
5656
protected function getTokens(string $code): array
5757
{
5858
if (empty($this->tokens)) {
59-
// https://api.slack.com/docs/oauth#step_3_-_exchanging_a_verification_code_for_an_access_token
6059
$this->tokens = \json_decode($this->request(
6160
'GET',
62-
'https://slack.com/api/oauth.access?' . \http_build_query([
61+
'https://slack.com/api/oauth.v2.access?' . \http_build_query([
6362
'client_id' => $this->appID,
6463
'client_secret' => $this->appSecret,
6564
'code' => $code,
6665
'redirect_uri' => $this->callback
6766
])
68-
), true);
67+
), true)['authed_user'] ?? [];
6968
}
7069

7170
return $this->tokens;
@@ -80,13 +79,13 @@ public function refreshTokens(string $refreshToken): array
8079
{
8180
$this->tokens = \json_decode($this->request(
8281
'GET',
83-
'https://slack.com/api/oauth.access?' . \http_build_query([
82+
'https://slack.com/api/oauth.v2.access?' . \http_build_query([
8483
'client_id' => $this->appID,
8584
'client_secret' => $this->appSecret,
8685
'refresh_token' => $refreshToken,
8786
'grant_type' => 'refresh_token'
8887
])
89-
), true);
88+
), true)['authed_user'] ?? [];
9089

9190
if (empty($this->tokens['refresh_token'])) {
9291
$this->tokens['refresh_token'] = $refreshToken;
@@ -161,9 +160,9 @@ protected function getUser(string $accessToken): array
161160
if (empty($this->user)) {
162161
$user = $this->request(
163162
'GET',
164-
'https://slack.com/api/users.identity?token=' . \urlencode($accessToken)
163+
'https://slack.com/api/users.identity',
164+
['Authorization: Bearer ' . \urlencode($accessToken)]
165165
);
166-
167166
$this->user = \json_decode($user, true);
168167
}
169168

src/Appwrite/Platform/Tasks/ScheduleBase.php

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use Utopia\Database\Query;
1212
use Utopia\Database\Validator\Authorization;
1313
use Utopia\Platform\Action;
14-
use Utopia\Queue\Publisher;
14+
use Utopia\Pools\Group;
1515
use Utopia\System\System;
1616

1717
use function Swoole\Coroutine\run;
@@ -26,18 +26,18 @@ abstract class ScheduleBase extends Action
2626
abstract public static function getName(): string;
2727
abstract public static function getSupportedResource(): string;
2828
abstract public static function getCollectionId(): string;
29-
abstract protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void;
29+
abstract protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void;
3030

3131
public function __construct()
3232
{
3333
$type = static::getSupportedResource();
3434

3535
$this
3636
->desc("Execute {$type}s scheduled in Appwrite")
37-
->inject('publisher')
37+
->inject('pools')
3838
->inject('dbForPlatform')
3939
->inject('getProjectDB')
40-
->callback(fn (Publisher $publisher, Database $dbForPlatform, callable $getProjectDB) => $this->action($publisher, $dbForPlatform, $getProjectDB));
40+
->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB));
4141
}
4242

4343
protected function updateProjectAccess(Document $project, Database $dbForPlatform): void
@@ -56,7 +56,7 @@ protected function updateProjectAccess(Document $project, Database $dbForPlatfor
5656
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
5757
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
5858
*/
59-
public function action(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
59+
public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
6060
{
6161
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
6262
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@@ -125,15 +125,17 @@ public function action(Publisher $publisher, Database $dbForPlatform, callable $
125125
$latestDocument = \end($results);
126126
}
127127

128+
$pools->reclaim();
129+
128130
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
129131

130132
Console::success("Starting timers at " . DateTime::now());
131133

132-
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $publisher, $getProjectDB) {
134+
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
133135
/**
134136
* The timer synchronize $schedules copy with database collection.
135137
*/
136-
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule) {
138+
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) {
137139
$time = DateTime::now();
138140
$timerStart = \microtime(true);
139141

@@ -182,15 +184,17 @@ public function action(Publisher $publisher, Database $dbForPlatform, callable $
182184
$lastSyncUpdate = $time;
183185
$timerEnd = \microtime(true);
184186

187+
$pools->reclaim();
188+
185189
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
186190
});
187191

188192
Timer::tick(
189193
static::ENQUEUE_TIMER * 1000,
190-
fn () => $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB)
194+
fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB)
191195
);
192196

193-
$this->enqueueResources($publisher, $dbForPlatform, $getProjectDB);
197+
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
194198
});
195199
}
196200
}

src/Appwrite/Platform/Tasks/ScheduleExecutions.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use Appwrite\Event\Func;
66
use Swoole\Coroutine as Co;
77
use Utopia\Database\Database;
8-
use Utopia\Queue\Publisher;
8+
use Utopia\Pools\Group;
99

1010
class ScheduleExecutions extends ScheduleBase
1111
{
@@ -27,9 +27,11 @@ public static function getCollectionId(): string
2727
return 'executions';
2828
}
2929

30-
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
30+
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
3131
{
32-
$queueForFunctions = new Func($publisher);
32+
$queue = $pools->get('publisher')->pop();
33+
$connection = $queue->getResource();
34+
$queueForFunctions = new Func($connection);
3335
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
3436

3537
foreach ($this->schedules as $schedule) {
@@ -81,5 +83,7 @@ protected function enqueueResources(Publisher $publisher, Database $dbForPlatfor
8183

8284
unset($this->schedules[$schedule['$internalId']]);
8385
}
86+
87+
$queue->reclaim();
8488
}
8589
}

src/Appwrite/Platform/Tasks/ScheduleFunctions.php

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use Utopia\CLI\Console;
88
use Utopia\Database\Database;
99
use Utopia\Database\DateTime;
10-
use Utopia\Queue\Publisher;
10+
use Utopia\Pools\Group;
1111

1212
class ScheduleFunctions extends ScheduleBase
1313
{
@@ -31,7 +31,7 @@ public static function getCollectionId(): string
3131
return 'functions';
3232
}
3333

34-
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
34+
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
3535
{
3636
$timerStart = \microtime(true);
3737
$time = DateTime::now();
@@ -70,9 +70,12 @@ protected function enqueueResources(Publisher $publisher, Database $dbForPlatfor
7070
}
7171

7272
foreach ($delayedExecutions as $delay => $scheduleKeys) {
73-
\go(function () use ($delay, $scheduleKeys, $publisher, $dbForPlatform) {
73+
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
7474
\sleep($delay); // in seconds
7575

76+
$queue = $pools->get('publisher')->pop();
77+
$connection = $queue->getResource();
78+
7679
foreach ($scheduleKeys as $scheduleKey) {
7780
// Ensure schedule was not deleted
7881
if (!\array_key_exists($scheduleKey, $this->schedules)) {
@@ -83,7 +86,8 @@ protected function enqueueResources(Publisher $publisher, Database $dbForPlatfor
8386

8487
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
8588

86-
$queueForFunctions = new Func($publisher);
89+
$queueForFunctions = new Func($connection);
90+
8791
$queueForFunctions
8892
->setType('schedule')
8993
->setFunction($schedule['resource'])
@@ -92,6 +96,8 @@ protected function enqueueResources(Publisher $publisher, Database $dbForPlatfor
9296
->setProject($schedule['project'])
9397
->trigger();
9498
}
99+
100+
$queue->reclaim();
95101
});
96102
}
97103

src/Appwrite/Platform/Tasks/ScheduleMessages.php

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Appwrite\Event\Messaging;
66
use Utopia\Database\Database;
7-
use Utopia\Queue\Publisher;
7+
use Utopia\Pools\Group;
88

99
class ScheduleMessages extends ScheduleBase
1010
{
@@ -26,7 +26,7 @@ public static function getCollectionId(): string
2626
return 'messages';
2727
}
2828

29-
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
29+
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
3030
{
3131
foreach ($this->schedules as $schedule) {
3232
if (!$schedule['active']) {
@@ -40,9 +40,13 @@ protected function enqueueResources(Publisher $publisher, Database $dbForPlatfor
4040
continue;
4141
}
4242

43-
\go(function () use ($schedule, $publisher, $dbForPlatform) {
43+
\go(function () use ($schedule, $pools, $dbForPlatform) {
44+
$queue = $pools->get('publisher')->pop();
45+
$connection = $queue->getResource();
46+
$queueForMessaging = new Messaging($connection);
47+
4448
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
45-
$queueForMessaging = new Messaging($publisher);
49+
4650
$queueForMessaging
4751
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
4852
->setMessageId($schedule['resourceId'])
@@ -54,6 +58,8 @@ protected function enqueueResources(Publisher $publisher, Database $dbForPlatfor
5458
$schedule['$id'],
5559
);
5660

61+
$queue->reclaim();
62+
5763
unset($this->schedules[$schedule['$internalId']]);
5864
});
5965
}

0 commit comments

Comments
 (0)