Issue with MQTT on mc60

Hi,

I am trying MQTT protocol on MC60c with open CPU. I have written code such that it should be continuously publishing the data and if it fails to send for a few times it has to restart the connection. The issue here is it is not able to publish the data after a certain amount of time, and sometimes the subscribe messages are not received. I have worked on other controllers with MQTT protocol by continuously publishing the data but it never failed to send the data. Even I have tried on the SIMCOM module to publish the data it worked good. Can I get a solution how to resolve this? I am attaching my code below, please let me know if any modifications with my code so that it would work effectively as expected.

#ifdef MQTT_A

#include “ql_stdlib.h”
#include “ql_common.h”
#include “ql_type.h”
#include “ql_trace.h”
#include “ql_error.h”
#include “ql_uart.h”
#include “ql_gprs.h”
#include “ql_timer.h”
#include “ql_system.h”
#include “ril_network.h”
#include “ril.h”
#include “ril_util.h”
#include “ril_system.h”
#include<stdlib.h>
#include “ql_gpio.h”

#define DEBUG_ENABLE 1
#if DEBUG_ENABLE > 0
#define DEBUG_PORT UART_PORT1
#define DBG_BUF_LEN 512
static char DBG_BUFFER[DBG_BUF_LEN];
#define APP_DEBUG(FORMAT,…) {
Ql_memset(DBG_BUFFER, 0, DBG_BUF_LEN);
Ql_sprintf(DBG_BUFFER,FORMAT,##VA_ARGS);
if (UART_PORT2 == (DEBUG_PORT))
{
Ql_Debug_Trace(DBG_BUFFER);
} else {
Ql_UART_Write((Enum_SerialPort)(DEBUG_PORT), (u8*)(DBG_BUFFER), Ql_strlen((const char *)(DBG_BUFFER)));
}
}
#else
#define APP_DEBUG(FORMAT,…)
#endif

#define QL_RET_ERR_RIL_MQTT_FAIL -1

typedef struct{
u8 index;
u8* prefix;
s32 data;
u32 length;

}MQTT_Param;

/*****************************************************************

  • timer param
    ******************************************************************/
    #define MQTT_TIMER_ID TIMER_ID_USER_START
    #define TIMEOUT_90S_TIMER_ID TIMER_ID_USER_START + 1 //timeout
    #define MQTT_TIMER_PERIOD 2000 //changing from 5 to 2 sec
    #define TIMEOUT_90S_PERIOD 90000

static s32 timeout_90S_monitor = FALSE;

/*****************************************************************

  • APN Param
    ******************************************************************/
    static u8 m_apn[MAX_GPRS_APN_LEN] = “cmnet”;
    static u8 m_userid[MAX_GPRS_USER_NAME_LEN] = “”;
    static u8 m_passwd[MAX_GPRS_PASSWORD_LEN] = “”;

/**********************************************************************************/

#define MAX_MQTT_PARAM_LENGTH 100
#define HOST_NAME “broker.uniolabs.in”
#define HOST_PORT 1883
#define CONTEXT 0
int product_key=20; //chnaged to 20 from 120
//static u8 product_key[MAX_MQTT_PARAM_LENGTH] = “120”;
//static u8 device_name[MAX_MQTT_PARAM_LENGTH] = “TEST1”;
//static u8 device_secret[MAX_MQTT_PARAM_LENGTH] = “lN9l09QesoiL4HXw7W9Z1XVJ0JzTJxhs”;

//static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “brate”;
// static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “/hDTjzV2bVeR/TEST1/update”;
static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “29/raw”;
static u8 topic_update1[MAX_MQTT_PARAM_LENGTH] = “srate”;
static u8 topic_error[MAX_MQTT_PARAM_LENGTH] = “/hDTjzV2bVeR/TEST1/update/error”;
static u8 topic_get[MAX_MQTT_PARAM_LENGTH] = “/hDTjzV2bVeR/TEST1/get”;

/*************************************************************************************/
int count_mqtt_error=0;
u8 test_message[50] = “allan\0”;
char pub_msg[15]="";
char pub_msg1[15]="";
u8 dest_msg[1024]={0};
u8 packet1;
bool uart_flag=TRUE;

typedef enum{
STATE_NW_GET_SIMSTATE,
STATE_NW_QUERY_STATE,
STATE_GPRS_CONTEXT,
STATE_GPRS_ACTIVATE,
STATE_MQTT_CFG,
STATE_MQTT_OPEN,
STATE_MQTT_CONN,
STATE_MQTT_SUB,
STATE_MQTT_PUB,
STATE_MQTT_CLOSE,
STATE_MQTT_DISC,
STATE_GPRS_DEACTIVATE,
STATE_TOTAL_NUM
}Enum_TCPSTATE;

static u8 m_tcp_state = STATE_NW_GET_SIMSTATE;

static MQTT_Param mqtt_param;
/*****************************************************************

  • uart callback function
    ***************************************************************/
    static void CallBack_UART_Hdlr(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void
    customizedPara);
    static void CallBack_UART_Hdlr1(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void
    customizedPara);
    static void Callback_Timer(u32 timerId, void
    param);

static s32 ATResponse_mqtt_handler(char* line, u32 len, void* userdata);
static s32 ATResponse_mqtt_handler_pub(char* line, u32 len, void* userdata);
static s32 ATResponse_mqtt_handler_close(char* line, u32 len, void* userdata);
static void Hdlr_Mqtt_Recv_Data(u8* packet);
static void Hdlr_Mqtt_Recv_State(u8* packet);

s32 RIL_MQTT_QMTCFG_Ali( u8 connectID,int* product_key);
s32 RIL_MQTT_QMTOPEN( u8 connectID,u8* hostName, u32 port );
s32 RIL_MQTT_QMTCONN(u8 connectID, u8* clientID,u8* username,u8* password);
s32 RIL_MQTT_QMTSUB(u8 connectID, u8 msgId,u8* topic, u8 qos,u8* others);
s32 RIL_MQTT_QMTPUB(u8 connectID, u8 msgId,u8 qos,u8 retain, u8* topic,u8* data,u8 length);
s32 RIL_MQTT_QMTDISC(u8 connectID);
s32 RIL_MQTT_QMTCLOSE(u8 connectID);
s32 RIL_MQTT_QMTUNS(u8 connectID, u8 msgId,u8* topic,u8* others);

#define SERIAL_RX_BUFFER_LEN 2048
static u8 m_RxBuf_VirtualPort1[SERIAL_RX_BUFFER_LEN];
static u8 m_RxBuf_UartPort1[SERIAL_RX_BUFFER_LEN];
static u8 m_RxBuf_UartPort2[SERIAL_RX_BUFFER_LEN];
static u8 m_RxBuf_UartPort3[SERIAL_RX_BUFFER_LEN];

u8* pub_msg3;
u8* pub_msg2;
char pub_msg4[50];

static s32 ReadSerialPort(Enum_SerialPort port, /[out]/u8* pBuffer, /[in]/u32 bufLen)
{
s32 rdLen = 0;
s32 rdTotalLen = 0;
if (NULL == pBuffer || 0 == bufLen)
{
return -1;
}
Ql_memset(pBuffer, 0x0, bufLen);
while (1)
{
rdLen = Ql_UART_Read(port, pBuffer + rdTotalLen, bufLen - rdTotalLen);
if (rdLen <= 0) // All data is read out, or Serial Port Error!
{
break;
}
rdTotalLen += rdLen;
// Continue to read…
}
if (rdLen < 0) // Serial Port Error!
{
APP_DEBUG("<–Fail to read from port[%d]–>\r\n", port);
return -99;
}
return rdTotalLen;
}

static void Callback_UART_Hdlr(Enum_SerialPort port, Enum_UARTEventType evt, bool level, void* customizedPara)
{
s32 ret;
//APP_DEBUG("<–CallBack_UART_Hdlr: port=%d, event=%d, level=%d, p=%x–>\r\n", port, evt, level, customizedPara);
switch (evt)
{
case EVENT_UART_READY_TO_READ:
{
// if (VIRTUAL_PORT1 == port || VIRTUAL_PORT2 == port)
// {// Print URC through uart1 port
// s32 totalBytes = ReadSerialPort(port, m_RxBuf_VirtualPort1, sizeof(m_RxBuf_VirtualPort1));
// APP_DEBUG(“the data1 is %s\n”,m_RxBuf_VirtualPort1);
// s32 ret = Ql_UART_Write(UART_PORT1, m_RxBuf_VirtualPort1, totalBytes);
// if (ret < totalBytes)
// {
// APP_DEBUG("<–Only part of bytes are written, %d/%d–>\r\n", ret, totalBytes);
// // TODO: Need to handle event ‘QL_EVENT_UART_READY_TO_WRITE’
// }
// }
if (UART_PORT1 == port/* || UART_PORT2 == port || UART_PORT3 == port*/)
{// Reflect the data
APP_DEBUG(“the value of uart_flag is %d\n”,uart_flag);
// if (uart_flag==TRUE)
// {

            //Ql_Sleep(3000);
            u8* arrRxBuf[] = {m_RxBuf_UartPort1/*, m_RxBuf_UartPort2, m_RxBuf_UartPort3*/};
            s32 totalBytes = ReadSerialPort(port, arrRxBuf[port - UART_PORT1], SERIAL_RX_BUFFER_LEN);
            
            pub_msg2=0;
            pub_msg2 =arrRxBuf[port - UART_PORT1];
            
            pub_msg3=0;
            pub_msg3=pub_msg2;
            
            pub_msg4[0]=0;
            Ql_memset(pub_msg4, 0x0, sizeof(pub_msg3));
            int i;
            for(i=0;i<Ql_strlen(pub_msg3);i++)
            {
              pub_msg4[i]=pub_msg3[i];
            }
            pub_msg4[i]='\0';
            APP_DEBUG("the pub_msg4 is %s\n",pub_msg4);
            // for(int i=0;i<13;i++)
            // {
            //     pub_msg[i]=arrRxBuf[port - UART_PORT1];
            // }
            // pub_msg[SERIAL_RX_BUFFER_LEN]=arrRxBuf[port - UART_PORT1];
            // APP_DEBUG("pub_msg is %s\n",pub_msg);
            //pub_msg1[0]=pub_msg2;     //commenting out for checking while testing
            
            s32 ret = Ql_UART_Write(port, arrRxBuf[port - UART_PORT1], totalBytes);
            if (ret < totalBytes)
            {
                APP_DEBUG("<--Only part of bytes are written, %d/%d-->\r\n", ret, totalBytes);
                // TODO: Need to handle event 'QL_EVENT_UART_READY_TO_WRITE'
            }
            uart_flag=FALSE;
          // }
        }
        break;
    }
case EVENT_UART_READY_TO_WRITE:
    break;
default:
    break;
}

}

void proc_main_task(s32 taskId)
{
s32 ret;
ST_MSG msg;

// Register & open UART port
Ql_UART_Register(UART_PORT1, CallBack_UART_Hdlr1, NULL);
Ql_UART_Open(UART_PORT1, 115200, FC_NONE);

APP_DEBUG("<--OpenCPU: mqtt test.-->\r\n");

Ql_Timer_Register(MQTT_TIMER_ID, Callback_Timer, NULL);
Ql_Timer_Start(MQTT_TIMER_ID, MQTT_TIMER_PERIOD, TRUE);

Ql_Timer_Register(TIMEOUT_90S_TIMER_ID, Callback_Timer, NULL);
timeout_90S_monitor = FALSE;

while(TRUE)
{
    Ql_OS_GetMessage(&msg);
    switch(msg.message)
   {
    case MSG_ID_RIL_READY:
        APP_DEBUG("<-- RIL is ready -->\r\n");
        Ql_RIL_Initialize();
        break;
    case MSG_ID_URC_INDICATION:
        switch (msg.param1)
        {
 	       case URC_MQTT_RECV_IND:
             APP_DEBUG("test1\n");
 	         APP_DEBUG("<-- RECV DATA:%s -->\r\n", msg.param2);
 			 Hdlr_Mqtt_Recv_Data(msg.param2);
 		    break;
 	       case URC_MQTT_STAT_IND:
             APP_DEBUG("test2\n");
 	         APP_DEBUG("<-- RECV STAT:%s-->\r\n", msg.param2);
 			 Hdlr_Mqtt_Recv_State(msg.param2);
 		   break;
        }
	    break;
     default:
        break;
    }

}
}

void proc_subtask1(s32 taskId)
{
s32 ret;
ST_MSG msg;
ST_UARTDCB dcb;
Enum_SerialPort mySerialPort = UART_PORT1;

Ql_Debug_Trace("<--OpenCPU: proc_subtask1-->\r\n");

dcb.baudrate = 115200;
dcb.dataBits = DB_8BIT;
dcb.stopBits = SB_ONE;
dcb.parity   = PB_NONE;
dcb.flowCtrl = FC_NONE;
ret = Ql_UART_Register(mySerialPort, Callback_UART_Hdlr, NULL);
if (ret < QL_RET_OK)
{
    Ql_Debug_Trace("<--Ql_UART_Register(mySerialPort=%d)=%d-->\r\n", mySerialPort, ret);
}
ret = Ql_UART_OpenEx(mySerialPort, &dcb);
if (ret < QL_RET_OK)
{
    Ql_Debug_Trace("<--Ql_UART_OpenEx(mySerialPort=%d)=%d-->\r\n", mySerialPort, ret);
}
Ql_UART_ClrRxBuffer(mySerialPort);
APP_DEBUG("proc_subtask1...");
while(TRUE)
{
    Ql_OS_GetMessage(&msg);
    switch(msg.message)
    {
    case MSG_ID_USER_START:
        break;
    default:
        break;
    }
}

}

//src_string=“GPRMC,235945.799,V,0.00,0.00,050180,N” index =1 ·µ»ØTRUE ,dest_string=“235945.799”; index =3£¬·µ»ØFALSE
char QSDK_Get_Str(char *src_string, char *dest_string, unsigned char index)
{
char SentenceCnt = 0;
char ItemSum = 0;
char ItemLen = 0, Idx = 0;
char len = 0;
unsigned int i = 0;

if (src_string ==NULL)
{
    return FALSE;
}
len = Ql_strlen(src_string);
for ( i = 0;  i < len; i++)
{
	if (*(src_string + i) == ',')
	{
        
		ItemLen = i - ItemSum - SentenceCnt;
        
		ItemSum  += ItemLen;
        
        if (index == SentenceCnt )
        {
            
            if (ItemLen == 0)
            {
                return FALSE;
            }
	        else
            {
                
                Ql_memcpy(dest_string, src_string + Idx, ItemLen);
                *(dest_string + ItemLen) = '\0';
                return TRUE;
            }
        }
		SentenceCnt++;
		Idx = i + 1;
	}
}
if (index == SentenceCnt && (len - Idx) != 0)
{
    Ql_memcpy(dest_string, src_string + Idx, len - Idx);
    *(dest_string + len) = '\0';
    return TRUE;
}
else
{
    return FALSE;
}

}

static void Hdlr_Mqtt_Recv_Data(u8* packet)
{
u8 mqtt_message[1024]={0};
packet1=packet;

QSDK_Get_Str(packet,mqtt_message,3);
APP_DEBUG("<-- RECV message:%s–>\r\n",mqtt_message);
Ql_GPIO_Init(PINNAME_RTS, PINDIRECTION_OUT, PINLEVEL_LOW, PINPULLSEL_PULLUP);
Ql_GPIO_SetLevel(PINNAME_RTS, PINLEVEL_LOW);
Ql_Sleep(200);
Ql_GPIO_SetLevel(PINNAME_RTS, PINLEVEL_HIGH);
APP_DEBUG(“LED bliked\n”);

}
static void Hdlr_Mqtt_Recv_State(u8* packet)
{
char strTmp[10];

Ql_memset(strTmp, 0x0, sizeof(strTmp));
QSDK_Get_Str(packet,strTmp,3);
APP_DEBUG("<-- mqtt state = %d-->\r\n",Ql_atoi(strTmp));

}

static void CallBack_UART_Hdlr1(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void* customizedPara)
{

}

static void Callback_Timer(u32 timerId, void* param)
{
s32 ret ;

if (MQTT_TIMER_ID == timerId)
{

switch(m_tcp_state)
{
  case STATE_NW_GET_SIMSTATE:
  {
	  s32 simStat = 0;
      
	  RIL_SIM_GetSimState(&simStat);
	  if (simStat == SIM_STAT_READY)
	  {
		  m_tcp_state = STATE_NW_QUERY_STATE;
		  APP_DEBUG("<--SIM card status is normal!-->\r\n");
	  }else
	  {
		  APP_DEBUG("<--SIM card status is unnormal!-->\r\n");
	  }
	  break;
  }
  case STATE_NW_QUERY_STATE:
  {
	  s32 creg = 0;
	  s32 cgreg = 0;
      
	  ret = RIL_NW_GetGSMState(&creg);
	  ret = RIL_NW_GetGPRSState(&cgreg);
	  APP_DEBUG("<--Network State:creg=%d,cgreg=%d-->\r\n",creg,cgreg);
	  if((cgreg == NW_STAT_REGISTERED)||(cgreg == NW_STAT_REGISTERED_ROAMING))
	  {
		  m_tcp_state = STATE_GPRS_CONTEXT;
	  }
	  break;
  }
  case STATE_GPRS_CONTEXT:
  {
    
    ret = RIL_NW_SetGPRSContext(CONTEXT);
    APP_DEBUG("<-- Set GPRS context, ret=%d -->\r\n", ret);

	if(ret ==0)
	{
      m_tcp_state = STATE_GPRS_ACTIVATE;
    }
	break;
  }

  case STATE_GPRS_ACTIVATE:
  {
    
    ret = RIL_NW_SetAPN(1, m_apn, m_userid, m_passwd);
    APP_DEBUG("<-- Set GPRS APN, ret=%d -->\r\n", ret);

    ret = RIL_NW_OpenPDPContext();
    APP_DEBUG("<-- Open PDP context, ret=%d -->\r\n", ret);
	if(ret ==0)
	{
      m_tcp_state = STATE_MQTT_CFG;
    }
    else
    {
        m_tcp_state = STATE_GPRS_DEACTIVATE;
    }
	break;
  }
  case STATE_MQTT_CFG:
  {
    
    ret =  RIL_MQTT_QMTCFG_Ali( 0,product_key);
    APP_DEBUG("<-- mqtt config, ret=%d -->\r\n", ret);
	if(ret ==0)
	{
      m_tcp_state = STATE_MQTT_OPEN;
    }
	break;
  }

  case STATE_MQTT_OPEN:
  {
    
   ret = RIL_MQTT_QMTOPEN( 0,HOST_NAME, HOST_PORT) ;
    APP_DEBUG("<-- mqtt open, ret=%d -->\r\n",ret);
	if(ret ==0)
	{
      m_tcp_state = STATE_MQTT_CONN;
    }
	else
	{
      m_tcp_state = STATE_MQTT_DISC;
	}
	break;
  }
  case STATE_MQTT_CONN:
  {
    
    char strImei[5];
    Ql_memset(strImei, 0x0, sizeof(strImei));
	int a;
    a=rand();
    //itoa(a,strImei,10);
    Ql_sprintf(strImei,"%d",a);
    // ret = RIL_GetIMEI(strImei);
    APP_DEBUG("<-- IMEI:%s,ret=%d -->\r\n", strImei,ret);

    ret = RIL_MQTT_QMTCONN(0,strImei,NULL,NULL);
    APP_DEBUG("<-- mqtt connect,ret=%d -->\r\n", ret);
	// if(ret ==0)
	// {
  //     m_tcp_state = STATE_MQTT_SUB;
  //   }
  if(ret ==0)
	{
      // m_tcp_state = STATE_MQTT_PUB;//changing for air purifier project
      m_tcp_state = STATE_MQTT_SUB;
    }
    else
    {
	  m_tcp_state = STATE_GPRS_DEACTIVATE;
    }
   break;
  }
  case STATE_MQTT_SUB:
  {
    // APP_DEBUG("the pub_msg1 is %s\n",pub_msg1);
    ret = RIL_MQTT_QMTSUB(0, 1,topic_update1, 1,NULL);
    APP_DEBUG("<-- mqtt subscribe, ret=%d -->\r\n",ret);
	if(ret ==0)
	{
      m_tcp_state = STATE_MQTT_PUB;
    }
	else
    {
	   m_tcp_state = STATE_MQTT_CLOSE;
    }
	break;
  }
  case STATE_MQTT_PUB:
  {

  char data_pub[30]="";
  Ql_memset(data_pub, 0x0, sizeof(data_pub));

int b;
b=rand();
//itoa(a,strImei,10);
Ql_sprintf(data_pub,"%d",b);
// ret = RIL_GetIMEI(strImei);
APP_DEBUG("<-- data_pub:%s -->\r\n", data_pub);
u32 data_pub_length=0;
data_pub_length = Ql_strlen(data_pub);
ret = RIL_MQTT_QMTPUB(0, 1,1,0, topic_update,data_pub,data_pub_length);

//    u32 length2=0;
//    length2 = Ql_strlen(pub_msg4);
//
//
//   //u32 length1=0;
//
//
// //length1=Ql_strlen(test_message);
//     ret = RIL_MQTT_QMTPUB(0, 1,1,0, topic_update,pub_msg4,length2);
    uart_flag=TRUE;
    APP_DEBUG("the value of uart_flag is %d\n",uart_flag);
    //ret = RIL_MQTT_QMTPUB(0, 1,1,0, topic_update,test_message,length1);
    APP_DEBUG("<-- mqtt publish, ret=%d -->\r\n", ret);
	if(ret ==0)
	{
		m_tcp_state = STATE_MQTT_PUB;
  count_mqtt_error =0;
      // m_tcp_state = STATE_MQTT_CLOSE;
    }
	else
    {
	   // m_tcp_state = STATE_MQTT_CLOSE;
   m_tcp_state = STATE_MQTT_PUB;
   count_mqtt_error += 1;
   if (count_mqtt_error >10)
   {
     m_tcp_state = STATE_MQTT_CLOSE;
     count_mqtt_error=0;
   }
    }
	break;
  }
   case STATE_MQTT_CLOSE:
  {
    ret =RIL_MQTT_QMTCLOSE(0);
    APP_DEBUG("<--mqtt CLOSE, ret=%d -->\r\n", ret);
	if(ret ==0)
	{
      m_tcp_state = STATE_GPRS_DEACTIVATE;
    APP_DEBUG("closed the connection1 \n");
    }
    else
    {
        m_tcp_state = STATE_GPRS_DEACTIVATE;
        APP_DEBUG("closed the connection2 \n");
    }
	break;
  }
  case STATE_MQTT_DISC:
  {
    ret =RIL_MQTT_QMTDISC(0);
    APP_DEBUG("<--mqtt disconnect, ret=%d -->\r\n", ret);
    if(ret ==0)
    {
      m_tcp_state = STATE_GPRS_DEACTIVATE;
    }
    else
    {
        m_tcp_state = STATE_MQTT_CLOSE;
    }
    break;
  }
  case STATE_GPRS_DEACTIVATE:
  {
    ret = RIL_NW_ClosePDPContext();
    APP_DEBUG("<-- Set GPRS DEACTIVATE, ret=%d -->\r\n", ret);
	if(ret ==0)
	{
      //m_tcp_state = STATE_TOTAL_NUM;  //edited
        m_tcp_state = STATE_NW_GET_SIMSTATE;
    }
	break;
  }
}

}

}

