Source: Monitor.js

var _ = require('underscore'),
  SignalEmitter = require('./utility/SignalEmitter.js'),
  Filter = require('./Filter.js');

var EXTRA_ALL_EVENTS = {state : 'all', modifiedSince : -100000000 };
var REALLY_ALL_EVENTS =  EXTRA_ALL_EVENTS;
REALLY_ALL_EVENTS.fromTime = -1000000000;
REALLY_ALL_EVENTS.toTime = 10000000000;

var GETEVENT_MIN_REFRESH_RATE = 2000;

/**
 * Monitoring
 * @type {Function}
 * @constructor
 */
function Monitor(connection, filter) {
  SignalEmitter.extend(this, Messages, 'Monitor');
  this.connection = connection;
  this.id = 'M' + Monitor.serial++;

  this.filter = filter;

  this._lastUsedFilterData = filter.getData();

  if (this.filter.state) {
    throw new Error('Monitors only work for default state, not trashed or all');
  }

  this.filter.addEventListener(Filter.Messages.ON_CHANGE, this._onFilterChange.bind(this));
  this._events = null;


  // -- optimization & caching
  this.useCacheForEventsGetAllAndCompare = true;  // will look into cache before online
  this.ensureFullCache = true; // will fill the cache with ALL pryv content
  this.initWithPrefetch = 100; // prefetch some events before ensuringFullCache
}

Monitor.serial = 0;

var Messages = Monitor.Messages = {
  /** content: events **/
  ON_LOAD : 'started',
  /** content: error **/
  ON_ERROR : 'error',
  /** content: { enter: [], leave: [], change } **/
  ON_EVENT_CHANGE : 'eventsChanged',
  /** content: streams **/
  ON_STRUCTURE_CHANGE : 'streamsChanged',
  /** content: ? **/
  ON_FILTER_CHANGE : 'filterChanged'
};

// ----------- prototype  public ------------//

Monitor.prototype.start = function (done) {
  done = done || function () {};
  var batch = this.startBatch('Monitor:start');
  batch.addOnDoneListener('Monitor:startCompletion', function () {
    //TODO move this logic to ConnectionMonitors ??
    this.connection.monitors._monitors[this.id] = this;
    this.connection.monitors._startMonitoring(done);
  }.bind(this));


  this.lastSynchedST = -1000000000000;
  this._initEvents(batch);
  batch.done('Monitor:start');


};


Monitor.prototype.destroy = function () {
  //TODO move this logic to ConnectionMonitors ??
  delete this.connection.monitors._monitors[this.id];
  if (_.keys(this.connection.monitors._monitors).length === 0) {
    this.connection.monitors._stopMonitoring();
  }
};

Monitor.prototype.getEvents = function () {
  if (! this._events || ! this._events.active) {return []; }
  return _.toArray(this._events.active);
};

// ------------ private ----------//

// ----------- iOSocket ------//
Monitor.prototype._onIoConnect = function () {
  console.log('Monitor onConnect');
};
Monitor.prototype._onIoError = function (error) {
  console.log('Monitor _onIoError' + error);
};
Monitor.prototype._onIoEventsChanged = function () {
  var batch = this.startBatch('IoEventChanged');
  this._connectionEventsGetChanges(batch);
  batch.done('IoEventChanged');
};
Monitor.prototype._onIoStreamsChanged = function () {
  console.log('SOCKETIO', '_onIoStreamsChanged');
  var batch = this.startBatch('IoStreamsChanged');
  this._connectionStreamsGetChanges(batch);
  batch.done('IoStreamsChanged');
};



// -----------  filter changes ----------- //


Monitor.prototype._saveLastUsedFilter = function () {
  this._lastUsedFilterData = this.filter.getData();
};


Monitor.prototype._onFilterChange = function (signal, batch) {


  var changes = this.filter.compareToFilterData(this._lastUsedFilterData);

  var processLocalyOnly = 0;
  var foundsignal = 0;
  if (signal.signal === Filter.Messages.DATE_CHANGE) {  // only load events if date is wider
    foundsignal = 1;
    console.log('** DATE CHANGE ', changes.timeFrame);
    if (changes.timeFrame === 0) {
      return;
    }
    if (changes.timeFrame < 0) {  // new timeFrame contains more data
      processLocalyOnly = 1;
    }

  }

  if (signal.signal === Filter.Messages.STREAMS_CHANGE) {
    foundsignal = 1;
    console.log('** STREAMS_CHANGE', changes.streams);
    if (changes.streams === 0) {
      return;
    }
    if (changes.streams < 0) {  // new timeFrame contains more data
      processLocalyOnly = 1;
    }
  }

  if (signal.signal === Filter.Messages.STREAMS_CHANGE) {
    foundsignal = 1;
    console.log('** STREAMS_CHANGE', changes.streams);
    if (changes.streams === 0) {
      return;
    }
    if (changes.streams < 0) {  // new timeFrame contains more data
      processLocalyOnly = 1;
    }
  }

  if (signal.signal === Filter.Messages.STRUCTURE_CHANGE) {
    foundsignal = 1;
    // force full refresh
  }


  if (! foundsignal) {
    throw new Error('Signal not found :' + signal.signal);
  }

  this._saveLastUsedFilter();



  if (processLocalyOnly) {
    this._refilterLocaly(Messages.ON_FILTER_CHANGE, {filterInfos: signal}, batch);
  } else {
    this._connectionEventsGetAllAndCompare(Messages.ON_FILTER_CHANGE, {filterInfos: signal}, batch);
  }
};

