#| Copyright (c) 2007 All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The name of the author may not be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |# (in-package :nio-yarpc) (declaim (optimize (debug 3) (speed 3) (space 0))) ;; YetAnotherRPC Client state machine ;; ;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution ;; (defclass yarpc-client-state-machine (state-machine) ((job-queue :initform (nio-utils:concurrent-queue) :accessor job-queue :documentation "The queue used to hand off work from an external thread to the io thread") (request-map :initform (make-hash-table) :reader request-map :documentation "A map from request-id (a unique id for this request) to remote-job"))) (defclass remote-job() ((callback :initarg :callback :accessor callback :documentation "A function accepting one argument to call with the result of the remote operation") (start-time :initform (get-universal-high-res) :reader start-time :documentation "The (floating point) start time") (timeout :initarg :timeout :reader timeout :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close."))) (defparameter +rpc-timeout+ 60 "The number of seconds before a remote call is considered timedout") (defun remote-job(callback &key (timeout +rpc-timeout+)) (make-instance 'remote-job :callback callback :timeout timeout)) (defun yarpc-client-state-machine () (make-instance 'yarpc-client-state-machine)) (defparameter yarpc-pf (yarpc-packet-factory)) (defmethod get-packet-factory((sm yarpc-client-state-machine)) yarpc-pf) (defmethod print-object ((sm yarpc-client-state-machine) stream) (format stream "#" (call-next-method sm nil))) (defmethod print-object ((job remote-job) stream) (format stream "#" (start-time job) (timeout job))) (defconstant STATE-INITIALISED 0) (defconstant STATE-SENT-REQUEST 1) (defparameter +request-id+ 0) (defun check-timeouts(id job) ; (format-log t "Checking timeout on ~A~%" job) (when (> (get-universal-high-res) (+ (start-time job) (timeout job))) (format-log t "Timeout detected ~A ~A~%" id job) t)) (defun finish-job (request-id sm result) "Remove the job from the request map and call the callback with the result" (let ((remote-job (gethash request-id (request-map sm)))) (when remote-job (remhash request-id (request-map sm)) (funcall (callback remote-job) result)))) (defmethod process-timeout((sm yarpc-client-state-machine)) (let ((requests (request-map sm))) #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, searching for timeouts in ~A ~%" requests) (maphash #'(lambda (id job) (when (check-timeouts id job) (finish-job id sm nil))) requests))) (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil))) (when ttd #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) (destructuring-bind (job call-string) ttd (setf (gethash (incf +request-id+) (request-map sm)) job) (make-instance 'call-method-packet :call-string call-string :request-id +request-id+))))) (defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet)) #+nio-debug (format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response) (let* ((*package* (find-package :nio-yarpc)) (result (read-from-string (response response))) (request-id (request-id response))) #+nio-debug (format-log t "yarpc-client-state-machine:process-incoming-packet :result ~A :request-id ~A~%" result request-id) ; (maphash #'(lambda (k v) (format t "~a -> ~a~%" k v)) (request-map sm)) (finish-job request-id sm result))) (defparameter *simulate-calls* nil) ;Execute the call-string on the remote node and call callback with the result (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) #+nio-debug (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback) (if *simulate-calls* (funcall callback (execute-call call-string)) (nio-utils:add (job-queue sm) (list (remote-job callback) call-string)))) (defun simulate-connection() (setf *simulate-calls* t) (let* ((node (nio:node "127.0.0.1" 9999))) (setf (nio:active-conn node) (nio::create-state-machine 'yarpc-client-state-machine 1 1 6)) (push node nio::*nodes-list*))) (defun test-timeout() (let* ((done nil) (job (remote-job #'(lambda(x) (format-log t "~A finished~%" x) (setf done t)) :timeout 30))) (format-log t "Job: ~A~%" job) (loop while (not done) do (check-timeouts 99 job) (format-log t ".~%") (sleep 1)) (format-log t "done test~%")))