Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc479b8e

Browse files
committed
Adding packet re-assemlby
There appears to still be a bug in either splitting or re-assembly
1 parent50638ea commitc479b8e

File tree

3 files changed

+116
-30
lines changed

3 files changed

+116
-30
lines changed

‎MeshBase.cpp

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include"MeshBase.h"
44

55
#defineMAX_PACKET_SIZE32
6-
#defineMAX_PAYLOAD_SIZE (MAX_PACKET_SIZE -sizeof(Message))
6+
#defineMAX_PAYLOAD_SIZE (MAX_PACKET_SIZE -sizeof(MeshBase::MessageHeader))
77

88
// -- Broadcast addresses --
99
#definePEER_DISCOVERY1
@@ -28,7 +28,6 @@ void MeshBase::Begin()
2828
radio.begin();
2929
radio.enableDynamicPayloads();
3030
radio.setRetries(2,1);
31-
//radio.openReadingPipe(0, TO_ADDRESS(address));
3231
radio.openReadingPipe(1,TO_BROADCAST(PEER_DISCOVERY));
3332
radio.setAutoAck(0,true);
3433
radio.setAutoAck(1,false);
@@ -46,8 +45,7 @@ void MeshBase::Update()
4645
}
4746

4847
// Recieve
49-
uint8_t pipe_num;
50-
if (radio.available(&pipe_num))
48+
if (radio.available())
5149
{
5250
bool done =false;
5351
do {
@@ -78,31 +76,93 @@ void MeshBase::Update()
7876
}
7977
}
8078

