Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox: Improve batch handling #1286

Merged
merged 41 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4e95a1d
Make it "work"
obenland Feb 4, 2025
7210118
Updated approach
obenland Feb 5, 2025
ae480a3
Account for in-progress batches when reprocessing
obenland Feb 6, 2025
8a3af5b
Fix tests
obenland Feb 6, 2025
d588ce9
Add/sent to inboxes actions (#1278)
pfefferle Feb 6, 2025
d3049a9
Publish post after processing interactees if they don't get sent to f…
obenland Feb 6, 2025
478a1fc
Revert unnecessary docs change
obenland Feb 6, 2025
e6a75cd
store object id as meta
pfefferle Feb 6, 2025
1a0d39a
Revert object id meta
obenland Feb 7, 2025
628f0bf
With meta now registered, it shouldn't need a fallback
obenland Feb 7, 2025
2beeb99
Stream: Only surface errors in Outbox processing (#1240)
obenland Feb 7, 2025
a7c036d
Move publish_post to calling function
obenland Feb 7, 2025
706c246
Restore fallback for tests
obenland Feb 7, 2025
d889ed3
Outbox Batch: Only pass outbox id to jobs (#1285)
obenland Feb 7, 2025
e885e81
Improve Outbox Data
pfefferle Feb 7, 2025
519c3f1
Fix tests
pfefferle Feb 7, 2025
90187c8
invalidate old unprocessed items
pfefferle Feb 7, 2025
5e7a48f
ignore type on `Delete`
pfefferle Feb 7, 2025
e4727ce
Fix phpcs
pfefferle Feb 7, 2025
39a3a1c
delete offset
pfefferle Feb 7, 2025
7976eec
remove actor id from the schedule
pfefferle Feb 7, 2025
f914b2e
Merge branch 'trunk' into improve/outbox-data
pfefferle Feb 11, 2025
7d7dfff
Merge branch 'trunk' into improve/outbox-data
pfefferle Feb 11, 2025
1a99f44
Update includes/collection/class-outbox.php
pfefferle Feb 11, 2025
53bf9c3
small phpcs changes
pfefferle Feb 11, 2025
5cfcc2b
Update includes/collection/class-outbox.php
obenland Feb 11, 2025
a352102
Add missing dispatcher class
pfefferle Feb 11, 2025
117eae6
Fix C&P issues
pfefferle Feb 11, 2025
b8fa40f
Get offset for existing item
obenland Feb 11, 2025
c4130ed
Populate title for reposts
obenland Feb 11, 2025
343a3c1
use instanceof check
obenland Feb 11, 2025
3bd359a
Also remove process_outbox events
obenland Feb 11, 2025
118d521
backslashes
obenland Feb 11, 2025
c0a2f54
Move announce scheduler, because it also announces comments
pfefferle Feb 12, 2025
9791ef4
change checks a bit
pfefferle Feb 12, 2025
181622e
add item for debugging
pfefferle Feb 12, 2025
7ea0b15
add some debug data
pfefferle Feb 12, 2025
57d3b96
invalidate announces
pfefferle Feb 12, 2025
b33699e
fix phpcs
pfefferle Feb 12, 2025
2a9f084
remove escaping
pfefferle Feb 12, 2025
5c3d8b1
remove escaping
pfefferle Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions includes/class-activitypub.php
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,17 @@ private static function register_post_types() {
)
);

\register_post_meta(
Outbox::POST_TYPE,
'_activitypub_object_id',
array(
'type' => 'string',
'single' => true,
'description' => 'The ID (ActivityPub URI) of the object that the outbox item is about.',
'sanitize_callback' => 'sanitize_url',
)
);

\register_post_meta(
Outbox::POST_TYPE,
'activitypub_content_visibility',
Expand Down
19 changes: 10 additions & 9 deletions includes/class-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public static function process_outbox( $id ) {
'activitypub_async_batch',
array(
self::$callback,
$actor->get__id(),
$outbox_item->ID,
self::$batch_size,
\get_post_meta( $outbox_item->ID, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore
Expand All @@ -104,26 +103,27 @@ public static function process_outbox( $id ) {
} else {
// No followers to process for this update. We're done.
\wp_publish_post( $outbox_item );
\delete_post_meta( $outbox_item->ID, '_activitypub_outbox_offset' );
pfefferle marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Asynchronously runs batch processing routines.
*
* @param int $actor_id The actor ID.
* @param int $outbox_item_id The Outbox item ID.
* @param int $batch_size Optional. The batch size. Default 50.
* @param int $offset Optional. The offset. Default 0.
*
* @return array|void The next batch of followers to process, or void if done.
*/
public static function send_to_followers( $actor_id, $outbox_item_id, $batch_size = 50, $offset = 0 ) {
public static function send_to_followers( $outbox_item_id, $batch_size = 50, $offset = 0 ) {
$activity = self::get_activity( $outbox_item_id );
$actor = self::get_actor( \get_post( $outbox_item_id ) );
$json = $activity->to_json();
$inboxes = Followers::get_inboxes_for_activity( $json, $actor_id, $batch_size, $offset );
$inboxes = Followers::get_inboxes_for_activity( $json, $actor->get__id(), $batch_size, $offset );

foreach ( $inboxes as $inbox ) {
$result = safe_remote_post( $inbox, $json, $actor_id );
$result = safe_remote_post( $inbox, $json, $actor->get__id() );

/**
* Fires after an Activity has been sent to an inbox.
Expand All @@ -134,7 +134,7 @@ public static function send_to_followers( $actor_id, $outbox_item_id, $batch_siz
* @param int $actor_id The actor ID.
* @param int $outbox_item_id The Outbox item ID.
*/
\do_action( 'activitypub_sent_to_inbox', $result, $inbox, $json, $actor_id, $outbox_item_id );
\do_action( 'activitypub_sent_to_inbox', $result, $inbox, $json, $actor->get__id(), $outbox_item_id );
}

if ( is_countable( $inboxes ) && count( $inboxes ) < self::$batch_size ) {
Expand All @@ -150,7 +150,7 @@ public static function send_to_followers( $actor_id, $outbox_item_id, $batch_siz
* @param int $batch_size The batch size.
* @param int $offset The offset.
*/
\do_action( 'activitypub_outbox_processing_complete', $inboxes, $json, $actor_id, $outbox_item_id, $batch_size, $offset );
\do_action( 'activitypub_outbox_processing_complete', $inboxes, $json, $actor->get__id(), $outbox_item_id, $batch_size, $offset );

// No more followers to process for this update.
\wp_publish_post( $outbox_item_id );
Expand All @@ -167,9 +167,9 @@ public static function send_to_followers( $actor_id, $outbox_item_id, $batch_siz
* @param int $batch_size The batch size.
* @param int $offset The offset.
*/
\do_action( 'activitypub_outbox_processing_batch_complete', $inboxes, $json, $actor_id, $outbox_item_id, $batch_size, $offset );
\do_action( 'activitypub_outbox_processing_batch_complete', $inboxes, $json, $actor->get__id(), $outbox_item_id, $batch_size, $offset );

return array( $actor_id, $outbox_item_id, $batch_size, $offset + $batch_size );
return array( $outbox_item_id, $batch_size, $offset + $batch_size );
}
}

Expand Down Expand Up @@ -373,6 +373,7 @@ private static function get_activity( $outbox_item ) {
* Get the Actor object from the Outbox item.
*
* @param \WP_Post $outbox_item The Outbox post.
*
* @return \Activitypub\Model\User|\Activitypub\Model\Blog|\WP_Error The Actor object or WP_Error.
*/
private static function get_actor( $outbox_item ) {
Expand Down
59 changes: 58 additions & 1 deletion includes/class-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
use Activitypub\Scheduler\Post;
use Activitypub\Scheduler\Actor;
use Activitypub\Scheduler\Comment;
use Activitypub\Collection\Actors;
use Activitypub\Collection\Outbox;
use Activitypub\Collection\Followers;

use Activitypub\Transformer\Factory;
/**
* Scheduler class.
*
Expand All @@ -34,6 +35,7 @@ public static function init() {
\add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) );

\add_action( 'post_activitypub_add_to_outbox', array( self::class, 'schedule_outbox_activity_for_federation' ) );
\add_action( 'post_activitypub_add_to_outbox', array( self::class, 'schedule_announce_activity' ), 10, 4 );
}

/**
Expand Down Expand Up @@ -313,4 +315,59 @@ private static function next_scheduled_hook( $hook ) {

return $next;
}

/**
* Send announces.
*
* @param int $outbox_activity_id The outbox activity ID.
* @param Activity $activity_object The activity object.
* @param int $actor_id The actor ID.
* @param int $content_visibility The content visibility.
*/
public static function schedule_announce_activity( $outbox_activity_id, $activity_object, $actor_id, $content_visibility ) {
// Only if we're in both Blog and User modes.
if ( ACTIVITYPUB_ACTOR_AND_BLOG_MODE !== \get_option( 'activitypub_actor_mode', ACTIVITYPUB_ACTOR_MODE ) ) {
return;
}

// Only if this isn't the Blog Actor.
if ( Actors::BLOG_USER_ID === $actor_id ) {
return;
}

// Only if the content is public or quiet public.
if ( ACTIVITYPUB_CONTENT_VISIBILITY_PUBLIC !== $content_visibility ) {
return;
}

$activity_type = \get_post_meta( $outbox_activity_id, '_activitypub_activity_type', true );

// Only if the activity is a Create, Update or Delete.
if ( ! in_array( $activity_type, array( 'Create', 'Update', 'Delete' ), true ) ) {
return;
}

// Check if the object is an article, image, audio, video, event or document and ignore profile updates and other activities.
if ( ! in_array( $activity_object->get_type(), array( 'Note', 'Article', 'Image', 'Audio', 'Video', 'Event', 'Document' ), true ) ) {
return;
}

$transformer = Factory::get_transformer( $activity_object );
if ( ! $transformer || \is_wp_error( $transformer ) ) {
return;
}

$post = get_post( $outbox_activity_id );
$activity = $transformer->to_activity( $activity_type );
$activity->set_id( $post->guid );

$outbox_activity_id = Outbox::add( $activity, 'Announce', Actors::BLOG_USER_ID );

if ( ! $outbox_activity_id ) {
return;
}

// Schedule the outbox item for federation.
self::schedule_outbox_activity_for_federation( $outbox_activity_id );
}
}
79 changes: 78 additions & 1 deletion includes/collection/class-outbox.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

namespace Activitypub\Collection;

use Activitypub\Dispatcher;

use function Activitypub\is_activity;

/**
* ActivityPub Outbox Collection
*
Expand Down Expand Up @@ -38,14 +42,29 @@ public static function add( $activity_object, $activity_type, $user_id, $content
break;
}

$title = $activity_object->get_name() ?? $activity_object->get_content();
$activitypub_object_id = $activity_object->get_id();

if ( ! $title && is_activity( $activity_object ) && $activity_object->get_object() instanceof \Activitypub\Activity\Base_Object ) {
$title = $activity_object->get_object()->get_name() ?? $activity_object->get_object()->get_content();
$activitypub_object_id = $activity_object->get_object()->get_id();
}

$outbox_item = array(
'post_type' => self::POST_TYPE,
'post_title' => $activity_object->get_id(),
'post_title' => sprintf(
/* translators: 1. Activity type, 2. Object type, 3. Object Title or Excerpt */
__( '[%1$s] %2$s: %3$s', 'activitypub' ),
$activity_type,
$activity_object->get_type(),
\wp_trim_words( $title, 5 )
),
'post_content' => wp_slash( $activity_object->to_json() ),
// ensure that user ID is not below 0.
'post_author' => \max( $user_id, 0 ),
'post_status' => 'pending',
'meta_input' => array(
'_activitypub_object_id' => $activitypub_object_id,
'_activitypub_activity_type' => $activity_type,
'_activitypub_activity_actor' => $actor_type,
'activitypub_content_visibility' => $content_visibility,
Expand All @@ -72,6 +91,64 @@ public static function add( $activity_object, $activity_type, $user_id, $content
return false;
}

self::invalidate_existing_items( $activitypub_object_id, $activity_type, $id );

return $id;
}

/**
* Invalidate existing outbox items with the same activity type and object ID
* by setting their status to 'publish'.
*
* @param string $object_id The ID of the activity object.
* @param string $activity_type The type of the activity.
* @param int $current_id The ID of the current outbox item to exclude.
*
* @return void
*/
private static function invalidate_existing_items( $object_id, $activity_type, $current_id ) {
$meta_query = array(
array(
'key' => '_activitypub_object_id',
'value' => $object_id,
pfefferle marked this conversation as resolved.
Show resolved Hide resolved
),
);

// For non-Delete activities, only invalidate items of the same type.
if ( 'Delete' !== $activity_type ) {
$meta_query[] = array(
'key' => '_activitypub_activity_type',
'value' => $activity_type,
);
}

$existing_items = get_posts(
array(
'post_type' => self::POST_TYPE,
'post_status' => 'pending',
'exclude' => array( $current_id ),
// phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query
'meta_query' => $meta_query,
'fields' => 'ids',
)
);

foreach ( $existing_items as $existing_item_id ) {
pfefferle marked this conversation as resolved.
Show resolved Hide resolved
$event_args = array(
Dispatcher::$callback,
$existing_item_id,
Dispatcher::$batch_size,
\get_post_meta( $existing_item_id, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore
);

$timestamp = \wp_next_scheduled( 'activitypub_async_batch', $event_args );
\wp_unschedule_event( $timestamp, 'activitypub_async_batch', $event_args );

$timestamp = \wp_next_scheduled( 'activitypub_process_outbox', array( $existing_item_id ) );
\wp_unschedule_event( $timestamp, 'activitypub_process_outbox', array( $existing_item_id ) );

\wp_publish_post( $existing_item_id );
\delete_post_meta( $existing_item_id, '_activitypub_outbox_offset' );
}
}
}
59 changes: 58 additions & 1 deletion includes/debug.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,61 @@ function allow_localhost( $parsed_args ) {

return $parsed_args;
}
add_filter( 'http_request_args', '\Activitypub\allow_localhost' );
\add_filter( 'http_request_args', '\Activitypub\allow_localhost' );

/**
* Debug the outbox post type.
*
* @param array $args The arguments for the post type.
* @param string $post_type The post type.
*
* @return array The arguments for the post type.
*/
function debug_outbox_post_type( $args, $post_type ) {
if ( 'ap_outbox' !== $post_type ) {
return $args;
}

$args['show_ui'] = true;
$args['menu_icon'] = 'dashicons-upload';

return $args;
}
\add_filter( 'register_post_type_args', '\Activitypub\debug_outbox_post_type', 10, 2 );

/**
* Debug the outbox post type column.
*
* @param array $columns The columns.
* @param string $post_type The post type.
*
* @return array The updated columns.
*/
function debug_outbox_post_type_column( $columns, $post_type ) {
if ( 'ap_outbox' !== $post_type ) {
return $columns;
}

$columns['ap_outbox_meta'] = 'Meta';

return $columns;
}
\add_filter( 'manage_posts_columns', '\Activitypub\debug_outbox_post_type_column', 10, 2 );

/**
* Debug the outbox post type meta.
*
* @param string $column_name The column name.
* @param int $post_id The post ID.
*
* @return void
*/
function manage_posts_custom_column( $column_name, $post_id ) {
if ( 'ap_outbox_meta' === $column_name ) {
$meta = \get_post_meta( $post_id );
foreach ( $meta as $key => $value ) {
echo \esc_attr( $key ) . ': ' . \esc_html( $value[0] ) . '<br>';
}
}
}
\add_action( 'manage_posts_custom_column', '\Activitypub\manage_posts_custom_column', 10, 2 );
Loading
Loading