(import networking) (import buffers) (import binary-io) (import serial-io) (import file-manipulation) (import threading) (import s2j) (define *count* 0) (define *compressed?* #t) #| (define-java-class |java.util.zip.GZIPOutputStream|) (define-java-class |java.util.zip.GZIPInputStream|) (define-java-class |java.io.FileInputStream|) (define-java-class |java.io.FileOutputStream|) (define-java-class |java.io.BufferedInputStream|) (define-java-class |java.io.File|) (define-generic-java-method java-read |read|) (define-generic-java-method java-write |write|) (define-generic-java-method java-close |close|) (define (compress-file fn) (let* ((fos (java-new (->jstring (string-append fn ".gz")))) (gzos (java-new fos)) (fin (java-new (->jstring fn))) (in (java-new fin)) (data (java-array-new 1024))) (let loop ((len (->number (java-read in data)))) (when (>= len 0) (java-write gzos data (->jint 0) (->jint len)) (loop (->number (java-read in data))))) (java-close in) (java-close gzos))) (define (uncompress-file fn) (let* ((fos (java-new (->jstring fn))) (fin (java-new (->jstring (string-append fn ".gz")))) (gzis (java-new fin)) (in (java-new gzis)) (data (java-array-new 1024))) (let loop ((len (->number (java-read in data)))) (when (>= len 0) (java-write fos data (->jint 0) (->jint len)) (loop (->number (java-read in data))))) (java-close in) (java-close fos))) |# (define-java-class |java.util.zip.GZIPOutputStream|) (define-java-class |java.util.zip.GZIPInputStream|) (define-generic-java-field-accessor :out) (define-generic-java-field-modifier :out!) (define-generic-java-field-accessor :in) (define-generic-java-field-modifier :in!) (define (serialize-compressed obj port) (:out! (java-wrap port) (java-new (:out (java-wrap port)))) (serialize obj port)) (define (deserialize-compressed port) (:in! (java-wrap port) (java-new (:in (java-wrap port)))) (deserialize port)) ;; Simple function to return a temporary file name (define (temporary-file-name) (set! *count* (+ 1 *count*)) (string-append "ktest.scm.example." (number->string *count*))) ;; Serialize the given object then read into memory. (define (serialize-to-memory obj) (let ((fn (temporary-file-name))) (call-with-serial-output-file fn (lambda (port) ((if *compressed?* serialize-compressed serialize) obj port))) (let* ((size (file-length fn)) (buffer (make-buffer size))) (call-with-binary-input-file fn (lambda (port) (block-read buffer port size))) (file-delete! fn) buffer))) ;; Deserialize an object held in a buffer (define (deserialize-from-memory buffer) (let ((fn (temporary-file-name))) (call-with-binary-output-file fn (lambda (port) (block-write buffer port (buffer-length buffer)))) (dynamic-wind (lambda () #f) (lambda () (call-with-serial-input-file fn (if *compressed?* deserialize-compressed deserialize))) (lambda () (file-delete! fn))))) (define (ch->digit ch) (case ch ((#\0) 1) ((#\1) 2) ((#\2) 3) ((#\3) 4) ((#\4) 5) ((#\5) 6) ((#\6) 7) ((#\7) 8) ((#\8) 9) ((#\9) 10))) (define (digit->ch digit) (case digit ((1) #\0) ((2) #\1) ((3) #\2) ((4) #\3) ((5) #\4) ((6) #\5) ((7) #\6) ((8) #\7) ((9) #\8) ((10) #\9))) ;; Write a number to a binary representation (define (write-number n port) (let ((s (number->string n)) (buffer (make-buffer 10))) (do ((i 0 (+ 1 i))) ((= i (buffer-length buffer)) #f) (if (< i (string-length s)) (buffer-set! buffer i (ch->digit (string-ref s i))) (buffer-set! buffer i 0))) (block-write buffer port (buffer-length buffer)))) ;; Read a number from the binary representation (define (read-number port) (let ((s (make-string 10)) (buffer (make-buffer 10))) (block-read buffer port (buffer-length buffer)) (do ((i 0 (+ 1 i))) ((= (buffer-ref buffer i) 0) (string->number (substring s 0 i))) (string-set! s i (digit->ch (buffer-ref buffer i)))))) ;; Listens on a socket for remote requests to execute a function. (define exit-function (make-parameter #f)) (define (message-listener port) (let ((listener (open-tcp-listener port))) (display "Created listener\n") (let loop ((remote (accept-tcp-socket listener))) (display "remote accept\n") (let ((port (open-binary-socket-input-port remote))) (display "opened binary port\n") (let ((len (read-number port))) (display (format "Read length: ~a~%" len)) (let ((buffer (make-buffer len))) (block-read buffer port len) (let ((deserialized (deserialize-from-memory buffer))) (display (format "Got: ~a~%" deserialized)) (thread/spawn (lambda () (call/cc (lambda (exit) (exit-function exit) (apply (car deserialized) (cdr deserialized)))))))))) (close-socket remote) (loop (accept-tcp-socket listener))))) (thread/spawn (lambda () (with/fc (lambda (m e) (print-exception (make-exception m e))) (lambda () (message-listener 9002))))) ;; Send a function to a remote message listener. It will execute in that process. (define (send host port func . args) (let ((buffer (serialize-to-memory (cons func args)))) (let ((socket (open-tcp-socket host port))) (let ((port (open-binary-socket-output-port socket #t))) (write-number (buffer-length buffer) port) (block-write buffer port (buffer-length buffer)) (flush-output-port port)) (close-socket socket)))) (define (spawn-thread-on host port func) (send host port (lambda () (call/cc (lambda (exit) (exit-function exit) (func)))))) (define (migrate-thread host port) (call/cc (lambda (k) (send host port k) ((exit-function))))) (define (ping-pong) (let loop () (display "ping\n") (sleep 1000) (migrate-thread "127.0.0.1" 9002) (display "pong\n") (sleep 1000) (migrate-thread "127.0.0.1" 9003) (loop))) #| (send "localhost" 9002 display "test") (define a #f) (define (test1) (display "one\n") (call/cc (lambda (k) (set! a k))) (display "two\n")) (define (one) (display "one")) (define (two) (one)) (two) (spawn-thread-on "127.0.0.1" 9003 ping-pong) |#