s32 RIL_MQTT_QMTCFG_Ali( u8 connectID,int* product_key)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTCFG=\"KEEPALIVE\",%d,%d\n",connectID,product_key);

ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
    return ret;
}
return ret;

}

static s32 ATResponse_mqtt_handler(char* line, u32 len, void* userdata)
{
MQTT_Param *mqtt_param = (MQTT_Param )userdata;
char head = Ql_RIL_FindString(line, len, mqtt_param->prefix); //continue wait
if(head)
{
char strTmp[10];
char
p1 = NULL;
char
p2 = NULL;
Ql_memset(strTmp, 0x0, sizeof(strTmp));
p1 = Ql_strstr(head, “:”);
if (p1)
{
QSDK_Get_Str((p1+1),strTmp, mqtt_param->index);
mqtt_param->data= Ql_atoi(strTmp);
}
return RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, “OK”);
if(head)
{
return RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, “ERROR”);
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “+CME ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “+CMS ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait
}

static s32 ATResponse_mqtt_handler_pub(char* line, u32 len, void* userdata)
{
u8 *head = NULL;
u8 uCtrlZ = 0x1A;

MQTT_Param *mqtt_param = (MQTT_Param *)userdata;

head = Ql_RIL_FindString(line, len, "\r\n>");
if(head)
{
	Ql_RIL_WriteDataToCore (mqtt_param->prefix,mqtt_param->length);
    Ql_RIL_WriteDataToCore(&uCtrlZ,1);

	return RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, "ERROR");
if(head)
{
    return  RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, "OK");
if(head)
{
    return  RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, "+CMS ERROR:");//fail
if(head)
{
    return  RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait

}

s32 RIL_MQTT_QMTOPEN( u8 connectID,u8* hostName, u32 port)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTOPEN:";
mqtt_param.index = 1;
mqtt_param.data = 255;

Ql_sprintf(strAT, "AT+QMTOPEN=%d,\"%s\",%d\n", connectID,hostName,port);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
    APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
    return ret;
}
else if(0 != mqtt_param.data)
{
    APP_DEBUG("\r\n<--MQTT OPEN failure, =%d -->\r\n", mqtt_param.data);
    return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

s32 RIL_MQTT_QMTCONN(u8 connectID, u8* clientID,u8* username,u8* password)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTCONN:";
mqtt_param.index = 1;
mqtt_param.data = 255;

if(NULL != username && NULL !=password)
{
  Ql_sprintf(strAT, "AT+QMTCONN=%d,\"%s\",\"%s\",\"%s\"\n", connectID,clientID,username,password);
}
else
{
	Ql_sprintf(strAT, "AT+QMTCONN=%d,\"%s\"\n", connectID,clientID);
}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
    APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
    return ret;
}
else if(0 != mqtt_param.data)
{
    APP_DEBUG("\r\n<--MQTT connect failure, =%d -->\r\n", mqtt_param.data);
    return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

s32 RIL_MQTT_QMTSUB(u8 connectID, u8 msgId,u8* topic, u8 qos,u8* others)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTSUB:";
mqtt_param.index = 2;
mqtt_param.data = 255;

if(NULL == others)
{
  Ql_sprintf(strAT, "AT+QMTSUB=%d,%d,\"%s\",%d\n", connectID,msgId,topic,qos);
}
else
{

}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
    APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
    return ret;
}
else if(0 != mqtt_param.data)
{
    APP_DEBUG("\r\n<--MQTT subscribe failure, =%d -->\r\n", mqtt_param.data);
    return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

s32 RIL_MQTT_QMTPUB(u8 connectID, u8 msgId,u8 qos,u8 retain, u8* topic,u8* data,u8 length)
{
s32 ret = RIL_AT_SUCCESS;
// while(1)
// {
// char strAT[200];
// int count=1;
// Ql_memset(strAT, 0, sizeof(strAT));
// mqtt_param.prefix=data;
// mqtt_param.length= length;

// Ql_sprintf(strAT, "AT+QMTPUB=%d,%d,%d,%d,\"%s\"\n", connectID,msgId,qos,retain,topic);

// ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler_pub,(void* )&mqtt_param,0);
// APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
// if(RIL_AT_SUCCESS != ret)
// {
// APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
// return ret;
// }
// count++;
// }
char strAT[200];
int count=1;
Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix=data;
mqtt_param.length= length;

Ql_sprintf(strAT, "AT+QMTPUB=%d,%d,%d,%d,\"%s\"\n", connectID,msgId,qos,retain,topic);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler_pub,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
    APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
    return ret;
}
return ret;

}

s32 RIL_MQTT_QMTCLOSE(u8 connectID)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTCLOSE=%d\n",connectID);

ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
    return ret;
}
return ret;

}

s32 RIL_MQTT_QMTDISC(u8 connectID)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTDISC=%d\n",connectID);

ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
    return ret;
}
return ret;

}

s32 RIL_MQTT_QMTUNS(u8 connectID, u8 msgId,u8* topic,u8* others)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTUNS:";
mqtt_param.index = 2;
mqtt_param.data = 255;

if(NULL == others)
{
  Ql_sprintf(strAT, "AT+QMTUNS=%d,%d,\"%s\"\n", connectID,msgId,topic);
}
else
{

}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
    APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
    return ret;
}
else if(0 != mqtt_param.data)
{
    APP_DEBUG("\r\n<--MQTT subscribe failure, =%d -->\r\n", mqtt_param.data);
    return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

#endif // EXAMPLE_TCPSSL

1 Like

Dear Sir,

Thanks for contacting Quectel. Since your question is specific to your own situation, the solution might not be referential for other customers, we’d move our discussion off this forum.

Kindly share following details to our support email box( support@quectel.com ), we will align local FAE to support you further.

  • Company name / Phone Number:
  • R&D location:
  • Project name (so we can refer to it in the future):
  • Application type:
  • Estimated Annual Units (for series production):
  • Project timeline:
  • Current status:
  • From which distributor do you buy Quectel modules & EVB kits?
  • Do you have EVB kit for this application?
  • Problem description:

I had a similar problem…
Maybe you lost your network registration after a few minutes. In this case you need to explicitly disable Huawei’s IoT Platform.

“If there is no need for Huawei’s IoT platform, the registration has to be disabled with AT+QREGSWT=2 that will take effect only after rebooting the UE with command AT+NRB, or it will detach the UE from network, which may cause failure of related services (e.g. TCP/UDP).”

I am done MQTT_publish from Getting input from UART_port1…

/*****************************************************************************

  • Copyright Statement:

  • This software is protected by Copyright and the information contained
  • herein is confidential. The software may not be copied and the information
  • contained herein may not be used or disclosed except with the written
  • permission of Quectel Co., Ltd. 2013

/
/

*

  • Filename:

  • example_tcpssl.c
  • Project:

  • OpenCPU
  • Description:

  • This example demonstrates how to use tcp ssl function with APIs in OpenCPU.
  • Usage:

  • Compile & Run:
  • Set "C_PREDEF=-D __EXAMPLE_TCPSSL__" in gcc_makefile file. And compile the
    
  • app using "make clean/new".
    
  • Download image bin to module to run.
    
  • Operation:
  • Author:


*============================================================================

  •         HISTORY
    

*----------------------------------------------------------------------------
*
****************************************************************************/

//#if def EXAMPLE_MQTT
#include “ql_stdlib.h”
#include “ql_common.h”
#include “ql_type.h”
#include “ql_trace.h”
#include “ql_error.h”
#include “ql_uart.h”
#include “ql_gprs.h”
#include “ql_timer.h”
#include “ril_network.h”
#include “ril.h”
#include “ril_util.h”
#include “ril_system.h”

#define DEBUG_ENABLE 1
#if DEBUG_ENABLE > 0
#define DEBUG_PORT UART_PORT1
#define DBG_BUF_LEN 512
static char DBG_BUFFER[DBG_BUF_LEN];
#define APP_DEBUG(FORMAT,…) {
Ql_memset(DBG_BUFFER, 0, DBG_BUF_LEN);
Ql_sprintf(DBG_BUFFER,FORMAT,##VA_ARGS);
if (UART_PORT2 == (DEBUG_PORT))
{
Ql_Debug_Trace(DBG_BUFFER);
} else {
Ql_UART_Write((Enum_SerialPort)(DEBUG_PORT), (u8*)(DBG_BUFFER), Ql_strlen((const char *)(DBG_BUFFER)));
}
}
#else
#define APP_DEBUG(FORMAT,…)
#endif

typedef struct{
u8 index;
u8* prefix;
s32 data;
u32 length;

}MQTT_Param;

/*****************************************************************

  • timer param
    ******************************************************************/
    #define MQTT_TIMER_ID TIMER_ID_USER_START
    #define TIMEOUT_90S_TIMER_ID TIMER_ID_USER_START + 1 //timeout
    #define MQTT_TIMER_PERIOD 1000
    #define TIMEOUT_90S_PERIOD 90000

static s32 timeout_90S_monitor = FALSE;

#define SERIAL_RX_BUFFER_LEN 2048
static Enum_SerialPort m_myUartPort = UART_PORT1;
static u8 m_RxBuf_Uart1[SERIAL_RX_BUFFER_LEN];
/*****************************************************************

  • APN Param
    ******************************************************************/
    static u8 m_apn[MAX_GPRS_APN_LEN] = “airtelgprs.com”;
    static u8 m_userid[MAX_GPRS_USER_NAME_LEN] = “”;
    static u8 m_passwd[MAX_GPRS_PASSWORD_LEN] = “”;

/**********************************************************************************/

#define MAX_MQTT_PARAM_LENGTH 100
#define HOST_NAME “xxxxxx”//“iot-as-mqtt.cn-shanghai.aliyuncs.com
#define HOST_PORT 1883
#define CONTEXT 0
#define CLIENT_ID “ABCD”
#define USER_NAME “xxxxxxxx”
#define PASSWORD “xxxxxxxxxx”

static u8 product_key[MAX_MQTT_PARAM_LENGTH] = “\0”;
static u8 device_name[MAX_MQTT_PARAM_LENGTH] = “TEST1”;
static u8 device_secret[MAX_MQTT_PARAM_LENGTH] = “lN9l09QesoiL4HXw7W9Z1XVJ0JzTJxhs”;

static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “Deccan”;
static u8 topic_error[MAX_MQTT_PARAM_LENGTH] = “/EXAMPLE/TEST1/update/error”;
static u8 topic_get[MAX_MQTT_PARAM_LENGTH] = “/EXAMPLE/TEST1/get”;

static u8 topic_update1[MAX_MQTT_PARAM_LENGTH] = “Deccan_ack”;
static u8 TcpId = 2;
static u8 WillFlag = 0;
static u8 WillQos = 0;
static u8 WillRetain = 0;
static u8 WillTopic[4] = “HI\0”;
static u8 WillMsg[4] = “JH\0”;

/*************************************************************************************/

u8 test_message[2048]="\0";

typedef enum{
STATE_NW_GET_SIMSTATE,
STATE_NW_QUERY_STATE,
STATE_GPRS_CONTEXT,
STATE_GPRS_ACTIVATE,
STATE_MQTT_CFG,
STATE_MQTT_CFGTIMEOUT,
STATE_MQTT_CFGSESSION,
STATE_MQTT_CFGKEEPALIVE,
STATE_MQTT_CFGVERSION,
STATE_MQTT_OPEN,
STATE_MQTT_CONN,
STATE_MQTT_SUB,
STATE_MQTT_PUB,
STATE_MQTT_CLOSE,
STATE_GPRS_DEACTIVATE,
STATE_TOTAL_NUM
}Enum_TCPSTATE;

static u8 m_tcp_state = STATE_NW_GET_SIMSTATE;
static u8 timingofuart = 0;

static MQTT_Param mqtt_param;
/*****************************************************************

  • uart callback function
    ****************************************************************/
    static void CallBack_UART_Hdlr(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void
    customizedPara);
    static void Callback_Timer(u32 timerId, void
    param);

static s32 ATResponse_mqtt_handler(char* line, u32 len, void* userdata);
static s32 ATResponse_mqtt_handlercheck(char* line, u32 len, void* userdata);

static s32 ATResponse_mqtt_handler_pub(char* line, u32 len, void* userdata);
static s32 ATResponse_mqtt_handler_close(char* line, u32 len, void* userdata);
static void Hdlr_Mqtt_Recv_Data(u8* packet);
static void Hdlr_Mqtt_Recv_State(u8* packet);

s32 RIL_MQTT_QMTCFG_Ali( u8 connectID,u8* product_key,u8* device_name,u8* device_secret);
s32 RIL_MQTT_QMTOPEN( u8 connectID,u8* hostName, u32 port );
s32 RIL_MQTT_QMTCONN(u8 connectID, u8* clientID,u8* username,u8* password);
s32 RIL_MQTT_QMTSUB(u8 connectID, u8 msgId,u8* topic, u8 qos,u8* others);
s32 RIL_MQTT_QMTPUB(u8 connectID, u8 msgId,u8 qos,u8 retain, u8* topic,u8* data,u8 length);
s32 RIL_MQTT_QMTDISC(u8 connectID);
s32 RIL_MQTT_QMTCLOSE(u8 connectID);
s32 RIL_MQTT_QMTUNS(u8 connectID, u8 msgId,u8* topic,u8* others);

s32 RIL_MQTT_QMTCFG_Will(u8 connectID,u8 will_fg,u8 will_qos,u8 will_retain,u8* will_topic,u8* will_msg);
s32 RIL_MQTT_QMTCFG(u8* CnfgString,u8 Para1,u8 Para2,u8 Para3);

static u8 uart_out=0;

void proc_main_task(s32 taskId)
{
s32 ret;
ST_MSG msg;

Ql_Timer_Register(MQTT_TIMER_ID, Callback_Timer, NULL);
  Ql_Timer_Start(MQTT_TIMER_ID, MQTT_TIMER_PERIOD, TRUE);

  Ql_Timer_Register(TIMEOUT_90S_TIMER_ID, Callback_Timer, NULL);

// Register & open UART port
Ql_UART_Register(UART_PORT1, CallBack_UART_Hdlr, NULL);
Ql_UART_Open(UART_PORT1, 115200, FC_NONE);

APP_DEBUG("<--OpenCPU: mqtt test.-->\r\n");


timeout_90S_monitor = FALSE;

while(TRUE)
{
    Ql_OS_GetMessage(&msg);
    switch(msg.message)
   {
    case MSG_ID_RIL_READY:
        APP_DEBUG("<-- RIL is ready -->\r\n");
        Ql_RIL_Initialize();
        break;

case MSG_ID_URC_INDICATION:
switch (msg.param1)
{
case URC_MQTT_RECV_IND:
APP_DEBUG("<-- RECV DATA:%s -->\r\n", msg.param2);
Hdlr_Mqtt_Recv_Data(msg.param2);
break;
case URC_MQTT_STAT_IND:
APP_DEBUG("<-- RECV STAT:%s–>\r\n", msg.param2);
Hdlr_Mqtt_Recv_State(msg.param2);
break;
}
break;
default:
break;
}
}
Ql_UART_ClrRxBuffer(m_myUartPort);
Ql_UART_ClrTxBuffer(m_myUartPort);
}

//src_string=“GPRMC,235945.799,V,0.00,0.00,050180,N” index =1 ·µ»ØTRUE ,dest_string=“235945.799”; index =3£¬·µ»ØFALSE
char QSDK_Get_Str(char *src_string, char *dest_string, unsigned char index)
{
char SentenceCnt = 0;
char ItemSum = 0;
char ItemLen = 0, Idx = 0;
char len = 0;
unsigned int i = 0;

if (src_string ==NULL)
{
    return FALSE;
}
len = Ql_strlen(src_string);

for ( i = 0; i < len; i++)
{
if (*(src_string + i) == ‘,’)
{
ItemLen = i - ItemSum - SentenceCnt;
ItemSum += ItemLen;
if (index == SentenceCnt )
{
if (ItemLen == 0)
{
return FALSE;
}
else
{
Ql_memcpy(dest_string, src_string + Idx, ItemLen);
*(dest_string + ItemLen) = ‘\0’;
return TRUE;
}
}
SentenceCnt++;
Idx = i + 1;
}
}
if (index == SentenceCnt && (len - Idx) != 0)
{
Ql_memcpy(dest_string, src_string + Idx, len - Idx);
*(dest_string + len) = ‘\0’;
return TRUE;
}
else
{
return FALSE;
}
}

static void Hdlr_Mqtt_Recv_Data(u8* packet)
{
u8 mqtt_message[1024]={0};
QSDK_Get_Str(packet,mqtt_message,3);
APP_DEBUG("<-- RECV message:%s–>\r\n",mqtt_message);

}
static void Hdlr_Mqtt_Recv_State(u8* packet)
{
char strTmp[10];

Ql_memset(strTmp, 0x0, sizeof(strTmp));

QSDK_Get_Str(packet,strTmp,3);
APP_DEBUG("<-- mqtt state = %d–>\r\n",Ql_atoi(strTmp));
}

static s32 ReadSerialPort(Enum_SerialPort port, /[out]/u8* pBuffer, /[in]/u32 bufLen)
{
s32 rdLen = 0;
s32 rdTotalLen = 0;
if (NULL == pBuffer || 0 == bufLen)
{
return -1;
}
Ql_memset(pBuffer, 0x0, bufLen);
// test_message = pBuffer;
while (1)
{
rdLen = Ql_UART_Read(port, pBuffer + rdTotalLen, bufLen - rdTotalLen);
// APP_DEBUG(“hi to read from port[%d]\r\n”, pBuffer);
if (rdLen <= 0) // All data is read out, or Serial Port Error!
{
APP_DEBUG(“hi to read from port[%d]\r\n”, pBuffer);
//proc_main_task(1);
break;
}
rdTotalLen += rdLen;
// Continue to read…
}
if (rdLen < 0) // Serial Port Error!
{
APP_DEBUG(“Fail to read from port[%d]\r\n”, port);
return -99;
}
return rdTotalLen;
}

static void CallBack_UART_Hdlr(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void* customizedPara)
{
APP_DEBUG(“CallBack_UART_Hdlr: port=%d, event=%d, level=%d, p=%x\r\n”, port, msg, level, customizedPara);
// test_message[1] = “h”;
uart_out++;
switch (msg)
{
case EVENT_UART_READY_TO_READ:
{
if (m_myUartPort == port)
{
s32 totalBytes = ReadSerialPort(port, m_RxBuf_Uart1, sizeof(m_RxBuf_Uart1));
if (totalBytes <= 0)
{
APP_DEBUG("<-- No data in UART buffer! -->\r\n");
return;
}
else
{
u8 index;
for( index=0;index<totalBytes;index++)
test_message[index]=m_RxBuf_Uart1[index];
u32 length1 = Ql_strlen(test_message);
test_message[index]=’\0’;
RIL_MQTT_QMTPUB(TcpId,0,0,0, topic_update,test_message,length1);
//Ql_UART_RX_Disable(m_myUartPort);
//Ql_UART_RX_Enable(m_myUartPort);
}
{// Read data from UART
s32 ret;
char* pCh = NULL;

                // Echo
                Ql_UART_Write(m_myUartPort, m_RxBuf_Uart1, totalBytes);
                  APP_DEBUG("<-- No data in UART buffer = %d! -->\r\n",m_RxBuf_Uart1);
                //  return;


                pCh = Ql_strstr((char*)m_RxBuf_Uart1, "\r\n");
                if (pCh)
                {
                    *(pCh + 0) = '\0';
                    *(pCh + 1) = '\0';
                }

                // No permission for single <cr><lf>
                if (Ql_strlen((char*)m_RxBuf_Uart1) == 0)
                {
                    return;
                }
                //ret = Ql_RIL_SendATCmd((char*)m_RxBuf_Uart1, totalBytes, ATResponse_Handler, NULL, 0);
            }
        }
        //Ql_UART_Close(UART_PORT1);
       // Ql_UART_Open(UART_PORT1, 115200, FC_NONE);
        break;
    }


case EVENT_UART_READY_TO_WRITE:
    break;
default:
    break;
}

}

static void Callback_Timer(u32 timerId, void* param)
{
s32 ret ;
if (MQTT_TIMER_ID == timerId)
{
switch(m_tcp_state)
{
case STATE_NW_GET_SIMSTATE:
{
s32 simStat = 0;
RIL_SIM_GetSimState(&simStat);
if (simStat == SIM_STAT_READY)
{
m_tcp_state = STATE_NW_QUERY_STATE;
APP_DEBUG("<–SIM card status is normal!–>\r\n");
}else
{
APP_DEBUG("<–SIM card status is unnormal!–>\r\n");
}
break;
}
case STATE_NW_QUERY_STATE:
{
s32 creg = 0;
s32 cgreg = 0;
ret = RIL_NW_GetGSMState(&creg);
ret = RIL_NW_GetGPRSState(&cgreg);
APP_DEBUG("<–Network State:creg=%d,cgreg=%d–>\r\n",creg,cgreg);
if((cgreg == NW_STAT_REGISTERED)||(cgreg == NW_STAT_REGISTERED_ROAMING))
{
m_tcp_state = STATE_GPRS_CONTEXT;
}
break;
}
case STATE_GPRS_CONTEXT:
{
ret = RIL_NW_SetGPRSContext(CONTEXT);
APP_DEBUG("<-- Set GPRS context, ret=%d -->\r\n", ret);

if(ret ==0)
{
m_tcp_state = STATE_GPRS_ACTIVATE;
}
break;
}

case STATE_GPRS_ACTIVATE:
{
ret = RIL_NW_SetAPN(1, m_apn, m_userid, m_passwd);
APP_DEBUG("<-- Set GPRS APN, ret=%d -->\r\n", ret);

    ret = RIL_NW_OpenPDPContext();
    APP_DEBUG("<-- Open PDP context, ret=%d -->\r\n", ret);

if(ret ==0)
{
m_tcp_state = STATE_MQTT_CFG;
}
break;
}
case STATE_MQTT_CFG:
{
ret = RIL_MQTT_QMTCFG_Will( TcpId,WillFlag,WillQos, WillRetain,WillTopic,WillMsg);
APP_DEBUG("<-- mqtt config, ret=%d -->\r\n", ret);
if(ret ==RIL_AT_SUCCESS)
{
m_tcp_state = STATE_MQTT_CFGTIMEOUT;
}
break;
}
case STATE_MQTT_CFGTIMEOUT:
{
ret = RIL_MQTT_QMTCFG( “TIMEOUT”,TcpId,10,5);
APP_DEBUG("<-- mqtt TIMEOUT, ret=%d -->\r\n", ret);
if(ret ==RIL_AT_SUCCESS)
{
m_tcp_state = STATE_MQTT_CFGSESSION;
}

break;
}

case STATE_MQTT_CFGSESSION:
{
ret = RIL_MQTT_QMTCFG( “SESSION”,TcpId,0,NULL);
APP_DEBUG("<-- mqtt SESSION, ret=%d -->\r\n", ret);
if(ret ==RIL_AT_SUCCESS)
{
m_tcp_state = STATE_MQTT_CFGKEEPALIVE;
}

break;
}

case STATE_MQTT_CFGKEEPALIVE:
{
ret = RIL_MQTT_QMTCFG( “KEEPALIVE”,TcpId,120,NULL);
APP_DEBUG("<-- mqtt KEEPALIVE, ret=%d -->\r\n", ret);
if(ret ==RIL_AT_SUCCESS)
{
m_tcp_state = STATE_MQTT_CFGVERSION;
}

break;
}
case STATE_MQTT_CFGVERSION:
{
ret = RIL_MQTT_QMTCFG( “VERSION”,TcpId,1,NULL);
APP_DEBUG("<-- mqtt VERSION, ret=%d -->\r\n", ret);
if(ret ==RIL_AT_SUCCESS)
{
m_tcp_state = STATE_MQTT_OPEN;
}

 break;

}

  case STATE_MQTT_OPEN:

{
ret = RIL_MQTT_QMTOPEN(TcpId,HOST_NAME,HOST_PORT) ;
APP_DEBUG("<-- mqtt open, ret=%d -->\r\n",ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_CONN;
}
else
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
}
break;
}

case STATE_MQTT_CONN:
{
char strImei[30];
Ql_memset(strImei, 0x0, sizeof(strImei));

ret = RIL_GetIMEI(strImei);
//ret = Ql_RIL_SendATCmd(“AT+QMTOPEN?”,Ql_strlen(“AT+QMTOPEN?”),NULL,NULL,0);
APP_DEBUG("<-- IMEI:%s,ret=%d -->\r\n", strImei,ret);

    ret = RIL_MQTT_QMTCONN(TcpId, CLIENT_ID, USER_NAME ,PASSWORD);
    APP_DEBUG("<-- mqtt connect,ret=%d -->\r\n", ret);

if(ret ==0)
{
m_tcp_state = STATE_MQTT_SUB;
}
else
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
}

break;
}
case STATE_MQTT_SUB:
{
ret = RIL_MQTT_QMTSUB(TcpId,1,topic_update1, 0,NULL);
APP_DEBUG("<-- mqtt subscribe, ret=%d -->\r\n",ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_PUB;
}
else
{
m_tcp_state = STATE_MQTT_CLOSE;
}
break;
}
case STATE_MQTT_PUB:
{
u32 length =0;
//Ql_UART_Register(UART_PORT1, CallBack_UART_Hdlr, NULL);
// if (m_myUartPort == port)
u8 previous_msg[2048];//=test_message;
length = Ql_strlen(test_message);
u8 index1;
for(index1=0;index1<length;index1++)
previous_msg[index1]=test_message[index1];
previous_msg[index1]=’\0’;
if(previous_msg != test_message)
ret = RIL_MQTT_QMTPUB(TcpId,0,0,0, topic_update,test_message,length);
APP_DEBUG("<-- mqtt publish, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_TOTAL_NUM;
//m_tcp_state = STATE_MQTT_CLOSE;
}
else
{
m_tcp_state = STATE_MQTT_CLOSE;
}
break;
}
case STATE_MQTT_CLOSE:
{
ret =RIL_MQTT_QMTCLOSE(TcpId);
APP_DEBUG("<–mqtt CLOSE, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
}
break;
}
case STATE_GPRS_DEACTIVATE:
{
ret = RIL_NW_ClosePDPContext();
APP_DEBUG("<-- Set GPRS DEACTIVATE, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_TOTAL_NUM;
}
break;
}
}
}

}