// ----------- internal ----------------- //

/**
 * Process events locally
 */
Monitor.prototype._refilterLocaly = function (signal, extracontent, batch) {

  var result = { enter : [], leave : [] };
  _.extend(result, extracontent); // pass extracontent to receivers
  _.each(_.clone(this._events.active), function (event) {
    if (! this.filter.matchEvent(event)) {
      result.leave.push(event);
      delete this._events.active[event.id];
    }
  }.bind(this));
  this._fireEvent(signal, result, batch);
};


Monitor.prototype._initEvents = function (batch) {
  batch = this.startBatch('Monitor:initEvents', batch);
  this._events = { active : {}};


  var filterWith = this.filter.getData(true, EXTRA_ALL_EVENTS);

  if (this.initWithPrefetch) {
    filterWith.limit = this.initWithPrefetch;
  } else {
    if (this.ensureFullCache) {  filterWith = REALLY_ALL_EVENTS; }
  }


  this.connection.events.get(filterWith,
    function (error, events, extraInfos) {

      if (error) {
        this._fireEvent(Messages.ON_ERROR, error, batch);
        batch.done('Monitor:initEvents error');
        return;
      }


      if (! this.initWithPrefetch) {
        if (extraInfos && extraInfos.meta && extraInfos.meta.serverTime) {
          this.lastSynchedST = extraInfos.meta.serverTime;
        }
      }

      var result = [];

      _.each(events, function (event) {
        if (! this.ensureFullCache || this.filter.matchEvent(event)) {
          this._events.active[event.id] = event;
          result.push(event);
        }
      }.bind(this));


      this._fireEvent(Messages.ON_LOAD, result, batch);

      if (this.initWithPrefetch) {
        batch.waitForMeToFinish('delay');
        setTimeout(function () {
          this._connectionEventsGetChanges(batch);
          batch.done('delay');
        }.bind(this), 100);
      }
      batch.done('Monitor:initEvents finished');


    }.bind(this));
};





/**
 * @private
 */
Monitor.prototype._connectionEventsGetChanges = function (batch) {
  batch = this.startBatch('connectionEventsGetChanges', batch);
  if (this.eventsGetChangesInProgress) {
    this.eventsGetChangesNeeded = true;
    console.log('[WARNING] Skipping _connectionEventsGetChanges because one is in Progress');
    batch.done('connectionEventsGetChanges in Progress');
    return;
  }
  this.eventsGetChangesInProgress = true;
  this.eventsGetChangesNeeded = false;


  // var options = { modifiedSince : this.lastSynchedST};
  var options = { modifiedSince : this.lastSynchedST, state : 'all'};


  var filterWith = this.filter.getData(true, options);
  if (this.ensureFullCache) {
    filterWith = REALLY_ALL_EVENTS;
    filterWith = _.extend(filterWith, options);
  }
  //this.lastSynchedST = this.connection.getServerTime();

  var result = { created : [], trashed : [], modified: []};

  this.connection.events.get(filterWith,
    function (error, events, extraInfos) {
      if (extraInfos && extraInfos.meta && extraInfos.meta.serverTime) {
        this.lastSynchedST = extraInfos.meta.serverTime;
      }
      if (error) {
        this._fireEvent(Messages.ON_ERROR, error, batch);
        batch.done('connectionEventsGetChanges error');
        return;
      }

      _.each(events, function (event) {
        if (! this.ensureFullCache || this.filter.matchEvent(event)) {
          if (this._events.active[event.id]) {
            if (event.trashed && !this._events.active[event.id].trashed) { // trashed
              result.trashed.push(event);
              delete this._events.active[event.id];
            } else {
              result.modified.push(event);
              this._events.active[event.id] = event;
            }
          } else {
            if (this.ensureFullCache) { // can test streams  state (trashed)
              if (!event.trashed && event.stream && !event.stream.trashed) {
                result.created.push(event);
                this._events.active[event.id] = event;
              }
            } else {  // cannot test stream state
              if (!event.trashed) {
                result.created.push(event);
                this._events.active[event.id] = event;
              }
            }
          }
        }
      }.bind(this));

      this._fireEvent(Messages.ON_EVENT_CHANGE, result, batch);
      batch.done('connectionEventsGetChanges');

      // ---
      setTimeout(function () {
        this.eventsGetChangesInProgress = false;
        if (this.eventsGetChangesNeeded) {
          this._connectionEventsGetChanges();
        }
      }.bind(this), GETEVENT_MIN_REFRESH_RATE);

    }.bind(this));
};

/**
 * @private
 */
