blob: 7af1e12f439fb972b73dca5ddf3a90709c17d650 [file] [log] [blame] [edit]
//
// RACDynamicSignal.m
// ReactiveCocoa
//
// Created by Justin Spahr-Summers on 2013-10-10.
// Copyright (c) 2013 GitHub, Inc. All rights reserved.
//
#import "RACDynamicSignal.h"
#import "RACEXTScope.h"
#import "RACPassthroughSubscriber.h"
#import "RACScheduler+Private.h"
#import "RACSubscriber.h"
#import "RACCompoundDisposable.h"
#import <libkern/OSAtomic.h>
// Retains dynamic signals while they wait for subscriptions.
//
// This set must only be used on the main thread.
static CFMutableSetRef RACActiveSignals = nil;
// A linked list of RACDynamicSignals, used in RACActiveSignalsToCheck.
typedef struct RACSignalList {
CFTypeRef retainedSignal;
struct RACSignalList * restrict next;
} RACSignalList;
// An atomic queue of signals to check for subscribers. If any signals with zero
// subscribers are found in this queue, they are removed from RACActiveSignals.
static OSQueueHead RACActiveSignalsToCheck = OS_ATOMIC_QUEUE_INIT;
// Whether RACActiveSignalsToCheck will be enumerated on the next iteration on
// the main run loop.
static volatile uint32_t RACWillCheckActiveSignals = 0;
@interface RACDynamicSignal () {
// Contains all subscribers to the receiver.
//
// All access to this array must be synchronized using `_subscribersLock`.
NSMutableArray *_subscribers;
// Synchronizes access to `_subscribers`.
OSSpinLock _subscribersLock;
}
// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);
@end
@implementation RACDynamicSignal
#pragma mark Lifecycle
+ (void)initialize {
if (self != RACDynamicSignal.class) return;
CFSetCallBacks callbacks = kCFTypeSetCallBacks;
// Use pointer equality and hashes for membership testing.
callbacks.equal = NULL;
callbacks.hash = NULL;
RACActiveSignals = CFSetCreateMutable(NULL, 0, &callbacks);
}
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
// As soon as we're created we're already trying to be released. Such is life.
[self invalidateGlobalRefIfNoNewSubscribersShowUp];
return self;
}
static void RACCheckActiveSignals(void) {
// Clear this flag now, so another thread can re-dispatch to the main queue
// as needed.
OSAtomicAnd32Barrier(0, &RACWillCheckActiveSignals);
RACSignalList * restrict elem;
while ((elem = OSAtomicDequeue(&RACActiveSignalsToCheck, offsetof(RACSignalList, next))) != NULL) {
RACDynamicSignal *signal = CFBridgingRelease(elem->retainedSignal);
free(elem);
if (signal.hasSubscribers) {
// We want to keep the signal around until all its subscribers are done
CFSetAddValue(RACActiveSignals, (__bridge void *)signal);
} else {
CFSetRemoveValue(RACActiveSignals, (__bridge void *)signal);
}
}
}
- (void)invalidateGlobalRefIfNoNewSubscribersShowUp {
// If no one subscribes in one pass of the main run loop, then we're free to
// go. It's up to the caller to keep us alive if they still want us.
RACSignalList *elem = malloc(sizeof(*elem));
// This also serves to retain the signal until the next pass.
elem->retainedSignal = CFBridgingRetain(self);
OSAtomicEnqueue(&RACActiveSignalsToCheck, elem, offsetof(RACSignalList, next));
// Not using a barrier because duplicate scheduling isn't erroneous, just
// less optimized.
int32_t willCheck = OSAtomicOr32Orig(1, &RACWillCheckActiveSignals);
// Only schedule a check if RACWillCheckActiveSignals was 0 before.
if (willCheck == 0) {
dispatch_async(dispatch_get_main_queue(), ^{
RACCheckActiveSignals();
});
}
}
#pragma mark Managing Subscribers
- (BOOL)hasSubscribers {
OSSpinLockLock(&_subscribersLock);
BOOL hasSubscribers = _subscribers.count > 0;
OSSpinLockUnlock(&_subscribersLock);
return hasSubscribers;
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
OSSpinLockLock(&_subscribersLock);
if (_subscribers == nil) {
_subscribers = [NSMutableArray arrayWithObject:subscriber];
} else {
[_subscribers addObject:subscriber];
}
OSSpinLockUnlock(&_subscribersLock);
@weakify(self);
RACDisposable *defaultDisposable = [RACDisposable disposableWithBlock:^{
@strongify(self);
if (self == nil) return;
BOOL stillHasSubscribers = YES;
OSSpinLockLock(&_subscribersLock);
{
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [_subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) {
[_subscribers removeObjectAtIndex:index];
stillHasSubscribers = _subscribers.count > 0;
}
}
OSSpinLockUnlock(&_subscribersLock);
if (!stillHasSubscribers) {
[self invalidateGlobalRefIfNoNewSubscribersShowUp];
}
}];
[disposable addDisposable:defaultDisposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
@end