s32 RIL_MQTT_QMTCFG_Ali( u8 connectID,u8* product_key,u8* device_name,u8* device_secret)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTCFG=\"ALIAUTH\",%d,\"%s\",\"%s\",\"%s\"\n",connectID,product_key,device_name,device_secret);

Ql_sprintf(strAT, "AT+QMTCFG=\"WILL\",%d,\"%s\",\"%s\",\"%s\"\n",connectID,product_key,device_name,device_secret);

ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
    return ret;
}
return ret;

}

s32 RIL_MQTT_QMTCFG_Will(u8 connectID,u8 will_fg,u8 will_qos,u8 will_retain,u8* will_topic,u8* will_msg)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));

// Ql_sprintf(strAT, “AT+QMTCFG=“ALIAUTH”,%d,”%s","%s","%s"\n",connectID,product_key,device_name,device_secret);

Ql_sprintf(strAT, "AT+QMTCFG=\"WILL\",%d,%d,%d,%d,\"%s\",\"%s\"\n",connectID,will_fg,will_qos,will_retain,will_topic,will_msg);

ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
    return ret;
}
return ret;

}

s32 RIL_MQTT_QMTCFG(u8* CnfgString,u8 Para1,u8 Para2,u8 Para3)
{
s32 ret = RIL_AT_SUCCESS;
s32 *p = NULL;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));

