opc ua
是一种应用层协议,基于tcp之上,其url通常为opc.tcp://127.0.0.1:4840/abc
,在opc ua中常被称为endpoint
两种模式
opc ua支持c/s模式,同时也支持类似mqtt的发布订阅模式,通常各种设备作为opc ua的服务端提供各种服务。
信息模型
opc ua采用面向对象的设计思路, 使用了对象(objects)作为过程系统表示数据和活动的基础。对象包含了变量
,事件
和方法
,它们通过引用(reference)来互相连接。
OPC UA 信息模型是节点的网络(Network of Node,),或者称为结构化图(graph),由节点
node)和引用
References)组成,这种结构图称之为OPC UA 的地址空间。这种图形结构可以描述各种各样的结构化信息对象)。
注意⚠️:opc ua中所说的节点是在一个opc ua服务器中,不要理解为一个服务器对应一个node
节点
opc ua定义了8种类型的节点
对象Object)
对象类型Object Type)
变量Variable)
变量类型Variable Type)
方法Method)
视图View)
引用Reference)
数据类型Data Type)
每种节点都包含一些公共属性,如下:
属性 | 数据类型 | 说明 |
---|---|---|
NodeId | NodeId | 在OPC UA服务器内唯一确定的一个节点,并且在OPC UA服务器中定位该节点 |
NodeClass | Int32 | 该节点的类型(上面列出的8种之一) |
BrowseName | QualifiedName | 浏览OPC UA服务器事定义的节点。它是非本地化的 |
DisplayName | LocalizedText | 包含节点的名字,用来在用户接口中显示名字,本地化 |
Description | LocalizedText | 本地化的描述(可选) |
WriteMask | Uint32 | 指示哪个节点属性是可写的,即可被OPC UA客户端修改(可选) |
UserWriteMask | Uint32 | 指示哪个节点属性可以被当前连接到服务器上的用户修改(可选) |
除了数据类型
节点之外,其他各个节点都有额外的专属属性
引用
引用描述了两个节点之间的关系,用来连接多个节点。OPC UA预定义了多种引用,常见的引用有:
hasTypeDefinition
描述对象、变量和类型之间的关系
ObjectNode的hasTypeDefinition引用,指向了一个ObjectTypeNode,表示该ObjectNode的类型;
VariableNode的hasTypeDefinition引用,指向一个VariableTypeNode,表示该 VariableNode的类型。
hasSubType
描述对象的挤成关系,当子类从父类继承后,子类拥有一个hasSubType引用指向父类。
hasComponents
描述一种组合关系
ObjectNode一般都由多个VariableNode组成,ObjectNode包含某个VariableNode时,ObjectNode拥有一个hasComponents引用,指向该VariableNode;
VariableNode也可以包含子VariableNode,此时也用hasComponents描述它们的关系。
Organizes
指明两个节点的层次结构,通过organizes可以把多个节点组织到同一个父节点下。
完整引用如下
服务
服务可以看成是OPC UA服务器提供的API集合,OPC UA与定义了37个标准服务,常用的服务有:
读写服务
可以获取和修改服务器指定节点指定属性的值
调用服务
执行服务器上指定节点的方法
订阅数据变化和订阅事件
可以监控服务器数据的变化
opc ua编程
Sdk
python(支持客户端和服务端)
https://github.com/FreeOpcUa/python-opcua
golang(支持客户端,服务端尚不完善)
https://github.com/gopcua/opcua
客户端
使用python(pyqt5)开发使用pip可以安装,跨平台
sudo pip3 install pyqt5 -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install numpy -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install pyqtgraph -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install cryptography -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install opcua-client -i https://pypi.mirrors.ustc.edu.cn/simple/
模拟设备
可利用sdk自己开发 见下面的python demo
golang Demo
读取服务器数据
package main
import
"context"
"log"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
)
func main) {
endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
nodeID := "ns=2;s=Dynamic/RandomFloat"
ctx := context.Background)
c := opcua.NewClientendpoint, opcua.SecurityModeua.MessageSecurityModeNone))
if err := c.Connectctx); err != nil {
log.Fatalerr)
}
defer c.Close)
id, err := ua.ParseNodeIDnodeID)
if err != nil {
log.Fatalf"invalid node id: %v", err)
}
req := &ua.ReadRequest{
MaxAge: 2000,
NodesToRead: []*ua.ReadValueID{{NodeID: id}},
TimestampsToReturn: ua.TimestampsToReturnBoth,
}
resp, err := c.Readreq)
if err != nil {
log.Fatalf"Read failed: %s", err)
}
if resp.Results[0].Status != ua.StatusOK {
log.Fatalf"Status not OK: %v", resp.Results[0].Status)
}
log.Printf"%#v", resp.Results[0].Value.Value))
}
向服务器写数据
package main
import
"context"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
"log"
)
func main) {
endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
nodeID := "ns=2;s=Dynamic/RandomFloat"
ctx := context.Background)
c := opcua.NewClientendpoint, opcua.SecurityModeua.MessageSecurityModeNone))
if err := c.Connectctx); err != nil {
log.Fatalerr)
}
defer c.Close)
id, err := ua.ParseNodeIDnodeID)
if err != nil {
log.Fatalf"invalid node id: %v", err)
}
v, err := ua.NewVariant10.0)
if err != nil {
log.Fatalf"invalid value: %v", err)
}
req := &ua.WriteRequest{
NodesToWrite: []*ua.WriteValue{
{
NodeID: id,
AttributeID: ua.AttributeIDValue,
Value: &ua.DataValue{
EncodingMask: ua.DataValueValue,
Value: v,
},
},
},
}
resp, err := c.Writereq)
if err != nil {
log.Fatalf"Read failed: %s", err)
}
log.Printf"%v", resp.Results[0])
}
监听服务器数据变化
package main
import
"context"
"github.com/gopcua/opcua/monitor"
"log"
"os"
"os/signal"
"sync"
"time"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
)
func cleanupsub *monitor.Subscription, wg *sync.WaitGroup) {
log.Printf"stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID), sub.Delivered), sub.Dropped))
sub.Unsubscribe)
wg.Done)
}
func startCallbackSubctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
sub, err := m.Subscribe
ctx,
&opcua.SubscriptionParameters{
Interval: interval,
},
funcs *monitor.Subscription, msg *monitor.DataChangeMessage) {
if msg.Error != nil {
log.Printf"[callback] error=%s", msg.Error)
} else {
log.Printf"[callback] node=%s value=%v", msg.NodeID, msg.Value.Value))
}
time.Sleeplag)
},
nodes...)
if err != nil {
log.Fatalerr)
}
defer cleanupsub, wg)
<-ctx.Done)
}
func main) {
endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
nodeID := "ns=2;s=Dynamic/RandomFloat"
signalCh := makechan os.Signal, 1)
signal.NotifysignalCh, os.Interrupt)
ctx, cancel := context.WithCancelcontext.Background))
defer cancel)
go func) {
<-signalCh
println)
cancel)
})
c := opcua.NewClientendpoint, opcua.SecurityModeua.MessageSecurityModeNone))
if err := c.Connectctx); err != nil {
log.Fatalerr)
}
defer c.Close)
m, err := monitor.NewNodeMonitorc)
if err != nil {
log.Fatalerr)
}
m.SetErrorHandlerfunc_ *opcua.Client, sub *monitor.Subscription, err error) {
log.Printf"error: sub=%d err=%s", sub.SubscriptionID), err.Error))
})
wg := &sync.WaitGroup{}
// start callback-based subscription
wg.Add1)
go startCallbackSubctx, m, time.Second, 0, wg, nodeID)
<-ctx.Done)
wg.Wait)
}
python opcua server demo
#!/usr/bin/env python3
from threading import Thread
import random
import time
from opcua import ua, uamethod, Server
@uamethod
def set_temperatureparent, variant):
printf"set_temperature {variant.Value}")
temperature_thread.temperature.set_valuevariant.Value)
@uamethod
def set_onoffparent, variant):
printf"set_onoff {variant.Value}")
temperature_thread.temperature.set_valuevariant.Value)
# 这个类用于后台定时随机修改值
class TemperatureThread):
def __init__self, temperature, onoff):
Thread.__init__self)
self._stop = False
self.temperature = temperature
self.onoff = onoff
def stopself):
self._stop = True
def runself):
count = 1
while not self._stop:
value = random.randint-20, 100)
self.temperature.set_valuevalue)
printf"random set temperature {value}")
value = boolrandom.randint0, 1))
self.onoff.set_valuevalue)
printf"random set onoff {value}")
led_event.event.Message = ua.LocalizedText"high_temperature %d" % count)
led_event.event.Severity = count
#led_event.event.temperature = random.randint60, 100)
led_event.event.onoff = boolrandom.randint0, 1))
led_event.trigger)
count += 1
time.sleep10)
if __name__ == "__main__":
# now setup our server
server = Server)
server.set_endpoint"opc.tcp://0.0.0.0:40840/tuyaopcua/server/")
server.set_server_name"TuyaOpcUa Driver Demo Device")
# set all possible endpoint policies for clients to connect through
server.set_security_policy[
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
ua.SecurityPolicyType.Basic128Rsa15_Sign,
ua.SecurityPolicyType.Basic256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256_Sign])
# setup our own namespace
uri = "http://tuya.com"
idx = server.register_namespaceuri)
# 添加一个 `空调` 对象
air_conditioner = server.nodes.objects.add_objectidx, "AirConditioner")
temperature = air_conditioner.add_variableidx, "temperature", 20)
temperature.set_writable)
onoff = air_conditioner.add_variableidx, "onoff", True)
onoff.set_writable)
air_conditioner.add_methodidx, "set_temperature", set_temperature, [ua.VariantType.UInt32])
air_conditioner.add_methodidx, "set_onoff", set_onoff, [ua.VariantType.Boolean])
# creating a default event object, the event object automatically will have members for all events properties
led_event_type = server.create_custom_event_typeidx,
'high_temperature',
ua.ObjectIds.BaseEventType,
['temperature', ua.VariantType.UInt32), 'onoff', ua.VariantType.Boolean)])
led_event = server.get_event_generatorled_event_type, air_conditioner)
led_event.event.Severity = 300
# start opcua server
server.start)
print"Start opcua server...")
temperature_thread = Temperaturetemperature, onoff)
temperature_thread.start)
try:
led_event.triggermessage="This is BaseEvent")
while True:
time.sleep5)
finally:
print"Exit opcua server...")
temperature_thread.stop)
server.stop)