PythonでOPC UAサーバー、クライントを立てる

FA

最近、OPC UAというのに興味をもって、色々と調べたらこんなものを見つけた。

GitHub - FreeOpcUa/python-opcua: LGPL Pure Python OPC-UA Client and Server
LGPL Pure Python OPC-UA Client and Server. Contribute to FreeOpcUa/python-opcua development by creating an account on Gi...

どうもPythonでOPCUAを実装できるみたいで、よくわからないけど、とりあえず動かしてみる。

今回の環境は以下のようにした。

VScodeを使用して、WSL2上でDockerを使用してコンテナを立てた。
最近は、Pythonで仮想環境作るよりDockerイメージを作成したほうが、作りやすいし共有しやすい….

以下の内容で”Dockerfile”を作成する。

FROM ubuntu:20.04

RUN apt-get update && apt-get install -y sudo
RUN apt install pip && git -y sudo

ARG USERNAME="任意の値"
ARG GROUPNAME="任意の値"
ARG UID=1000
ARG GID=1000
ARG PASSWORD=user
RUN groupadd -g $GID $GROUPNAME && \
    useradd -m -s /bin/bash -u $UID -g $GID -G sudo $USERNAME && \
    echo $USERNAME:$PASSWORD | chpasswd && \
    echo "$USERNAME   ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
USER $USERNAME
WORKDIR /home/$USERNAME/

作成したDockerfileがあるフォルダに移動して以下のコマンド実行することで、”opcua“という名前のDockerイメージが作成される。

sudo docker build -t opcua:1 .

Vscode上のDockerからopcua”1”イメージを右クリックして、RUNInteractiveを実行してコンテナを立ち上げる。

コンテナに以下の表示がされる

Visual Studio Codeをアタッチするを選択

新しくウィンドウが立ち上がる

以下のコマンドを実行してOPCUAをインストールする

pip install opcua

gitをダウンロードを行う

git clone https://github.com/FreeOpcUa/python-opcua.git

exsampleのフォルダに移動する

最小限のサーバーを起動してみる

serever-minimal.pyのPythonファイルを起動して画像のようになったらサーバーが起動する。

client-minimal.pyを起動して、画像のようになったら無事通信できた。

リアルタイムで値が変わる様子を見ないとなんとも理解できない自分は、serever-minimal.pyとclient-minimal.pyを以下のように変更した。

import sys
sys.path.insert(0, "..")
import time

from opcua import ua, Server

if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    # setup our own namespace, not really necessary but should as spec
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # get Objects node, this is where we should put our nodes
    objects = server.get_objects_node()

    # populating our address space
    myobj = objects.add_object(idx, "MyObject")
    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    myvar.set_writable()    # Set MyVariable to be writable by clients

    # starting!
    server.start()
    
    try:
        count = 0
        while True:
            time.sleep(1)
            count += 0.1
            myvar.set_value(count)
            print("{:.1f}".format(count)) #カウントアップ値を表示
    finally:
        #close connection, remove subcsriptions, etc
        server.stop()
import sys
import time
sys.path.insert(0, "..")


from opcua import Client


if __name__ == "__main__":

    client = Client("opc.tcp://localhost:4840/freeopcua/server/")
    # client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
    while(1):
        try:
            client.connect()

            # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
            root = client.get_root_node()
            print("Objects node is: ", root)

            # Node objects have methods to read and write node attributes as well as browse or populate address space
            print("Children of root are: ", root.get_children())

            # get a specific node knowing its node id
            #var = client.get_node(ua.NodeId(1002, 2))
            #var = client.get_node("ns=3;i=2002")
            #print(var)
            #var.get_data_value() # get value of node as a DataValue object
            #var.get_value() # get value of node as a python builtin
            #var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
            #var.set_value(3.9) # set node value using implicit data type

            # Now getting a variable node using its browse path
            myvar = root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
            obj = root.get_child(["0:Objects", "2:MyObject"])
            print("myvar is: ", myvar)
            print("myobj is: ", obj)
            print(myvar.get_data_value())  #カウントアップ値を取得
            time.sleep(1)   #1秒ごとに取得

            # Stacked myvar access
            # print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())

        finally:
            client.disconnect()

右側がサーバーの現在値、左側がクライントでサーバから取得した値となっている。
無事に1秒毎にしゅとくすることができた。

タイトルとURLをコピーしました