class Mongo::Server::PushMonitor
A monitor utilizing server-pushed ismaster requests.
When a Monitor
handshakes with a 4.4+ server, it creates an instance of PushMonitor
. PushMonitor
subsequently executes server-pushed ismaster (i.e. awaited & exhausted ismaster) to receive topology changes from the server as quickly as possible. The Monitor
still monitors the server for round-trip time calculations and to perform immediate checks as requested by the application.
@api private
Attributes
monitor[R]
@return [ Monitor
] The monitor to which this push monitor is attached.
monitoring[R]
@return [ Monitoring
] monitoring The monitoring.
options[R]
@return [ Hash ] Push monitor options.
topology_version[R]
@return [ TopologyVersion
] Most recently received topology version.
Public Class Methods
new(monitor, topology_version, monitoring, **options)
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 32 def initialize(monitor, topology_version, monitoring, **options) if topology_version.nil? raise ArgumentError, 'Topology version must be provided but it was nil' end @monitor = monitor @topology_version = topology_version @monitoring = monitoring @options = options @lock = Mutex.new end
Public Instance Methods
do_work()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 83 def do_work @lock.synchronize do return if @stop_requested end result = monitoring.publish_heartbeat(server, awaited: true) do ismaster end new_description = monitor.run_sdam_flow(result, awaited: true) # When ismaster fails due to a fail point, the response does not # include topology version. In this case we need to keep our existing # topology version so that we can resume monitoring. # The spec does not appear to directly address this case but # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-ismaster # says that topologyVersion should only be updated from successful # ismaster responses. if new_description.topology_version @topology_version = new_description.topology_version end rescue Mongo::Error => exc msg = "Error running awaited ismaster on #{server.address}" Utils.warn_bg_exception(msg, exc, logger: options[:logger], log_prefix: options[:log_prefix], bg_error_backtrace: options[:bg_error_backtrace], ) end
ismaster()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 111 def ismaster @lock.synchronize do if @connection && @connection.pid != Process.pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") @connection.disconnect! @connection = nil end end @lock.synchronize do unless @connection @server_pushing = false connection = PushMonitor::Connection.new(server.address, options) connection.connect! @connection = connection end end resp_msg = begin unless @server_pushing write_ismaster end read_response rescue Mongo::Error @lock.synchronize do @connection.disconnect! @connection = nil end raise end @server_pushing = resp_msg.flags.include?(:more_to_come) result = resp_msg.documents.first end
read_response()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 155 def read_response if timeout = options[:connect_timeout] if timeout < 0 raise Mongo::SocketTimeoutError, "Requested to read with a negative timeout: #{}" elsif timeout > 0 timeout += options[:heartbeat_frequency] || Monitor::DEFAULT_HEARTBEAT_INTERVAL end end # We set the timeout twice: once passed into read_socket which applies # to each individual read operation, and again around the entire read. Timeout.timeout(timeout, Error::SocketTimeoutError, "Failed to read an awaited ismaster response in #{timeout} seconds") do @lock.synchronize { @connection }.read_response(socket_timeout: timeout) end end
start!()
click to toggle source
Calls superclass method
Mongo::BackgroundThread#start!
# File lib/mongo/server/push_monitor.rb, line 58 def start! @lock.synchronize do super end end
stop!()
click to toggle source
Calls superclass method
Mongo::BackgroundThread#stop!
# File lib/mongo/server/push_monitor.rb, line 64 def stop! @lock.synchronize do @stop_requested = true if @connection # Interrupt any in-progress exhausted ismaster reads by # disconnecting the connection. @connection.send(:socket).close end end super.tap do @lock.synchronize do if @connection @connection.disconnect! @connection = nil end end end end
write_ismaster()
click to toggle source
# File lib/mongo/server/push_monitor.rb, line 145 def write_ismaster payload = Monitor::Connection::ISMASTER_OP_MSG.merge( topologyVersion: topology_version.to_doc, maxAwaitTimeMS: monitor.heartbeat_interval * 1000, ) req_msg = Protocol::Msg.new([:exhaust_allowed], {}, payload) @lock.synchronize { @connection }.write_bytes(req_msg.serialize.to_s) end