- Improved efficiency and readability of the RfbBitmap configuration logic. Refactored redundant code blocks for different pixel format (bpp) configurations into a single, streamlined method. This change enhances maintainability and clarity of the bitmap configuration process.

- Added cursor pseudo encoding support.

- Added Windows support for cursor image capturing in `get_cursor_image` method. Implemented Windows-specific logic using the `win32gui`, `win32ui`, and related libraries to capture the current cursor image, enhancing the cross-platform capability of the application.

- Fixed issues in `get_bitmap` method for handling different bpp formats. Specifically addressed the processing logic for 16 bpp (BGR565) format, ensuring that the image conversion and rendering are handled correctly for VNC clients expecting this format.

- Added initial Tight encoding support.

- Updated the `send_image` method in the Tight encoding class to correctly handle JPEG and ZLIB compression. This includes proper signaling to the client about the type of compression used (JPEG or ZLIB) and ensuring that the data is formatted and sent according to the Tight encoding specifications.

- Added checks and conversions in `send_image` to handle different image modes (like RGBX and RGBA) and convert them to the appropriate format (RGB) before compression and transmission.

- Implemented a more robust and accurate method for determining when to use JPEG compression in Tight encoding based on the unique color count and image characteristics.

These updates significantly improve the functionality, stability, and compatibility of the VNC server, particularly in handling different pixel formats and encoding methods.
This commit is contained in:
Matias Fernandez
2023-12-06 09:21:26 -03:00
parent 76e309ef08
commit 2bc431f0a8
18 changed files with 650 additions and 433 deletions

View File

@@ -19,4 +19,6 @@
from . import common
from . import raw
from . import zlib
#from . import zrle
from . import tight
from . import cursor

View File

@@ -20,10 +20,14 @@ encodings = {}
class ENCODINGS:
raw = 0
zlib = 6
tight = 7
#zrle = 16
# supported pseudo-encodings
cursor = -239
encodings_priority = [
#ENCODINGS.zrle,
ENCODINGS.tight,
ENCODINGS.zlib,
ENCODINGS.raw
]

View File

@@ -15,10 +15,28 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import struct
import ctypes
import ctypes.util
import platform
from PIL import Image, ImageChops, ImageDraw, ImagePalette
import base64
from io import BytesIO
from . import common
from struct import *
from lib import log
import zlib
OS_NAME = platform.system()
if OS_NAME == 'Linux':
import lib.oshelpers.x11 as x11
Xcursor = x11.XCursor
if OS_NAME == 'Windows':
import lib.oshelpers.windows_cursor as windows_cursor
class Encoding:
name = 'Cursor'
@@ -31,6 +49,54 @@ class Encoding:
def __init__(self):
log.debug("Initialized", __name__)
self.default_cursor_data = 'iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAACc0lEQVR4nK2Wv0sbYRjHP/fDyNUGalKitBDI4FAojg5NF8M7BFIPUkvSopuQoYMgFrl/wbWbndrFuBocAroVHCIdJAaxzdJuNYXGBpIgsU8H7zSpGo3mhS/3Hvfe9/N837t7OI3zobXNhT4Nre2otZ3/7RdEbwOYSqk4MACYbdfuPDTXcKherzeUUklgyAX1BaK5ZkERkUql0lBKpYD7/YJogA94LCJi27YcHh42lVLpfkE0YBAIi4iEw2FJJpNSqVSaSqnX/YB0ACKRiEQiEZmenu4bpAMwNjZ2pnQ6fWfIhcWGYZxpd3eX+fn5wWw2+1Ep9cItxOgFcmGhaZodKhaLLCws3BrSNYGnYrHI4uKiB0n0Ark2gadSqcTS0tLg2traJxficyHaBddeE3gqlUo4juNB4m0proR4Dc4HjIjI92g0ekrWdQzDoNVqsbq6Sjgc7rix0Wg0bdt+tbW1ladLczQvS6DrOo7jsLe3Ry6XY319nUAg8GV2dvY90ASqwFfg51XG/6c4+w5isZjk83nZ39//XS6XZXJyUqampqRarR6HQqEo8Ah4CFj08AzEq8RxHCzL+jExMfGu1Wr9Gh8fp9FosL29PTA3N/cSqLkJmsDJdQnaAScAmqZ9i8fjb2u12ueVlZWcbduYpsnGxgaZTOYNp9vaterLtsgAApubmwXLsp4BIcA/PDz8vFqtHs/MzEgikZCjoyMJBoNPOG0ZPUN8lmWNct5zBoDR5eXl3MHBgezs7EihUCgDI7cBtCfxXl0duKfr+tNUKvUhk8lk/X5/DPDTQy/qVoUH8QEP3PkfoE4PPwU3iemBcI25qTnAP3ZG9LuVtmFhAAAAAElFTkSuQmCC'
self.default_cursor_img = Image.open(BytesIO(base64.b64decode(self.default_cursor_data)))
def get_cursor_image(self):
if OS_NAME == 'Windows':
return windows_cursor.get_cursor_image()
elif OS_NAME == 'Linux':
cursor = Xcursor()
return cursor.get_cursor_image()
elif OS_NAME == 'Darwin':
if self.default_cursor_img.mode != 'RGBA':
self.default_cursor_img = self.default_cursor_img.convert('RGBA')
return self.default_cursor_img
else:
return None
def send_cursor(self, x, y, cursor):
sendbuff = bytearray()
sendbuff.extend(pack("!B", 0)) # message type 0 == SetCursorPosition
sendbuff.extend(pack("!H", x))
sendbuff.extend(pack("!H", y))
self.cursor_sent = True
if cursor is not None:
w, h = cursor.size
cursor_bytes = cursor.convert("RGBA").tobytes("raw", "BGRA")
# Invert alpha channel if needed
pixels = bytearray(cursor_bytes)
for i in range(0, len(pixels), 4):
pixels[i + 3] = 255 - pixels[i + 3]
sendbuff.extend(pack("!B", 1)) # message type 1 == SetCursorShape
sendbuff.extend(pack("!H", w)) # width
sendbuff.extend(pack("!H", h)) # height
sendbuff.extend(pixels)
else:
sendbuff.extend(pack("!B", 0)) # message type 0 == SetCursorPosition
sendbuff.extend(pack("!H", x))
sendbuff.extend(pack("!H", y))
return sendbuff
common.encodings[common.ENCODINGS.cursor] = Encoding
log.debug("Loaded encoding: %s (%s)" % (__name__, Encoding.id))

