Compare commits

2 Commits

Author SHA1 Message Date
Matias Fernandez
6207aee345 unit tests 2024-02-03 21:42:45 -03:00
Matias Fernandez
50df8095fd minor fixes 2024-01-05 14:33:03 -03:00
5 changed files with 116 additions and 38 deletions

View File

@@ -21,4 +21,5 @@ from . import raw
from . import zlib from . import zlib
#from . import zrle #from . import zrle
from . import tight from . import tight
#from . import hextile
from . import cursor from . import cursor

View File

@@ -21,12 +21,14 @@ class ENCODINGS:
raw = 0 raw = 0
zlib = 6 zlib = 6
tight = 7 tight = 7
hextile = 5
#zrle = 16 #zrle = 16
# supported pseudo-encodings # supported pseudo-encodings
cursor = -239 cursor = -239
encodings_priority = [ encodings_priority = [
#ENCODINGS.zrle, #ENCODINGS.zrle,
ENCODINGS.hextile,
ENCODINGS.tight, ENCODINGS.tight,
ENCODINGS.zlib, ENCODINGS.zlib,
ENCODINGS.raw ENCODINGS.raw

View File

@@ -10,6 +10,8 @@ class Encoding:
id = 7 id = 7
description = 'Tight VNC encoding' description = 'Tight VNC encoding'
enabled = True enabled = True
use_jpeg_threshold = 0 # color threshold to use jpeg compression, 0 = always use jpeg
jpeg_compression_quality = 75
def __init__(self): def __init__(self):
self.compress_obj = zlib.compressobj( self.compress_obj = zlib.compressobj(
@@ -32,9 +34,9 @@ class Encoding:
sendbuff.extend(pack("!HHHH", x, y, w, h)) sendbuff.extend(pack("!HHHH", x, y, w, h))
sendbuff.extend(pack(">i", self.id)) sendbuff.extend(pack(">i", self.id))
if self._should_use_jpeg(image, 64): if self._should_use_jpeg(image, self.use_jpeg_threshold):
self._last_compression_was_jpeg = True self._last_compression_was_jpeg = True
compressed_data = self._compress_image_jpeg(image) compressed_data = self._compress_image_jpeg(image, self.jpeg_compression_quality)
sendbuff.append(0x90) # 0x90 = 10010000 = JPEG subencoding sendbuff.append(0x90) # 0x90 = 10010000 = JPEG subencoding
else: else:
compressed_data = self._compress_image_zlib(image) compressed_data = self._compress_image_zlib(image)
@@ -51,10 +53,10 @@ class Encoding:
def _send_compact_length(self, length): def _send_compact_length(self, length):
sendbuff = bytearray() sendbuff = bytearray()
while True: while True:
# Toma los 7 bits más bajos del tamaño # Lowest 7 bits of length
byte = length & 0x7F byte = length & 0x7F
length >>= 7 length >>= 7
# Si aún hay más datos, establece el bit alto y continúa # if more length bytes are required, set the next highest bit
if length > 0: if length > 0:
byte |= 0x80 byte |= 0x80
sendbuff.append(byte) sendbuff.append(byte)
@@ -65,6 +67,9 @@ class Encoding:
def _should_use_jpeg(self, image, threshold=256): def _should_use_jpeg(self, image, threshold=256):
if image.mode == 'P': if image.mode == 'P':
return False return False
if self.use_jpeg_threshold == 0:
return True
if image.mode == 'RGB': if image.mode == 'RGB':
width, height = image.size width, height = image.size
@@ -87,7 +92,7 @@ class Encoding:
self.compress_obj = zlib.compressobj( self.compress_obj = zlib.compressobj(
zlib.Z_DEFAULT_COMPRESSION, zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, zlib.DEFLATED,
-zlib.MAX_WBITS, # El negativo omite la inclusión de un encabezado zlib. -zlib.MAX_WBITS, # suppress zlib header
zlib.DEF_MEM_LEVEL, zlib.DEF_MEM_LEVEL,
zlib.Z_DEFAULT_STRATEGY zlib.Z_DEFAULT_STRATEGY
) )

View File

@@ -50,7 +50,6 @@ class VNCServer():
unix = 129 # Unix Login Authentication unix = 129 # Unix Login Authentication
encoding_object = None encoding_object = None
last_cursor = None last_cursor = None
def __init__(self, socket, password=None, auth_type=None, pem_file='', vnc_config = None): def __init__(self, socket, password=None, auth_type=None, pem_file='', vnc_config = None):
@@ -64,6 +63,7 @@ class VNCServer():
self.pem_file = pem_file self.pem_file = pem_file
self.vnc_config = vnc_config self.vnc_config = vnc_config
self.cursor_encoding = CursorEncoding() self.cursor_encoding = CursorEncoding()
self.fbupdate_rate_limit = 0.05
log.debug("Configured auth type:", self.auth_type) log.debug("Configured auth type:", self.auth_type)
@@ -102,8 +102,8 @@ class VNCServer():
server_version = float(self.RFB_VERSION) server_version = float(self.RFB_VERSION)
try: try:
client_version = float(data[4:11]) client_version = float(data[4:11])
except: except Exception as e:
log.debug("Error parsing client version") log.debug(f"Error parsing client version: {str(e)}")
return False return False
log.debug("client, server:", client_version, server_version) log.debug("client, server:", client_version, server_version)
@@ -253,7 +253,7 @@ class VNCServer():
def handle_client(self): def handle_client(self):
self.socket.settimeout(None) # set nonblocking socket self.socket.settimeout(None) # set nonblocking socket
last_rfbu = time.time() last_fbur = time.time()
mousecontroller = mousectrl.MouseController() mousecontroller = mousectrl.MouseController()
kbdcontroller = kbdctrl.KeyboardController() kbdcontroller = kbdctrl.KeyboardController()
@@ -287,12 +287,12 @@ class VNCServer():
break break
if data[0] == 0: # client SetPixelFormat if data[0] == 0: # client SetPixelFormat
data2 = sock.recv(19, socket.MSG_WAITALL) fbur_data = sock.recv(19, socket.MSG_WAITALL)
log.debug("Client Message Type: Set Pixel Format (0)") log.debug("Client Message Type: Set Pixel Format (0)")
(self.bpp, self.depth, self.bigendian, self.truecolor, self.red_maximum, (self.bpp, self.depth, self.bigendian, self.truecolor, self.red_maximum,
self.green_maximum, self.blue_maximum, self.green_maximum, self.blue_maximum,
self.red_shift, self.green_shift, self.blue_shift self.red_shift, self.green_shift, self.blue_shift
) = unpack("!xxxBBBBHHHBBBxxx", data2) ) = unpack("!xxxBBBBHHHBBBxxx", fbur_data)
log.debug("IMG bpp, depth, endian, truecolor", self.bpp, self.depth, self.bigendian, self.truecolor) log.debug("IMG bpp, depth, endian, truecolor", self.bpp, self.depth, self.bigendian, self.truecolor)
log.debug("SHIFTS", self.red_shift, self.green_shift, self.blue_shift) log.debug("SHIFTS", self.red_shift, self.green_shift, self.blue_shift)
log.debug("MAXS", self.red_maximum, self.green_maximum, self.blue_maximum) log.debug("MAXS", self.red_maximum, self.green_maximum, self.blue_maximum)
@@ -323,13 +323,13 @@ class VNCServer():
continue continue
if data[0] == 2: # SetEncoding if data[0] == 2: # SetEncoding
data2 = sock.recv(3) fbur_data = sock.recv(3)
log.debug("Client Message Type: SetEncoding (2)") log.debug("Client Message Type: SetEncoding (2)")
(nencodings,) = unpack("!xH", data2) (nencodings,) = unpack("!xH", fbur_data)
log.debug("SetEncoding: total encodings", repr(nencodings)) log.debug("SetEncoding: total encodings", repr(nencodings))
data2 = sock.recv(4 * nencodings, socket.MSG_WAITALL) fbur_data = sock.recv(4 * nencodings, socket.MSG_WAITALL)
#log.debug("len", len(data2)) #log.debug("len", len(data2))
self.client_encodings = unpack("!%si" % nencodings, data2) self.client_encodings = unpack("!%si" % nencodings, fbur_data)
log.debug("client_encodings", repr(self.client_encodings), len(self.client_encodings)) log.debug("client_encodings", repr(self.client_encodings), len(self.client_encodings))
# cursor support? # cursor support?
@@ -342,11 +342,14 @@ class VNCServer():
# which pixel encoding to use? # which pixel encoding to use?
log.debug("encs.common.encodings_priority", encs.common.encodings_priority) log.debug("encs.common.encodings_priority", encs.common.encodings_priority)
for e in encs.common.encodings_priority: for e in encs.common.encodings_priority:
#log.debug("E", e)
if e in self.client_encodings: if e in self.client_encodings:
if self.encoding == e: if self.encoding == e:
# don't initialize same encoding again # don't initialize same encoding again
break break
# check if encoding is disabled
if not encs.common.encodings[e].enabled:
log.debug("Encoding disabled:", e)
continue
self.encoding = e self.encoding = e
#log.debug("Using %s encoding" % self.encoding) #log.debug("Using %s encoding" % self.encoding)
log.debug("Using %s encoding" % encs.common.encodings[self.encoding].name) log.debug("Using %s encoding" % encs.common.encodings[self.encoding].name)
@@ -358,19 +361,21 @@ class VNCServer():
if data[0] == 3: # FBUpdateRequest if data[0] == 3: # FBUpdateRequest
# rate limit # rate limit
data2 = sock.recv(9, socket.MSG_WAITALL) fbur_data = sock.recv(9, socket.MSG_WAITALL)
if not data2: if not fbur_data:
log.debug("connection closed?") log.debug("connection closed?")
break break
if time.time() - last_rfbu < 0.1: if time.time() - last_fbur < self.fbupdate_rate_limit:
# rate limited
try: try:
sock.sendall(pack("!BxH", 0, 0)) sock.sendall(pack("!BxH", 0, 0))
except: except Exception as e:
log.debug("connection closed?") log.debug(f"Error sending rate limited FBUpdateRequest: {str(e)}")
break break
continue continue
last_rfbu = time.time()
(incremental, x, y, w, h) = unpack("!BHHHH", data2) last_fbur = time.time()
(incremental, x, y, w, h) = unpack("!BHHHH", fbur_data)
#log.debug("RFBU:", incremental, x, y, w, h) #log.debug("RFBU:", incremental, x, y, w, h)
self.send_rectangles(sock, x, y, w, h, incremental) self.send_rectangles(sock, x, y, w, h, incremental)
if self.cursor_support: if self.cursor_support:
@@ -391,13 +396,14 @@ class VNCServer():
log.debug("ClientCutText:", text) log.debug("ClientCutText:", text)
else: else:
data2 = sock.recv(4096) fbur_data = sock.recv(4096)
log.debug("RAW Server received data:", repr(data[0]) , data+data2) log.debug("RAW Server received data:", repr(data[0]) , data+fbur_data)
def get_rectangle(self, x, y, w, h): def get_rectangle(self, x, y, w, h):
try: try:
scr = ImageGrab.grab() scr = ImageGrab.grab()
except: except Exception as ex:
log.debug("Error grabbing screen: %s" % ex)
return False return False
(scr_width, scr_height) = scr.size (scr_width, scr_height) = scr.size
@@ -427,18 +433,17 @@ class VNCServer():
raw_pixels = bitmap.get_bitmap(cursor_img) raw_pixels = bitmap.get_bitmap(cursor_img)
raw_pixels = raw_pixels.tobytes("raw", raw_pixels.mode) raw_pixels = raw_pixels.tobytes("raw", raw_pixels.mode)
# Crear la máscara de forma (bitmask) # Create bitmask
bitmask = bytearray() bitmask = bytearray()
for j in range(h): for j in range(h):
row = 0 row = 0
for i in range(w): for i in range(w):
if cursor_img.getpixel((i, j))[3]: # Verificar alfa del pixel if cursor_img.getpixel((i, j))[3]: # Verify alpha pixel
row |= (128 >> (i % 8)) row |= (128 >> (i % 8))
if (i % 8 == 7) or i == w - 1: if (i % 8 == 7) or i == w - 1:
bitmask.append(row) bitmask.append(row)
row = 0 row = 0
# Empaquetar y enviar la información del cursor
sendbuff = bytearray() sendbuff = bytearray()
sendbuff.extend(pack("!BxH", 0, 1)) # FramebufferUpdate, 1 rectangle sendbuff.extend(pack("!BxH", 0, 1)) # FramebufferUpdate, 1 rectangle
sendbuff.extend(pack("!HHHH", x, y, w, h)) # geometry sendbuff.extend(pack("!HHHH", x, y, w, h)) # geometry
@@ -449,7 +454,7 @@ class VNCServer():
try: try:
self.socket.sendall(sendbuff) self.socket.sendall(sendbuff)
except Exception as e: except Exception as e:
print(f"Error al enviar el cursor: {e}") print(f"Error sending cursor info: {e}")
return False return False
return True return True
@@ -471,14 +476,15 @@ class VNCServer():
diff = ImageChops.difference(rectangle, self.framebuffer) diff = ImageChops.difference(rectangle, self.framebuffer)
if diff.getbbox() is None: if diff.getbbox() is None:
# no changes... # no changes...
#log.debug("no changes")
rectangles = 0 rectangles = 0
sendbuff.extend(pack("!BxH", 0, rectangles)) sendbuff.extend(pack("!BxH", 0, rectangles))
# clear the incoming socket buffer # clear the incoming socket buffer
sleep(0.05) sleep(0.05)
try: try:
sock.sendall(sendbuff) sock.sendall(sendbuff)
except: except Exception as e:
log.debug("connection closed?") log.debug(f"Error sending no changes: {str(e)}")
return False return False
return return
@@ -491,9 +497,6 @@ class VNCServer():
h = rectangle.height h = rectangle.height
#log.debug(f"RFB_RES:", incremental, x, y, w, h) #log.debug(f"RFB_RES:", incremental, x, y, w, h)
#stimeout = sock.gettimeout()
#sock.settimeout(None)
if self.bpp == 32 or self.bpp == 16 or self.bpp == 8: if self.bpp == 32 or self.bpp == 16 or self.bpp == 8:
bitmap = self.rfb_bitmap bitmap = self.rfb_bitmap
bitmap.bpp = self.bpp bitmap.bpp = self.bpp
@@ -508,6 +511,7 @@ class VNCServer():
image = bitmap.get_bitmap(rectangle) image = bitmap.get_bitmap(rectangle)
# send image with client defined encoding # send image with client defined encoding
self.encoding_object.framebuffer = self.framebuffer
sendbuff.extend(self.encoding_object.send_image(x, y, w, h, image)) sendbuff.extend(self.encoding_object.send_image(x, y, w, h, image))
else: else:
log.debug("[!] Unsupported BPP: %s" % self.bpp) log.debug("[!] Unsupported BPP: %s" % self.bpp)
@@ -515,8 +519,7 @@ class VNCServer():
self.framebuffer = lastshot self.framebuffer = lastshot
try: try:
sock.sendall(sendbuff) sock.sendall(sendbuff)
except: except Exception as e:
# connection closed? # connection closed?
log.debug(f"Error sending changes: {str(e)}")
return False return False
#sock.settimeout(stimeout)
#log.debug("end SendRectangles")

67
test/test_vncserver.py Normal file
View File

@@ -0,0 +1,67 @@
import unittest
from unittest.mock import Mock, patch
from pyvncs.server import VNCServer
class TestVNCServer(unittest.TestCase):
def setUp(self):
self.socket_mock = Mock()
self.vnc_config_mock = Mock()
self.vnc_config_mock.win_title = "Test Window Title"
self.vnc_server = VNCServer(socket=self.socket_mock, password="test_password", auth_type=2, vnc_config=self.vnc_config_mock)
def test_constructor(self):
self.assertEqual(self.vnc_server.password, "test_password")
self.assertIsNotNone(self.vnc_server.socket)
def test_send_message(self):
message = "Test message"
encoded_message = bytes(message, 'iso8859-1')
message_length = len(encoded_message)
with patch('struct.pack') as mock_pack:
mock_pack.return_value = b'' # Simulate the packed message
self.vnc_server.send_message(message)
mock_pack.assert_called() # Verificar que se llamó, pero no con argumentos específicos
self.socket_mock.send.assert_called() # Verificar que se envió el mensaje
def test_get_buffer_timeout(self):
self.socket_mock.recv.side_effect = TimeoutError
result = self.vnc_server.get_buffer(30)
self.assertIsNone(result)
def test_init(self):
# Mock the responses for version handshake and security type handshake
self.socket_mock.recv.side_effect = [
b"RFB 003.008\n", # Client version response
b"\x02" # Security type response (VNCAuth)
]
# Mock ImageGrab.grab() called in server_init
with patch('pyvncs.server.ImageGrab.grab') as mock_grab:
mock_grab.return_value.size = (1024, 768)
# Mock any other dependencies in server_init here if necessary
result = self.vnc_server.init()
self.assertTrue(result)
def test_server_init(self):
with patch('pyvncs.server.ImageGrab.grab') as mock_grab:
mock_grab.return_value.size = (1024, 768)
self.vnc_server.server_init()
self.assertEqual(self.vnc_server.width, 1024)
self.assertEqual(self.vnc_server.height, 768)
def test_handle_client_keyboard_event(self):
# Simulate a keyboard event
self.socket_mock.recv.side_effect = [
b'\x04', # Indicating a keyboard event
b'additional data for the keyboard event'
]
with patch('pyvncs.server.kbdctrl.KeyboardController.process_event') as mock_kbd_event:
self.vnc_server.handle_client()
mock_kbd_event.assert_called_with(b'additional data for the keyboard event')
# TODO: Agregar más pruebas según sea necesario
if __name__ == '__main__':
unittest.main()