Compare commits

2 Commits

Author SHA1 Message Date
Matias Fernandez
6207aee345 unit tests 2024-02-03 21:42:45 -03:00
Matias Fernandez
50df8095fd minor fixes 2024-01-05 14:33:03 -03:00
5 changed files with 116 additions and 38 deletions

View File

@@ -21,4 +21,5 @@ from . import raw
from . import zlib
#from . import zrle
from . import tight
#from . import hextile
from . import cursor

View File

@@ -21,12 +21,14 @@ class ENCODINGS:
raw = 0
zlib = 6
tight = 7
hextile = 5
#zrle = 16
# supported pseudo-encodings
cursor = -239
encodings_priority = [
#ENCODINGS.zrle,
ENCODINGS.hextile,
ENCODINGS.tight,
ENCODINGS.zlib,
ENCODINGS.raw

View File

@@ -10,6 +10,8 @@ class Encoding:
id = 7
description = 'Tight VNC encoding'
enabled = True
use_jpeg_threshold = 0 # color threshold to use jpeg compression, 0 = always use jpeg
jpeg_compression_quality = 75
def __init__(self):
self.compress_obj = zlib.compressobj(
@@ -32,9 +34,9 @@ class Encoding:
sendbuff.extend(pack("!HHHH", x, y, w, h))
sendbuff.extend(pack(">i", self.id))
if self._should_use_jpeg(image, 64):
if self._should_use_jpeg(image, self.use_jpeg_threshold):
self._last_compression_was_jpeg = True
compressed_data = self._compress_image_jpeg(image)
compressed_data = self._compress_image_jpeg(image, self.jpeg_compression_quality)
sendbuff.append(0x90) # 0x90 = 10010000 = JPEG subencoding
else:
compressed_data = self._compress_image_zlib(image)
@@ -51,10 +53,10 @@ class Encoding:
def _send_compact_length(self, length):
sendbuff = bytearray()
while True:
# Toma los 7 bits más bajos del tamaño
# Lowest 7 bits of length
byte = length & 0x7F
length >>= 7
# Si aún hay más datos, establece el bit alto y continúa
# if more length bytes are required, set the next highest bit
if length > 0:
byte |= 0x80
sendbuff.append(byte)
@@ -65,6 +67,9 @@ class Encoding:
def _should_use_jpeg(self, image, threshold=256):
if image.mode == 'P':
return False
if self.use_jpeg_threshold == 0:
return True
if image.mode == 'RGB':
width, height = image.size
@@ -87,7 +92,7 @@ class Encoding:
self.compress_obj = zlib.compressobj(
zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED,
-zlib.MAX_WBITS, # El negativo omite la inclusión de un encabezado zlib.
-zlib.MAX_WBITS, # suppress zlib header
zlib.DEF_MEM_LEVEL,
zlib.Z_DEFAULT_STRATEGY
)

View File

@@ -50,7 +50,6 @@ class VNCServer():
unix = 129 # Unix Login Authentication
encoding_object = None
last_cursor = None
def __init__(self, socket, password=None, auth_type=None, pem_file='', vnc_config = None):
@@ -64,6 +63,7 @@ class VNCServer():
self.pem_file = pem_file
self.vnc_config = vnc_config
self.cursor_encoding = CursorEncoding()
self.fbupdate_rate_limit = 0.05
log.debug("Configured auth type:", self.auth_type)
@@ -102,8 +102,8 @@ class VNCServer():
server_version = float(self.RFB_VERSION)
try:
client_version = float(data[4:11])
except:
log.debug("Error parsing client version")
except Exception as e:
log.debug(f"Error parsing client version: {str(e)}")
return False
log.debug("client, server:", client_version, server_version)
@@ -253,7 +253,7 @@ class VNCServer():
def handle_client(self):
self.socket.settimeout(None) # set nonblocking socket
last_rfbu = time.time()
last_fbur = time.time()
mousecontroller = mousectrl.MouseController()
kbdcontroller = kbdctrl.KeyboardController()
@@ -287,12 +287,12 @@ class VNCServer():
break
if data[0] == 0: # client SetPixelFormat
data2 = sock.recv(19, socket.MSG_WAITALL)
fbur_data = sock.recv(19, socket.MSG_WAITALL)
log.debug("Client Message Type: Set Pixel Format (0)")
(self.bpp, self.depth, self.bigendian, self.truecolor, self.red_maximum,
self.green_maximum, self.blue_maximum,
self.red_shift, self.green_shift, self.blue_shift
) = unpack("!xxxBBBBHHHBBBxxx", data2)
) = unpack("!xxxBBBBHHHBBBxxx", fbur_data)
log.debug("IMG bpp, depth, endian, truecolor", self.bpp, self.depth, self.bigendian, self.truecolor)
log.debug("SHIFTS", self.red_shift, self.green_shift, self.blue_shift)
log.debug("MAXS", self.red_maximum, self.green_maximum, self.blue_maximum)
@@ -323,13 +323,13 @@ class VNCServer():
continue
if data[0] == 2: # SetEncoding
data2 = sock.recv(3)
fbur_data = sock.recv(3)
log.debug("Client Message Type: SetEncoding (2)")
(nencodings,) = unpack("!xH", data2)
(nencodings,) = unpack("!xH", fbur_data)
log.debug("SetEncoding: total encodings", repr(nencodings))
data2 = sock.recv(4 * nencodings, socket.MSG_WAITALL)
fbur_data = sock.recv(4 * nencodings, socket.MSG_WAITALL)
#log.debug("len", len(data2))
self.client_encodings = unpack("!%si" % nencodings, data2)
self.client_encodings = unpack("!%si" % nencodings, fbur_data)
log.debug("client_encodings", repr(self.client_encodings), len(self.client_encodings))
# cursor support?
@@ -342,11 +342,14 @@ class VNCServer():
# which pixel encoding to use?
log.debug("encs.common.encodings_priority", encs.common.encodings_priority)
for e in encs.common.encodings_priority:
#log.debug("E", e)
if e in self.client_encodings:
if self.encoding == e:
# don't initialize same encoding again
break
# check if encoding is disabled
if not encs.common.encodings[e].enabled:
log.debug("Encoding disabled:", e)
continue
self.encoding = e
#log.debug("Using %s encoding" % self.encoding)
log.debug("Using %s encoding" % encs.common.encodings[self.encoding].name)
@@ -358,19 +361,21 @@ class VNCServer():
if data[0] == 3: # FBUpdateRequest
# rate limit
data2 = sock.recv(9, socket.MSG_WAITALL)
if not data2:
fbur_data = sock.recv(9, socket.MSG_WAITALL)
if not fbur_data:
log.debug("connection closed?")
break
if time.time() - last_rfbu < 0.1:
if time.time() - last_fbur < self.fbupdate_rate_limit:
# rate limited
try:
sock.sendall(pack("!BxH", 0, 0))
except:
log.debug("connection closed?")
except Exception as e:
log.debug(f"Error sending rate limited FBUpdateRequest: {str(e)}")
break
continue
last_rfbu = time.time()
(incremental, x, y, w, h) = unpack("!BHHHH", data2)
last_fbur = time.time()
(incremental, x, y, w, h) = unpack("!BHHHH", fbur_data)
#log.debug("RFBU:", incremental, x, y, w, h)
self.send_rectangles(sock, x, y, w, h, incremental)
if self.cursor_support:
@@ -391,13 +396,14 @@ class VNCServer():
log.debug("ClientCutText:", text)
else:
data2 = sock.recv(4096)
log.debug("RAW Server received data:", repr(data[0]) , data+data2)
fbur_data = sock.recv(4096)
log.debug("RAW Server received data:", repr(data[0]) , data+fbur_data)
def get_rectangle(self, x, y, w, h):
try:
scr = ImageGrab.grab()
except:
except Exception as ex:
log.debug("Error grabbing screen: %s" % ex)
return False
(scr_width, scr_height) = scr.size
@@ -427,18 +433,17 @@ class VNCServer():
raw_pixels = bitmap.get_bitmap(cursor_img)
raw_pixels = raw_pixels.tobytes("raw", raw_pixels.mode)
# Crear la máscara de forma (bitmask)
# Create bitmask
bitmask = bytearray()
for j in range(h):
row = 0
for i in range(w):
if cursor_img.getpixel((i, j))[3]: # Verificar alfa del pixel
if cursor_img.getpixel((i, j))[3]: # Verify alpha pixel
row |= (128 >> (i % 8))
if (i % 8 == 7) or i == w - 1:
bitmask.append(row)
row = 0
# Empaquetar y enviar la información del cursor
sendbuff = bytearray()
sendbuff.extend(pack("!BxH", 0, 1)) # FramebufferUpdate, 1 rectangle
sendbuff.extend(pack("!HHHH", x, y, w, h)) # geometry
@@ -449,7 +454,7 @@ class VNCServer():
try:
self.socket.sendall(sendbuff)
except Exception as e:
print(f"Error al enviar el cursor: {e}")
print(f"Error sending cursor info: {e}")
return False
return True
@@ -471,14 +476,15 @@ class VNCServer():
diff = ImageChops.difference(rectangle, self.framebuffer)
if diff.getbbox() is None:
# no changes...
#log.debug("no changes")
rectangles = 0
sendbuff.extend(pack("!BxH", 0, rectangles))
# clear the incoming socket buffer
sleep(0.05)
try:
sock.sendall(sendbuff)
except:
log.debug("connection closed?")
except Exception as e:
log.debug(f"Error sending no changes: {str(e)}")
return False
return
@@ -491,9 +497,6 @@ class VNCServer():
h = rectangle.height
#log.debug(f"RFB_RES:", incremental, x, y, w, h)
#stimeout = sock.gettimeout()
#sock.settimeout(None)
if self.bpp == 32 or self.bpp == 16 or self.bpp == 8:
bitmap = self.rfb_bitmap
bitmap.bpp = self.bpp
@@ -508,6 +511,7 @@ class VNCServer():
image = bitmap.get_bitmap(rectangle)
# send image with client defined encoding
self.encoding_object.framebuffer = self.framebuffer
sendbuff.extend(self.encoding_object.send_image(x, y, w, h, image))
else:
log.debug("[!] Unsupported BPP: %s" % self.bpp)
@@ -515,8 +519,7 @@ class VNCServer():
self.framebuffer = lastshot
try:
sock.sendall(sendbuff)
except:
except Exception as e:
# connection closed?
log.debug(f"Error sending changes: {str(e)}")
return False
#sock.settimeout(stimeout)
#log.debug("end SendRectangles")

67
test/test_vncserver.py Normal file
View File

@@ -0,0 +1,67 @@
import unittest
from unittest.mock import Mock, patch
from pyvncs.server import VNCServer
class TestVNCServer(unittest.TestCase):
def setUp(self):
self.socket_mock = Mock()
self.vnc_config_mock = Mock()
self.vnc_config_mock.win_title = "Test Window Title"
self.vnc_server = VNCServer(socket=self.socket_mock, password="test_password", auth_type=2, vnc_config=self.vnc_config_mock)
def test_constructor(self):
self.assertEqual(self.vnc_server.password, "test_password")
self.assertIsNotNone(self.vnc_server.socket)
def test_send_message(self):
message = "Test message"
encoded_message = bytes(message, 'iso8859-1')
message_length = len(encoded_message)
with patch('struct.pack') as mock_pack:
mock_pack.return_value = b'' # Simulate the packed message
self.vnc_server.send_message(message)
mock_pack.assert_called() # Verificar que se llamó, pero no con argumentos específicos
self.socket_mock.send.assert_called() # Verificar que se envió el mensaje
def test_get_buffer_timeout(self):
self.socket_mock.recv.side_effect = TimeoutError
result = self.vnc_server.get_buffer(30)
self.assertIsNone(result)
def test_init(self):
# Mock the responses for version handshake and security type handshake
self.socket_mock.recv.side_effect = [
b"RFB 003.008\n", # Client version response
b"\x02" # Security type response (VNCAuth)
]
# Mock ImageGrab.grab() called in server_init
with patch('pyvncs.server.ImageGrab.grab') as mock_grab:
mock_grab.return_value.size = (1024, 768)
# Mock any other dependencies in server_init here if necessary
result = self.vnc_server.init()
self.assertTrue(result)
def test_server_init(self):
with patch('pyvncs.server.ImageGrab.grab') as mock_grab:
mock_grab.return_value.size = (1024, 768)
self.vnc_server.server_init()
self.assertEqual(self.vnc_server.width, 1024)
self.assertEqual(self.vnc_server.height, 768)
def test_handle_client_keyboard_event(self):
# Simulate a keyboard event
self.socket_mock.recv.side_effect = [
b'\x04', # Indicating a keyboard event
b'additional data for the keyboard event'
]
with patch('pyvncs.server.kbdctrl.KeyboardController.process_event') as mock_kbd_event:
self.vnc_server.handle_client()
mock_kbd_event.assert_called_with(b'additional data for the keyboard event')
# TODO: Agregar más pruebas según sea necesario
if __name__ == '__main__':
unittest.main()