package MangoX::Queue;

use Mojo::Base 'Mojo::EventEmitter';

use Carp 'croak';
use DateTime;
use DateTime::Duration;
use Mojo::Log;
use Mango::BSON ':bson';
use MangoX::Queue::Delay;

no warnings 'experimental::smartmatch';

our $VERSION = '0.04';

# A logger
has 'log' => sub { Mojo::Log->new->level('error') };

# The Mango::Collection representing the queue
has 'collection';

# A MangoX::Queue::Delay
has 'delay' => sub { MangoX::Queue::Delay->new };

# How long to wait before assuming a job has failed
has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 };

# How many times to retry a job before giving up
has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 };

# Store Mojo::IOLoop->timer IDs
has 'consumers' => sub { {} };

# Store plugins
has 'plugins' => sub { {} };

# Which status should be collected from the queue (can be scalar or array ref)
has 'pending_status' => sub { 'Pending' };

# The status which is set when a job is consumed from the queue
has 'processing_status' => sub { 'Processing' };

# The status to use when a job has failed
has 'failed_status' => sub { 'Failed' };

sub new {
    my $self = shift->SUPER::new(@_);

    croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection';

    return $self;
}

sub plugin {
    my ($self, $name, $options) = @_;

    croak qq{Plugin $name already loaded} if exists $self->plugins->{$name};

    {
        no strict 'refs';
        unless($name->can('new')) {
            eval "require $name" or croak qq{Failed to load plugin $name: $@};
        }
    }

    eval {
        $self->plugins->{$name} = $name->new(%$options);  
        return 1;          
    } or croak qq{Error calling constructor for plugin $name: $@};

    eval {
        $self->plugins->{$name}->register($self);
        return 1;
    } or croak qq{Error calling register for plugin $name: $@};

    return $self->plugins->{$name};
}

sub get_options {
    my ($self) = @_;

    return {
        query => {
            '$or' => [{
                status => {
                    '$in' => ref($self->pending_status) eq 'ARRAY' ? $self->pending_status : [ $self->pending_status ],
                },
                '$or' => [ { processing => 0 }, { processing => undef } ],
            },{
                status => $self->processing_status,
                processing => {
                    '$lt' => DateTime->now->subtract_duration(DateTime::Duration->new(seconds => $self->timeout))
                }
            }],
            attempt => {
                '$lte' => $self->retries + 1,
            },
        },
        update => {
            '$set' => {
                processing => DateTime->now,
                status => $self->processing_status,
            },
            '$inc' => {
                attempt => 1,
            }
        },
        sort => bson_doc( # Sort by priority, then in order of creation
            'priority' => 1,
            'created' => -1,
        ),
        new => 0, # Get the original object (so we can see status etc)
    };
}

sub enqueue {
    my ($self, @args) = @_;

    # args maybe
    # - 'job_name'
    # - foo => bar, 'job_name'
    # - 'job_name', $callback
    # - foo => bar, 'job_name', $callback

    my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
    my $job = pop @args;
    my %args;
    %args = (@args) if scalar @args;

    my $db_job = {
        priority => $args{priority} // 1,
        created => $args{created} // DateTime->now,
        data => $job,
        status => $args{status} // 'Pending',
        attempt => 1,
    };

    if($callback) {
        return $self->collection->insert($db_job => sub {
            my ($collection, $error, $oid) = @_;
            if($error) {
                $self->emit_safe(error => qq{Error inserting job into collection: $error});
                return;
            }
            $db_job->{_id} = $oid;
            $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
            eval {
                $callback->($db_job);
                return 1;
            } or $self->emit_safe(error => qq{Error in callback: $@});
        });
    } else {
        eval {
            $db_job->{_id} = $self->collection->insert($db_job);
            return 1;
        } or croak qq{Error inserting job into collection: $@};
        $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
        return $db_job;
    }
}

sub watch {
    my ($self, $id_or_job, $status, $callback) = @_;

    my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

    $status //= 'Complete';

    # args
    # - watch $queue $id, 'Status' => $callback

    if($callback) {
        # Non-blocking
        $self->log->debug("Waiting for $id on status $status in non-blocking mode");
        return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
    } else {
        # Blocking
        $self->log->debug("Waiting for $id on status $status in blocking mode");
        return $self->_watch_blocking($id, $status);
    }
}

sub _watch_blocking {
    my ($self, $id, $status) = @_;

    while(1) {
        my $doc = $self->collection->find_one({'_id' => $id});
        $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

        if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
            return 1;
        } else {
            $self->delay->wait;
        }
    }
}

sub _watch_nonblocking {
    my ($self, $id, $status, $callback) = @_;

    $self->collection->find_one({'_id' => $id} => sub {
        my ($cursor, $err, $doc) = @_;
        $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
        
        if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
            $self->log->debug("Status is $status");
            $self->delay->reset;
            $callback->($doc);
        } else {
            $self->log->debug("Job not found or status doesn't match");
            $self->delay->wait(sub {
                return unless Mojo::IOLoop->is_running;
                Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
            });
            return undef;
        }
    });
}