79+
boolFindStream(const MeshBase::Message* current,const MeshBase::MessageHeader* find)
80+
{
81+
if (current->header.address_from != find->address_from)
82+
returnfalse;
83+
if (current->header.msg_id != find->msg_id)
84+
returnfalse;
85+
returntrue;
86+
}
87+
88+
voidMeshBase::Message::AddPart(constvoid* payload,uint8_t len,uint8_t part_num,bool more_parts)
89+
{
90+
uint8_t start_pos = part_num * MAX_PAYLOAD_SIZE;
91+
uint8_t end_pos = len + (part_num * MAX_PAYLOAD_SIZE);
92+
Serial.print(" R AddPart() : Adding part. start_pos=");
93+
Serial.print(start_pos);
94+
Serial.print(" end_pos=");
95+
Serial.print(end_pos);
96+
Serial.print(" len=");
97+
Serial.print(len);
98+
Serial.print(" part_num=");
99+
Serial.print(part_num);
100+
Serial.print(" more_parts=");
101+
Serial.println(more_parts);
102+
if (data ==NULL)
103+
data =malloc(end_pos);
104+
if (end_pos > data_used)
105+
data =realloc(data, end_pos);
106+
memcpy(&static_cast<byte*>(data)[start_pos], payload, len);
107+
if (end_pos > data_used)
108+
data_used = end_pos;
109+
blocks_recieved +=1;
110+
if (!more_parts) {
111+
header.split_more =false;
112+
header.split_part = part_num;
113+
}
114+
}
115+
116+
boolMeshBase::Message::IsDone()const
117+
{
118+
// We set the split_more to false if we recieved the last packet
119+
// in the stream, and split_part to total number of blocks in the stream.
120+
// So if split_more is false, and we have the right number of blocks_recieved
121+
// we are good to go.
122+
Serial.print(" R IsDone() : split_more=");
123+
Serial.print(header.split_more);
124+
Serial.print(" split_part=");
125+
Serial.print(header.split_part);
126+
Serial.print(" blocks_recieved=");
127+
Serial.println(blocks_recieved);
128+
if (!header.split_more && blocks_recieved >= header.split_part)
129+
returntrue;
130+
Serial.println(" R IsDone() : False");
131+
returnfalse;
132+
}
133+
134+
MeshBase::Message::~Message() {
135+
free(data);
136+
}
137+
81138
voidMeshBase::HandlePacket(const byte* data,uint8_t len)
82139
{
83-
if (len <sizeof(Message))
140+
if (len <sizeof(MessageHeader))
84141
return;
85-
const MeshBase::Message* msg = (structMeshBase::Message*)data;
86-
uint8_t payload_length = len -sizeof(Message);
87-
const byte* payload = data +sizeof(Message);
88-
if (msg->split_more || msg->split_part !=0)
89-
{
90-
// Re-assembly needed
91-
// TODO: Re-assemble packets
92-
}else {
93-
switch(msg->type) {
142+
const MeshBase::MessageHeader* header = (structMeshBase::MessageHeader*)data;
143+
uint8_t payload_length = len -sizeof(MessageHeader);
144+
const byte* payload = data +sizeof(MessageHeader);
145+
Message* s = assembly_list.Find<const MessageHeader*>(header, &FindStream);
146+
if (s ==NULL) {
147+
s =newMessage(*header);
148+
assembly_list.Add(s);
149+
}
150+
s->AddPart(payload, payload_length, header->split_part, header->split_more);
151+
if (s->IsDone()) {
152+
Serial.println(" R IsDone() : true!!");
153+
switch(header->type) {
94154
case type_peer_discovery:
95-
HandlePeerDiscovery(msg, payload, payload_length);
155+
HandlePeerDiscovery(&(s->header), s->data, s->data_used);
96156
break;
97157
default:
98-
OnMessage(msg, payload, payload_length);
158+
OnMessage(&(s->header), s->data, s->data_used);
99159
break;
100160
}
101-
delete data;
161+
assembly_list.Remove(s);
102162
}
103163
}
104164

105-
voidMeshBase::HandlePeerDiscovery(const MeshBase::Message* msg,constvoid* buff,uint8_t length)
165+
voidMeshBase::HandlePeerDiscovery(const MeshBase::MessageHeader* msg,constvoid* buff,uint8_t length)
106166
{
107167
if (length !=sizeof(PeerDiscoveryMessage))
108168
return;
@@ -143,28 +203,36 @@ void MeshBase::SendPeerDiscovery()
143203

144204
voidMeshBase::SendMessage(uint32_t to,uint8_t type,constvoid* data,uint8_t length,bool is_broadcast)
145205
{
206+
staticuint8_t current_msg_id =0;
146207
byte buff[MAX_PACKET_SIZE];
147-
Message* msg = (structMessage*)buff;
208+
MessageHeader* msg = (structMessageHeader*)buff;
148209
msg->protocol_version =1;
149210
msg->ttl =0;
150211
msg->type = type;
151212
msg->address_from = address;
213+
msg->msg_id = current_msg_id++;
152214

153215
uint8_t num_pkts = (length / MAX_PAYLOAD_SIZE) +1;
154216
for (uint8_t num =0; num < num_pkts; ++num)
155217
{
156218
uint8_t remaining_length = length - (num * MAX_PAYLOAD_SIZE);
157219
msg->split_part = num;
158220
msg->split_more = remaining_length > MAX_PAYLOAD_SIZE;
159-
memcpy(buff +sizeof(Message), (const byte*)data + (num * MAX_PAYLOAD_SIZE),min(remaining_length, MAX_PAYLOAD_SIZE));
221+
memcpy(buff +sizeof(MessageHeader), (const byte*)data + (num * MAX_PAYLOAD_SIZE),min(remaining_length, MAX_PAYLOAD_SIZE));
160222

161223
radio.stopListening();
162224
if (is_broadcast)
163225
radio.openWritingPipe(TO_BROADCAST(to));
164226
else
165227
radio.openWritingPipe(TO_ADDRESS(to));
166-
radio.write(buff,min(remaining_length, MAX_PAYLOAD_SIZE));
228+
radio.write(buff,min(remaining_length +sizeof(MessageHeader), MAX_PAYLOAD_SIZE));
167229
radio.startListening();
230+
Serial.print(" T Sending pkt split_part=");
231+
Serial.print(msg->split_part);
232+
Serial.print(" split_more=");
233+
Serial.print(msg->split_more);
234+
Serial.print(" length=");
235+
Serial.println(min(remaining_length, MAX_PAYLOAD_SIZE));
168236
}
169237
}
170238

‎MeshBase.h

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ class MeshBase
1717
uint16_ttime;
1818
Peer(uint32_t address) : address(address), time(0) {}
1919
};
20-
21-
structMessage
20+
21+
structMessageHeader
2222
{
2323
uint8_tprotocol_version :4;
2424
uint8_tttl :4;
@@ -29,6 +29,20 @@ class MeshBase
2929
uint32_taddress_from;
3030
} PACKED;
3131

32+
structMessage {
33+
Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(0), prev(0) {}
34+
~Message();
35+
MessageHeader header;
36+
void* data;
37+
uint8_t data_used;
38+
uint8_t blocks_recieved;
39+
Message* next;
40+
Message* prev;
41+
42+
voidAddPart(constvoid* data,uint8_t len,uint8_t part_num,bool more_parts);
43+
boolIsDone()const;
44+
};
45+
3246
// -- Message types --
3347
enum message_type {
3448
type_peer_discovery,
@@ -42,7 +56,7 @@ class MeshBase
4256
uint32_tGetAddress()const {return address; }
4357
boolIsReady()const {return address !=0; }
4458
protected:
45-
virtualvoidOnMessage(constMeshBase::Message* meta,constvoid* data,uint8_t length) =0;
59+
virtualvoidOnMessage(constMessageHeader* meta,constvoid* data,uint8_t length) =0;
4660
virtualvoidOnNewPeer(Peer*) {}
4761
virtualvoidOnLostPeer(Peer*) {}
4862
private:
@@ -53,14 +67,15 @@ class MeshBase
5367

5468
voidSendPeerDiscovery();
5569
voidSendMessage(uint32_t address,uint8_t type,constvoid* data,uint8_t length,bool is_broadcast);
56-
voidHandlePeerDiscovery(constMessage* msg,constvoid* buff,uint8_t length);
70+
voidHandlePeerDiscovery(constMessageHeader* msg,constvoid* buff,uint8_t length);
5771
voidHandlePacket(const byte* data,uint8_t length);
5872
voidChooseAddress();
59-
73+
6074
LinkedList<Peer>peers;
61-
75+
LinkedList2<Message>assembly_list;
76+
6277
Peer*GetPeer(uint32_t address);
63-
78+
6479
structPeerDiscoveryMessage
6580
{
6681
uint8_tprotocol_version;

‎RF_test.ino

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ class App : public MeshBase
77
public:
88
App() : MeshBase(9,10) {}
99
protected:
10-
virtualvoidOnMessage(const MeshBase::Message* meta,constvoid* data,uint8_t length)
10+
virtualvoidOnMessage(const MeshBase::MessageHeader* meta,constvoid* data,uint8_t length)
1111
{
1212
Serial.print(meta->address_from, DEC);
1313
Serial.print(" :");
1414
Serial.println((constchar*)data);
15+
Serial.print("split_part =");
16+
Serial.println(meta->split_part, DEC);
1517
}
1618
virtualvoidOnNewPeer(Peer* p)
1719
{
1820
if (!IsReady())return;
1921
char buff[255];
20-
int len =snprintf(buff,255,"Hello %u", p->address);
22+
int len =snprintf(buff,255,"Hello %u. This is a super long message for some reason. Please keep adding to this message for great good and other things. I would very much like it if this could be put together.", p->address);
2123
Serial.print("Me :");
2224
Serial.println(buff);
2325
SendMessage(p->address, type_user, buff, len +1);
@@ -39,3 +41,4 @@ void loop()
3941
app.Update();
4042
delay(100);
4143
}
44+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp