blob: 69d9700c972c1bb53ed94b52d76928b13d135820 [file] [log] [blame]
//
// RACSignal.m
// ReactiveCocoa
//
// Created by Josh Abernathy on 3/15/12.
// Copyright (c) 2012 GitHub, Inc. All rights reserved.
//
#import "RACSignal.h"
#import "RACCompoundDisposable.h"
#import "RACDisposable.h"
#import "RACDynamicSignal.h"
#import "RACEmptySignal.h"
#import "RACErrorSignal.h"
#import "RACMulticastConnection.h"
#import "RACReplaySubject.h"
#import "RACReturnSignal.h"
#import "RACScheduler.h"
#import "RACSerialDisposable.h"
#import "RACSignal+Operations.h"
#import "RACSubject.h"
#import "RACSubscriber+Private.h"
#import "RACTuple.h"
@implementation RACSignal
#pragma mark Lifecycle
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
+ (RACSignal *)error:(NSError *)error {
return [RACErrorSignal error:error];
}
+ (RACSignal *)never {
return [[self createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
return nil;
}] setNameWithFormat:@"+never"];
}
+ (RACSignal *)startEagerlyWithScheduler:(RACScheduler *)scheduler block:(void (^)(id<RACSubscriber> subscriber))block {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(block != NULL);
RACSignal *signal = [self startLazilyWithScheduler:scheduler block:block];
// Subscribe to force the lazy signal to call its block.
[[signal publish] connect];
return [signal setNameWithFormat:@"+startEagerlyWithScheduler:%@ block:", scheduler];
}
+ (RACSignal *)startLazilyWithScheduler:(RACScheduler *)scheduler block:(void (^)(id<RACSubscriber> subscriber))block {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(block != NULL);
RACMulticastConnection *connection = [[RACSignal
createSignal:^ id (id<RACSubscriber> subscriber) {
block(subscriber);
return nil;
}]
multicast:[RACReplaySubject subject]];
return [[[RACSignal
createSignal:^ id (id<RACSubscriber> subscriber) {
[connection.signal subscribe:subscriber];
[connection connect];
return nil;
}]
subscribeOn:scheduler]
setNameWithFormat:@"+startLazilyWithScheduler:%@ block:", scheduler];
}
#pragma mark NSObject
- (NSString *)description {
return [NSString stringWithFormat:@"<%@: %p> name: %@", self.class, self, self.name];
}
@end
@implementation RACSignal (RACStream)
+ (RACSignal *)empty {
return [RACEmptySignal empty];
}
+ (RACSignal *)return:(id)value {
return [RACReturnSignal return:value];
}
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
/*
* -bind: should:
*
* 1. Subscribe to the original signal of values.
* 2. Any time the original signal sends a value, transform it using the binding block.
* 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
* 4. If the binding block asks the bind to terminate, complete the _original_ signal.
* 5. When _all_ signals complete, send completed to the subscriber.
*
* If any signal sends an error at any point, send that to the subscriber.
*/
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
BOOL removeDisposable = NO;
@synchronized (signals) {
[signals removeObject:signal];
if (signals.count == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
removeDisposable = YES;
}
}
if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
};
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
@synchronized (signals) {
[signals addObject:signal];
}
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];
selfDisposable.disposable = disposable;
};
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
BOOL stop = NO;
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}
- (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];
__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];
void (^sendCompletedIfNecessary)(void) = ^{
@synchronized (selfValues) {
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};
void (^sendNext)(void) = ^{
@synchronized (selfValues) {
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;
RACTuple *tuple = [RACTuple tupleWithObjects:selfValues[0], otherValues[0], nil];
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];
[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (selfValues) {
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (selfValues) {
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[otherDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}
@end
@implementation RACSignal (Subscription)
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCAssert(NO, @"This method must be overridden by subclasses");
return nil;
}
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(completedBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:completedBlock];
return [self subscribe:o];
}
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
NSCParameterAssert(completedBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}
- (RACDisposable *)subscribeError:(void (^)(NSError *error))errorBlock {
NSCParameterAssert(errorBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:NULL error:errorBlock completed:NULL];
return [self subscribe:o];
}
- (RACDisposable *)subscribeCompleted:(void (^)(void))completedBlock {
NSCParameterAssert(completedBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:NULL error:NULL completed:completedBlock];
return [self subscribe:o];
}
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:NULL];
return [self subscribe:o];
}
- (RACDisposable *)subscribeError:(void (^)(NSError *))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(completedBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:NULL error:errorBlock completed:completedBlock];
return [self subscribe:o];
}
@end
@implementation RACSignal (Debugging)
- (RACSignal *)logAll {
return [[[self logNext] logError] logCompleted];
}
- (RACSignal *)logNext {
return [[self doNext:^(id x) {
NSLog(@"%@ next: %@", self, x);
}] setNameWithFormat:@"%@", self.name];
}
- (RACSignal *)logError {
return [[self doError:^(NSError *error) {
NSLog(@"%@ error: %@", self, error);
}] setNameWithFormat:@"%@", self.name];
}
- (RACSignal *)logCompleted {
return [[self doCompleted:^{
NSLog(@"%@ completed", self);
}] setNameWithFormat:@"%@", self.name];
}
@end
@implementation RACSignal (Testing)
static const NSTimeInterval RACSignalAsynchronousWaitTimeout = 10;
- (id)asynchronousFirstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
NSCAssert([NSThread isMainThread], @"%s should only be used from the main thread", __func__);
__block id result = defaultValue;
__block BOOL done = NO;
// Ensures that we don't pass values across thread boundaries by reference.
__block NSError *localError;
__block BOOL localSuccess = YES;
[[[[self
take:1]
timeout:RACSignalAsynchronousWaitTimeout onScheduler:[RACScheduler scheduler]]
deliverOn:RACScheduler.mainThreadScheduler]
subscribeNext:^(id x) {
result = x;
done = YES;
} error:^(NSError *e) {
if (!done) {
localSuccess = NO;
localError = e;
done = YES;
}
} completed:^{
done = YES;
}];
do {
[NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:0.1]];
} while (!done);
if (success != NULL) *success = localSuccess;
if (error != NULL) *error = localError;
return result;
}
- (BOOL)asynchronouslyWaitUntilCompleted:(NSError **)error {
BOOL success = NO;
[[self ignoreValues] asynchronousFirstOrDefault:nil success:&success error:error];
return success;
}
@end
@implementation RACSignal (Deprecated)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-implementations"
+ (RACSignal *)startWithScheduler:(RACScheduler *)scheduler subjectBlock:(void (^)(RACSubject *subject))block {
NSCParameterAssert(block != NULL);
RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"+startWithScheduler:subjectBlock:"];
[scheduler schedule:^{
block(subject);
}];
return subject;
}
+ (RACSignal *)start:(id (^)(BOOL *success, NSError **error))block {
return [[self startWithScheduler:[RACScheduler scheduler] block:block] setNameWithFormat:@"+start:"];
}
+ (RACSignal *)startWithScheduler:(RACScheduler *)scheduler block:(id (^)(BOOL *success, NSError **error))block {
return [[self startWithScheduler:scheduler subjectBlock:^(id<RACSubscriber> subscriber) {
BOOL success = YES;
NSError *error = nil;
id returned = block(&success, &error);
if (!success) {
[subscriber sendError:error];
} else {
[subscriber sendNext:returned];
[subscriber sendCompleted];
}
}] setNameWithFormat:@"+startWithScheduler:block:"];
}
#pragma clang diagnostic pop
@end