Hi,
I am trying MQTT protocol on MC60c with open CPU. I have written code such that it should be continuously publishing the data and if it fails to send for a few times it has to restart the connection. The issue here is it is not able to publish the data after a certain amount of time, and sometimes the subscribe messages are not received. I have worked on other controllers with MQTT protocol by continuously publishing the data but it never failed to send the data. Even I have tried on the SIMCOM module to publish the data it worked good. Can I get a solution how to resolve this? I am attaching my code below, please let me know if any modifications with my code so that it would work effectively as expected.
#ifdef MQTT_A
#include “ql_stdlib.h”
#include “ql_common.h”
#include “ql_type.h”
#include “ql_trace.h”
#include “ql_error.h”
#include “ql_uart.h”
#include “ql_gprs.h”
#include “ql_timer.h”
#include “ql_system.h”
#include “ril_network.h”
#include “ril.h”
#include “ril_util.h”
#include “ril_system.h”
#include<stdlib.h>
#include “ql_gpio.h”
#define DEBUG_ENABLE 1
#if DEBUG_ENABLE > 0
#define DEBUG_PORT UART_PORT1
#define DBG_BUF_LEN 512
static char DBG_BUFFER[DBG_BUF_LEN];
#define APP_DEBUG(FORMAT,…) {
Ql_memset(DBG_BUFFER, 0, DBG_BUF_LEN);
Ql_sprintf(DBG_BUFFER,FORMAT,##VA_ARGS);
if (UART_PORT2 == (DEBUG_PORT))
{
Ql_Debug_Trace(DBG_BUFFER);
} else {
Ql_UART_Write((Enum_SerialPort)(DEBUG_PORT), (u8*)(DBG_BUFFER), Ql_strlen((const char *)(DBG_BUFFER)));
}
}
#else
#define APP_DEBUG(FORMAT,…)
#endif
#define QL_RET_ERR_RIL_MQTT_FAIL -1
typedef struct{
u8 index;
u8* prefix;
s32 data;
u32 length;
}MQTT_Param;
/*****************************************************************
- timer param
******************************************************************/
#define MQTT_TIMER_ID TIMER_ID_USER_START
#define TIMEOUT_90S_TIMER_ID TIMER_ID_USER_START + 1 //timeout
#define MQTT_TIMER_PERIOD 2000 //changing from 5 to 2 sec
#define TIMEOUT_90S_PERIOD 90000
static s32 timeout_90S_monitor = FALSE;
/*****************************************************************
- APN Param
******************************************************************/
static u8 m_apn[MAX_GPRS_APN_LEN] = “cmnet”;
static u8 m_userid[MAX_GPRS_USER_NAME_LEN] = “”;
static u8 m_passwd[MAX_GPRS_PASSWORD_LEN] = “”;
/**********************************************************************************/
#define MAX_MQTT_PARAM_LENGTH 100
#define HOST_NAME “broker.uniolabs.in”
#define HOST_PORT 1883
#define CONTEXT 0
int product_key=20; //chnaged to 20 from 120
//static u8 product_key[MAX_MQTT_PARAM_LENGTH] = “120”;
//static u8 device_name[MAX_MQTT_PARAM_LENGTH] = “TEST1”;
//static u8 device_secret[MAX_MQTT_PARAM_LENGTH] = “lN9l09QesoiL4HXw7W9Z1XVJ0JzTJxhs”;
//static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “brate”;
// static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “/hDTjzV2bVeR/TEST1/update”;
static u8 topic_update[MAX_MQTT_PARAM_LENGTH] = “29/raw”;
static u8 topic_update1[MAX_MQTT_PARAM_LENGTH] = “srate”;
static u8 topic_error[MAX_MQTT_PARAM_LENGTH] = “/hDTjzV2bVeR/TEST1/update/error”;
static u8 topic_get[MAX_MQTT_PARAM_LENGTH] = “/hDTjzV2bVeR/TEST1/get”;
/*************************************************************************************/
int count_mqtt_error=0;
u8 test_message[50] = “allan\0”;
char pub_msg[15]="";
char pub_msg1[15]="";
u8 dest_msg[1024]={0};
u8 packet1;
bool uart_flag=TRUE;
typedef enum{
STATE_NW_GET_SIMSTATE,
STATE_NW_QUERY_STATE,
STATE_GPRS_CONTEXT,
STATE_GPRS_ACTIVATE,
STATE_MQTT_CFG,
STATE_MQTT_OPEN,
STATE_MQTT_CONN,
STATE_MQTT_SUB,
STATE_MQTT_PUB,
STATE_MQTT_CLOSE,
STATE_MQTT_DISC,
STATE_GPRS_DEACTIVATE,
STATE_TOTAL_NUM
}Enum_TCPSTATE;
static u8 m_tcp_state = STATE_NW_GET_SIMSTATE;
static MQTT_Param mqtt_param;
/*****************************************************************
- uart callback function
***************************************************************/
static void CallBack_UART_Hdlr(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void customizedPara);
static void CallBack_UART_Hdlr1(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void customizedPara);
static void Callback_Timer(u32 timerId, void param);
static s32 ATResponse_mqtt_handler(char* line, u32 len, void* userdata);
static s32 ATResponse_mqtt_handler_pub(char* line, u32 len, void* userdata);
static s32 ATResponse_mqtt_handler_close(char* line, u32 len, void* userdata);
static void Hdlr_Mqtt_Recv_Data(u8* packet);
static void Hdlr_Mqtt_Recv_State(u8* packet);
s32 RIL_MQTT_QMTCFG_Ali( u8 connectID,int* product_key);
s32 RIL_MQTT_QMTOPEN( u8 connectID,u8* hostName, u32 port );
s32 RIL_MQTT_QMTCONN(u8 connectID, u8* clientID,u8* username,u8* password);
s32 RIL_MQTT_QMTSUB(u8 connectID, u8 msgId,u8* topic, u8 qos,u8* others);
s32 RIL_MQTT_QMTPUB(u8 connectID, u8 msgId,u8 qos,u8 retain, u8* topic,u8* data,u8 length);
s32 RIL_MQTT_QMTDISC(u8 connectID);
s32 RIL_MQTT_QMTCLOSE(u8 connectID);
s32 RIL_MQTT_QMTUNS(u8 connectID, u8 msgId,u8* topic,u8* others);
#define SERIAL_RX_BUFFER_LEN 2048
static u8 m_RxBuf_VirtualPort1[SERIAL_RX_BUFFER_LEN];
static u8 m_RxBuf_UartPort1[SERIAL_RX_BUFFER_LEN];
static u8 m_RxBuf_UartPort2[SERIAL_RX_BUFFER_LEN];
static u8 m_RxBuf_UartPort3[SERIAL_RX_BUFFER_LEN];
u8* pub_msg3;
u8* pub_msg2;
char pub_msg4[50];
static s32 ReadSerialPort(Enum_SerialPort port, /[out]/u8* pBuffer, /[in]/u32 bufLen)
{
s32 rdLen = 0;
s32 rdTotalLen = 0;
if (NULL == pBuffer || 0 == bufLen)
{
return -1;
}
Ql_memset(pBuffer, 0x0, bufLen);
while (1)
{
rdLen = Ql_UART_Read(port, pBuffer + rdTotalLen, bufLen - rdTotalLen);
if (rdLen <= 0) // All data is read out, or Serial Port Error!
{
break;
}
rdTotalLen += rdLen;
// Continue to read…
}
if (rdLen < 0) // Serial Port Error!
{
APP_DEBUG("<–Fail to read from port[%d]–>\r\n", port);
return -99;
}
return rdTotalLen;
}
static void Callback_UART_Hdlr(Enum_SerialPort port, Enum_UARTEventType evt, bool level, void* customizedPara)
{
s32 ret;
//APP_DEBUG("<–CallBack_UART_Hdlr: port=%d, event=%d, level=%d, p=%x–>\r\n", port, evt, level, customizedPara);
switch (evt)
{
case EVENT_UART_READY_TO_READ:
{
// if (VIRTUAL_PORT1 == port || VIRTUAL_PORT2 == port)
// {// Print URC through uart1 port
// s32 totalBytes = ReadSerialPort(port, m_RxBuf_VirtualPort1, sizeof(m_RxBuf_VirtualPort1));
// APP_DEBUG(“the data1 is %s\n”,m_RxBuf_VirtualPort1);
// s32 ret = Ql_UART_Write(UART_PORT1, m_RxBuf_VirtualPort1, totalBytes);
// if (ret < totalBytes)
// {
// APP_DEBUG("<–Only part of bytes are written, %d/%d–>\r\n", ret, totalBytes);
// // TODO: Need to handle event ‘QL_EVENT_UART_READY_TO_WRITE’
// }
// }
if (UART_PORT1 == port/* || UART_PORT2 == port || UART_PORT3 == port*/)
{// Reflect the data
APP_DEBUG(“the value of uart_flag is %d\n”,uart_flag);
// if (uart_flag==TRUE)
// {
//Ql_Sleep(3000);
u8* arrRxBuf[] = {m_RxBuf_UartPort1/*, m_RxBuf_UartPort2, m_RxBuf_UartPort3*/};
s32 totalBytes = ReadSerialPort(port, arrRxBuf[port - UART_PORT1], SERIAL_RX_BUFFER_LEN);
pub_msg2=0;
pub_msg2 =arrRxBuf[port - UART_PORT1];
pub_msg3=0;
pub_msg3=pub_msg2;
pub_msg4[0]=0;
Ql_memset(pub_msg4, 0x0, sizeof(pub_msg3));
int i;
for(i=0;i<Ql_strlen(pub_msg3);i++)
{
pub_msg4[i]=pub_msg3[i];
}
pub_msg4[i]='\0';
APP_DEBUG("the pub_msg4 is %s\n",pub_msg4);
// for(int i=0;i<13;i++)
// {
// pub_msg[i]=arrRxBuf[port - UART_PORT1];
// }
// pub_msg[SERIAL_RX_BUFFER_LEN]=arrRxBuf[port - UART_PORT1];
// APP_DEBUG("pub_msg is %s\n",pub_msg);
//pub_msg1[0]=pub_msg2; //commenting out for checking while testing
s32 ret = Ql_UART_Write(port, arrRxBuf[port - UART_PORT1], totalBytes);
if (ret < totalBytes)
{
APP_DEBUG("<--Only part of bytes are written, %d/%d-->\r\n", ret, totalBytes);
// TODO: Need to handle event 'QL_EVENT_UART_READY_TO_WRITE'
}
uart_flag=FALSE;
// }
}
break;
}
case EVENT_UART_READY_TO_WRITE:
break;
default:
break;
}
}
void proc_main_task(s32 taskId)
{
s32 ret;
ST_MSG msg;
// Register & open UART port
Ql_UART_Register(UART_PORT1, CallBack_UART_Hdlr1, NULL);
Ql_UART_Open(UART_PORT1, 115200, FC_NONE);
APP_DEBUG("<--OpenCPU: mqtt test.-->\r\n");
Ql_Timer_Register(MQTT_TIMER_ID, Callback_Timer, NULL);
Ql_Timer_Start(MQTT_TIMER_ID, MQTT_TIMER_PERIOD, TRUE);
Ql_Timer_Register(TIMEOUT_90S_TIMER_ID, Callback_Timer, NULL);
timeout_90S_monitor = FALSE;
while(TRUE)
{
Ql_OS_GetMessage(&msg);
switch(msg.message)
{
case MSG_ID_RIL_READY:
APP_DEBUG("<-- RIL is ready -->\r\n");
Ql_RIL_Initialize();
break;
case MSG_ID_URC_INDICATION:
switch (msg.param1)
{
case URC_MQTT_RECV_IND:
APP_DEBUG("test1\n");
APP_DEBUG("<-- RECV DATA:%s -->\r\n", msg.param2);
Hdlr_Mqtt_Recv_Data(msg.param2);
break;
case URC_MQTT_STAT_IND:
APP_DEBUG("test2\n");
APP_DEBUG("<-- RECV STAT:%s-->\r\n", msg.param2);
Hdlr_Mqtt_Recv_State(msg.param2);
break;
}
break;
default:
break;
}
}
}
void proc_subtask1(s32 taskId)
{
s32 ret;
ST_MSG msg;
ST_UARTDCB dcb;
Enum_SerialPort mySerialPort = UART_PORT1;
Ql_Debug_Trace("<--OpenCPU: proc_subtask1-->\r\n");
dcb.baudrate = 115200;
dcb.dataBits = DB_8BIT;
dcb.stopBits = SB_ONE;
dcb.parity = PB_NONE;
dcb.flowCtrl = FC_NONE;
ret = Ql_UART_Register(mySerialPort, Callback_UART_Hdlr, NULL);
if (ret < QL_RET_OK)
{
Ql_Debug_Trace("<--Ql_UART_Register(mySerialPort=%d)=%d-->\r\n", mySerialPort, ret);
}
ret = Ql_UART_OpenEx(mySerialPort, &dcb);
if (ret < QL_RET_OK)
{
Ql_Debug_Trace("<--Ql_UART_OpenEx(mySerialPort=%d)=%d-->\r\n", mySerialPort, ret);
}
Ql_UART_ClrRxBuffer(mySerialPort);
APP_DEBUG("proc_subtask1...");
while(TRUE)
{
Ql_OS_GetMessage(&msg);
switch(msg.message)
{
case MSG_ID_USER_START:
break;
default:
break;
}
}
}
//src_string=“GPRMC,235945.799,V,0.00,0.00,050180,N” index =1 ·µ»ØTRUE ,dest_string=“235945.799”; index =3£¬·µ»ØFALSE
char QSDK_Get_Str(char *src_string, char *dest_string, unsigned char index)
{
char SentenceCnt = 0;
char ItemSum = 0;
char ItemLen = 0, Idx = 0;
char len = 0;
unsigned int i = 0;
if (src_string ==NULL)
{
return FALSE;
}
len = Ql_strlen(src_string);
for ( i = 0; i < len; i++)
{
if (*(src_string + i) == ',')
{
ItemLen = i - ItemSum - SentenceCnt;
ItemSum += ItemLen;
if (index == SentenceCnt )
{
if (ItemLen == 0)
{
return FALSE;
}
else
{
Ql_memcpy(dest_string, src_string + Idx, ItemLen);
*(dest_string + ItemLen) = '\0';
return TRUE;
}
}
SentenceCnt++;
Idx = i + 1;
}
}
if (index == SentenceCnt && (len - Idx) != 0)
{
Ql_memcpy(dest_string, src_string + Idx, len - Idx);
*(dest_string + len) = '\0';
return TRUE;
}
else
{
return FALSE;
}
}
static void Hdlr_Mqtt_Recv_Data(u8* packet)
{
u8 mqtt_message[1024]={0};
packet1=packet;
QSDK_Get_Str(packet,mqtt_message,3);
APP_DEBUG("<-- RECV message:%s–>\r\n",mqtt_message);
Ql_GPIO_Init(PINNAME_RTS, PINDIRECTION_OUT, PINLEVEL_LOW, PINPULLSEL_PULLUP);
Ql_GPIO_SetLevel(PINNAME_RTS, PINLEVEL_LOW);
Ql_Sleep(200);
Ql_GPIO_SetLevel(PINNAME_RTS, PINLEVEL_HIGH);
APP_DEBUG(“LED bliked\n”);
}
static void Hdlr_Mqtt_Recv_State(u8* packet)
{
char strTmp[10];
Ql_memset(strTmp, 0x0, sizeof(strTmp));
QSDK_Get_Str(packet,strTmp,3);
APP_DEBUG("<-- mqtt state = %d-->\r\n",Ql_atoi(strTmp));
}
static void CallBack_UART_Hdlr1(Enum_SerialPort port, Enum_UARTEventType msg, bool level, void* customizedPara)
{
}
static void Callback_Timer(u32 timerId, void* param)
{
s32 ret ;
if (MQTT_TIMER_ID == timerId)
{
switch(m_tcp_state)
{
case STATE_NW_GET_SIMSTATE:
{
s32 simStat = 0;
RIL_SIM_GetSimState(&simStat);
if (simStat == SIM_STAT_READY)
{
m_tcp_state = STATE_NW_QUERY_STATE;
APP_DEBUG("<--SIM card status is normal!-->\r\n");
}else
{
APP_DEBUG("<--SIM card status is unnormal!-->\r\n");
}
break;
}
case STATE_NW_QUERY_STATE:
{
s32 creg = 0;
s32 cgreg = 0;
ret = RIL_NW_GetGSMState(&creg);
ret = RIL_NW_GetGPRSState(&cgreg);
APP_DEBUG("<--Network State:creg=%d,cgreg=%d-->\r\n",creg,cgreg);
if((cgreg == NW_STAT_REGISTERED)||(cgreg == NW_STAT_REGISTERED_ROAMING))
{
m_tcp_state = STATE_GPRS_CONTEXT;
}
break;
}
case STATE_GPRS_CONTEXT:
{
ret = RIL_NW_SetGPRSContext(CONTEXT);
APP_DEBUG("<-- Set GPRS context, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_GPRS_ACTIVATE;
}
break;
}
case STATE_GPRS_ACTIVATE:
{
ret = RIL_NW_SetAPN(1, m_apn, m_userid, m_passwd);
APP_DEBUG("<-- Set GPRS APN, ret=%d -->\r\n", ret);
ret = RIL_NW_OpenPDPContext();
APP_DEBUG("<-- Open PDP context, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_CFG;
}
else
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
}
break;
}
case STATE_MQTT_CFG:
{
ret = RIL_MQTT_QMTCFG_Ali( 0,product_key);
APP_DEBUG("<-- mqtt config, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_OPEN;
}
break;
}
case STATE_MQTT_OPEN:
{
ret = RIL_MQTT_QMTOPEN( 0,HOST_NAME, HOST_PORT) ;
APP_DEBUG("<-- mqtt open, ret=%d -->\r\n",ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_CONN;
}
else
{
m_tcp_state = STATE_MQTT_DISC;
}
break;
}
case STATE_MQTT_CONN:
{
char strImei[5];
Ql_memset(strImei, 0x0, sizeof(strImei));
int a;
a=rand();
//itoa(a,strImei,10);
Ql_sprintf(strImei,"%d",a);
// ret = RIL_GetIMEI(strImei);
APP_DEBUG("<-- IMEI:%s,ret=%d -->\r\n", strImei,ret);
ret = RIL_MQTT_QMTCONN(0,strImei,NULL,NULL);
APP_DEBUG("<-- mqtt connect,ret=%d -->\r\n", ret);
// if(ret ==0)
// {
// m_tcp_state = STATE_MQTT_SUB;
// }
if(ret ==0)
{
// m_tcp_state = STATE_MQTT_PUB;//changing for air purifier project
m_tcp_state = STATE_MQTT_SUB;
}
else
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
}
break;
}
case STATE_MQTT_SUB:
{
// APP_DEBUG("the pub_msg1 is %s\n",pub_msg1);
ret = RIL_MQTT_QMTSUB(0, 1,topic_update1, 1,NULL);
APP_DEBUG("<-- mqtt subscribe, ret=%d -->\r\n",ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_PUB;
}
else
{
m_tcp_state = STATE_MQTT_CLOSE;
}
break;
}
case STATE_MQTT_PUB:
{
char data_pub[30]="";
Ql_memset(data_pub, 0x0, sizeof(data_pub));
int b;
b=rand();
//itoa(a,strImei,10);
Ql_sprintf(data_pub,"%d",b);
// ret = RIL_GetIMEI(strImei);
APP_DEBUG("<-- data_pub:%s -->\r\n", data_pub);
u32 data_pub_length=0;
data_pub_length = Ql_strlen(data_pub);
ret = RIL_MQTT_QMTPUB(0, 1,1,0, topic_update,data_pub,data_pub_length);
// u32 length2=0;
// length2 = Ql_strlen(pub_msg4);
//
//
// //u32 length1=0;
//
//
// //length1=Ql_strlen(test_message);
// ret = RIL_MQTT_QMTPUB(0, 1,1,0, topic_update,pub_msg4,length2);
uart_flag=TRUE;
APP_DEBUG("the value of uart_flag is %d\n",uart_flag);
//ret = RIL_MQTT_QMTPUB(0, 1,1,0, topic_update,test_message,length1);
APP_DEBUG("<-- mqtt publish, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_MQTT_PUB;
count_mqtt_error =0;
// m_tcp_state = STATE_MQTT_CLOSE;
}
else
{
// m_tcp_state = STATE_MQTT_CLOSE;
m_tcp_state = STATE_MQTT_PUB;
count_mqtt_error += 1;
if (count_mqtt_error >10)
{
m_tcp_state = STATE_MQTT_CLOSE;
count_mqtt_error=0;
}
}
break;
}
case STATE_MQTT_CLOSE:
{
ret =RIL_MQTT_QMTCLOSE(0);
APP_DEBUG("<--mqtt CLOSE, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
APP_DEBUG("closed the connection1 \n");
}
else
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
APP_DEBUG("closed the connection2 \n");
}
break;
}
case STATE_MQTT_DISC:
{
ret =RIL_MQTT_QMTDISC(0);
APP_DEBUG("<--mqtt disconnect, ret=%d -->\r\n", ret);
if(ret ==0)
{
m_tcp_state = STATE_GPRS_DEACTIVATE;
}
else
{
m_tcp_state = STATE_MQTT_CLOSE;
}
break;
}
case STATE_GPRS_DEACTIVATE:
{
ret = RIL_NW_ClosePDPContext();
APP_DEBUG("<-- Set GPRS DEACTIVATE, ret=%d -->\r\n", ret);
if(ret ==0)
{
//m_tcp_state = STATE_TOTAL_NUM; //edited
m_tcp_state = STATE_NW_GET_SIMSTATE;
}
break;
}
}
}
}
s32 RIL_MQTT_QMTCFG_Ali( u8 connectID,int* product_key)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTCFG=\"KEEPALIVE\",%d,%d\n",connectID,product_key);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
return ret;
}
return ret;
}
static s32 ATResponse_mqtt_handler(char* line, u32 len, void* userdata)
{
MQTT_Param *mqtt_param = (MQTT_Param )userdata;
char head = Ql_RIL_FindString(line, len, mqtt_param->prefix); //continue wait
if(head)
{
char strTmp[10];
char p1 = NULL;
char p2 = NULL;
Ql_memset(strTmp, 0x0, sizeof(strTmp));
p1 = Ql_strstr(head, “:”);
if (p1)
{
QSDK_Get_Str((p1+1),strTmp, mqtt_param->index);
mqtt_param->data= Ql_atoi(strTmp);
}
return RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, “OK”);
if(head)
{
return RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, “ERROR”);
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “+CME ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, “+CMS ERROR:”);//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait
}
static s32 ATResponse_mqtt_handler_pub(char* line, u32 len, void* userdata)
{
u8 *head = NULL;
u8 uCtrlZ = 0x1A;
MQTT_Param *mqtt_param = (MQTT_Param *)userdata;
head = Ql_RIL_FindString(line, len, "\r\n>");
if(head)
{
Ql_RIL_WriteDataToCore (mqtt_param->prefix,mqtt_param->length);
Ql_RIL_WriteDataToCore(&uCtrlZ,1);
return RIL_ATRSP_CONTINUE;
}
head = Ql_RIL_FindString(line, len, "ERROR");
if(head)
{
return RIL_ATRSP_FAILED;
}
head = Ql_RIL_FindString(line, len, "OK");
if(head)
{
return RIL_ATRSP_SUCCESS;
}
head = Ql_RIL_FindString(line, len, "+CMS ERROR:");//fail
if(head)
{
return RIL_ATRSP_FAILED;
}
return RIL_ATRSP_CONTINUE; //continue wait
}
s32 RIL_MQTT_QMTOPEN( u8 connectID,u8* hostName, u32 port)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTOPEN:";
mqtt_param.index = 1;
mqtt_param.data = 255;
Ql_sprintf(strAT, "AT+QMTOPEN=%d,\"%s\",%d\n", connectID,hostName,port);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<--MQTT OPEN failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}
return ret;
}
s32 RIL_MQTT_QMTCONN(u8 connectID, u8* clientID,u8* username,u8* password)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTCONN:";
mqtt_param.index = 1;
mqtt_param.data = 255;
if(NULL != username && NULL !=password)
{
Ql_sprintf(strAT, "AT+QMTCONN=%d,\"%s\",\"%s\",\"%s\"\n", connectID,clientID,username,password);
}
else
{
Ql_sprintf(strAT, "AT+QMTCONN=%d,\"%s\"\n", connectID,clientID);
}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<--MQTT connect failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}
return ret;
}
s32 RIL_MQTT_QMTSUB(u8 connectID, u8 msgId,u8* topic, u8 qos,u8* others)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTSUB:";
mqtt_param.index = 2;
mqtt_param.data = 255;
if(NULL == others)
{
Ql_sprintf(strAT, "AT+QMTSUB=%d,%d,\"%s\",%d\n", connectID,msgId,topic,qos);
}
else
{
}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<--MQTT subscribe failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}
return ret;
}
s32 RIL_MQTT_QMTPUB(u8 connectID, u8 msgId,u8 qos,u8 retain, u8* topic,u8* data,u8 length)
{
s32 ret = RIL_AT_SUCCESS;
// while(1)
// {
// char strAT[200];
// int count=1;
// Ql_memset(strAT, 0, sizeof(strAT));
// mqtt_param.prefix=data;
// mqtt_param.length= length;
// Ql_sprintf(strAT, "AT+QMTPUB=%d,%d,%d,%d,\"%s\"\n", connectID,msgId,qos,retain,topic);
// ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler_pub,(void* )&mqtt_param,0);
// APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
// if(RIL_AT_SUCCESS != ret)
// {
// APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
// return ret;
// }
// count++;
// }
char strAT[200];
int count=1;
Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix=data;
mqtt_param.length= length;
Ql_sprintf(strAT, "AT+QMTPUB=%d,%d,%d,%d,\"%s\"\n", connectID,msgId,qos,retain,topic);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler_pub,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
return ret;
}
s32 RIL_MQTT_QMTCLOSE(u8 connectID)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTCLOSE=%d\n",connectID);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
return ret;
}
return ret;
}
s32 RIL_MQTT_QMTDISC(u8 connectID)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
Ql_sprintf(strAT, "AT+QMTDISC=%d\n",connectID);
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),NULL,NULL,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if (RIL_AT_SUCCESS != ret)
{
return ret;
}
return ret;
}
s32 RIL_MQTT_QMTUNS(u8 connectID, u8 msgId,u8* topic,u8* others)
{
s32 ret = RIL_AT_SUCCESS;
char strAT[200];
Ql_memset(strAT, 0, sizeof(strAT));
mqtt_param.prefix="+QMTUNS:";
mqtt_param.index = 2;
mqtt_param.data = 255;
if(NULL == others)
{
Ql_sprintf(strAT, "AT+QMTUNS=%d,%d,\"%s\"\n", connectID,msgId,topic);
}
else
{
}
ret = Ql_RIL_SendATCmd(strAT,Ql_strlen(strAT),ATResponse_mqtt_handler,(void* )&mqtt_param,0);
APP_DEBUG("<-- Send AT:%s, ret = %d -->\r\n",strAT, ret);
if(RIL_AT_SUCCESS != ret)
{
APP_DEBUG("\r\n<-- send AT command failure -->\r\n");
return ret;
}
else if(0 != mqtt_param.data)
{
APP_DEBUG("\r\n<--MQTT subscribe failure, =%d -->\r\n", mqtt_param.data);
return QL_RET_ERR_RIL_MQTT_FAIL;
}
return ret;
}
#endif // EXAMPLE_TCPSSL