sub requeue {
    my ($self, $job, $callback) = @_;

    $job->{status} = ref($self->pending_status) eq 'ARRAY' ? $self->pending_status->[0] : $self->pending_status;
    return $self->update($job, $callback);
}

sub dequeue {
    my ($self, $id_or_job, $callback) = @_;

    # TODO option to not remove on dequeue?

    my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

    if($callback) {
        $self->collection->remove({'_id' => $id} => sub {
            $callback->();
            $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
        });
    } else {
        $self->collection->remove({'_id' => $id});
        $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
    }
}

sub get {
    my ($self, $id_or_job, $callback) = @_;

    my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

    if($callback) {
        return $self->collection->find_one({'_id' => $id} => sub {
            my ($collection, $error, $doc) = @_;
            $callback->($doc);
        });
    } else {
        return $self->collection->find_one({'_id' => $id});
    }
}

sub update {
    my ($self, $job, $callback) = @_;

    if($callback) {
        return $self->collection->find_one({'_id' => $job->{_id}} => sub {
            my ($collection, $error, $doc) = @_;
            $callback->($doc);
        });
    } else {
        return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1});
    }
}

sub fetch {
    my ($self, @args) = @_;

    # fetch $queue status => 'Complete', sub { my $job = shift; }

    my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
    my %args;
    %args = (@args) if scalar @args;

    $self->log->debug("In fetch");

    if($callback) {
        $self->log->debug("Fetching in non-blocking mode");
        my $consumer_id = (scalar keys %{$self->consumers}) + 1;
        $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
        return $consumer_id;
    } else {
        $self->log->debug("Fetching in blocking mode");
        return $self->_consume_blocking(\%args, 1);
    }
}

sub consume {
    my ($self, @args) = @_;

    # consume $queue status => 'Failed', sub { my $job = shift; }

    my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
    my %args;
    %args = (@args) if scalar @args;

    $self->log->debug("In consume");

    if($callback) {
        $self->log->debug("consuming in non-blocking mode");
        my $consumer_id = (scalar keys %{$self->consumers}) + 1;
        $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
        $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
        return $consumer_id;
    } else {
        $self->log->debug("consuming in blocking mode");
        return $self->_consume_blocking(\%args, 0);
    }
}

sub release {
    my ($self, $consumer_id) = @_;

    $self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id});

    Mojo::IOLoop->remove($self->consumers->{$consumer_id});
    delete $self->consumers->{$consumer_id};

    return;
}

sub _consume_blocking {
    my ($self, $args, $fetch) = @_;

    while(1) {
        my $opts = $self->get_options;
        $opts->{query} = $args if scalar keys %$args;

        my $doc = $self->collection->find_and_modify($opts);
        $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

        if($doc && $doc->{attempt} > $self->retries) {
            $doc->{status} = $self->failed_status;
            $self->update($doc);
            $doc = undef;
            $self->log->debug("Job exceeded retries, status set to failed and job abandoned");
        }

        if($doc) {
            $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
            return $doc;
        } else {
            last if $fetch;
            $self->delay->wait;
        }
    }
}

sub _consume_nonblocking {
    my ($self, $args, $consumer_id, $callback, $fetch) = @_;

    my $opts = $self->get_options;
    $opts->{query} = $args if scalar keys %$args;

    $self->collection->find_and_modify($opts => sub {
        my ($cursor, $err, $doc) = @_;
        $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
        
        if($doc && $doc->{attempt} > $self->retries) {
            $doc->{status} = $self->failed_status;
            $self->update($doc);
            $doc = undef;
            $self->log->debug("Job exceeded retries, status set to failed and job abandoned");
        }

        if($doc) {
            $self->delay->reset;
            $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
            eval {
                $callback->($doc);
                return 1;
            } or $self->emit_safe(error => "Error in callback: $@");
            return unless Mojo::IOLoop->is_running;
            return if $fetch;
            return unless exists $self->consumers->{$consumer_id};
            $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
            $self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
        } else {
            return unless Mojo::IOLoop->is_running;
            return if $fetch;
            $self->delay->wait(sub {
                return unless exists $self->consumers->{$consumer_id};
                $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
                $self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
            });
            return undef;
        }
    });
}

1;

=encoding utf8

=head1 NAME

MangoX::Queue - A MongoDB queue implementation using Mango

=head1 DESCRIPTION

L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.

L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.

For an introduction to L<MangoX::Queue>, see L<MangoX::Queue::Tutorial>.

=head1 SYNOPSIS

=head2 Non-blocking

Non-blocking mode requires a running L<Mojo::IOLoop>.

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To add a job
    enqueue $queue 'test' => sub { my $id = shift; };

    # To set options
    enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; };

    # To watch for a specific job status
    watch $queue $id, 'Complete' => sub {
        # Job status is 'Complete'
    };

    # To fetch a job
    fetch $queue sub {
        my ($job) = @_;
        # ...
    };

    # To get a job by id
    get $queue $id => sub { my $job = shift; };

    # To requeue a job
    requeue $queue $job => sub { my $id = shift; };

    # To dequeue a job
    dequeue $queue $id => sub { };

    # To consume a queue
    my $consumer = consume $queue sub {
        my ($job) = @_;
        # ...
    };

    # To stop consuming a queue
    release $queue $consumer;

    # To listen for errors
    on $queue error => sub { my ($queue, $error) = @_; };

