pipe.c 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /* vim: tabstop=4 shiftwidth=4 noexpandtab
  2. * This file is part of ToaruOS and is released under the terms
  3. * of the NCSA / University of Illinois License - see LICENSE.md
  4. * Copyright (C) 2012-2014 Kevin Lange
  5. *
  6. * Buffered Pipe
  7. *
  8. */
  9. #include <kernel/system.h>
  10. #include <kernel/fs.h>
  11. #include <kernel/printf.h>
  12. #include <kernel/pipe.h>
  13. #include <kernel/logging.h>
  14. #define DEBUG_PIPES 0
  15. uint32_t read_pipe(fs_node_t *node, uint32_t offset, uint32_t size, uint8_t *buffer);
  16. uint32_t write_pipe(fs_node_t *node, uint32_t offset, uint32_t size, uint8_t *buffer);
  17. void open_pipe(fs_node_t *node, unsigned int flags);
  18. void close_pipe(fs_node_t *node);
  19. static inline size_t pipe_unread(pipe_device_t * pipe) {
  20. if (pipe->read_ptr == pipe->write_ptr) {
  21. return 0;
  22. }
  23. if (pipe->read_ptr > pipe->write_ptr) {
  24. return (pipe->size - pipe->read_ptr) + pipe->write_ptr;
  25. } else {
  26. return (pipe->write_ptr - pipe->read_ptr);
  27. }
  28. }
  29. int pipe_size(fs_node_t * node) {
  30. pipe_device_t * pipe = (pipe_device_t *)node->device;
  31. return pipe_unread(pipe);
  32. }
  33. static inline size_t pipe_available(pipe_device_t * pipe) {
  34. if (pipe->read_ptr == pipe->write_ptr) {
  35. return pipe->size - 1;
  36. }
  37. if (pipe->read_ptr > pipe->write_ptr) {
  38. return pipe->read_ptr - pipe->write_ptr - 1;
  39. } else {
  40. return (pipe->size - pipe->write_ptr) + pipe->read_ptr - 1;
  41. }
  42. }
  43. int pipe_unsize(fs_node_t * node) {
  44. pipe_device_t * pipe = (pipe_device_t *)node->device;
  45. return pipe_available(pipe);
  46. }
  47. static inline void pipe_increment_read(pipe_device_t * pipe) {
  48. pipe->read_ptr++;
  49. if (pipe->read_ptr == pipe->size) {
  50. pipe->read_ptr = 0;
  51. }
  52. }
  53. static inline void pipe_increment_write(pipe_device_t * pipe) {
  54. pipe->write_ptr++;
  55. if (pipe->write_ptr == pipe->size) {
  56. pipe->write_ptr = 0;
  57. }
  58. }
  59. static inline void pipe_increment_write_by(pipe_device_t * pipe, size_t amount) {
  60. pipe->write_ptr = (pipe->write_ptr + amount) % pipe->size;
  61. }
  62. static void pipe_alert_waiters(pipe_device_t * pipe) {
  63. if (pipe->alert_waiters) {
  64. while (pipe->alert_waiters->head) {
  65. node_t * node = list_dequeue(pipe->alert_waiters);
  66. process_t * p = node->value;
  67. process_alert_node(p, pipe);
  68. free(node);
  69. }
  70. }
  71. }
  72. uint32_t read_pipe(fs_node_t *node, uint32_t offset, uint32_t size, uint8_t *buffer) {
  73. assert(node->device != 0 && "Attempted to read from a fully-closed pipe.");
  74. /* Retreive the pipe object associated with this file node */
  75. pipe_device_t * pipe = (pipe_device_t *)node->device;
  76. #if DEBUG_PIPES
  77. if (pipe->size > 300) { /* Ignore small pipes (ie, keyboard) */
  78. debug_print(INFO, "[debug] Call to read from pipe 0x%x", node->device);
  79. debug_print(INFO, " Unread bytes: %d", pipe_unread(pipe));
  80. debug_print(INFO, " Total size: %d", pipe->size);
  81. debug_print(INFO, " Request size: %d", size);
  82. debug_print(INFO, " Write pointer: %d", pipe->write_ptr);
  83. debug_print(INFO, " Read pointer: %d", pipe->read_ptr);
  84. debug_print(INFO, " Buffer address: 0x%x", pipe->buffer);
  85. }
  86. #endif
  87. if (pipe->dead) {
  88. debug_print(WARNING, "Pipe is dead?");
  89. send_signal(getpid(), SIGPIPE);
  90. return 0;
  91. }
  92. size_t collected = 0;
  93. while (collected == 0) {
  94. spin_lock(pipe->lock_read);
  95. while (pipe_unread(pipe) > 0 && collected < size) {
  96. buffer[collected] = pipe->buffer[pipe->read_ptr];
  97. pipe_increment_read(pipe);
  98. collected++;
  99. }
  100. spin_unlock(pipe->lock_read);
  101. wakeup_queue(pipe->wait_queue_writers);
  102. /* Deschedule and switch */
  103. if (collected == 0) {
  104. sleep_on(pipe->wait_queue_readers);
  105. }
  106. }
  107. return collected;
  108. }
  109. uint32_t write_pipe(fs_node_t *node, uint32_t offset, uint32_t size, uint8_t *buffer) {
  110. assert(node->device != 0 && "Attempted to write to a fully-closed pipe.");
  111. /* Retreive the pipe object associated with this file node */
  112. pipe_device_t * pipe = (pipe_device_t *)node->device;
  113. #if DEBUG_PIPES
  114. if (pipe->size > 300) { /* Ignore small pipes (ie, keyboard) */
  115. debug_print(INFO, "[debug] Call to write to pipe 0x%x", node->device);
  116. debug_print(INFO, " Available space: %d", pipe_available(pipe));
  117. debug_print(INFO, " Total size: %d", pipe->size);
  118. debug_print(INFO, " Request size: %d", size);
  119. debug_print(INFO, " Write pointer: %d", pipe->write_ptr);
  120. debug_print(INFO, " Read pointer: %d", pipe->read_ptr);
  121. debug_print(INFO, " Buffer address: 0x%x", pipe->buffer);
  122. debug_print(INFO, " Write: %s", buffer);
  123. }
  124. #endif
  125. if (pipe->dead) {
  126. debug_print(WARNING, "Pipe is dead?");
  127. send_signal(getpid(), SIGPIPE);
  128. return 0;
  129. }
  130. size_t written = 0;
  131. while (written < size) {
  132. spin_lock(pipe->lock_write);
  133. #if 0
  134. size_t available = 0;
  135. if (pipe->read_ptr <= pipe->write_ptr) {
  136. available = pipe->size - pipe->write_ptr;
  137. } else {
  138. available = pipe->read_ptr - pipe->write_ptr - 1;
  139. }
  140. if (available) {
  141. available = min(available, size - written);
  142. memcpy(&pipe->buffer[pipe->write_ptr], buffer, available);
  143. pipe_increment_write_by(pipe, available);
  144. written += available;
  145. }
  146. #else
  147. while (pipe_available(pipe) > 0 && written < size) {
  148. pipe->buffer[pipe->write_ptr] = buffer[written];
  149. pipe_increment_write(pipe);
  150. written++;
  151. }
  152. #endif
  153. spin_unlock(pipe->lock_write);
  154. wakeup_queue(pipe->wait_queue_readers);
  155. pipe_alert_waiters(pipe);
  156. if (written < size) {
  157. sleep_on(pipe->wait_queue_writers);
  158. }
  159. }
  160. return written;
  161. }
  162. void open_pipe(fs_node_t * node, unsigned int flags) {
  163. assert(node->device != 0 && "Attempted to open a fully-closed pipe.");
  164. /* Retreive the pipe object associated with this file node */
  165. pipe_device_t * pipe = (pipe_device_t *)node->device;
  166. /* Add a reference */
  167. pipe->refcount++;
  168. return;
  169. }
  170. void close_pipe(fs_node_t * node) {
  171. assert(node->device != 0 && "Attempted to close an already fully-closed pipe.");
  172. /* Retreive the pipe object associated with this file node */
  173. pipe_device_t * pipe = (pipe_device_t *)node->device;
  174. /* Drop one reference */
  175. pipe->refcount--;
  176. /* Check the reference count number */
  177. if (pipe->refcount == 0) {
  178. #if 0
  179. /* No other references exist, free the pipe (but not its buffer) */
  180. free(pipe->buffer);
  181. list_free(pipe->wait_queue);
  182. free(pipe->wait_queue);
  183. free(pipe);
  184. /* And let the creator know there are no more references */
  185. node->device = 0;
  186. #endif
  187. }
  188. return;
  189. }
  190. static int pipe_check(fs_node_t * node) {
  191. pipe_device_t * pipe = (pipe_device_t *)node->device;
  192. if (pipe_unread(pipe) > 0) {
  193. return 0;
  194. }
  195. return 1;
  196. }
  197. static int pipe_wait(fs_node_t * node, void * process) {
  198. pipe_device_t * pipe = (pipe_device_t *)node->device;
  199. if (!pipe->alert_waiters) {
  200. pipe->alert_waiters = list_create();
  201. }
  202. if (!list_find(pipe->alert_waiters, process)) {
  203. list_insert(pipe->alert_waiters, process);
  204. }
  205. list_insert(((process_t *)process)->node_waits, pipe);
  206. return 0;
  207. }
  208. fs_node_t * make_pipe(size_t size) {
  209. fs_node_t * fnode = malloc(sizeof(fs_node_t));
  210. pipe_device_t * pipe = malloc(sizeof(pipe_device_t));
  211. memset(fnode, 0, sizeof(fs_node_t));
  212. memset(pipe, 0, sizeof(pipe_device_t));
  213. fnode->device = 0;
  214. fnode->name[0] = '\0';
  215. sprintf(fnode->name, "[pipe]");
  216. fnode->uid = 0;
  217. fnode->gid = 0;
  218. fnode->mask = 0666;
  219. fnode->flags = FS_PIPE;
  220. fnode->read = read_pipe;
  221. fnode->write = write_pipe;
  222. fnode->open = open_pipe;
  223. fnode->close = close_pipe;
  224. fnode->readdir = NULL;
  225. fnode->finddir = NULL;
  226. fnode->ioctl = NULL; /* TODO ioctls for pipes? maybe */
  227. fnode->get_size = pipe_size;
  228. fnode->selectcheck = pipe_check;
  229. fnode->selectwait = pipe_wait;
  230. fnode->atime = now();
  231. fnode->mtime = fnode->atime;
  232. fnode->ctime = fnode->atime;
  233. fnode->device = pipe;
  234. pipe->buffer = malloc(size);
  235. pipe->write_ptr = 0;
  236. pipe->read_ptr = 0;
  237. pipe->size = size;
  238. pipe->refcount = 0;
  239. pipe->dead = 0;
  240. spin_init(pipe->lock_read);
  241. spin_init(pipe->lock_write);
  242. pipe->wait_queue_writers = list_create();
  243. pipe->wait_queue_readers = list_create();
  244. return fnode;
  245. }