55 #define PRINTF(...) printf(__VA_ARGS__)
60 #define MS_TO_TICKS(ms) ((uint32_t)(ms * (RTIMER_SECOND / 1000.0f)))
62 #define FAT_COOP_SLOT_SIZE_TICKS MS_TO_TICKS(FAT_COOP_SLOT_SIZE_MS)
63 #define FAT_COOP_TIME_READ_BLOCK_TICKS MS_TO_TICKS(FAT_COOP_TIME_READ_BLOCK_MS)
64 #define FAT_COOP_TIME_WRITE_BLOCK_TICKS MS_TO_TICKS(FAT_COOP_TIME_WRITE_BLOCK_MS)
72 static process_event_t coop_global_event_id = 0;
73 static uint8_t next_token = 0;
75 uint8_t writeBuffer[FAT_COOP_BUFFER_SIZE];
76 uint16_t writeBuffer_start, writeBuffer_len;
78 QueueEntry queue[FAT_COOP_QUEUE_SIZE];
79 Event_OperationFinished op_results[FAT_COOP_QUEUE_SIZE];
80 uint16_t queue_start, queue_len;
82 uint8_t coop_step_allowed = 0;
84 uint8_t next_step_type = INTERNAL;
87 extern struct file_system
mounted;
93 extern struct file_desc
fat_fd_pool[FAT_FD_POOL_SIZE];
97 QueueEntry* queue_current_entry();
100 uint8_t time_left_for_step(uint8_t step_type);
101 uint8_t try_internal_operation();
102 uint8_t queue_rm_top_entry();
103 uint8_t queue_add_entry(QueueEntry *entry);
104 void pop_from_buffer(uint16_t length);
105 uint32_t time_left();
106 uint8_t time_for_step(uint8_t step_type);
108 PROCESS(fsp,
"FileSystemProcess");
113 if (coop_global_event_id == 0) {
118 return coop_global_event_id;
122 printQueueEntry(QueueEntry *entry)
124 printf(
"\nQueueEntry %u", (uint16_t) entry);
125 printf(
"\n\ttoken = %u", entry->token);
126 printf(
"\n\tsource_process = %u", (uint16_t) entry->source_process);
131 printf(
"COOP_CFS_OPEN");
132 printf(
"\n\tname = %s", entry->parameters.open.name);
133 printf(
"\n\tflags = %d", entry->parameters.open.flags);
136 printf(
"COOP_CFS_CLOSE");
137 printf(
"\n\tfd = %d", entry->parameters.generic.fd);
138 printf(
"\n\tbuffer = %u", (uint16_t) entry->parameters.generic.buffer);
139 printf(
"\n\tlength = %u", entry->parameters.generic.length);
142 printf(
"COOP_CFS_WRITE");
143 printf(
"\n\tfd = %d", entry->parameters.generic.fd);
144 printf(
"\n\tbuffer = %u", (uint16_t) entry->parameters.generic.buffer);
145 printf(
"\n\tlength = %u", entry->parameters.generic.length);
148 printf(
" COOP_CFS_READ");
149 printf(
"\n\tfd = %d", entry->parameters.generic.fd);
150 printf(
"\n\tbuffer = %u", (uint16_t) entry->parameters.generic.buffer);
151 printf(
"\n\tlength = %u", entry->parameters.generic.length);
154 printf(
" COOP_CFS_SEEK");
155 printf(
"\n\tfd = %d", entry->parameters.seek.fd);
156 printf(
"\n\toffset = %lu", (uint32_t) entry->parameters.seek.offset);
157 printf(
"\n\twhence = %d", entry->parameters.seek.whence);
159 case COOP_CFS_REMOVE:
160 printf(
"OOP_CFS_REMOVE");
161 printf(
"\n\tname = %s", entry->parameters.open.name);
163 case COOP_CFS_OPENDIR:
164 printf(
"OP_CFS_OPENDIR");
166 case COOP_CFS_READDIR:
167 printf(
"OP_CFS_READDIR");
169 case COOP_CFS_CLOSEDIR:
170 printf(
"OP_CFS_CLOSEDIR");
174 printf(
"\n\tstate = %u", entry->state);
175 printf(
"\n\tret_value = %d", entry->ret_value);
181 QueueEntry *entry = (QueueEntry *) data;
183 coop_step_allowed = 1;
185 if (entry->state != STATUS_INPROGRESS) {
186 entry->state = STATUS_INPROGRESS;
191 PRINTF(
"FSP: opening file %s\n", entry->parameters.open.name);
192 int fd = entry->ret_value;
193 entry->ret_value =
cfs_open(entry->parameters.open.name, entry->parameters.open.flags);
194 if (entry->ret_value == -1) {
199 PRINTF(
"FSP: closing file\n");
203 PRINTF(
"FSP: write to file\n");
205 entry->ret_value = cfs_write(entry->parameters.generic.fd, entry->parameters.generic.buffer,
206 entry->parameters.generic.length);
207 pop_from_buffer(entry->parameters.generic.length);
210 PRINTF(
"FSP: read from file\n");
211 entry->ret_value = cfs_read(entry->parameters.generic.fd, entry->parameters.generic.buffer,
212 entry->parameters.generic.length);
215 PRINTF(
"FSP: seek in file\n");
216 entry->ret_value =
cfs_seek(entry->parameters.seek.fd, entry->parameters.seek.offset,
217 entry->parameters.seek.whence);
219 case COOP_CFS_REMOVE:
220 PRINTF(
"FSP: removing file %s\n", entry->parameters.open.name);
221 entry->ret_value =
cfs_remove(entry->parameters.open.name);
223 case COOP_CFS_OPENDIR:
224 entry->ret_value =
cfs_opendir(entry->parameters.dir.dirp, entry->parameters.dir.u.name);
226 case COOP_CFS_READDIR:
227 entry->ret_value =
cfs_readdir(entry->parameters.dir.dirp, entry->parameters.dir.u.dirent);
229 case COOP_CFS_CLOSEDIR:
234 entry->state = STATUS_DONE;
235 op_results[queue_start].token = entry->token;
236 op_results[queue_start].ret_value = entry->ret_value;
241 static uint32_t deadline = 0;
242 static QueueEntry *entry;
243 static uint16_t operations = 0;
253 entry = queue_current_entry();
256 uint32_t my_end =
RTIMER_NOW() + time_for_step(next_step_type);
261 if (entry->state == STATUS_DONE) {
262 PRINTF(
"FSP: Operation finished\n");
264 entry = queue_current_entry();
274 try_internal_operation()
292 process_post(entry->source_process, get_coop_event_id(), &(op_results[queue_start]));
295 queue_rm_top_entry();
298 entry = queue_current_entry();
309 uint32_t time_left = 0;
312 time_left = FAT_COOP_SLOT_SIZE_TICKS;
319 time_left_etimer -= 5;
321 if( time_left_etimer < time_left ) {
322 time_left = time_left_etimer;
330 time_for_step(uint8_t step_type)
334 if (step_type == INTERNAL ) {
338 if (step_type == READ ) {
339 result = FAT_COOP_TIME_READ_BLOCK_TICKS;
342 if (step_type == WRITE ) {
343 result = FAT_COOP_TIME_WRITE_BLOCK_TICKS;
353 time_left_for_step(uint8_t step_type)
355 if( time_left() > time_for_step(step_type) ) {
363 queue_current_entry()
365 if (queue_len == 0) {
369 return &(queue[queue_start]);
372 queue_add_entry(QueueEntry *entry)
374 uint16_t pos = (queue_start + queue_len) % FAT_COOP_QUEUE_SIZE;
376 if (pos == queue_start && queue_len > 0) {
380 memcpy(&(queue[pos]), entry,
sizeof (QueueEntry));
389 if (queue_len == 0) {
393 queue_start = (queue_start + 1) % FAT_COOP_QUEUE_SIZE;
406 uint16_t pos = (writeBuffer_start + writeBuffer_len) % FAT_COOP_BUFFER_SIZE;
410 if (pos == writeBuffer_start - 1) {
415 if (pos < writeBuffer_start) {
416 free = writeBuffer_start - pos;
418 free = (FAT_COOP_BUFFER_SIZE - pos) + writeBuffer_start;
425 if (pos + length > FAT_COOP_BUFFER_SIZE) {
426 memcpy(&(writeBuffer[pos]), source, FAT_COOP_BUFFER_SIZE - pos);
427 memcpy(&(writeBuffer[0]), source, length - (FAT_COOP_BUFFER_SIZE - pos));
429 memcpy(&(writeBuffer[pos]), source, length);
432 writeBuffer_len += length;
438 pop_from_buffer(uint16_t length)
440 if (length > writeBuffer_len) {
444 writeBuffer_start = (writeBuffer_start + length) % FAT_COOP_BUFFER_SIZE;
445 writeBuffer_len -= length;
447 if (writeBuffer_len == 0) {
449 writeBuffer_start = 0;
454 get_item_from_buffer(uint8_t *start, uint16_t index)
457 uint16_t start_index = (uint16_t) (start - writeBuffer);
458 return writeBuffer[(start_index + index) % FAT_COOP_BUFFER_SIZE];
462 ccfs_open(
const char *name,
int flags, uint8_t *token)
468 for (i = 0; i < FAT_FD_POOL_SIZE; i++) {
479 if (queue_len == FAT_COOP_QUEUE_SIZE) {
488 entry.token = next_token++;
490 *token = entry.token;
493 entry.op = COOP_CFS_OPEN;
495 entry.parameters.open.name = name;
496 entry.parameters.open.flags = flags;
497 entry.ret_value = fd;
498 entry.state = STATUS_QUEUED;
502 queue_add_entry(&entry);
508 ccfs_close(
int fd, uint8_t *token)
512 if (queue_len == FAT_COOP_QUEUE_SIZE) {
521 entry.token = next_token++;
523 *token = entry.token;
526 entry.op = COOP_CFS_CLOSE;
528 entry.parameters.generic.fd = fd;
529 entry.state = STATUS_QUEUED;
531 queue_add_entry(&entry);
537 ccfs_read(
int fd, uint8_t *buf, uint16_t length, uint8_t *token)
541 if (queue_len == FAT_COOP_QUEUE_SIZE) {
550 entry.token = next_token++;
552 *token = entry.token;
555 entry.op = COOP_CFS_READ;
557 entry.parameters.generic.fd = fd;
558 entry.parameters.generic.buffer = buf;
559 entry.parameters.generic.length = length;
560 entry.state = STATUS_QUEUED;
562 queue_add_entry(&entry);
568 ccfs_write(
int fd, uint8_t *buf, uint16_t length, uint8_t *token)
572 memset(&entry, 0,
sizeof(QueueEntry));
574 if (queue_len == FAT_COOP_QUEUE_SIZE) {
575 PRINTF(
"ccfs_write: Buffer full\n");
581 PRINTF(
"ccfs_write: Event Queue full\n");
585 entry.token = next_token++;
587 *token = entry.token;
590 entry.op = COOP_CFS_WRITE;
592 entry.parameters.generic.fd = fd;
594 if (!fat_buffer_available(length)) {
595 PRINTF(
"ccfs_write: Buffer full\n");
599 entry.parameters.generic.buffer = &(writeBuffer[(writeBuffer_start + writeBuffer_len) % FAT_COOP_BUFFER_SIZE]);
605 entry.parameters.generic.length = length;
606 entry.state = STATUS_QUEUED;
608 queue_add_entry(&entry);
614 ccfs_seek(
int fd, cfs_offset_t offset,
int whence, uint8_t *token)
618 if (queue_len == FAT_COOP_QUEUE_SIZE) {
627 entry.token = next_token++;
629 *token = entry.token;
632 entry.op = COOP_CFS_SEEK;
634 entry.parameters.seek.fd = fd;
635 entry.parameters.seek.offset = offset;
636 entry.parameters.seek.whence = whence;
637 entry.state = STATUS_QUEUED;
639 queue_add_entry(&entry);
645 ccfs_remove(
const char *name, uint8_t *token)
649 if (queue_len == FAT_COOP_QUEUE_SIZE) {
658 entry.token = next_token++;
660 *token = entry.token;
663 entry.op = COOP_CFS_REMOVE;
665 entry.parameters.open.name = name;
666 entry.state = STATUS_QUEUED;
668 queue_add_entry(&entry);
674 ccfs_opendir(
struct cfs_dir *dirp,
const char *name, uint8_t *token)
678 if (queue_len == FAT_COOP_QUEUE_SIZE) {
687 entry.token = next_token++;
689 *token = entry.token;
692 entry.op = COOP_CFS_OPENDIR;
694 entry.parameters.dir.dirp = dirp;
695 entry.parameters.dir.u.name = name;
696 entry.state = STATUS_QUEUED;
698 queue_add_entry(&entry);
704 ccfs_readdir(
struct cfs_dir *dirp,
struct cfs_dirent *dirent, uint8_t *token)
708 if (queue_len == FAT_COOP_QUEUE_SIZE) {
717 entry.token = next_token++;
719 *token = entry.token;
722 entry.op = COOP_CFS_READDIR;
724 entry.parameters.dir.dirp = dirp;
725 entry.parameters.dir.u.dirent = dirent;
726 entry.state = STATUS_QUEUED;
728 queue_add_entry(&entry);
734 ccfs_closedir(
struct cfs_dir *dirp, uint8_t *token)
738 if (queue_len == FAT_COOP_QUEUE_SIZE) {
747 entry.token = next_token++;
749 *token = entry.token;
752 entry.op = COOP_CFS_CLOSEDIR;
754 entry.parameters.dir.dirp = dirp;
755 entry.state = STATUS_QUEUED;
757 queue_add_entry(&entry);
763 fat_op_status(uint8_t token)
768 for (i = 0; i < queue_len; i++) {
769 entry = &(queue[(queue_start + i) % FAT_COOP_QUEUE_SIZE]);
770 if (entry->token == token) {
779 _get_time_of_operation(Operation op, uint16_t length)
781 uint8_t op_multiplikator = (length / 512) + ((length % 512 != 0) ? 1 : 0);
784 case COOP_CFS_OPENDIR:
785 case COOP_CFS_READDIR:
788 return (FAT_COOP_TIME_READ_BLOCK_MS) * op_multiplikator;
789 case COOP_CFS_CLOSEDIR:
793 case COOP_CFS_REMOVE:
795 return (FAT_COOP_TIME_READ_BLOCK_MS + FAT_COOP_TIME_WRITE_BLOCK_MS) * op_multiplikator;
802 fat_estimate_by_token(uint8_t token)
804 uint16_t estimated_time = 0;
809 for (i = 0; i < queue_len; i++) {
810 entry = &(queue[(queue_start + i) % FAT_COOP_QUEUE_SIZE]);
813 if (entry->op == COOP_CFS_CLOSE || entry->op == COOP_CFS_READ || entry->op == COOP_CFS_WRITE) {
814 length = entry->parameters.generic.length;
817 estimated_time += _get_time_of_operation(entry->op, length);
818 if (entry->token == token) {
823 return estimated_time;
827 fat_estimate_by_parameter(Operation type, uint16_t length)
829 uint16_t estimated_time = _get_time_of_operation(type, length);
834 for (i = 0; i < queue_len; i++) {
835 entry = &(queue[(queue_start + i) % FAT_COOP_QUEUE_SIZE]);
838 if (entry->op == COOP_CFS_CLOSE || entry->op == COOP_CFS_READ || entry->op == COOP_CFS_WRITE) {
839 sublength = entry->parameters.generic.length;
842 estimated_time += _get_time_of_operation(entry->op, sublength);
844 return estimated_time;
848 fat_buffer_available(uint16_t length)
850 uint16_t pos = (writeBuffer_start + writeBuffer_len) % FAT_COOP_BUFFER_SIZE;
853 if (pos == writeBuffer_start && writeBuffer_len != 0) {
857 if (pos < writeBuffer_start) {
858 free = writeBuffer_start - pos;
860 free = (FAT_COOP_BUFFER_SIZE - pos) + writeBuffer_start;