.. _pubsub-tsn-loopback: Realtime Loopback Example ------------------------- This tutorial shows publishing and subscribing information in Realtime. This example has both Publisher and Subscriber(used as threads, running in same core), the Subscriber thread subscribes to the counterdata published by the Publisher thread of pubsub_TSN_publisher.c example. The subscribed counterdata is again published, which is subscribed by the Subscriber thread of pubsub_TSN_publisher.c example. Thus a round-trip of counterdata is achieved. The flow of this communication and the trace points are given in the diagram below. Another thread called the UserApplication thread is also used in the example, which serves the functionality of the Control loop. In this example, UserApplication threads increments the counterData, which is published by the Publisher thread and also reads the subscribed data from the Information Model and writes the updated counterdata into distinct csv files during each cycle. Buffered Network Message will be used for publishing and subscribing in the RT path. Further, DataSetField will be accessed via direct pointer access between the user interface and the Information Model. Another additional feature called the Blocking Socket is employed in the Subscriber thread. This feature is optional and can be enabled or disabled when running application by using command line argument "-enableBlockingSocket". When using Blocking Socket, the Subscriber thread remains in "blocking mode" until a message is received from every wake up time of the thread. In other words, the timeout is overwritten and the thread continuously waits for the message from every wake up time of the thread. Once the message is received, the Subscriber thread updates the value in the Information Model, sleeps up to wake up time and again waits for the next message. This process is repeated until the application is terminated. To ensure realtime capabilities, Publisher uses ETF(Earliest Tx-time First) to publish information at the calculated tranmission time over Ethernet. Subscriber can be used with or without XDP(Xpress Data Processing) over Ethernet Run step of the example is as mentioned below: ./bin/examples/pubsub_TSN_loopback -interface For more options, run ./bin/examples/pubsub_TSN_loopback -help .. code-block:: c /* Trace point setup * * +--------------+ +----------------+ * T1 | OPCUA PubSub | T8 T5 | OPCUA loopback | T4 * | | Application | ^ | | Application | ^ * | +--------------+ | | +----------------+ | * User | | | | | | | | * Space | | | | | | | | * | | | | | | | | * -----------|--------------|------------------|----------------|-------- * | | Node 1 | | | | Node 2 | | * Kernel | | | | | | | | * Space | | | | | | | | * | | | | | | | | * v +--------------+ | v +----------------+ | * T2 | TX tcpdump | T7<----------T6 | RX tcpdump | T3 * | +--------------+ +----------------+ ^ * | | * ---------------------------------------------------------- */ #define _GNU_SOURCE #include #include #include #include #include #include #include /* For thread operations */ #include #include #include #include #include #include #include #include #include #include #include "ua_pubsub.h" UA_NodeId readerGroupIdentifier; UA_NodeId readerIdentifier; UA_DataSetReaderConfig readerConfig; /*to find load of each thread * ps -L -o pid,pri,%cpu -C pubsub_TSN_loopback */ /* Configurable Parameters */ /* These defines enables the publisher and subscriber of the OPCUA stack */ /* To run only publisher, enable PUBLISHER define alone (comment SUBSCRIBER) */ #define PUBLISHER /* To run only subscriber, enable SUBSCRIBER define alone (comment PUBLISHER) */ #define SUBSCRIBER /* Cycle time in milliseconds */ #define DEFAULT_CYCLE_TIME 0.25 /* Qbv offset */ #define DEFAULT_QBV_OFFSET 125 #define DEFAULT_SOCKET_PRIORITY 3 #define PUBLISHER_ID 2235 #define WRITER_GROUP_ID 100 #define DATA_SET_WRITER_ID 62541 #define DEFAULT_PUBLISHING_MAC_ADDRESS "opc.eth://01-00-5E-00-00-01:8.3" #if defined(SUBSCRIBER) #define PUBLISHER_ID_SUB 2234 #define WRITER_GROUP_ID_SUB 101 #define DATA_SET_WRITER_ID_SUB 62541 #define DEFAULT_SUBSCRIBING_MAC_ADDRESS "opc.eth://01-00-5E-7F-00-01:8.3" #endif #define REPEATED_NODECOUNTS 2 // Default to publish 64 bytes #define PORT_NUMBER 62541 #define DEFAULT_XDP_QUEUE 2 #define PUBSUB_CONFIG_RT_INFORMATION_MODEL /* Non-Configurable Parameters */ /* Milli sec and sec conversion to nano sec */ #define MILLI_SECONDS 1000 * 1000 #define SECONDS 1000 * 1000 * 1000 #define SECONDS_SLEEP 5 #if defined(PUBLISHER) /* Publisher will sleep for 60% of cycle time and then prepares the */ /* transmission packet within 40% */ static UA_Double pubWakeupPercentage = 0.6; #endif /* Subscriber will wakeup only during start of cycle and check whether */ /* the packets are received */ static UA_Double subWakeupPercentage = 0; /* User application Pub/Sub will wakeup at the 30% of cycle time and handles the */ /* user data such as read and write in Information model */ static UA_Double userAppWakeupPercentage = 0.3; /* Priority of Publisher, subscriber, User application and server are kept */ /* after some prototyping and analyzing it */ #define DEFAULT_PUB_SCHED_PRIORITY 78 #define DEFAULT_SUB_SCHED_PRIORITY 81 #define DEFAULT_USERAPPLICATION_SCHED_PRIORITY 75 #define MAX_MEASUREMENTS 1000000 #define DEFAULT_PUB_CORE 2 #define DEFAULT_SUB_CORE 2 #define DEFAULT_USER_APP_CORE 3 #define SECONDS_INCREMENT 1 #ifndef CLOCK_TAI #define CLOCK_TAI 11 #endif #define CLOCKID CLOCK_TAI #define ETH_TRANSPORT_PROFILE "http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp" #ifdef UA_ENABLE_PUBSUB_ENCRYPTION #define UA_AES128CTR_SIGNING_KEY_LENGTH 32 #define UA_AES128CTR_KEY_LENGTH 16 #define UA_AES128CTR_KEYNONCE_LENGTH 4 #if defined(PUBLISHER) UA_Byte signingKeyPub[UA_AES128CTR_SIGNING_KEY_LENGTH] = {0}; UA_Byte encryptingKeyPub[UA_AES128CTR_KEY_LENGTH] = {0}; UA_Byte keyNoncePub[UA_AES128CTR_KEYNONCE_LENGTH] = {0}; #endif #if defined(SUBSCRIBER) UA_Byte signingKeySub[UA_AES128CTR_SIGNING_KEY_LENGTH] = {0}; UA_Byte encryptingKeySub[UA_AES128CTR_KEY_LENGTH] = {0}; UA_Byte keyNonceSub[UA_AES128CTR_KEYNONCE_LENGTH] = {0}; #endif #endif /* If the Hardcoded publisher/subscriber MAC addresses need to be changed, * change PUBLISHING_MAC_ADDRESS and SUBSCRIBING_MAC_ADDRESS */ /* Set server running as true */ UA_Boolean runningServer = true; char* pubMacAddress = DEFAULT_PUBLISHING_MAC_ADDRESS; char* subMacAddress = DEFAULT_SUBSCRIBING_MAC_ADDRESS; static UA_Double cycleTimeInMsec = DEFAULT_CYCLE_TIME; static UA_Int32 socketPriority = DEFAULT_SOCKET_PRIORITY; static UA_Int32 pubPriority = DEFAULT_PUB_SCHED_PRIORITY; static UA_Int32 subPriority = DEFAULT_SUB_SCHED_PRIORITY; static UA_Int32 userAppPriority = DEFAULT_USERAPPLICATION_SCHED_PRIORITY; static UA_Int32 pubCore = DEFAULT_PUB_CORE; static UA_Int32 subCore = DEFAULT_SUB_CORE; static UA_Int32 userAppCore = DEFAULT_USER_APP_CORE; static UA_Int32 qbvOffset = DEFAULT_QBV_OFFSET; static UA_UInt32 xdpQueue = DEFAULT_XDP_QUEUE; static UA_UInt32 xdpFlag = XDP_FLAGS_SKB_MODE; static UA_UInt32 xdpBindFlag = XDP_COPY; static UA_Boolean disableSoTxtime = true; static UA_Boolean enableCsvLog = false; static UA_Boolean consolePrint = false; static UA_Boolean enableBlockingSocket = false; static UA_Boolean signalTerm = false; static UA_Boolean enableXdpSubscribe = false; /* Variables corresponding to PubSub connection creation, * published data set and writer group */ UA_NodeId connectionIdent; UA_NodeId publishedDataSetIdent; UA_NodeId writerGroupIdent; UA_NodeId pubNodeID; UA_NodeId subNodeID; UA_NodeId pubRepeatedCountNodeID; UA_NodeId subRepeatedCountNodeID; UA_NodeId runningPubStatusNodeID; UA_NodeId runningSubStatusNodeID; /* Variables for counter data handling in address space */ UA_UInt64 *pubCounterData = NULL; UA_DataValue *pubDataValueRT = NULL; UA_Boolean *runningPub = NULL; UA_DataValue *runningPubDataValueRT = NULL; UA_UInt64 *repeatedCounterData[REPEATED_NODECOUNTS] = {NULL}; UA_DataValue *repeatedDataValueRT[REPEATED_NODECOUNTS] = {NULL}; UA_UInt64 *subCounterData = NULL; UA_DataValue *subDataValueRT = NULL; UA_Boolean *runningSub = NULL; UA_DataValue *runningSubDataValueRT = NULL; UA_UInt64 *subRepeatedCounterData[REPEATED_NODECOUNTS] = {NULL}; UA_DataValue *subRepeatedDataValueRT[REPEATED_NODECOUNTS] = {NULL}; CSV file handling ~~~~~~~~~~~~~~~~~ CSV files are written for Publisher and Subscriber thread. csv files include the counterdata that is being either Published or Subscribed along with the timestamp. These csv files can be used to compute latency for following combinations of Tracepoints, T1-T4 and T1-T8. T1-T8 - Gives the Round-trip time of a counterdata, as the value published by the Publisher thread in pubsub_TSN_publisher.c example is subscribed by the Subscriber thread in pubsub_TSN_loopback.c example and is published back to the pubsub_TSN_publisher.c example .. code-block:: c #if defined(PUBLISHER) /* File to store the data and timestamps for different traffic */ FILE *fpPublisher; char *filePublishedData = "publisher_T5.csv"; /* Array to store published counter data */ UA_UInt64 publishCounterValue[MAX_MEASUREMENTS]; size_t measurementsPublisher = 0; /* Array to store timestamp */ struct timespec publishTimestamp[MAX_MEASUREMENTS]; /* Thread for publisher */ pthread_t pubthreadID; struct timespec dataModificationTime; #endif #if defined(SUBSCRIBER) /* File to store the data and timestamps for different traffic */ FILE *fpSubscriber; char *fileSubscribedData = "subscriber_T4.csv"; /* Array to store subscribed counter data */ UA_UInt64 subscribeCounterValue[MAX_MEASUREMENTS]; size_t measurementsSubscriber = 0; /* Array to store timestamp */ struct timespec subscribeTimestamp[MAX_MEASUREMENTS]; /* Thread for subscriber */ pthread_t subthreadID; /* Variable for PubSub connection creation */ UA_NodeId connectionIdentSubscriber; struct timespec dataReceiveTime; #endif /* Thread for user application*/ pthread_t userApplicationThreadID; /* Base time handling for the threads */ struct timespec threadBaseTime; UA_Boolean baseTimeCalculated = false; typedef struct { UA_Server *ServerRun; } serverConfigStruct; /* Structure to define thread parameters */ typedef struct { UA_Server *server; void *data; UA_ServerCallback callback; UA_Duration interval_ms; UA_UInt64 *callbackId; } threadArg; Function calls for different threads .. code-block:: c /* Publisher thread routine for ETF */ void *publisherETF(void *arg); /* Subscriber thread routine */ void *subscriber(void *arg); /* User application thread routine */ void *userApplicationPubSub(void *arg); /* For adding nodes in the server information model */ static void addServerNodes(UA_Server *server); /* For deleting the nodes created */ static void removeServerNodes(UA_Server *server); /* To create multi-threads */ static pthread_t threadCreation(UA_Int16 threadPriority, size_t coreAffinity, void *(*thread)(void *), char *applicationName, void *serverConfig); /* Stop signal */ static void stopHandler(int sign) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c"); signalTerm = true; } Nanosecond field handling ~~~~~~~~~~~~~~~~~~~~~~~~~~ Nanosecond field in timespec is checked for overflowing and one second is added to seconds field and nanosecond field is set to zero. .. code-block:: c static void nanoSecondFieldConversion(struct timespec *timeSpecValue) { /* Check if ns field is greater than '1 ns less than 1sec' */ while(timeSpecValue->tv_nsec > (SECONDS -1)) { /* Move to next second and remove it from ns field */ timeSpecValue->tv_sec += SECONDS_INCREMENT; timeSpecValue->tv_nsec -= SECONDS; } } Custom callback handling ~~~~~~~~~~~~~~~~~~~~~~~~~ Custom callback thread handling overwrites the default timer based callback function with the custom (user-specified) callback interval. .. code-block:: c /* Add a callback for cyclic repetition */ static UA_StatusCode addPubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_ServerCallback callback, void *data, UA_Double interval_ms, UA_DateTime *baseTime, UA_TimerPolicy timerPolicy, UA_UInt64 *callbackId) { /* Initialize arguments required for the thread to run */ threadArg *threadArguments = (threadArg *) UA_malloc(sizeof(threadArg)); /* Pass the value required for the threads */ threadArguments->server = server; threadArguments->data = data; threadArguments->callback = callback; threadArguments->interval_ms = interval_ms; threadArguments->callbackId = callbackId; /* Check the writer group identifier and create the thread accordingly */ if(UA_NodeId_equal(&identifier, &writerGroupIdent)) { #if defined(PUBLISHER) /* Create the publisher thread with the required priority and core affinity */ char threadNamePub[10] = "Publisher"; *callbackId = threadCreation((UA_Int16)pubPriority, (size_t)pubCore, publisherETF, threadNamePub, threadArguments); UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Publisher thread callback Id: %lu\n", (long unsigned)*callbackId); #endif } else { #if defined(SUBSCRIBER) /* Create the subscriber thread with the required priority and core affinity */ char threadNameSub[11] = "Subscriber"; *callbackId = threadCreation((UA_Int16)subPriority, (size_t)subCore, subscriber, threadNameSub, threadArguments); UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Subscriber thread callback Id: %lu\n", (long unsigned)*callbackId); #endif } return UA_STATUSCODE_GOOD; } static UA_StatusCode changePubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_UInt64 callbackId, UA_Double interval_ms, UA_DateTime *baseTime, UA_TimerPolicy timerPolicy) { /* Callback interval need not be modified as it is thread based implementation. * The thread uses nanosleep for calculating cycle time and modification in * nanosleep value changes cycle time */ return UA_STATUSCODE_GOOD; } /* Remove the callback added for cyclic repetition */ static void removePubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_UInt64 callbackId) { if(callbackId && (pthread_join((pthread_t)callbackId, NULL) != 0)) UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Pthread Join Failed thread: %lu\n", (long unsigned)callbackId); } External data source handling ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If the external data source is written over the information model, the externalDataWriteCallback will be triggered. The user has to take care and assure that the write leads not to synchronization issues and race conditions. .. code-block:: c static UA_StatusCode externalDataWriteCallback(UA_Server *server, const UA_NodeId *sessionId, void *sessionContext, const UA_NodeId *nodeId, void *nodeContext, const UA_NumericRange *range, const UA_DataValue *data){ //node values are updated by using variables in the memory //UA_Server_write is not used for updating node values. return UA_STATUSCODE_GOOD; } static UA_StatusCode externalDataReadNotificationCallback(UA_Server *server, const UA_NodeId *sessionId, void *sessionContext, const UA_NodeId *nodeid, void *nodeContext, const UA_NumericRange *range){ //allow read without any preparation return UA_STATUSCODE_GOOD; } Subscriber ~~~~~~~~~~ Create connection, readergroup, datasetreader, subscribedvariables for the Subscriber thread. .. code-block:: c #if defined(SUBSCRIBER) static void addPubSubConnectionSubscriber(UA_Server *server, UA_NetworkAddressUrlDataType *networkAddressUrlSubscriber){ UA_StatusCode retval = UA_STATUSCODE_GOOD; /* Details about the connection configuration and handling are located * in the pubsub connection tutorial */ UA_PubSubConnectionConfig connectionConfig; memset(&connectionConfig, 0, sizeof(connectionConfig)); connectionConfig.name = UA_STRING("Subscriber Connection"); connectionConfig.enabled = true; UA_KeyValuePair connectionOptions[4]; connectionOptions[0].key = UA_QUALIFIEDNAME(0, "enableXdpSocket"); UA_Boolean enableXdp = enableXdpSubscribe; UA_Variant_setScalar(&connectionOptions[0].value, &enableXdp, &UA_TYPES[UA_TYPES_BOOLEAN]); connectionOptions[1].key = UA_QUALIFIEDNAME(0, "xdpflag"); UA_UInt32 flags = xdpFlag; UA_Variant_setScalar(&connectionOptions[1].value, &flags, &UA_TYPES[UA_TYPES_UINT32]); connectionOptions[2].key = UA_QUALIFIEDNAME(0, "hwreceivequeue"); UA_UInt32 rxqueue = xdpQueue; UA_Variant_setScalar(&connectionOptions[2].value, &rxqueue, &UA_TYPES[UA_TYPES_UINT32]); connectionOptions[3].key = UA_QUALIFIEDNAME(0, "xdpbindflag"); UA_UInt32 bindflags = xdpBindFlag; UA_Variant_setScalar(&connectionOptions[3].value, &bindflags, &UA_TYPES[UA_TYPES_UINT16]); connectionConfig.connectionProperties.map = connectionOptions; connectionConfig.connectionProperties.mapSize = 4; UA_NetworkAddressUrlDataType networkAddressUrlsubscribe = *networkAddressUrlSubscriber; connectionConfig.transportProfileUri = UA_STRING(ETH_TRANSPORT_PROFILE); UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrlsubscribe, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]); connectionConfig.publisherIdType = UA_PUBLISHERIDTYPE_UINT32; connectionConfig.publisherId.uint32 = UA_UInt32_random(); retval |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdentSubscriber); if(retval == UA_STATUSCODE_GOOD) UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "The PubSub Connection was created successfully!"); } /* Add ReaderGroup to the created connection */ static void addReaderGroup(UA_Server *server) { if(server == NULL) return; UA_ReaderGroupConfig readerGroupConfig; memset(&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig)); readerGroupConfig.name = UA_STRING("ReaderGroup"); readerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE; readerGroupConfig.subscribingInterval = cycleTimeInMsec; /* Timeout is modified when blocking socket is enabled, and the default * timeout is used when blocking socket is disabled */ if(enableBlockingSocket == false) { /* As we run in 250us cycle time, modify default timeout (1ms) to 50us */ readerGroupConfig.timeout = 50; } else { readerGroupConfig.enableBlockingSocket = true; readerGroupConfig.timeout = 0; /* Blocking socket */ } #ifdef UA_ENABLE_PUBSUB_ENCRYPTION /* Encryption settings */ UA_ServerConfig *config = UA_Server_getConfig(server); readerGroupConfig.securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT; readerGroupConfig.securityPolicy = &config->pubSubConfig.securityPolicies[0]; #endif readerGroupConfig.pubsubManagerCallback.addCustomCallback = addPubSubApplicationCallback; readerGroupConfig.pubsubManagerCallback.changeCustomCallback = changePubSubApplicationCallback; readerGroupConfig.pubsubManagerCallback.removeCustomCallback = removePubSubApplicationCallback; UA_Server_addReaderGroup(server, connectionIdentSubscriber, &readerGroupConfig, &readerGroupIdentifier); #ifdef UA_ENABLE_PUBSUB_ENCRYPTION /* Add the encryption key informaton */ UA_ByteString sk = {UA_AES128CTR_SIGNING_KEY_LENGTH, signingKeySub}; UA_ByteString ek = {UA_AES128CTR_KEY_LENGTH, encryptingKeySub}; UA_ByteString kn = {UA_AES128CTR_KEYNONCE_LENGTH, keyNonceSub}; // TODO security token not necessary for readergroup (extracted from security-header) UA_Server_setReaderGroupEncryptionKeys(server, readerGroupIdentifier, 1, sk, ek, kn); #endif } /* Set SubscribedDataSet type to TargetVariables data type * Add subscribedvariables to the DataSetReader */ static void addSubscribedVariables(UA_Server *server) { UA_Int32 iterator = 0; UA_Int32 iteratorRepeatedCount = 0; if(server == NULL) return; UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable*) UA_calloc((REPEATED_NODECOUNTS + 2), sizeof(UA_FieldTargetVariable)); if(!targetVars) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "FieldTargetVariable - Bad out of memory"); return; } runningSub = UA_Boolean_new(); if(!runningSub) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningsub - Bad out of memory"); return; } *runningSub = true; runningSubDataValueRT = UA_DataValue_new(); if(!runningSubDataValueRT) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningsubDatavalue - Bad out of memory"); return; } UA_Variant_setScalar(&runningSubDataValueRT->value, runningSub, &UA_TYPES[UA_TYPES_BOOLEAN]); runningSubDataValueRT->hasValue = true; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend runningSubvalueBackend; runningSubvalueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; runningSubvalueBackend.backend.external.value = &runningSubDataValueRT; runningSubvalueBackend.backend.external.callback.userWrite = externalDataWriteCallback; runningSubvalueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)30000), runningSubvalueBackend); UA_FieldTargetDataType_init(&targetVars[iterator].targetVariable); targetVars[iterator].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; targetVars[iterator].targetVariable.targetNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)30000); iterator++; /* For creating Targetvariable */ for(iterator = 1, iteratorRepeatedCount = 0; iterator <= REPEATED_NODECOUNTS; iterator++, iteratorRepeatedCount++) { subRepeatedCounterData[iteratorRepeatedCount] = UA_UInt64_new(); if(!subRepeatedCounterData[iteratorRepeatedCount]) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeRepeatedCounterData - Bad out of memory"); return; } *subRepeatedCounterData[iteratorRepeatedCount] = 0; subRepeatedDataValueRT[iteratorRepeatedCount] = UA_DataValue_new(); if(!subRepeatedDataValueRT[iteratorRepeatedCount]) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeRepeatedCounterDataValue - Bad out of memory"); return; } UA_Variant_setScalar(&subRepeatedDataValueRT[iteratorRepeatedCount]->value, subRepeatedCounterData[iteratorRepeatedCount], &UA_TYPES[UA_TYPES_UINT64]); subRepeatedDataValueRT[iteratorRepeatedCount]->hasValue = true; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend valueBackend; valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; valueBackend.backend.external.value = &subRepeatedDataValueRT[iteratorRepeatedCount]; valueBackend.backend.external.callback.userWrite = externalDataWriteCallback; valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)iteratorRepeatedCount+50000), valueBackend); UA_FieldTargetDataType_init(&targetVars[iterator].targetVariable); targetVars[iterator].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; targetVars[iterator].targetVariable.targetNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)iteratorRepeatedCount + 50000); } subCounterData = UA_UInt64_new(); if(!subCounterData) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeCounterData - Bad out of memory"); return; } *subCounterData = 0; subDataValueRT = UA_DataValue_new(); if(!subDataValueRT) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeDataValue - Bad out of memory"); return; } UA_Variant_setScalar(&subDataValueRT->value, subCounterData, &UA_TYPES[UA_TYPES_UINT64]); subDataValueRT->hasValue = true; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend valueBackend; valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; valueBackend.backend.external.value = &subDataValueRT; valueBackend.backend.external.callback.userWrite = externalDataWriteCallback; valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; UA_Server_setVariableNode_valueBackend(server, subNodeID, valueBackend); UA_FieldTargetDataType_init(&targetVars[iterator].targetVariable); targetVars[iterator].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; targetVars[iterator].targetVariable.targetNodeId = subNodeID; /* Set the subscribed data to TargetVariable type */ readerConfig.subscribedDataSetType = UA_PUBSUB_SDS_TARGET; readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariables = targetVars; readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariablesSize = REPEATED_NODECOUNTS + 2; } /* Add DataSetReader to the ReaderGroup */ static void addDataSetReader(UA_Server *server) { UA_Int32 iterator = 0; if(server == NULL) { return; } memset(&readerConfig, 0, sizeof(UA_DataSetReaderConfig)); readerConfig.name = UA_STRING("DataSet Reader"); UA_UInt16 publisherIdentifier = PUBLISHER_ID_SUB; readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_UINT16]; readerConfig.publisherId.data = &publisherIdentifier; readerConfig.writerGroupId = WRITER_GROUP_ID_SUB; readerConfig.dataSetWriterId = DATA_SET_WRITER_ID_SUB; readerConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED; readerConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPDATASETREADERMESSAGEDATATYPE]; UA_UadpDataSetReaderMessageDataType *dataSetReaderMessage = UA_UadpDataSetReaderMessageDataType_new(); dataSetReaderMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER); readerConfig.messageSettings.content.decoded.data = dataSetReaderMessage; /* Setting up Meta data configuration in DataSetReader */ UA_DataSetMetaDataType *pMetaData = &readerConfig.dataSetMetaData; UA_DataSetMetaDataType_init(pMetaData); /* Static definition of number of fields size to 1 to create one * targetVariable */ pMetaData->fieldsSize = REPEATED_NODECOUNTS + 2; pMetaData->fields = (UA_FieldMetaData*) UA_Array_new(pMetaData->fieldsSize, &UA_TYPES[UA_TYPES_FIELDMETADATA]); /* Boolean DataType */ UA_FieldMetaData_init(&pMetaData->fields[iterator]); UA_NodeId_copy(&UA_TYPES[UA_TYPES_BOOLEAN].typeId, &pMetaData->fields[iterator].dataType); pMetaData->fields[iterator].builtInType = UA_NS0ID_BOOLEAN; pMetaData->fields[iterator].valueRank = -1; /* scalar */ iterator++; for(iterator = 1; iterator <= REPEATED_NODECOUNTS; iterator++) { UA_FieldMetaData_init(&pMetaData->fields[iterator]); UA_NodeId_copy(&UA_TYPES[UA_TYPES_UINT64].typeId, &pMetaData->fields[iterator].dataType); pMetaData->fields[iterator].builtInType = UA_NS0ID_UINT64; pMetaData->fields[iterator].valueRank = -1; /* scalar */ } /* Unsigned Integer DataType */ UA_FieldMetaData_init(&pMetaData->fields[iterator]); UA_NodeId_copy(&UA_TYPES[UA_TYPES_UINT64].typeId, &pMetaData->fields[iterator].dataType); pMetaData->fields[iterator].builtInType = UA_NS0ID_UINT64; pMetaData->fields[iterator].valueRank = -1; /* scalar */ /* Setup Target Variables in DSR config */ addSubscribedVariables(server); /* Setting up Meta data configuration in DataSetReader */ UA_Server_addDataSetReader(server, readerGroupIdentifier, &readerConfig, &readerIdentifier); UA_free(readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariables); UA_free(readerConfig.dataSetMetaData.fields); UA_UadpDataSetReaderMessageDataType_delete(dataSetReaderMessage); } #endif #if defined(PUBLISHER) Publisher ~~~~~~~~~ Create connection, writergroup, datasetwriter and publisheddataset for Publisher thread. .. code-block:: c static void addPubSubConnection(UA_Server *server, UA_NetworkAddressUrlDataType *networkAddressUrlPub){ /* Details about the connection configuration and handling are located * in the pubsub connection tutorial */ UA_PubSubConnectionConfig connectionConfig; memset(&connectionConfig, 0, sizeof(connectionConfig)); connectionConfig.name = UA_STRING("Publisher Connection"); connectionConfig.enabled = true; UA_NetworkAddressUrlDataType networkAddressUrl = *networkAddressUrlPub; connectionConfig.transportProfileUri = UA_STRING(ETH_TRANSPORT_PROFILE); UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]); connectionConfig.publisherIdType = UA_PUBLISHERIDTYPE_UINT16; connectionConfig.publisherId.uint16 = PUBLISHER_ID; /* Connection options are given as Key/Value Pairs - Sockprio and Txtime */ UA_KeyValuePair connectionOptions[2]; connectionOptions[0].key = UA_QUALIFIEDNAME(0, "sockpriority"); UA_Variant_setScalar(&connectionOptions[0].value, &socketPriority, &UA_TYPES[UA_TYPES_UINT32]); connectionOptions[1].key = UA_QUALIFIEDNAME(0, "enablesotxtime"); UA_Variant_setScalar(&connectionOptions[1].value, &disableSoTxtime, &UA_TYPES[UA_TYPES_BOOLEAN]); connectionConfig.connectionProperties.map = connectionOptions; connectionConfig.connectionProperties.mapSize = 2; UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent); } /* PublishedDataset handling */ static void addPublishedDataSet(UA_Server *server) { UA_PublishedDataSetConfig publishedDataSetConfig; memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig)); publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS; publishedDataSetConfig.name = UA_STRING("Demo PDS"); UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent); } /* DataSetField handling */ static void _addDataSetField(UA_Server *server) { /* Add a field to the previous created PublishedDataSet */ UA_NodeId dataSetFieldIdent1; UA_DataSetFieldConfig dataSetFieldConfig; #if defined PUBSUB_CONFIG_FASTPATH_FIXED_OFFSETS staticValueSource = UA_DataValue_new(); #endif UA_NodeId dataSetFieldIdentRunning; UA_DataSetFieldConfig dsfConfigPubStatus; memset(&dsfConfigPubStatus, 0, sizeof(UA_DataSetFieldConfig)); runningPub = UA_Boolean_new(); if(!runningPub) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningPub - Bad out of memory"); return; } *runningPub = true; runningPubDataValueRT = UA_DataValue_new(); if(!runningPubDataValueRT) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningPubDataValue - Bad out of memory"); return; } UA_Variant_setScalar(&runningPubDataValueRT->value, runningPub, &UA_TYPES[UA_TYPES_BOOLEAN]); runningPubDataValueRT->hasValue = true; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend runningPubvalueBackend; runningPubvalueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; runningPubvalueBackend.backend.external.value = &runningPubDataValueRT; runningPubvalueBackend.backend.external.callback.userWrite = externalDataWriteCallback; runningPubvalueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)20000), runningPubvalueBackend); /* setup RT DataSetField config */ dsfConfigPubStatus.field.variable.rtValueSource.rtInformationModelNode = true; dsfConfigPubStatus.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(1, (UA_UInt32)20000); UA_Server_addDataSetField(server, publishedDataSetIdent, &dsfConfigPubStatus, &dataSetFieldIdentRunning); for(UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++) { memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig)); repeatedCounterData[iterator] = UA_UInt64_new(); if(!repeatedCounterData[iterator]) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishRepeatedCounter - Bad out of memory"); return; } *repeatedCounterData[iterator] = 0; repeatedDataValueRT[iterator] = UA_DataValue_new(); if(!repeatedDataValueRT[iterator]) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishRepeatedCounterDataValue - Bad out of memory"); return; } UA_Variant_setScalar(&repeatedDataValueRT[iterator]->value, repeatedCounterData[iterator], &UA_TYPES[UA_TYPES_UINT64]); repeatedDataValueRT[iterator]->hasValue = true; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend valueBackend; valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; valueBackend.backend.external.value = &repeatedDataValueRT[iterator]; valueBackend.backend.external.callback.userWrite = externalDataWriteCallback; valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)iterator+10000), valueBackend); /* setup RT DataSetField config */ dataSetFieldConfig.field.variable.rtValueSource.rtInformationModelNode = true; dataSetFieldConfig.field.variable.publishParameters. publishedVariable = UA_NODEID_NUMERIC(1, (UA_UInt32)iterator+10000); UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, &dataSetFieldIdent1); } UA_NodeId dataSetFieldIdent; UA_DataSetFieldConfig dsfConfig; memset(&dsfConfig, 0, sizeof(UA_DataSetFieldConfig)); pubCounterData = UA_UInt64_new(); if(!pubCounterData) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishCounter - Bad out of memory"); return; } *pubCounterData = 0; pubDataValueRT = UA_DataValue_new(); if(!pubDataValueRT) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishDataValue - Bad out of memory"); return; } UA_Variant_setScalar(&pubDataValueRT->value, pubCounterData, &UA_TYPES[UA_TYPES_UINT64]); pubDataValueRT->hasValue = true; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend valueBackend; valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; valueBackend.backend.external.value = &pubDataValueRT; valueBackend.backend.external.callback.userWrite = externalDataWriteCallback; valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; UA_Server_setVariableNode_valueBackend(server, pubNodeID, valueBackend); /* setup RT DataSetField config */ dsfConfig.field.variable.rtValueSource.rtInformationModelNode = true; dsfConfig.field.variable.publishParameters.publishedVariable = pubNodeID; UA_Server_addDataSetField(server, publishedDataSetIdent, &dsfConfig, &dataSetFieldIdent); } /* WriterGroup handling */ static void addWriterGroup(UA_Server *server) { UA_WriterGroupConfig writerGroupConfig; memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig)); writerGroupConfig.name = UA_STRING("Demo WriterGroup"); writerGroupConfig.publishingInterval = cycleTimeInMsec; writerGroupConfig.enabled = false; writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP; writerGroupConfig.writerGroupId = WRITER_GROUP_ID; writerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE; writerGroupConfig.pubsubManagerCallback.addCustomCallback = addPubSubApplicationCallback; writerGroupConfig.pubsubManagerCallback.changeCustomCallback = changePubSubApplicationCallback; writerGroupConfig.pubsubManagerCallback.removeCustomCallback = removePubSubApplicationCallback; writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED; writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE]; #ifdef UA_ENABLE_PUBSUB_ENCRYPTION UA_ServerConfig *config = UA_Server_getConfig(server); writerGroupConfig.securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT; writerGroupConfig.securityPolicy = &config->pubSubConfig.securityPolicies[1]; #endif /* The configuration flags for the messages are encapsulated inside the * message- and transport settings extension objects. These extension * objects are defined by the standard. e.g. * UadpWriterGroupMessageDataType */ UA_UadpWriterGroupMessageDataType *writerGroupMessage = UA_UadpWriterGroupMessageDataType_new(); /* Change message settings of writerGroup to send PublisherId, * WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader * of NetworkMessage */ writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER); writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage; UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent); UA_Server_setWriterGroupOperational(server, writerGroupIdent); UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage); #ifdef UA_ENABLE_PUBSUB_ENCRYPTION /* Add the encryption key informaton */ UA_ByteString sk = {UA_AES128CTR_SIGNING_KEY_LENGTH, signingKeyPub}; UA_ByteString ek = {UA_AES128CTR_KEY_LENGTH, encryptingKeyPub}; UA_ByteString kn = {UA_AES128CTR_KEYNONCE_LENGTH, keyNoncePub}; UA_Server_setWriterGroupEncryptionKeys(server, writerGroupIdent, 1, sk, ek, kn); #endif } /* DataSetWriter handling */ static void addDataSetWriter(UA_Server *server) { UA_NodeId dataSetWriterIdent; UA_DataSetWriterConfig dataSetWriterConfig; memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig)); dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter"); dataSetWriterConfig.dataSetWriterId = DATA_SET_WRITER_ID; dataSetWriterConfig.keyFrameCount = 10; UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent, &dataSetWriterConfig, &dataSetWriterIdent); } #endif Published data handling ~~~~~~~~~~~~~~~~~~~~~~~ The published data is updated in the array using this function. .. code-block:: c #if defined(PUBLISHER) static void updateMeasurementsPublisher(struct timespec start_time, UA_UInt64 counterValue) { if(measurementsPublisher >= MAX_MEASUREMENTS) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Publisher: Maximum log measurements reached - Closing the application"); signalTerm = true; return; } if(consolePrint) UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Pub:%lu,%ld.%09ld\n", (long unsigned)counterValue, start_time.tv_sec, start_time.tv_nsec); if(signalTerm != true){ publishTimestamp[measurementsPublisher] = start_time; publishCounterValue[measurementsPublisher] = counterValue; measurementsPublisher++; } } #endif #if defined(SUBSCRIBER) Subscribed data handling ~~~~~~~~~~~~~~~~~~~~~~~~ The subscribed data is updated in the array using this function Subscribed data handling. .. code-block:: c static void updateMeasurementsSubscriber(struct timespec receive_time, UA_UInt64 counterValue) { if(measurementsSubscriber >= MAX_MEASUREMENTS) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Subscriber: Maximum log measurements reached - Closing the application"); signalTerm = true; return; } if(consolePrint) UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Sub:%lu,%ld.%09ld\n", (long unsigned)counterValue, receive_time.tv_sec, receive_time.tv_nsec); if(signalTerm != true){ subscribeTimestamp[measurementsSubscriber] = receive_time; subscribeCounterValue[measurementsSubscriber] = counterValue; measurementsSubscriber++; } } #endif #if defined(PUBLISHER) Publisher thread routine ~~~~~~~~~~~~~~~~~~~~~~~~~ This is the Publisher thread that sleeps for 60% of the cycletime (250us) and prepares the tranmission packet within 40% of cycletime. The priority of this thread is lower than the priority of the Subscriber thread, so the subscriber thread executes first during every cycle. The data published by this thread in one cycle is subscribed by the subscriber thread of pubsub_TSN_loopback in the next cycle (two cycle timing model). The publisherETF function is the routine used by the publisher thread. .. code-block:: c void * publisherETF(void *arg) { struct timespec nextnanosleeptime; UA_ServerCallback pubCallback; UA_Server* server; UA_WriterGroup* currentWriterGroup; // TODO: Remove WriterGroup Usage UA_UInt64 interval_ns; UA_UInt64 transmission_time; /* Initialise value for nextnanosleeptime timespec */ nextnanosleeptime.tv_nsec = 0; threadArg *threadArgumentsPublisher = (threadArg *)arg; server = threadArgumentsPublisher->server; pubCallback = threadArgumentsPublisher->callback; currentWriterGroup = (UA_WriterGroup *)threadArgumentsPublisher->data; interval_ns = (UA_UInt64)(threadArgumentsPublisher->interval_ms * MILLI_SECONDS); /* Verify whether baseTime has already been calculated */ if(!baseTimeCalculated) { /* Get current time and compute the next nanosleeptime */ clock_gettime(CLOCKID, &threadBaseTime); /* Variable to nano Sleep until SECONDS_SLEEP second boundary */ threadBaseTime.tv_sec += SECONDS_SLEEP; threadBaseTime.tv_nsec = 0; baseTimeCalculated = true; } nextnanosleeptime.tv_sec = threadBaseTime.tv_sec; /* Modify the nanosecond field to wake up at the pubWakeUp percentage */ nextnanosleeptime.tv_nsec = threadBaseTime.tv_nsec + (__syscall_slong_t)(cycleTimeInMsec * MILLI_SECONDS * pubWakeupPercentage); nanoSecondFieldConversion(&nextnanosleeptime); /* Define Ethernet ETF transport settings */ UA_EthernetWriterGroupTransportDataType ethernettransportSettings; memset(ðernettransportSettings, 0, sizeof(UA_EthernetWriterGroupTransportDataType)); ethernettransportSettings.transmission_time = 0; /* Encapsulate ETF config in transportSettings */ UA_ExtensionObject transportSettings; memset(&transportSettings, 0, sizeof(UA_ExtensionObject)); /* TODO: transportSettings encoding and type to be defined */ transportSettings.content.decoded.data = ðernettransportSettings; currentWriterGroup->config.transportSettings = transportSettings; UA_UInt64 roundOffCycleTime = (UA_UInt64) ((cycleTimeInMsec * MILLI_SECONDS) - (cycleTimeInMsec * MILLI_SECONDS * pubWakeupPercentage)); while(*runningPub) { /* The Publisher threads wakes up at the configured publisher wake up * percentage (60%) of each cycle */ clock_nanosleep(CLOCKID, TIMER_ABSTIME, &nextnanosleeptime, NULL); /* Whenever Ctrl + C pressed, publish running boolean as false to stop * the subscriber before terminating the application */ if(signalTerm == true) *runningPub = false; /* Calculation of transmission time using the configured qbv offset by * the user - Will be handled by publishingOffset in the future */ transmission_time = ((UA_UInt64)nextnanosleeptime.tv_sec * SECONDS + (UA_UInt64)nextnanosleeptime.tv_nsec) + roundOffCycleTime + (UA_UInt64)(qbvOffset * 1000); ethernettransportSettings.transmission_time = transmission_time; /* Publish the data using the pubcallback - UA_WriterGroup_publishCallback(). * Start publishing when pubCounterData is greater than 1. */ if(*pubCounterData > 0) pubCallback(server, currentWriterGroup); /* Calculation of the next wake up time by adding the interval with the * previous wake up time */ nextnanosleeptime.tv_nsec += (__syscall_slong_t)interval_ns; nanoSecondFieldConversion(&nextnanosleeptime); } UA_free(threadArgumentsPublisher); sleep(1); runningServer = false; return NULL; } #endif #if defined(SUBSCRIBER) Subscriber thread routine ~~~~~~~~~~~~~~~~~~~~~~~~~ This Subscriber thread will wakeup during the start of cycle at 250us interval and check if the packets are received. Subscriber thread has the highest priority. This Subscriber thread subscribes to the data published by the Publisher thread of pubsub_TSN_loopback in the previous cycle. The subscriber function is the routine used by the subscriber thread. .. code-block:: c void *subscriber(void *arg) { UA_Server* server; void* currentReaderGroup; UA_ServerCallback subCallback; struct timespec nextnanosleeptimeSub; UA_UInt64 subInterval_ns; threadArg *threadArgumentsSubscriber = (threadArg *)arg; server = threadArgumentsSubscriber->server; subCallback = threadArgumentsSubscriber->callback; currentReaderGroup = threadArgumentsSubscriber->data; subInterval_ns = (UA_UInt64)(threadArgumentsSubscriber->interval_ms * MILLI_SECONDS); /* Verify whether baseTime has already been calculated */ if(!baseTimeCalculated) { /* Get current time and compute the next nanosleeptime */ clock_gettime(CLOCKID, &threadBaseTime); /* Variable to nano Sleep until SECONDS_SLEEP second boundary */ threadBaseTime.tv_sec += SECONDS_SLEEP; threadBaseTime.tv_nsec = 0; baseTimeCalculated = true; } nextnanosleeptimeSub.tv_sec = threadBaseTime.tv_sec; /* Modify the nanosecond field to wake up at the subWakeUp percentage */ nextnanosleeptimeSub.tv_nsec = threadBaseTime.tv_nsec + (__syscall_slong_t)(cycleTimeInMsec * MILLI_SECONDS * subWakeupPercentage); nanoSecondFieldConversion(&nextnanosleeptimeSub); while(*runningSub) { /* The Subscriber threads wakes up at the configured subscriber wake up * percentage (0%) of each cycle */ clock_nanosleep(CLOCKID, TIMER_ABSTIME, &nextnanosleeptimeSub, NULL); /* Receive and process the incoming data */ subCallback(server, currentReaderGroup); /* Calculation of the next wake up time by adding the interval with the * previous wake up time */ nextnanosleeptimeSub.tv_nsec += (__syscall_slong_t)subInterval_ns; nanoSecondFieldConversion(&nextnanosleeptimeSub); /* Whenever Ctrl + C pressed, modify the runningSub boolean to false to * end this while loop */ if(signalTerm == true) *runningSub = false; } /* While ctrl+c is provided in publisher side then loopback application * need to be closed by after sending *running=0 for subscriber T8 */ if(*runningSub == false) signalTerm = true; #if defined(SUBSCRIBER) && !defined(PUBLISHER) runningServer = UA_FALSE; #endif UA_free(threadArgumentsSubscriber); return NULL; } #endif #if defined(PUBLISHER) || defined(SUBSCRIBER) UserApplication thread routine ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The userapplication thread will wakeup at 30% of cycle time and handles the userdata(read and write in Information Model). This thread serves the purpose of a Control loop, which is used to increment the counterdata to be published by the Publisher thread and read the data from Information Model for the Subscriber thread and writes the updated counterdata in distinct csv files for both threads. .. code-block:: c void *userApplicationPubSub(void *arg) { struct timespec nextnanosleeptimeUserApplication; /* Verify whether baseTime has already been calculated */ if(!baseTimeCalculated) { /* Get current time and compute the next nanosleeptime */ clock_gettime(CLOCKID, &threadBaseTime); /* Variable to nano Sleep until SECONDS_SLEEP second boundary */ threadBaseTime.tv_sec += SECONDS_SLEEP; threadBaseTime.tv_nsec = 0; baseTimeCalculated = true; } nextnanosleeptimeUserApplication.tv_sec = threadBaseTime.tv_sec; /* Modify the nanosecond field to wake up at the userAppWakeUp percentage */ nextnanosleeptimeUserApplication.tv_nsec = threadBaseTime.tv_nsec + (__syscall_slong_t)(cycleTimeInMsec * MILLI_SECONDS * userAppWakeupPercentage); nanoSecondFieldConversion(&nextnanosleeptimeUserApplication); #if defined(PUBLISHER) && defined(SUBSCRIBER) while (*runningSub || *runningPub) { #else while (*runningSub) { #endif /* The User application threads wakes up at the configured userApp wake * up percentage (30%) of each cycle */ clock_nanosleep(CLOCKID, TIMER_ABSTIME, &nextnanosleeptimeUserApplication, NULL); #if defined(SUBSCRIBER) /* Get the time - T4, time where subscribed varibles are read from the * Information model. At this point, the packet will be already * subscribed and written into the Information model. As this * application uses FPM, we do not require explicit call of * UA_Server_read() to read the subscribed value from the Information * model. Hence, we take subscribed T4 time here */ clock_gettime(CLOCKID, &dataReceiveTime); #endif #if defined(PUBLISHER) /* Pass the received subscribed values to publish variables * subCounterData value to pubCounter data repeatedSubCounter data * values to repeatedPubCounter data */ *pubCounterData = *subCounterData; for(UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++) *repeatedCounterData[iterator] = *subRepeatedCounterData[iterator]; /* Get the time - T5, time where the values of the subsribed data were * copied to the publisher counter variables */ clock_gettime(CLOCKID, &dataModificationTime); #endif /* Update the T4, T5 time with the counter data in the user defined * publisher and subscriber arrays */ if(enableCsvLog || consolePrint) { #if defined(SUBSCRIBER) if(*subCounterData > 0) updateMeasurementsSubscriber(dataReceiveTime, *subCounterData); #endif #if defined(PUBLISHER) if(*pubCounterData > 0) updateMeasurementsPublisher(dataModificationTime, *pubCounterData); #endif } /* Calculation of the next wake up time by adding the interval with the * previous wake up time */ nextnanosleeptimeUserApplication.tv_nsec += (__syscall_slong_t)(cycleTimeInMsec * MILLI_SECONDS); nanoSecondFieldConversion(&nextnanosleeptimeUserApplication); } return NULL; } #endif Thread creation ~~~~~~~~~~~~~~~ The threadcreation functionality creates thread with given threadpriority, coreaffinity. The function returns the threadID of the newly created thread. .. code-block:: c static pthread_t threadCreation(UA_Int16 threadPriority, size_t coreAffinity, void *(*thread)(void *), char *applicationName, void *serverConfig) { /* Core affinity set */ cpu_set_t cpuset; pthread_t threadID; struct sched_param schedParam; UA_Int32 returnValue = 0; UA_Int32 errorSetAffinity = 0; /* Return the ID for thread */ threadID = pthread_self(); schedParam.sched_priority = threadPriority; returnValue = pthread_setschedparam(threadID, SCHED_FIFO, &schedParam); if(returnValue != 0) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "pthread_setschedparam: failed\n"); exit(1); } UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\npthread_setschedparam:%s Thread priority is %d \n", applicationName, schedParam.sched_priority); CPU_ZERO(&cpuset); CPU_SET(coreAffinity, &cpuset); errorSetAffinity = pthread_setaffinity_np(threadID, sizeof(cpu_set_t), &cpuset); if(errorSetAffinity) { fprintf(stderr, "pthread_setaffinity_np: %s\n", strerror(errorSetAffinity)); exit(1); } returnValue = pthread_create(&threadID, NULL, thread, serverConfig); if(returnValue != 0) UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, ":%s Cannot create thread\n", applicationName); if(CPU_ISSET(coreAffinity, &cpuset)) UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "%s CPU CORE: %lu\n", applicationName, (long unsigned)coreAffinity); return threadID; } Creation of nodes ~~~~~~~~~~~~~~~~~~ The addServerNodes function is used to create the publisher and subscriber nodes. .. code-block:: c static void addServerNodes(UA_Server *server) { UA_NodeId objectId; UA_NodeId newNodeId; UA_ObjectAttributes object = UA_ObjectAttributes_default; object.displayName = UA_LOCALIZEDTEXT("en-US", "Counter Object"); UA_Server_addObjectNode(server, UA_NODEID_NULL, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), UA_QUALIFIEDNAME(1, "Counter Object"), UA_NODEID_NULL, object, NULL, &objectId); UA_VariableAttributes publisherAttr = UA_VariableAttributes_default; UA_UInt64 publishValue = 0; publisherAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; publisherAttr.dataType = UA_TYPES[UA_TYPES_UINT64].typeId; UA_Variant_setScalar(&publisherAttr.value, &publishValue, &UA_TYPES[UA_TYPES_UINT64]); publisherAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Publisher Counter"); newNodeId = UA_NODEID_STRING(1, "PublisherCounter"); UA_Server_addVariableNode(server, newNodeId, objectId, UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "Publisher Counter"), UA_NODEID_NULL, publisherAttr, NULL, &pubNodeID); UA_VariableAttributes subscriberAttr = UA_VariableAttributes_default; UA_UInt64 subscribeValue = 0; subscriberAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; subscriberAttr.dataType = UA_TYPES[UA_TYPES_UINT64].typeId; UA_Variant_setScalar(&subscriberAttr.value, &subscribeValue, &UA_TYPES[UA_TYPES_UINT64]); subscriberAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Subscriber Counter"); newNodeId = UA_NODEID_STRING(1, "SubscriberCounter"); UA_Server_addVariableNode(server, newNodeId, objectId, UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "Subscriber Counter"), UA_NODEID_NULL, subscriberAttr, NULL, &subNodeID); for(UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++) { UA_VariableAttributes repeatedNodePub = UA_VariableAttributes_default; UA_UInt64 repeatedPublishValue = 0; repeatedNodePub.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; repeatedNodePub.dataType = UA_TYPES[UA_TYPES_UINT64].typeId; UA_Variant_setScalar(&repeatedNodePub.value, &repeatedPublishValue, &UA_TYPES[UA_TYPES_UINT64]); repeatedNodePub.displayName = UA_LOCALIZEDTEXT("en-US", "Publisher RepeatedCounter"); newNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)iterator+10000); UA_Server_addVariableNode(server, newNodeId, objectId, UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "Publisher RepeatedCounter"), UA_NODEID_NULL, repeatedNodePub, NULL, &pubRepeatedCountNodeID); } UA_VariableAttributes runningStatusPub = UA_VariableAttributes_default; UA_Boolean runningPubStatus = 0; runningStatusPub.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; UA_Variant_setScalar(&runningStatusPub.value, &runningPubStatus, &UA_TYPES[UA_TYPES_BOOLEAN]); runningStatusPub.displayName = UA_LOCALIZEDTEXT("en-US", "RunningStatus Pub"); runningStatusPub.dataType = UA_TYPES[UA_TYPES_BOOLEAN].typeId; newNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)20000); UA_Server_addVariableNode(server, newNodeId, objectId, UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "RunningStatus Pub"), UA_NODEID_NULL, runningStatusPub, NULL, &runningPubStatusNodeID); for(UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++) { UA_VariableAttributes repeatedNodeSub = UA_VariableAttributes_default; UA_DateTime repeatedSubscribeValue; UA_Variant_setScalar(&repeatedNodeSub.value, &repeatedSubscribeValue, &UA_TYPES[UA_TYPES_UINT64]); repeatedNodeSub.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; repeatedNodeSub.dataType = UA_TYPES[UA_TYPES_UINT64].typeId; repeatedNodeSub.displayName = UA_LOCALIZEDTEXT("en-US", "Subscriber RepeatedCounter"); newNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)iterator+50000); UA_Server_addVariableNode(server, newNodeId, objectId, UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "Subscriber RepeatedCounter"), UA_NODEID_NULL, repeatedNodeSub, NULL, &subRepeatedCountNodeID); } UA_VariableAttributes runningStatusSubscriber = UA_VariableAttributes_default; UA_Boolean runningSubStatusValue = 0; runningStatusSubscriber.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; UA_Variant_setScalar(&runningStatusSubscriber.value, &runningSubStatusValue, &UA_TYPES[UA_TYPES_BOOLEAN]); runningStatusSubscriber.displayName = UA_LOCALIZEDTEXT("en-US", "RunningStatus Sub"); runningStatusSubscriber.dataType = UA_TYPES[UA_TYPES_BOOLEAN].typeId; newNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)30000); UA_Server_addVariableNode(server, newNodeId, objectId, UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "RunningStatus Sub"), UA_NODEID_NULL, runningStatusSubscriber, NULL, &runningSubStatusNodeID); } Deletion of nodes ~~~~~~~~~~~~~~~~~ The removeServerNodes function is used to delete the publisher and subscriber nodes. .. code-block:: c static void removeServerNodes(UA_Server *server) { /* Delete the Publisher Counter Node*/ UA_Server_deleteNode(server, pubNodeID, true); UA_NodeId_clear(&pubNodeID); for(UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++) { UA_Server_deleteNode(server, pubRepeatedCountNodeID, true); UA_NodeId_clear(&pubRepeatedCountNodeID); } UA_Server_deleteNode(server, runningPubStatusNodeID, true); UA_NodeId_clear(&runningPubStatusNodeID); UA_Server_deleteNode(server, subNodeID, true); UA_NodeId_clear(&subNodeID); for(UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++) { UA_Server_deleteNode(server, subRepeatedCountNodeID, true); UA_NodeId_clear(&subRepeatedCountNodeID); } UA_Server_deleteNode(server, runningSubStatusNodeID, true); UA_NodeId_clear(&runningSubStatusNodeID); } Usage function ~~~~~~~~~~~~~~ The usage function gives the information to run the application. ``./bin/examples/pubsub_TSN_loopback -interface runs the application.`` For more options, use ./bin/examples/pubsub_TSN_loopback -h. .. code-block:: c static void usage(char *appname) { fprintf(stderr, "\n" "usage: %s [options]\n" "\n" " -interface [name] Use network interface 'name'\n" " -cycleTimeInMsec [num] Cycle time in milli seconds (default %lf)\n" " -socketPriority [num] Set publisher SO_PRIORITY to (default %d)\n" " -pubPriority [num] Publisher thread priority value (default %d)\n" " -subPriority [num] Subscriber thread priority value (default %d)\n" " -userAppPriority [num] User application thread priority value (default %d)\n" " -pubCore [num] Run on CPU for publisher (default %d)\n" " -subCore [num] Run on CPU for subscriber (default %d)\n" " -userAppCore [num] Run on CPU for userApplication (default %d)\n" " -pubMacAddress [name] Publisher Mac address (default %s - where 8 is the VLAN ID and 3 is the PCP)\n" " -subMacAddress [name] Subscriber Mac address (default %s - where 8 is the VLAN ID and 3 is the PCP)\n" " -qbvOffset [num] QBV offset value (default %d)\n" " -disableSoTxtime Do not use SO_TXTIME\n" " -enableCsvLog Experimental: To log the data in csv files. Support up to 1 million samples\n" " -enableconsolePrint Experimental: To print the data in console output. Support for higher cycle time\n" " -enableBlockingSocket Run application with blocking socket option. While using blocking socket option need to\n" " run both the Publisher and Loopback application. Otherwise application will not terminate.\n" " -enableXdpSubscribe Enable XDP feature for subscriber. XDP_COPY and XDP_FLAGS_SKB_MODE is used by default. Not recommended to be enabled along with blocking socket.\n" " -xdpQueue [num] XDP queue value (default %d)\n" " -xdpFlagDrvMode Use XDP in DRV mode\n" " -xdpBindFlagZeroCopy Use Zero-Copy mode in XDP\n" "\n", appname, DEFAULT_CYCLE_TIME, DEFAULT_SOCKET_PRIORITY, DEFAULT_PUB_SCHED_PRIORITY, \ DEFAULT_SUB_SCHED_PRIORITY, DEFAULT_USERAPPLICATION_SCHED_PRIORITY, \ DEFAULT_PUB_CORE, DEFAULT_SUB_CORE, DEFAULT_USER_APP_CORE, \ DEFAULT_PUBLISHING_MAC_ADDRESS, DEFAULT_SUBSCRIBING_MAC_ADDRESS, DEFAULT_QBV_OFFSET, DEFAULT_XDP_QUEUE); } Main Server ~~~~~~~~~~~ The main function contains publisher and subscriber threads running in parallel. .. code-block:: c int main(int argc, char **argv) { signal(SIGINT, stopHandler); signal(SIGTERM, stopHandler); UA_Int32 returnValue = 0; UA_StatusCode retval = UA_STATUSCODE_GOOD; UA_Server *server = UA_Server_new(); UA_ServerConfig *config = UA_Server_getConfig(server); char *interface = NULL; UA_Int32 argInputs = 0; UA_Int32 long_index = 0; char *progname; pthread_t userThreadID; /* Process the command line arguments */ progname = strrchr(argv[0], '/'); progname = progname ? 1 + progname : argv[0]; static struct option long_options[] = { {"interface", required_argument, 0, 'a'}, {"cycleTimeInMsec", required_argument, 0, 'b'}, {"socketPriority", required_argument, 0, 'c'}, {"pubPriority", required_argument, 0, 'd'}, {"subPriority", required_argument, 0, 'e'}, {"userAppPriority", required_argument, 0, 'f'}, {"pubCore", required_argument, 0, 'g'}, {"subCore", required_argument, 0, 'h'}, {"userAppCore", required_argument, 0, 'i'}, {"pubMacAddress", required_argument, 0, 'j'}, {"subMacAddress", required_argument, 0, 'k'}, {"qbvOffset", required_argument, 0, 'l'}, {"disableSoTxtime", no_argument, 0, 'm'}, {"enableCsvLog", no_argument, 0, 'n'}, {"enableconsolePrint", no_argument, 0, 'o'}, {"enableBlockingSocket", no_argument, 0, 'p'}, {"xdpQueue", required_argument, 0, 'q'}, {"xdpFlagDrvMode", no_argument, 0, 'r'}, {"xdpBindFlagZeroCopy", no_argument, 0, 's'}, {"enableXdpSubscribe", no_argument, 0, 't'}, {"help", no_argument, 0, 'u'}, {0, 0, 0, 0 } }; while((argInputs = getopt_long_only(argc, argv,"", long_options, &long_index)) != -1) { switch(argInputs) { case 'a': interface = optarg; break; case 'b': cycleTimeInMsec = atof(optarg); break; case 'c': socketPriority = atoi(optarg); break; case 'd': pubPriority = atoi(optarg); break; case 'e': subPriority = atoi(optarg); break; case 'f': userAppPriority = atoi(optarg); break; case 'g': pubCore = atoi(optarg); break; case 'h': subCore = atoi(optarg); break; case 'i': userAppCore = atoi(optarg); break; case 'j': pubMacAddress = optarg; break; case 'k': subMacAddress = optarg; break; case 'l': qbvOffset = atoi(optarg); break; case 'm': disableSoTxtime = false; break; case 'n': enableCsvLog = true; break; case 'o': consolePrint = true; break; case 'p': /* TODO: Application need to be exited independently */ enableBlockingSocket = true; break; case 'q': xdpQueue = (UA_UInt32)atoi(optarg); break; case 'r': xdpFlag = XDP_FLAGS_DRV_MODE; break; case 's': xdpBindFlag = XDP_ZEROCOPY; break; case 't': enableXdpSubscribe = true; break; case 'u': usage(progname); return -1; case '?': usage(progname); return -1; } } if(!interface) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Need a network interface to run"); usage(progname); UA_Server_delete(server); return 0; } if(cycleTimeInMsec < 0.125) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "%f Bad cycle time", cycleTimeInMsec); usage(progname); return -1; } if(enableBlockingSocket == true) { if(enableXdpSubscribe == true) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Cannot enable blocking socket and xdp at the same time"); usage(progname); return -1; } } if(xdpFlag == XDP_FLAGS_DRV_MODE || xdpBindFlag == XDP_ZEROCOPY) { if(enableXdpSubscribe == false) UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Flag enableXdpSubscribe is false, running application without XDP"); } UA_ServerConfig_setMinimal(config, PORT_NUMBER, NULL);