// Ql_sprintf(strAT, “AT+QMTCFG=“ALIAUTH”,%d,”%s","%s","%s"\n",connectID,product_key,device_name,device_secret);
p = Ql_strstr(CnfgString,“TIMEOUT”);
if§
{
Ql_sprintf(strAT, “AT+QMTCFG=”%s",%d,%d,%d\n",CnfgString,Para1,Para2,Para3);
}
else
Ql_sprintf(strAT, “AT+QMTCFG=”%s",%d,%d\n",CnfgString,Para1,Para2);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
return ret;
}
return ret;
}

static s32 ATResponse_mqtt_handler(char* line, u32 len, void* userdata)
{
MQTT_Param *mqtt_param = (MQTT_Param )userdata;
char head = Ql_RIL_FindString(line, len, mqtt_param->prefix); //continue wait
if(head)
{
char strTmp[10];
char
p1 = NULL;
char
p2 = NULL;
Ql_memset(strTmp, 0x0, sizeof(strTmp));
p1 = Ql_strstr(head, “:”);
if (p1)
{
QSDK_Get_Str((p1+1),strTmp, mqtt_param->index);
mqtt_param->data= Ql_atoi(strTmp);
}
return RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, “OK”);
if(head)
{
return RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, “ERROR”);
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “+CME ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “+CMS ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait
}