101
lib/encodings/tight.py Normal file
View File

@@ -0,0 +1,101 @@
from . import common
from lib import log
from struct import *
import zlib
from io import BytesIO
import numpy as np
class Encoding:
name = 'tight'
id = 7
description = 'Tight VNC encoding'
enabled = True
def __init__(self):
self.compress_obj = zlib.compressobj(
zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED,
zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
zlib.Z_DEFAULT_STRATEGY
)
self._last_compression_was_jpeg = False
def send_image(self, x, y, w, h, image):
sendbuff = bytearray()
if image.mode == 'RGBX' or image.mode == 'RGBA':
image = image.convert('RGB')
rectangles = 1
sendbuff.extend(pack("!BxH", 0, rectangles)) # FramebufferUpdate
sendbuff.extend(pack("!HHHH", x, y, w, h))
sendbuff.extend(pack(">i", self.id))
if self._should_use_jpeg(image, 64):
self._last_compression_was_jpeg = True
compressed_data = self._compress_image_jpeg(image)
sendbuff.append(0x90) # 0x90 = 10010000 = JPEG subencoding
else:
compressed_data = self._compress_image_zlib(image)
sendbuff.append(0) # control byte
# content lenght
sendbuff.extend(self._send_compact_length(len(compressed_data)))
# Tight data
sendbuff.extend(compressed_data)
return sendbuff
def _send_compact_length(self, length):
sendbuff = bytearray()
while True:
# Toma los 7 bits más bajos del tamaño
byte = length & 0x7F
length >>= 7
# Si aún hay más datos, establece el bit alto y continúa
if length > 0:
byte |= 0x80
sendbuff.append(byte)
if length == 0:
break
return sendbuff
def _should_use_jpeg(self, image, threshold=256):
if image.mode == 'P':
return False
if image.mode == 'RGB':
width, height = image.size
sample_size = min(width * height, 1000)
sample = np.array(image).reshape(-1, 3)[:sample_size]
unique_colors = np.unique(sample, axis=0)
return len(unique_colors) > threshold
return False
def _compress_image_jpeg(self, image, quality=75):
buffer = BytesIO()
image.save(buffer, format="JPEG", quality=quality)
jpeg_data = buffer.getvalue()
buffer.close()
return jpeg_data
def _compress_image_zlib(self, image):
if self.compress_obj is None or self._last_compression_was_jpeg:
self.compress_obj = zlib.compressobj(
zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED,
-zlib.MAX_WBITS, # El negativo omite la inclusión de un encabezado zlib.
zlib.DEF_MEM_LEVEL,
zlib.Z_DEFAULT_STRATEGY
)
self._last_compression_was_jpeg = False
zlib_data = self.compress_obj.compress(image.tobytes()) + self.compress_obj.flush(zlib.Z_SYNC_FLUSH)
return zlib_data
common.encodings[common.ENCODINGS.tight] = Encoding
log.debug("Loaded encoding: %s (%s)" % (__name__, Encoding.id))

