Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import typing
- from contextlib import asynccontextmanager
- from asyncua import Client
- @asynccontextmanager
- async def connector(client: Client):
- client.session_timeout = 600000
- await client.connect()
- yield client
- await client.disconnect()
- async def get_value(hmi_opcua: Client, tag: str) -> typing.Any:
- root = hmi_opcua.get_root_node()
- tags = await root.get_child(["0:Objects", "1:HmiRuntime", "1:HMI_RT_4", "1:Tags"])
- for child in await tags.get_children():
- name = (await child.read_browse_name()).Name.split("::")[1]
- if name == tag:
- return await child.get_value()
- raise ValueError(f"Could not find {tag}")
- async def main():
- async with connector(Client(url="opc.tcp://127.0.0.1:4949")) as client:
- for _ in range(10):
- print("Tag in position:", await get_value(client, "Tag_in_pos"))
- await asyncio.sleep(0.1)
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement