.. _pubsub-nodeset-subscriber-tutorial: Subscriber Realtime example using custom nodes ---------------------------------------------- The purpose of this example file is to use the custom nodes of the XML file(subDataModel.xml) for subscriber. This Subscriber example uses the two custom nodes (SubscriberCounterVariable and Pressure) created using the XML file(subDataModel.xml) for subscribing the packet. The subDataModel.csv will contain the nodeids of custom nodes(object and variables) and the nodeids of the custom nodes are harcoded inside the addSubscribedVariables API This example uses two threads namely the Subscriber and UserApplication. The Subscriber thread is used to subscribe to data at every cycle. The UserApplication thread serves the functionality of the Control loop, which reads the Information Model of the Subscriber and the new counterdata will be written in the csv along with received timestamp. Run steps of the Subscriber application as mentioned below: ``./bin/examples/pubsub_nodeset_rt_subscriber -i `` For more information run ``./bin/examples/pubsub_nodeset_rt_subscriber -h``. .. code-block:: c #define _GNU_SOURCE /* For thread operations */ #include #include #include #include #include #include #include "ua_pubsub.h" #include "open62541/namespace_example_subscriber_generated.h" UA_NodeId readerGroupIdentifier; UA_NodeId readerIdentifier; UA_DataSetReaderConfig readerConfig; /* to find load of each thread * ps -L -o pid,pri,%cpu -C pubsub_nodeset_rt_subscriber*/ /* Configurable Parameters */ /* Cycle time in milliseconds */ #define DEFAULT_CYCLE_TIME 0.25 /* Qbv offset */ #define QBV_OFFSET 25 * 1000 #define DEFAULT_SOCKET_PRIORITY 3 #define PUBLISHER_ID_SUB 2234 #define WRITER_GROUP_ID_SUB 101 #define DATA_SET_WRITER_ID_SUB 62541 #define SUBSCRIBING_MAC_ADDRESS "opc.eth://01-00-5E-7F-00-01:8.3" #define PORT_NUMBER 62541 /* Non-Configurable Parameters */ /* Milli sec and sec conversion to nano sec */ #define MILLI_SECONDS 1000 * 1000 #define SECONDS 1000 * 1000 * 1000 #define SECONDS_SLEEP 5 #define DEFAULT_SUB_SCHED_PRIORITY 81 #define MAX_MEASUREMENTS 30000000 #define DEFAULT_PUBSUB_CORE 2 #define DEFAULT_USER_APP_CORE 3 #define SECONDS_INCREMENT 1 #define CLOCKID CLOCK_TAI #define ETH_TRANSPORT_PROFILE "http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp" #define DEFAULT_USERAPPLICATION_SCHED_PRIORITY 75 /* Below mentioned parameters can be provided as input using command line arguments * If user did not provide the below mentioned parameters as input through command line * argument then default value will be used */ static UA_Double cycleTimeMsec = DEFAULT_CYCLE_TIME; static UA_Boolean consolePrint = UA_FALSE; static UA_Int32 subPriority = DEFAULT_SUB_SCHED_PRIORITY; static UA_Int32 userAppPriority = DEFAULT_USERAPPLICATION_SCHED_PRIORITY; static UA_Int32 pubSubCore = DEFAULT_PUBSUB_CORE; static UA_Int32 userAppCore = DEFAULT_USER_APP_CORE; /* User application Pub will wakeup at the 30% of cycle time and handles the */ /* user data write in Information model */ /* After 60% is left for publisher */ static UA_Double userAppWakeupPercentage = 0.3; /* Subscriber will wake up at the start of cycle time and then receives the packet */ static UA_Double subWakeupPercentage = 0; static UA_Boolean fileWrite = UA_FALSE; /* Set server running as true */ UA_Boolean running = UA_TRUE; UA_UInt16 nsIdx = 0; UA_UInt64 *subCounterData; UA_DataValue *subDataValueRT; UA_Double *pressureData; UA_DataValue *pressureValueRT; /* File to store the data and timestamps for different traffic */ FILE *fpSubscriber; char *fileName = "subscriber_T4.csv"; /* Array to store subscribed counter data */ UA_UInt64 subscribeCounterValue[MAX_MEASUREMENTS]; UA_Double pressureValues[MAX_MEASUREMENTS]; size_t measurementsSubscriber = 0; /* Array to store timestamp */ struct timespec subscribeTimestamp[MAX_MEASUREMENTS]; /* Thread for subscriber */ pthread_t subthreadID; /* Variable for PubSub connection creation */ UA_NodeId connectionIdentSubscriber; struct timespec dataReceiveTime; /* Thread for user application*/ pthread_t userApplicationThreadID; typedef struct { UA_Server* ServerRun; } serverConfigStruct; /* Structure to define thread parameters */ typedef struct { UA_Server* server; void* data; UA_ServerCallback callback; UA_Duration interval_ms; UA_UInt64* callbackId; } threadArg; /* Subscriber thread routine */ void *subscriber(void *arg); /* User application thread routine */ void *userApplicationSub(void *arg); /* To create multi-threads */ static pthread_t threadCreation(UA_Int32 threadPriority, UA_Int32 coreAffinity, void *(*thread) (void *), char *applicationName, void *serverConfig); /* Stop signal */ static void stopHandler(int sign) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c"); running = UA_FALSE; } **Nanosecond field handling** Nanosecond field in timespec is checked for overflowing and one second is added to seconds field and nanosecond field is set to zero .. code-block:: c while (timeSpecValue->tv_nsec > (SECONDS -1)) { /* Move to next second and remove it from ns field */ timeSpecValue->tv_sec += SECONDS_INCREMENT; timeSpecValue->tv_nsec -= SECONDS; } } **Custom callback handling** Custom callback thread handling overwrites the default timer based callback function with the custom (user-specified) callback interval. .. code-block:: c /* Add a callback for cyclic repetition */ static UA_StatusCode addPubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_ServerCallback callback, void *data, UA_Double interval_ms, UA_DateTime *baseTime, UA_TimerPolicy timerPolicy, UA_UInt64 *callbackId) { /* Initialize arguments required for the thread to run */ threadArg *threadArguments = (threadArg *) UA_malloc(sizeof(threadArg)); /* Pass the value required for the threads */ threadArguments->server = server; threadArguments->data = data; threadArguments->callback = callback; threadArguments->interval_ms = interval_ms; threadArguments->callbackId = callbackId; /* Create the subscriber thread with the required priority and core affinity */ char threadNameSub[11] = "Subscriber"; subthreadID = threadCreation(subPriority, pubSubCore, subscriber, threadNameSub, threadArguments); return UA_STATUSCODE_GOOD; } static UA_StatusCode changePubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_UInt64 callbackId, UA_Double interval_ms, UA_DateTime *baseTime, UA_TimerPolicy timerPolicy) { /* Callback interval need not be modified as it is thread based implementation. * The thread uses nanosleep for calculating cycle time and modification in * nanosleep value changes cycle time */ return UA_STATUSCODE_GOOD; } /* Remove the callback added for cyclic repetition */ static void removePubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_UInt64 callbackId) { if(callbackId && (pthread_join(callbackId, NULL) != 0)) UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Pthread Join Failed thread: %ld\n", callbackId); } **External data source handling** If the external data source is written over the information model, the externalDataWriteCallback will be triggered. The user has to take care and assure that the write leads not to synchronization issues and race conditions. .. code-block:: c static UA_StatusCode externalDataWriteCallback(UA_Server *server, const UA_NodeId *sessionId, void *sessionContext, const UA_NodeId *nodeId, void *nodeContext, const UA_NumericRange *range, const UA_DataValue *data){ //node values are updated by using variables in the memory //UA_Server_write is not used for updating node values. return UA_STATUSCODE_GOOD; } static UA_StatusCode externalDataReadNotificationCallback(UA_Server *server, const UA_NodeId *sessionId, void *sessionContext, const UA_NodeId *nodeid, void *nodeContext, const UA_NumericRange *range){ //allow read without any preparation return UA_STATUSCODE_GOOD; } **Subscriber Connection Creation** Create Subscriber connection for the Subscriber thread .. code-block:: c static void addPubSubConnectionSubscriber(UA_Server *server, UA_NetworkAddressUrlDataType *networkAddressUrlSubscriber){ UA_StatusCode retval = UA_STATUSCODE_GOOD; /* Details about the connection configuration and handling are located * in the pubsub connection tutorial */ UA_PubSubConnectionConfig connectionConfig; memset(&connectionConfig, 0, sizeof(connectionConfig)); connectionConfig.name = UA_STRING("Subscriber Connection"); connectionConfig.enabled = UA_TRUE; UA_NetworkAddressUrlDataType networkAddressUrlsubscribe = *networkAddressUrlSubscriber; connectionConfig.transportProfileUri = UA_STRING(ETH_TRANSPORT_PROFILE); UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrlsubscribe, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]); connectionConfig.publisherId.numeric = UA_UInt32_random(); retval |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdentSubscriber); if (retval == UA_STATUSCODE_GOOD) UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,"The PubSub Connection was created successfully!"); } **ReaderGroup** ReaderGroup is used to group a list of DataSetReaders. All ReaderGroups are created within a PubSubConnection and automatically deleted if the connection is removed. .. code-block:: c /* Add ReaderGroup to the created connection */ static void addReaderGroup(UA_Server *server) { if (server == NULL) { return; } UA_ReaderGroupConfig readerGroupConfig; memset (&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig)); readerGroupConfig.name = UA_STRING("ReaderGroup1"); readerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE; readerGroupConfig.pubsubManagerCallback.addCustomCallback = addPubSubApplicationCallback; readerGroupConfig.pubsubManagerCallback.changeCustomCallback = changePubSubApplicationCallback; readerGroupConfig.pubsubManagerCallback.removeCustomCallback = removePubSubApplicationCallback; UA_Server_addReaderGroup(server, connectionIdentSubscriber, &readerGroupConfig, &readerGroupIdentifier); } **SubscribedDataSet** Set SubscribedDataSet type to TargetVariables data type Add SubscriberCounter variable to the DataSetReader .. code-block:: c static void addSubscribedVariables (UA_Server *server) { if (server == NULL) { return; } UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable*) UA_calloc(2, sizeof(UA_FieldTargetVariable)); subCounterData = UA_UInt64_new(); *subCounterData = 0; subDataValueRT = UA_DataValue_new(); UA_Variant_setScalar(&subDataValueRT->value, subCounterData, &UA_TYPES[UA_TYPES_UINT64]); subDataValueRT->hasValue = UA_TRUE; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend valueBackend; valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; valueBackend.backend.external.value = &subDataValueRT; valueBackend.backend.external.callback.userWrite = externalDataWriteCallback; valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback; /* If user need to change the nodeid of the custom nodes in the application then it must be * changed inside the xml and .csv file inside examples\pubsub_realtime\nodeset\*/ /* The nodeid of the Custom node SubscriberCounterVariable is 2005 which is used below */ UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(nsIdx, 2005), valueBackend); UA_FieldTargetDataType_init(&targetVars[0].targetVariable); targetVars[0].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; targetVars[0].targetVariable.targetNodeId = UA_NODEID_NUMERIC(nsIdx, 2005); pressureData = UA_Double_new(); *pressureData = 0; pressureValueRT = UA_DataValue_new(); UA_Variant_setScalar(&pressureValueRT->value, pressureData, &UA_TYPES[UA_TYPES_DOUBLE]); pressureValueRT->hasValue = UA_TRUE; /* Set the value backend of the above create node to 'external value source' */ UA_ValueBackend valueBackend1; valueBackend1.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; valueBackend1.backend.external.value = &pressureValueRT; valueBackend1.backend.external.callback.userWrite = externalDataWriteCallback; valueBackend1.backend.external.callback.notificationRead = externalDataReadNotificationCallback; /* The nodeid of the Custom node Pressure is 2006 which is used below */ UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(nsIdx, 2006), valueBackend1); UA_FieldTargetDataType_init(&targetVars[1].targetVariable); targetVars[1].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; targetVars[1].targetVariable.targetNodeId = UA_NODEID_NUMERIC(nsIdx, 2006); /* Set the subscribed data to TargetVariable type */ readerConfig.subscribedDataSetType = UA_PUBSUB_SDS_TARGET; readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariables = targetVars; readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariablesSize = 2; } **DataSetReader** DataSetReader can receive NetworkMessages with the DataSetMessage of interest sent by the Publisher. DataSetReader provides the configuration necessary to receive and process DataSetMessages on the Subscriber side. DataSetReader must be linked with a SubscribedDataSet and be contained within a ReaderGroup. .. code-block:: c /* Add DataSetReader to the ReaderGroup */ static void addDataSetReader(UA_Server *server) { if (server == NULL) { return; } memset (&readerConfig, 0, sizeof(UA_DataSetReaderConfig)); readerConfig.name = UA_STRING("DataSet Reader 1"); UA_UInt16 publisherIdentifier = PUBLISHER_ID_SUB; readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_UINT16]; readerConfig.publisherId.data = &publisherIdentifier; readerConfig.writerGroupId = WRITER_GROUP_ID_SUB; readerConfig.dataSetWriterId = DATA_SET_WRITER_ID_SUB; readerConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED; readerConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPDATASETREADERMESSAGEDATATYPE]; UA_UadpDataSetReaderMessageDataType *dataSetReaderMessage = UA_UadpDataSetReaderMessageDataType_new(); dataSetReaderMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID | (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER); readerConfig.messageSettings.content.decoded.data = dataSetReaderMessage; /* Setting up Meta data configuration in DataSetReader */ UA_DataSetMetaDataType *pMetaData = &readerConfig.dataSetMetaData; /* FilltestMetadata function in subscriber implementation */ UA_DataSetMetaDataType_init(pMetaData); pMetaData->name = UA_STRING ("DataSet Test"); /* Static definition of number of fields size to 1 to create one targetVariable */ pMetaData->fieldsSize = 2; pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize, &UA_TYPES[UA_TYPES_FIELDMETADATA]); /* Unsigned Integer DataType */ UA_FieldMetaData_init (&pMetaData->fields[0]); UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT64].typeId, &pMetaData->fields[0].dataType); pMetaData->fields[0].builtInType = UA_NS0ID_UINT64; pMetaData->fields[0].valueRank = -1; /* scalar */ /* Double DataType */ UA_FieldMetaData_init (&pMetaData->fields[1]); UA_NodeId_copy (&UA_TYPES[UA_TYPES_DOUBLE].typeId, &pMetaData->fields[1].dataType); pMetaData->fields[1].builtInType = UA_NS0ID_DOUBLE; pMetaData->fields[1].valueRank = -1; /* scalar */ /* Setup Target Variables in DSR config */ addSubscribedVariables(server); /* Setting up Meta data configuration in DataSetReader */ UA_Server_addDataSetReader(server, readerGroupIdentifier, &readerConfig, &readerIdentifier); UA_free(readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariables); UA_free(readerConfig.dataSetMetaData.fields); UA_UadpDataSetReaderMessageDataType_delete(dataSetReaderMessage); } **Subscribed data handling** The subscribed data is updated in the array using this function Subscribed data handling** .. code-block:: c static void updateMeasurementsSubscriber(struct timespec receive_time, UA_UInt64 counterValue, UA_Double pressureValue) { subscribeTimestamp[measurementsSubscriber] = receive_time; subscribeCounterValue[measurementsSubscriber] = counterValue; pressureValues[measurementsSubscriber] = pressureValue; measurementsSubscriber++; } **Subscriber thread routine** Subscriber thread will wakeup during the start of cycle at 250us interval and check if the packets are received. The subscriber function is the routine used by the subscriber thread. .. code-block:: c void *subscriber(void *arg) { UA_Server* server; UA_ReaderGroup* currentReaderGroup; UA_ServerCallback subCallback; struct timespec nextnanosleeptimeSub; threadArg *threadArgumentsSubscriber = (threadArg *)arg; server = threadArgumentsSubscriber->server; subCallback = threadArgumentsSubscriber->callback; currentReaderGroup = (UA_ReaderGroup *)threadArgumentsSubscriber->data; /* Get current time and compute the next nanosleeptime */ clock_gettime(CLOCKID, &nextnanosleeptimeSub); /* Variable to nano Sleep until 1ms before a 1 second boundary */ nextnanosleeptimeSub.tv_sec += SECONDS_SLEEP; nextnanosleeptimeSub.tv_nsec = (__syscall_slong_t)(cycleTimeMsec * subWakeupPercentage * MILLI_SECONDS); nanoSecondFieldConversion(&nextnanosleeptimeSub); while (running) { clock_nanosleep(CLOCKID, TIMER_ABSTIME, &nextnanosleeptimeSub, NULL); /* Read subscribed data from the SubscriberCounter variable */ subCallback(server, currentReaderGroup); nextnanosleeptimeSub.tv_nsec += (__syscall_slong_t)(cycleTimeMsec * MILLI_SECONDS); nanoSecondFieldConversion(&nextnanosleeptimeSub); } UA_free(threadArgumentsSubscriber); return (void*)NULL; } **UserApplication thread routine** The userapplication thread will wakeup at 30% of cycle time and handles the userdata in the Information Model. This thread is used to write the counterdata that is subscribed by the Subscriber thread in a csv. .. code-block:: c void *userApplicationSub(void *arg) { struct timespec nextnanosleeptimeUserApplication; /* Get current time and compute the next nanosleeptime */ clock_gettime(CLOCKID, &nextnanosleeptimeUserApplication); /* Variable to nano Sleep until 1ms before a 1 second boundary */ nextnanosleeptimeUserApplication.tv_sec += SECONDS_SLEEP; nextnanosleeptimeUserApplication.tv_nsec = (__syscall_slong_t)(cycleTimeMsec * userAppWakeupPercentage * MILLI_SECONDS); nanoSecondFieldConversion(&nextnanosleeptimeUserApplication); while (running) { clock_nanosleep(CLOCKID, TIMER_ABSTIME, &nextnanosleeptimeUserApplication, NULL); clock_gettime(CLOCKID, &dataReceiveTime); if ((fileWrite == UA_TRUE) || (consolePrint == UA_TRUE)) { if (*subCounterData > 0) updateMeasurementsSubscriber(dataReceiveTime, *subCounterData, *pressureData); } nextnanosleeptimeUserApplication.tv_nsec += (__syscall_slong_t)(cycleTimeMsec * MILLI_SECONDS); nanoSecondFieldConversion(&nextnanosleeptimeUserApplication); } return (void*)NULL; } **Thread creation** The threadcreation functionality creates thread with given threadpriority, coreaffinity. The function returns the threadID of the newly created thread. .. code-block:: c static pthread_t threadCreation(UA_Int32 threadPriority, UA_Int32 coreAffinity, void *(*thread) (void *), \ char *applicationName, void *serverConfig){ /* Core affinity set */ cpu_set_t cpuset; pthread_t threadID; struct sched_param schedParam; UA_Int32 returnValue = 0; UA_Int32 errorSetAffinity = 0; /* Return the ID for thread */ threadID = pthread_self(); schedParam.sched_priority = threadPriority; returnValue = pthread_setschedparam(threadID, SCHED_FIFO, &schedParam); if (returnValue != 0) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"pthread_setschedparam: failed\n"); exit(1); } UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,\ "\npthread_setschedparam:%s Thread priority is %d \n", \ applicationName, schedParam.sched_priority); CPU_ZERO(&cpuset); CPU_SET((size_t)coreAffinity, &cpuset); errorSetAffinity = pthread_setaffinity_np(threadID, sizeof(cpu_set_t), &cpuset); if (errorSetAffinity) { fprintf(stderr, "pthread_setaffinity_np: %s\n", strerror(errorSetAffinity)); exit(1); } returnValue = pthread_create(&threadID, NULL, thread, serverConfig); if (returnValue != 0) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,":%s Cannot create thread\n", applicationName); } if (CPU_ISSET((size_t)coreAffinity, &cpuset)) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"%s CPU CORE: %d\n", applicationName, coreAffinity); } return threadID; } **Usage function** The usage function gives the list of options that can be configured in the application. ./bin/examples/pubsub_nodeset_rt_subscriber -h gives the list of options for running the application. .. code-block:: c static void usage(char *appname) { fprintf(stderr, "\n" "usage: %s [options]\n" "\n" " -i [name] use network interface 'name'\n" " -C [num] cycle time in milli seconds (default %lf)\n" " -p Do you need to print the data in console output\n" " -P [num] Publisher priority value (default %d)\n" " -U [num] User application priority value (default %d)\n" " -c [num] run on CPU for publisher'num'(default %d)\n" " -u [num] run on CPU for userApplication'num'(default %d)\n" " -m [mac_addr] ToDO:dst MAC address\n" " -h prints this message and exits\n" "\n", appname, DEFAULT_CYCLE_TIME, DEFAULT_SUB_SCHED_PRIORITY, \ DEFAULT_USERAPPLICATION_SCHED_PRIORITY, DEFAULT_PUBSUB_CORE, DEFAULT_USER_APP_CORE); } **Main Server code** The main function contains subscriber threads running .. code-block:: c int main(int argc, char **argv) { signal(SIGINT, stopHandler); signal(SIGTERM, stopHandler); UA_Int32 returnValue = 0; char *interface = NULL; char *progname; UA_Int32 argInputs = -1; UA_StatusCode retval = UA_STATUSCODE_GOOD; UA_Server *server = UA_Server_new(); UA_ServerConfig *config = UA_Server_getConfig(server); pthread_t userThreadID; UA_ServerConfig_setMinimal(config, PORT_NUMBER, NULL); /* Files namespace_example_subscriber_generated.h and namespace_example_subscriber_generated.c are created from * subDataModel.xml in the /src_generated directory by CMake */ /* Loading the user created variables into the information model from the generated .c and .h files */ if(namespace_example_subscriber_generated(server) != UA_STATUSCODE_GOOD) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Could not add the example nodeset. " "Check previous output for any error."); } else { nsIdx = UA_Server_addNamespace(server, "http://yourorganisation.org/test/"); } UA_NetworkAddressUrlDataType networkAddressUrlSub; /* For more information run ./bin/examples/pubsub_nodeset_rt_subscriber -h */ /* Process the command line arguments */ progname = strrchr(argv[0], '/'); progname = progname ? 1 + progname : argv[0]; while (EOF != (argInputs = getopt(argc, argv, "i:C:f:ps:P:U:c:u:tm:h:"))) { switch (argInputs) { case 'i': interface = optarg; break; case 'C': cycleTimeMsec = atof(optarg); break; case 'f': fileName = optarg; fileWrite = UA_TRUE; fpSubscriber = fopen(fileName, "w"); break; case 'p': consolePrint = UA_TRUE; break; case 'P': subPriority = atoi(optarg); break; case 'U': userAppPriority = atoi(optarg); break; case 'c': pubSubCore = atoi(optarg); break; case 'u': userAppCore = atoi(optarg); break; case 'm': /*ToDo:Need to handle for mac address*/ break; case 'h': usage(progname); return -1; case '?': usage(progname); return -1; } } if (cycleTimeMsec < 0.125) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "%f Bad cycle time", cycleTimeMsec); usage(progname); return -1; } if (!interface) { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Need a network interface to run"); usage(progname); return -1; } networkAddressUrlSub.networkInterface = UA_STRING(interface); networkAddressUrlSub.url = UA_STRING(SUBSCRIBING_MAC_ADDRESS); UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerEthernet()); addPubSubConnectionSubscriber(server, &networkAddressUrlSub); addReaderGroup(server); addDataSetReader(server); UA_Server_freezeReaderGroupConfiguration(server, readerGroupIdentifier); UA_Server_setReaderGroupOperational(server, readerGroupIdentifier); serverConfigStruct *serverConfig; serverConfig = (serverConfigStruct*)UA_malloc(sizeof(serverConfigStruct)); serverConfig->ServerRun = server; char threadNameUserApplication[22] = "UserApplicationSub"; userThreadID = threadCreation(userAppPriority, userAppCore, userApplicationSub, threadNameUserApplication, serverConfig); retval |= UA_Server_run(server, &running); UA_Server_unfreezeReaderGroupConfiguration(server, readerGroupIdentifier); returnValue = pthread_join(subthreadID, NULL); if (returnValue != 0) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"\nPthread Join Failed for subscriber thread:%d\n", returnValue); } returnValue = pthread_join(userThreadID, NULL); if (returnValue != 0) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"\nPthread Join Failed for User thread:%d\n", returnValue); } if (fileWrite == UA_TRUE) { /* Write the subscribed data in the file */ size_t subLoopVariable = 0; for (subLoopVariable = 0; subLoopVariable < measurementsSubscriber; subLoopVariable++) { fprintf(fpSubscriber, "%ld,%ld.%09ld,%lf\n", subscribeCounterValue[subLoopVariable], subscribeTimestamp[subLoopVariable].tv_sec, subscribeTimestamp[subLoopVariable].tv_nsec, pressureValues[subLoopVariable]); } fclose(fpSubscriber); } if (consolePrint == UA_TRUE) { size_t subLoopVariable = 0; for (subLoopVariable = 0; subLoopVariable < measurementsSubscriber; subLoopVariable++) { fprintf(fpSubscriber, "%ld,%ld.%09ld,%lf\n", subscribeCounterValue[subLoopVariable], subscribeTimestamp[subLoopVariable].tv_sec, subscribeTimestamp[subLoopVariable].tv_nsec, pressureValues[subLoopVariable]); } } UA_Server_delete(server); UA_free(serverConfig); UA_free(subCounterData); /* Free external data source */ UA_free(subDataValueRT); UA_free(pressureData); /* Free external data source */ UA_free(pressureValueRT); return (int)retval; }