static s32 ATResponse_mqtt_handlercheck(char* line, u32 len, void* userdata)
{
MQTT_Param mqtt_param =NULL;
char head = Ql_RIL_FindString(line, len, “+QMTOPEN:”); //continue wait
if(head)
{
char strTmp[10];
char
p1 = NULL;
char
p2 = NULL;
Ql_memset(strTmp, 0x0, sizeof(strTmp));
p1 = Ql_strstr(head, “+QMTOPEN:”);
p2 = Ql_strstr(head, “\r\n”);
/if (p1)
{
QSDK_Get_Str((p1+1),strTmp, mqtt_param->index);
mqtt_param->data= Ql_atoi(strTmp);
}
/

        APP_DEBUG("<--MQTT OPEN %s\r\n",p2-p1);
    return  RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, "OK");
if(head)
{
    return  RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, "ERROR");
if(head)
{
    return  RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, "+CME ERROR:");//fail
if(head)
{
    return  RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, "+CMS ERROR:");//fail
if(head)
{
    return  RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait

}

static s32 ATResponse_mqtt_handler_pub(char* line, u32 len, void* userdata)
{
u8 *head = NULL;
u8 uCtrlZ = 0x1A;

MQTT_Param *mqtt_param = (MQTT_Param *)userdata;

head = Ql_RIL_FindString(line, len, “\r\n>”);
if(head)
{
Ql_RIL_WriteDataToCore (mqtt_param->prefix,mqtt_param->length);
Ql_RIL_WriteDataToCore(&uCtrlZ,1);

return RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, “ERROR”);
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “OK”);
if(head)
{
return RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, “+CMS ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait
}

s32 RIL_MQTT_QMTOPEN( u8 connectID,u8* hostName, u32 port)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));

mqtt_param.prefix="+QMTOPEN:";
mqtt_param.index = 1;
mqtt_param.data = 255;

Ql_sprintf(strAT, "AT+QMTOPEN=%d,\"%s%s\",%d\n", connectID,product_key,hostName,port);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
    APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
    return ret;
}
else if(0 != mqtt_param.data)
{
    APP_DEBUG("\r\n<--MQTT OPEN failure, =%d -->\r\n", mqtt_param.data);
    return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

s32 RIL_MQTT_QMTCONN(u8 connectID, u8* clientID,u8* username,u8* password)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTCONN:";

mqtt_param.index = 1;
mqtt_param.data = 255;

if(NULL != username && NULL !=password)
{
Ql_sprintf(strAT, “AT+QMTCONN=%d,”%s","%s","%s"\n", connectID,clientID,username,password);
}
else
{
Ql_sprintf(strAT, “AT+QMTCONN=%d,”%s"\n", connectID,clientID);
}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<–MQTT connect failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

s32 RIL_MQTT_QMTSUB(u8 connectID, u8 msgId,u8* topic, u8 qos,u8* others)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTSUB:";

mqtt_param.index = 2;
mqtt_param.data = 255;

if(NULL == others)
{
Ql_sprintf(strAT, “AT+QMTSUB=%d,%d,”%s",%d\n", connectID,msgId,topic,qos);
}
else
{

}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<–MQTT subscribe failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

s32 RIL_MQTT_QMTPUB(u8 connectID, u8 msgId,u8 qos,u8 retain, u8* topic,u8* data,u8 length)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix=data;

mqtt_param.length= length;
Ql_sprintf(strAT, “AT+QMTPUB=%d,%d,%d,%d,”%s"\n", connectID,msgId,qos,retain,topic);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler_pub,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
return ret;
}

s32 RIL_MQTT_QMTCLOSE(u8 connectID)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTCLOSE=%d\n",connectID);

ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
    return ret;
}
return ret;

}

s32 RIL_MQTT_QMTUNS(u8 connectID, u8 msgId,u8* topic,u8* others)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];

Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTUNS:";

mqtt_param.index = 2;
mqtt_param.data = 255;

if(NULL == others)
{
Ql_sprintf(strAT, “AT+QMTUNS=%d,%d,”%s"\n", connectID,msgId,topic);
}
else
{

}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<–MQTT subscribe failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}

return ret;

}

//#endif // EXAMPLE_TCPSSL

I need MQTT_Subscribe code…
Still I am not getting Give some_idea…

You have already written mqtt subscribe code

I didn’t get Subscriber message…from this code.

Hi guys,
i have completed mqtt publish and subscribe code, if you explain your issues may be i can help you for your next steps

Hell @kannannatesh
I am publishing data @every 20 sec. My server sends acknowledge to every published message .But I am not able to receive that response. After successfull mqtt connection,I subscribed to the topic, but no data is coming on that topic.
Please suggest me what should be the issue.
Thank you in advance!

Hi @Sana_Shaikh
if you can share your code then i will help to point out the mistakes

hi

I am using EC200u could you pls help me with this