Terminal OPC UA Browser Example¶
Source example: examples/highlevel/opcua_browser.py
This example builds a small terminal browser with curses. It browses references on the left and reads attributes for the selected node on the right.
Explanation¶
The browser uses the newer attribute-based API when it needs node metadata. Instead of convenience properties, it reads values explicitly with AttributeId.
node(attr=o6.AttributeId.DISPLAYNAME)
client.read(node._nodeid, attribute_id=o6.AttributeId.BROWSENAME)
The main loop combines client.browse(...), fuzzy filtering, and keyboard navigation to keep both panes updated.
Full source¶
import o6
from o6 import Client, StatusCodeError, types
import socket
import curses
import os
os.environ.setdefault("ESCDELAY", "25") # reduce ESC key delay from ~1000ms to 25ms
localhost = socket.gethostname()
endpoint_url = f"opc.tcp://{localhost}:4840"
def node_detail_lines(node, max_w):
"""Return display lines for all available attributes of a node."""
from o6.client_nodes import (
ObjectNode,
VariableNode,
MethodNode,
ObjectTypeNode,
VariableTypeNode,
ReferenceTypeNode,
DataTypeNode,
ViewNode,
)
def tryread(label, fn):
try:
return (label, str(fn()))
except Exception:
return None
pairs = [
("NodeId", str(node._nodeid)),
("Class", node._node_class.name),
]
for pair in [
tryread("DisplayName", lambda: node(attr=o6.AttributeId.DISPLAYNAME)),
tryread("BrowseName", lambda: node(attr=o6.AttributeId.BROWSENAME)),
tryread("Description", lambda: node(attr=o6.AttributeId.DESCRIPTION)),
tryread("WriteMask", lambda: node(attr=o6.AttributeId.WRITEMASK)),
tryread("UserWriteMask", lambda: node(attr=o6.AttributeId.USERWRITEMASK)),
]:
if pair:
pairs.append(pair)
if isinstance(node, VariableNode):
for pair in [
tryread("Value", lambda: node(attr=o6.AttributeId.VALUE)),
tryread("DataType", lambda: node(attr=o6.AttributeId.DATATYPE)),
tryread("ValueRank", lambda: node(attr=o6.AttributeId.VALUERANK)),
tryread("ArrayDimensions", lambda: node(attr=o6.AttributeId.ARRAYDIMENSIONS)),
tryread("AccessLevel", lambda: node(attr=o6.AttributeId.ACCESSLEVEL)),
tryread("UserAccessLevel", lambda: node(attr=o6.AttributeId.USERACCESSLEVEL)),
tryread("MinSamplingInterval", lambda: node(attr=o6.AttributeId.MINIMUMSAMPLINGINTERVAL)),
tryread("Historizing", lambda: node(attr=o6.AttributeId.HISTORIZING)),
]:
if pair:
pairs.append(pair)
elif isinstance(node, ObjectNode):
pair = tryread("EventNotifier", lambda: node(attr=o6.AttributeId.EVENTNOTIFIER))
if pair:
pairs.append(pair)
elif isinstance(node, MethodNode):
for pair in [
tryread("Executable", lambda: node(attr=o6.AttributeId.EXECUTABLE)),
tryread("UserExecutable", lambda: node(attr=o6.AttributeId.USEREXECUTABLE)),
]:
if pair:
pairs.append(pair)
elif isinstance(node, ObjectTypeNode):
pair = tryread("IsAbstract", lambda: node(attr=o6.AttributeId.ISABSTRACT))
if pair:
pairs.append(pair)
elif isinstance(node, VariableTypeNode):
for pair in [
tryread("IsAbstract", lambda: node(attr=o6.AttributeId.ISABSTRACT)),
tryread("Value", lambda: node(attr=o6.AttributeId.VALUE)),
tryread("DataType", lambda: node(attr=o6.AttributeId.DATATYPE)),
tryread("ValueRank", lambda: node(attr=o6.AttributeId.VALUERANK)),
tryread("ArrayDimensions", lambda: node(attr=o6.AttributeId.ARRAYDIMENSIONS)),
]:
if pair:
pairs.append(pair)
elif isinstance(node, ReferenceTypeNode):
for pair in [
tryread("IsAbstract", lambda: node(attr=o6.AttributeId.ISABSTRACT)),
tryread("Symmetric", lambda: node(attr=o6.AttributeId.SYMMETRIC)),
tryread("InverseName", lambda: node(attr=o6.AttributeId.INVERSENAME)),
]:
if pair:
pairs.append(pair)
elif isinstance(node, DataTypeNode):
for pair in [
tryread("IsAbstract", lambda: node(attr=o6.AttributeId.ISABSTRACT)),
tryread("DataTypeDefinition", lambda: node(attr=o6.AttributeId.DATATYPEDEFINITION)),
]:
if pair:
pairs.append(pair)
elif isinstance(node, ViewNode):
for pair in [
tryread("ContainsNoLoops", lambda: node(attr=o6.AttributeId.CONTAINSNOLOOPS)),
tryread("EventNotifier", lambda: node(attr=o6.AttributeId.EVENTNOTIFIER)),
]:
if pair:
pairs.append(pair)
# Align values after the colon using the longest label width
col = max((len(label) for label, _ in pairs), default=0)
lines = []
for label, val in pairs:
prefix = f" {label.ljust(col)} : "
avail = max_w - len(prefix) - 1
if len(val) > avail:
val = val[: avail - 3] + "..."
lines.append(prefix + val)
return lines
def fuzzy_match(query, text):
"""Subsequence fuzzy match. Returns (matched, [matched_indices_in_text])."""
if not query:
return True, []
q = query.lower()
t = text.lower()
positions = []
qi = 0
for ti, ch in enumerate(t):
if qi < len(q) and ch == q[qi]:
positions.append(ti)
qi += 1
return qi == len(q), positions
def main(stdscr):
curses.curs_set(0)
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN) # selected row
curses.init_pair(2, curses.COLOR_CYAN, -1) # header/title
curses.init_pair(3, curses.COLOR_YELLOW, -1) # node info
curses.init_pair(4, curses.COLOR_GREEN, -1) # status bar
with Client(endpoint_url) as client:
node = client.root
history = []
path_segments = [str(client.read(node._nodeid, attribute_id=o6.AttributeId.BROWSENAME))]
selected = 0
message = ""
filter_query = ""
filter_mode = False
while True:
stdscr.clear()
height, width = stdscr.getmaxyx()
# Layout rows:
# 0 title bar
# 1 browse path (full width, with label)
# 2 horizontal separator
# 3 col headers (left) | "Node Details" (right)
# 4.. ref list (left) | node attrs (right)
# H-4 left sep row for ref detail panel
# H-3 ref detail (left, 1 line)
# H-2 horizontal separator (full width)
# H-1 key legend
left_w = width // 2
div_x = left_w
right_x = div_x + 1
right_w = width - right_x
REF_DETAIL_ROWS = 1
list_start = 4
# rows used below list: left-sep(1) + refdetail(1) + full-sep(1) + legend(1) = 4
list_height = height - list_start - REF_DETAIL_ROWS - 4
# --- Title bar ---
stdscr.attron(curses.color_pair(2) | curses.A_BOLD)
stdscr.addnstr(0, 0, " OPC UA Browser ".center(width), width)
stdscr.attroff(curses.color_pair(2) | curses.A_BOLD)
# --- Browse path row (row 1) ---
browse_path = "/" + "/".join(path_segments)
avail = width - len(" Browse Path: ") - 1
if len(browse_path) > avail:
browse_path = "..." + browse_path[-avail + 3 :]
stdscr.attron(curses.A_BOLD)
stdscr.addstr(1, 0, " Browse Path: ")
stdscr.attroff(curses.A_BOLD)
stdscr.addnstr(
1,
len(" Browse Path: "),
browse_path.ljust(width),
width - len(" Browse Path: ") - 1,
)
# --- Horizontal separator after browse path (row 2) ---
try:
stdscr.addnstr(2, 0, "─" * width, width - 1)
except curses.error:
pass
# --- Vertical divider (rows 3..H-3) ---
for row in range(3, height - 2):
try:
stdscr.addch(row, div_x, curses.ACS_VLINE)
except curses.error:
pass
# --- Left pane: reference list ---
refs = client.browse(
node._nodeid,
result_mask=(
o6.BrowseResultMask.BrowseName
| o6.BrowseResultMask.NodeClass
| o6.BrowseResultMask.ReferenceTypeId
),
)
# Fuzzy filter
filtered_refs = []
match_positions_map = {}
for ref in refs:
bn = str(ref.browse_name)
matched, positions = fuzzy_match(filter_query, bn)
if matched:
filtered_refs.append(ref)
match_positions_map[id(ref)] = positions
if selected < 0:
selected = 0
if selected >= len(filtered_refs):
selected = max(len(filtered_refs) - 1, 0)
scroll = max(0, selected - list_height + 1)
# Column header (row 3): show active filter query if set
if filter_query:
hdr = f" /{filter_query} ({len(filtered_refs)}/{len(refs)})"
stdscr.attron(curses.color_pair(4))
stdscr.addnstr(3, 0, hdr[:left_w].ljust(left_w), left_w)
stdscr.attroff(curses.color_pair(4))
else:
stdscr.attron(curses.A_BOLD)
stdscr.addnstr(
3, 0, f" {'#':>3} browse_name"[:left_w].ljust(left_w), left_w
)
stdscr.attroff(curses.A_BOLD)
for list_idx in range(
scroll, min(scroll + list_height, len(filtered_refs))
):
ref = filtered_refs[list_idx]
row_y = list_start + (list_idx - scroll)
if row_y >= list_start + list_height:
break
bn = str(ref.browse_name)
prefix = f" {list_idx:>3} "
is_selected = list_idx == selected
base_attr = curses.color_pair(1) if is_selected else 0
line = (prefix + bn)[:left_w].ljust(left_w)
stdscr.addnstr(row_y, 0, line, left_w, base_attr)
# Overdraw matched chars with bold highlight
if filter_query:
positions = set(match_positions_map.get(id(ref), []))
for i, ch in enumerate(bn):
col = len(prefix) + i
if col >= left_w:
break
if i in positions:
try:
stdscr.addch(row_y, col, ch, base_attr | curses.A_BOLD)
except curses.error:
pass
# --- Left pane bottom: selected ref details ---
sep_y = height - REF_DETAIL_ROWS - 3
stdscr.attron(curses.A_DIM)
stdscr.addnstr(sep_y, 0, ("─" * left_w)[:left_w], left_w)
stdscr.attroff(curses.A_DIM)
if filtered_refs:
sel_ref = filtered_refs[selected]
ref_detail = (
f" {sel_ref.browse_name} "
f"id={sel_ref.nodeid} "
f"type={sel_ref.reference_type_id}"
)
stdscr.addnstr(sep_y + 1, 0, ref_detail[:left_w].ljust(left_w), left_w)
# --- Right pane: current node details ---
stdscr.attron(curses.A_BOLD)
stdscr.addnstr(
3, right_x, " Node Details"[:right_w].ljust(right_w), right_w
)
stdscr.attroff(curses.A_BOLD)
detail_lines = node_detail_lines(node, right_w)
for i, line in enumerate(detail_lines):
row_y = 4 + i
if row_y >= height - 2:
break
stdscr.attron(curses.color_pair(3))
stdscr.addnstr(row_y, right_x, line[:right_w].ljust(right_w), right_w)
stdscr.attroff(curses.color_pair(3))
# --- Horizontal separator before legend (row H-2) ---
try:
stdscr.addnstr(height - 2, 0, "─" * width, width - 1)
except curses.error:
pass
# --- Key legend / filter input (row H-1) ---
if filter_mode:
filter_display = f" Filter: {filter_query}▌"
stdscr.attron(curses.color_pair(4) | curses.A_BOLD)
stdscr.addnstr(height - 1, 0, filter_display.ljust(width), width - 1)
stdscr.attroff(curses.color_pair(4) | curses.A_BOLD)
else:
legend = " jk/↑↓:select l/→/Enter:open h/←/Bksp:back /:filter g:goto q:quit"
if message:
legend = f" {message} |{legend}"
stdscr.attron(curses.color_pair(4))
stdscr.addnstr(height - 1, 0, legend.ljust(width), width - 1)
stdscr.attroff(curses.color_pair(4))
stdscr.refresh()
message = ""
# --- Input handling ---
key = stdscr.getch()
if filter_mode:
if key == 27: # ESC — clear and exit
filter_query = ""
filter_mode = False
selected = 0
elif key in (10, 13): # Enter — keep filter, exit mode
filter_mode = False
elif key in (curses.KEY_BACKSPACE, 127, 263):
filter_query = filter_query[:-1]
selected = 0
elif 32 <= key < 256:
filter_query += chr(key)
selected = 0
else:
if key == ord("q") or key == ord("Q"):
break
elif key == 27: # ESC — clear filter
filter_query = ""
selected = 0
elif key == ord("/"):
filter_mode = True
filter_query = ""
selected = 0
elif key == curses.KEY_UP or key == ord("k"):
selected = max(0, selected - 1)
elif key == curses.KEY_DOWN or key == ord("j"):
selected = min(len(filtered_refs) - 1, selected + 1)
elif key == curses.KEY_PPAGE:
selected = max(0, selected - list_height)
elif key == curses.KEY_NPAGE:
selected = min(len(filtered_refs) - 1, selected + list_height)
elif key == curses.KEY_HOME:
selected = 0
elif key == curses.KEY_END:
selected = len(filtered_refs) - 1
elif key in (curses.KEY_ENTER, 10, 13, curses.KEY_RIGHT, ord("l")):
if filtered_refs:
ref = filtered_refs[selected]
try:
new_node = client[types.NodeId(str(ref.nodeid))]
history.append((node, selected, list(path_segments)))
path_segments.append(str(ref.browse_name))
node = new_node
selected = 0
filter_query = ""
filter_mode = False
except Exception as e:
message = f"Error: {e}"
elif key in (curses.KEY_BACKSPACE, 127, 263, curses.KEY_LEFT, ord("h")):
if history:
node, selected, saved_path = history.pop()
path_segments.clear()
path_segments.extend(saved_path)
filter_query = ""
filter_mode = False
else:
message = "Already at root"
elif key == ord("g"):
# --- Goto: manual input loop with ESC to cancel ---
curses.curs_set(1)
prompt = " Goto: "
input_buf = []
while True:
display = prompt + "".join(input_buf)
stdscr.attron(curses.color_pair(4))
stdscr.addnstr(height - 1, 0, display.ljust(width), width - 1)
stdscr.attroff(curses.color_pair(4))
stdscr.move(height - 1, min(len(display), width - 1))
stdscr.refresh()
ch = stdscr.getch()
if ch == 27: # ESC — cancel
input_buf = None
break
elif ch in (10, 13): # Enter — confirm
break
elif ch in (curses.KEY_BACKSPACE, 127, 263):
if input_buf:
input_buf.pop()
elif 32 <= ch < 256:
input_buf.append(chr(ch))
curses.curs_set(0)
if input_buf is not None:
path_input = "".join(input_buf).strip()
if path_input:
segments = [
s for s in path_input.strip("/").split("/") if s
]
try:
cur = client.root
new_segments = [str(client.read(cur._nodeid, attribute_id=o6.AttributeId.BROWSENAME))]
for seg in segments:
cur = getattr(cur, seg)
new_segments.append(seg)
# Push current state so back returns here
history.append((node, selected, list(path_segments)))
path_segments.clear()
path_segments.extend(new_segments)
node = cur
selected = 0
except KeyError as e:
message = f"Not found: {e}"
except Exception as e:
message = f"Error: {e}"
curses.wrapper(main)