Monitor.prototype._connectionStreamsGetChanges = function (batch) {
  batch = this.startBatch('connectionStreamsGetChanges', batch);
  var previousStreamsData = {};
  var previousStreamsMap = {}; // !! only used to get back deleted streams..
  var created = [], modified = [], modifiedPreviousProperties = {}, trashed = [], deleted = [];

  var isStreamChanged = function (streamA, streamB) {
    return !_.isEqual(streamA, streamB);
  };


  // check if the stream has changed it.. and save it in the right message box
  var checkChangedStatus = function (stream) {


    if (! previousStreamsData[stream.id]) { // new stream
      created.push(stream);
    } else if (isStreamChanged(previousStreamsData[stream.id], stream.getData())) {

      if (previousStreamsData[stream.id].trashed !== stream.trashed) {
        if (!stream.trashed) {
          created.push(stream);
        } else {
          trashed.push(stream);
        }
      } else {
        modified.push(stream);
        modifiedPreviousProperties[stream.id] = previousStreamsData[stream.id];
      }
    }

    _.each(stream.children, function (child) {
      checkChangedStatus(child);
    });
    delete previousStreamsData[stream.id];
  };

  //-- get all current streams before matching with new ones --//
  var getFlatTree = function (stream) {
    previousStreamsData[stream.id] = stream.getData();
    previousStreamsMap[stream.id] = stream;

    _.each(stream.children, function (child) {
      getFlatTree(child);
    });
  };
  _.each(this.connection.datastore.getStreams(true), function (rootStream) {
    getFlatTree(rootStream);
  });

  this.connection.fetchStructure(function (error, result) {
    if (error) {
      batch.done('connectionStreamsGetChanges fetchStructure error');
      return;
    }
    _.each(result, function (rootStream) {
      checkChangedStatus(rootStream);
    });
    // each stream remaining in streams[] are deleted streams;
    _.each(previousStreamsData, function (streamData, streamId) {
      deleted.push(previousStreamsMap[streamId]);
    });

    this._fireEvent(Messages.ON_STRUCTURE_CHANGE,
      { created : created, trashed : trashed, modified: modified, deleted: deleted,
        modifiedPreviousProperties: modifiedPreviousProperties}, batch);

    this._onFilterChange({signal : Filter.Messages.STRUCTURE_CHANGE}, batch);
    batch.done('connectionStreamsGetChanges');
  }.bind(this));
};

/**
 * @private
 */
Monitor.prototype._connectionEventsGetAllAndCompare = function (signal, extracontent, batch) {
  //this.lastSynchedST = this.connection.getServerTime();


  if (this.useCacheForEventsGetAllAndCompare) {



    // POC code to look into in-memory events for matching events..
    // do not activate until cache handles DELETE
    var result1 = { enter : [], leave : []};
    _.extend(result1, extracontent);


    // first cleanup same as : this._refilterLocaly(signal, extracontent, batch);
    if (! this._events) {
      throw new Error('Not yet started!!!');
    }
    _.each(_.clone(this._events.active), function (event) {
      if (! this.filter.matchEvent(event)) {
        result1.leave.push(event);
        delete this._events.active[event.id];
      }
    }.bind(this));



    var cachedEvents = this.connection.datastore.getEventsMatchingFilter(this.filter);
    _.each(cachedEvents, function (event) {
      if (! this._events.active[event.id]) {  // we don't care for already known event
        this._events.active[event.id] = event; // store it
        result1.enter.push(event);
      }
    }.bind(this));



    this._fireEvent(signal, result1, batch);

    // remove all events not matching filter


  }

  // look online
  if (! this.ensureFullCache)  { // not needed when full cache is enabled
    var result = { enter : [] };
    _.extend(result, extracontent); // pass extracontent to receivers

    var toremove = _.clone(this._events.active);

    batch = this.startBatch('connectionEventsGetAllAndCompare:online', batch);
    this.connection.events.get(this.filter.getData(true, EXTRA_ALL_EVENTS),
      function (error, events, extraInfos) {
        if (extraInfos && extraInfos.meta && extraInfos.meta.serverTime) {
          this.lastSynchedST = extraInfos.meta.serverTime;
        }
        if (error) {
          this._fireEvent(Messages.ON_ERROR, error, batch);
          batch.done('connectionEventsGetAllAndCompare:online error');
          return;
        }
        _.each(events, function (event) {
          if (this._events.active[event.id]) {  // already known event we don't care
            delete toremove[event.id];
          } else {
            this._events.active[event.id] = event;
            result.enter.push(event);
          }
        }.bind(this));
        _.each(_.keys(toremove), function (streamid) {
          delete this._events.active[streamid]; // cleanup not found streams
        }.bind(this));
        result.leave = _.values(toremove); // unmatched events are to be removed
        this._fireEvent(signal, result, batch);
        batch.done('connectionEventsGetAllAndCompare:online');
      }.bind(this));
  }
};


/**
 * TODO write doc
 * return informations on events
 */
Monitor.prototype.stats = function (force, callback) {
  this.connection.profile.getTimeLimits(force, callback);
};

module.exports = Monitor;