Blame view

RIOT/examples/emcute/main.c 7.07 KB
a752c7ab   elopes   add first test an...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
  /*
   * Copyright (C) 2015 Freie Universitรคt Berlin
   *
   * This file is subject to the terms and conditions of the GNU Lesser
   * General Public License v2.1. See the file LICENSE in the top level
   * directory for more details.
   */
  
  /**
   * @ingroup     examples
   * @{
   *
   * @file
   * @brief       Example application for demonstrating the RIOT network stack
   *
   * @author      Hauke Petersen <hauke.petersen@fu-berlin.de>
   *
   * @}
   */
  
  #include <stdio.h>
  #include <string.h>
  #include <stdlib.h>
  
  #include "shell.h"
  #include "msg.h"
  #include "net/emcute.h"
  #include "net/ipv6/addr.h"
  
  #define EMCUTE_PORT         (1883U)
  #define EMCUTE_ID           ("gertrud")
  #define EMCUTE_PRIO         (THREAD_PRIORITY_MAIN - 1)
  
  #define NUMOFSUBS           (16U)
  #define TOPIC_MAXLEN        (64U)
  
  static char stack[THREAD_STACKSIZE_DEFAULT];
  static msg_t queue[8];
  
  static emcute_sub_t subscriptions[NUMOFSUBS];
  static char topics[NUMOFSUBS][TOPIC_MAXLEN];
  
  static void *emcute_thread(void *arg)
  {
      (void)arg;
      emcute_run(EMCUTE_PORT, EMCUTE_ID);
      return NULL;    /* should never be reached */
  }
  
  static void on_pub(const emcute_topic_t *topic, void *data, size_t len)
  {
      char *in = (char *)data;
  
      printf("### got publication for topic '%s' [%i] ###\n",
             topic->name, (int)topic->id);
      for (size_t i = 0; i < len; i++) {
          printf("%c", in[i]);
      }
      puts("");
  }
  
  static unsigned get_qos(const char *str)
  {
      int qos = atoi(str);
      switch (qos) {
          case 1:     return EMCUTE_QOS_1;
          case 2:     return EMCUTE_QOS_2;
          default:    return EMCUTE_QOS_0;
      }
  }
  
  static int cmd_con(int argc, char **argv)
  {
      sock_udp_ep_t gw = { .family = AF_INET6, .port = EMCUTE_PORT };
      char *topic = NULL;
      char *message = NULL;
      size_t len = 0;
  
      if (argc < 2) {
          printf("usage: %s <ipv6 addr> [port] [<will topic> <will message>]\n",
                  argv[0]);
          return 1;
      }
  
      /* parse address */
      if (ipv6_addr_from_str((ipv6_addr_t *)&gw.addr.ipv6, argv[1]) == NULL) {
          printf("error parsing IPv6 address\n");
          return 1;
      }
  
      if (argc >= 3) {
          gw.port = atoi(argv[2]);
      }
      if (argc >= 5) {
          topic = argv[3];
          message = argv[4];
          len = strlen(message);
      }
  
      if (emcute_con(&gw, true, topic, message, len, 0) != EMCUTE_OK) {
          printf("error: unable to connect to [%s]:%i\n", argv[1], (int)gw.port);
      }
      printf("Successfully connected to gateway at [%s]:%i\n",
             argv[1], (int)gw.port);
  
      return 0;
  }
  
  static int cmd_discon(int argc, char **argv)
  {
      (void)argc;
      (void)argv;
  
      int res = emcute_discon();
      if (res == EMCUTE_NOGW) {
          puts("error: not connected to any broker");
          return 1;
      }
      else if (res != EMCUTE_OK) {
          puts("error: unable to disconnect");
          return 1;
      }
      puts("Disconnect successful");
      return 0;
  }
  
  static int cmd_pub(int argc, char **argv)
  {
      emcute_topic_t t;
      unsigned flags = EMCUTE_QOS_0;
  
      if (argc < 3) {
          printf("usage: %s <topic name> <data> [QoS level]\n", argv[0]);
          return 1;
      }
  
      /* parse QoS level */
      if (argc >= 4) {
          flags |= get_qos(argv[3]);
      }
  
      printf("pub with topic: %s and name %s and flags 0x%02x\n", argv[1], argv[2], (int)flags);
  
      /* step 1: get topic id */
      t.name = argv[1];
      if (emcute_reg(&t) != EMCUTE_OK) {
          puts("error: unable to obtain topic ID");
          return 1;
      }
  
      /* step 2: publish data */
      if (emcute_pub(&t, argv[2], strlen(argv[2]), flags) != EMCUTE_OK) {
          printf("error: unable to publish data to topic '%s [%i]'\n",
                  t.name, (int)t.id);
          return 1;
      }
  
      printf("Published %i bytes to topic '%s [%i]'\n",
              (int)strlen(argv[2]), t.name, t.id);
  
      return 0;
  }
  
  static int cmd_sub(int argc, char **argv)
  {
      unsigned flags = EMCUTE_QOS_0;
  
      if (argc < 2) {
          printf("usage: %s <topic name> [QoS level]\n", argv[0]);
          return 1;
      }
  
      if (strlen(argv[1]) > TOPIC_MAXLEN) {
          puts("error: topic name exceeds maximum possible size");
          return 1;
      }
      if (argc >= 3) {
          flags |= get_qos(argv[2]);
      }
  
      /* find empty subscription slot */
      unsigned i = 0;
      for (; (i < NUMOFSUBS) && (subscriptions[i].topic.id != 0); i++) {}
      if (i == NUMOFSUBS) {
          puts("error: no memory to store new subscriptions");
          return 1;
      }
  
      subscriptions[i].cb = on_pub;
      strcpy(topics[i], argv[1]);
      subscriptions[i].topic.name = topics[i];
      if (emcute_sub(&subscriptions[i], flags) != EMCUTE_OK) {
          printf("error: unable to subscribe to %s\n", argv[1]);
          return 1;
      }
  
      printf("Now subscribed to %s\n", argv[1]);
      return 0;
  }
  
  static int cmd_unsub(int argc, char **argv)
  {
      if (argc < 2) {
          printf("usage %s <topic name>\n", argv[0]);
          return 1;
      }
  
      /* find subscriptions entry */
      for (unsigned i = 0; i < NUMOFSUBS; i++) {
          if (subscriptions[i].topic.name &&
              (strcmp(subscriptions[i].topic.name, argv[1]) == 0)) {
              if (emcute_unsub(&subscriptions[i]) == EMCUTE_OK) {
                  memset(&subscriptions[i], 0, sizeof(emcute_sub_t));
                  printf("Unsubscribed from '%s'\n", argv[1]);
              }
              else {
                  printf("Unsubscription form '%s' failed\n", argv[1]);
              }
              return 0;
          }
      }
  
      printf("error: no subscription for topic '%s' found\n", argv[1]);
      return 1;
  }
  
  static int cmd_will(int argc, char **argv)
  {
      if (argc < 3) {
          printf("usage %s <will topic name> <will message content>\n", argv[0]);
          return 1;
      }
  
      if (emcute_willupd_topic(argv[1], 0) != EMCUTE_OK) {
          puts("error: unable to update the last will topic");
          return 1;
      }
      if (emcute_willupd_msg(argv[2], strlen(argv[2])) != EMCUTE_OK) {
          puts("error: unable to update the last will message");
          return 1;
      }
  
      puts("Successfully updated last will topic and message");
      return 0;
  }
  
  static const shell_command_t shell_commands[] = {
      { "con", "connect to MQTT broker", cmd_con },
      { "discon", "disconnect from the current broker", cmd_discon },
      { "pub", "publish something", cmd_pub },
      { "sub", "subscribe topic", cmd_sub },
      { "unsub", "unsubscribe from topic", cmd_unsub },
      { "will", "register a last will", cmd_will },
      { NULL, NULL, NULL }
  };
  
  int main(void)
  {
      puts("MQTT-SN example application\n");
      puts("Type 'help' to get started. Have a look at the README.md for more"
           "information.");
  
      /* the main thread needs a msg queue to be able to run `ping6`*/
      msg_init_queue(queue, (sizeof(queue) / sizeof(msg_t)));
  
      /* initialize our subscription buffers */
      memset(subscriptions, 0, (NUMOFSUBS * sizeof(emcute_sub_t)));
  
      /* start the emcute thread */
      thread_create(stack, sizeof(stack), EMCUTE_PRIO, 0,
                    emcute_thread, NULL, "emcute");
  
      /* start shell */
      char line_buf[SHELL_DEFAULT_BUFSIZE];
      shell_run(shell_commands, line_buf, SHELL_DEFAULT_BUFSIZE);
  
      /* should be never reached */
      return 0;
  }