=head2 Blocking

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To add a job
    my $id = enqueue $queue 'test';

    # To set options
    my $id = enqueue $queue priority => 1, created => DateTime->now, 'test';

    # To watch for a specific job status
    watch $queue $id;

    # To fetch a job
    my $job = fetch $queue;

    # To get a job by id
    my $job = get $queue $id;

    # To requeue a job
    my $id = requeue $queue $job;

    # To dequeue a job
    dequeue $queue $id;

    # To consume a queue
    while(my $job = consume $queue) {
        # ...
    }

=head2 Other

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To listen for events
    on $queue enqueued => sub ( my ($queue, $job) = @_; };
    on $queue dequeued => sub ( my ($queue, $job) = @_; };
    on $queue consumed => sub { my ($queue, $job) = @_; };

    # To register a plugin
    plugin $queue 'MangoX::Queue::Plugin::Statsd';

=head1 ATTRIBUTES

L<MangoX::Queue> implements the following attributes.

=head2 collection

    my $collection = $queue->collection;
    $queue->collection($mango->db('foo')->collection('bar'));

    my $queue = MangoX::Queue->new(collection => $collection);

The L<Mango::Collection> representing the MongoDB queue collection.

=head2 delay

    my $delay = $queue->delay;
    $queue->delay(MangoX::Queue::Delay->new);

The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.

=head2 failed_status

    my $status = $queue->failed_status;

    $queue->failed_status('Failed');
    
The status to set failed jobs to (when exceeding retries).

=head2 pending_status

    my $status = $queue->pending_status;

    $queue->pending_status('Pending');
    $queue->pending_status(['Pending', 'Queued']);

The pending status used to find new jobs on the queue.

=head2 plugins

    my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

=head2 processing_status

    my $status = $queue->processing_status;

    $queue->processing_status('Processing');
    
The status to set jobs to when consumed from the queue.

=head2 retries

    my $retries = $queue->retries;
    $queue->retries(5);

The number of times a job will be picked up from the queue before it is
marked as failed.

=head2 timeout

    my $timeout = $queue->timeout;
    $queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.

=head1 EVENTS

L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events.

Events are emitted only for actions on the current queue object, not the entire queue.

=head2 consumed

    on $queue consumed => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is consumed (either via consume or fetch)

=head2 dequeued

    on $queue dequeued => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is dequeued

=head2 enqueued

    on $queue enqueued => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is enqueued

=head1 METHODS

L<MangoX::Queue> implements the following methods.

=head2 consume

    # In blocking mode
    while(my $job = consume $queue) {
        # ...
    }

    # In non-blocking mode
    consume $queue sub {
        my ($job) = @_;
        # ...
    };

Waits for jobs to arrive on the queue, sleeping between queue checks using L<MangoX::Queue::Delay> or L<Mojo::IOLoop>.

Currently sets the status to 'Retrieved' before returning the job.

=head2 dequeue

    my $job = fetch $queue;
    dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

=head2 enqueue

    my $id = enqueue $queue 'job name';
    my $id = enqueue $queue [ 'some', 'data' ];
    my $id = enqueue $queue +{ foo => 'bar' };

Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.

You can set queue options including priority, created and status.

    my $id = enqueue $queue,  
        priority => 1,
        created => DateTime->now,
        status => 'Pending',
        +{
            foo => 'bar'
        };

For non-blocking mode, pass in a coderef as the final argument.

    my $id = enqueue $queue 'job_name' => sub {
        # ...
    };

    my $id = enqueue $queue priority => 1, +{
        foo => 'bar',
    } => sub {
        # ...
    };

Sets the status to 'Pending' by default.

=head2 fetch

    # In blocking mode
    my $job = fetch $queue;

    # In non-blocking mode
    fetch $queue sub {
        my ($job) = @_;
        # ...
    };

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

=head2 get

    # In non-blocking mode
    get $queue $id => sub {
        my ($job) = @_;
        # ...
    };

    # In blocking mode
    my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

You can also pass in a job instead of an ID.

    $job = get $queue $job;

=head2 get_options

    my $options = $queue->get_options;

Returns the L<Mango::Collection> options hash used by find_and_modify to
identify and update available queue items.

=head2 release

    my $consumer = consume $queue sub {
        # ...
    };
    release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

=head2 requeue

    my $job = fetch $queue;
    requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

=head2 update

    my $job = fetch $queue;
    $job->{status} = 'Failed';
    update $queue $job;

Updates a job in the queue.

=head2 watch

Wait for a job to enter a certain status.

    # In blocking mode
    my $id = enqueue $queue 'test';
    watch $queue $id, 'Complete'; # blocks until job is complete

    # In non-blocking mode
    my $id = enqueue $queue 'test';
    watch $queue $id, 'Complete' => sub {
        # ...
    };

=head1 SEE ALSO

L<MangoX::Queue::Tutorial>, L<Mojolicious>, L<Mango>

=cut