View File

@@ -1,6 +1,6 @@
from . import common
from struct import *
from lib import log
from struct import *
import zlib
@@ -39,13 +39,13 @@ class Encoding:
sendbuff.extend(pack(">i", self.id))
#log.debug("Compressing...")
zlibdata = self._compressObj.compress( image.tobytes() )
zlibdata = self._compressObj.compress(image.tobytes())
zlibdata += self._compressObj.flush(zlib.Z_FULL_FLUSH)
#log.debug("LEN", len(zlibdata))
l = pack("!I", len(zlibdata) )
sendbuff.extend( l ) # send length
sendbuff.extend( zlibdata ) # send compressed data
l = pack("!I", len(zlibdata))
sendbuff.extend(l) # send length
sendbuff.extend(zlibdata) # send compressed data
return sendbuff

95
lib/encodings/zrle.py Normal file
View File

@@ -0,0 +1,95 @@
from . import common
from lib import log
import zlib
from struct import pack
from PIL import Image
class Encoding:
name = 'zrle'
id = 16
description = 'zrle VNC encoding'
enabled = True
def __init__(self):
log.debug("Initialized", __name__)
self._compressObj = zlib.compressobj()
def send_image(self, x, y, w, h, image):
sendbuff = bytearray()
rectangles = 1
sendbuff.extend(pack("!BxH", 0, rectangles)) # FramebufferUpdate
sendbuff.extend(pack("!HHHH", x, y, w, h))
sendbuff.extend(pack(">i", self.id)) # ID de encoding ZRLE
tmpbuf = bytearray()
# Dividir la imagen en tiles y comprimirlas
for tile_y in range(0, h, 64):
for tile_x in range(0, w, 64):
tile = image.crop((tile_x, tile_y, min(tile_x + 64, w), min(tile_y + 64, h)))
encoded_tile = self.tile_encode(tile)
tmpbuf.extend(encoded_tile)
compressed_data = self._compressObj.compress(tmpbuf)
compressed_data += self._compressObj.flush(zlib.Z_SYNC_FLUSH)
sendbuff.extend(pack("!I", len(compressed_data)))
sendbuff.extend(compressed_data)
log.debug("zrle send_image", x, y, w, h, image)
return sendbuff
def tile_encode(self, tile):
"""Codifica una baldosa (tile) de la imagen usando ZRLE."""
w, h = tile.size
pixels = list(tile.getdata())
rle_data = bytearray()
# Proceso RLE para la baldosa
prev_pixel = pixels[0]
count = 1
for pixel in pixels[1:]:
if pixel == prev_pixel and count < 255:
count += 1
else:
rle_data.extend(self._pack_pixel(prev_pixel, count))
prev_pixel = pixel
count = 1
rle_data.extend(self._pack_pixel(prev_pixel, count))
# Empaquetar la data RLE con el byte de subencoding
encoded_tile = bytearray()
encoded_tile.append(128) # Subencoding RLE
encoded_tile.extend(rle_data)
return encoded_tile
def _pack_pixel(self, pixel, count):
if isinstance(pixel, tuple):
# RGBA
r, g, b, a = pixel
pixel_data = bytes([r, g, b]) # Usar solo RGB para ZRLE.
else:
pixel_data = bytes([pixel, pixel, pixel])
count_data = self._encode_run_length(count)
return pixel_data + count_data
def _encode_run_length(self, length):
"""Codifica la longitud de una secuencia para RLE."""
if length == 1:
return b""
length -= 1 # La longitud se incrementa en 1 según el protocolo ZRLE
encoded_length = bytearray()
while length > 0:
byte = length % 255
encoded_length.append(byte)
length //= 255
if length > 0:
encoded_length.append(255)
return encoded_length
common.encodings[common.ENCODINGS.zrle] = Encoding
log.debug("Loaded encoding: %s (%s)" % (__name__, Encoding.id))