31 ssize_t sent = write(sendBlock.fd, sendBlock.data, sendBlock.size);
34 if(errno==EPIPE || errno==EBADF)
43 if(sent!=(ssize_t)sendBlock.size)
56 if(minBlockSize>(writeIt->data.get()+Chunk::size-writeIt->end) && ++writeIt==chunks.end())
58 chunks.push_back(
Chunk());
61 frames.push(
Frame(size, kill,
id));
67 using namespace Protocol;
69 bool transmitEmpty=transmit();
71 int retVal=poll(&pollFds.front(), pollFds.size(), 0);
74 if(transmitEmpty)
return true;
79 std::vector<pollfd>::iterator pollFd = find_if(pollFds.begin(), pollFds.end(),
reventsZero);
81 if(pollFd->revents & (POLLHUP|POLLERR|POLLNVAL) )
83 fdBuffers.erase(pollFd->fd);
84 pollFds.erase(pollFd);
92 socklen_t addrlen=
sizeof(sockaddr_un);
93 fd=accept(fd, (sockaddr*)&addr, &addrlen);
94 fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
96 pollFds.push_back(pollfd());
97 pollFds.back().fd = fd;
98 pollFds.back().events = POLLIN|POLLHUP|POLLERR|POLLNVAL;
100 Message& messageBuffer=fdBuffers[fd].messageBuffer;
101 messageBuffer.
size=0;
102 messageBuffer.
type=0;
104 else if(fd==wakeUpFdIn)
107 read(wakeUpFdIn, &x, 1);
111 Message& messageBuffer=fdBuffers[fd].messageBuffer;
112 Header& headerBuffer=fdBuffers[fd].headerBuffer;
116 if(!messageBuffer.data)
119 actual=read(fd, (
char*)&headerBuffer+messageBuffer.size,
sizeof(Header)-messageBuffer.size);
121 if(actual>0) messageBuffer.size+=actual;
125 fdBuffers.erase( pollFd->fd );
126 pollFds.erase( pollFd );
130 if(messageBuffer.size!=
sizeof(Header))
132 if(transmitEmpty)
return true;
136 messageBuffer.data.reset(
new char[
sizeof(Header)+headerBuffer.getContentLength()+headerBuffer.getPaddingLength()]);
137 memcpy(static_cast<void*>(messageBuffer.data.get()), static_cast<const void*>(&headerBuffer),
sizeof(Header));
140 const Header& header=*(
const Header*)messageBuffer.data.get();
141 size_t needed=header.getContentLength()+header.getPaddingLength()+
sizeof(Header)-messageBuffer.size;
142 actual=read(fd, messageBuffer.data.get()+messageBuffer.size, needed);
144 if(actual>0) messageBuffer.size+=actual;
147 if(actual==(ssize_t)needed)
149 sendMessage(FullId(headerBuffer.getRequestId(), fd), messageBuffer);
150 messageBuffer.size=0;
151 messageBuffer.data.reset();
154 if(transmitEmpty)
return true;
161 if(pRead>=chunks.begin()->end)
163 if(writeIt==chunks.begin())
165 pRead=writeIt->data.get();
170 if(writeIt==--chunks.end())
172 chunks.begin()->end=chunks.begin()->data.get();
173 chunks.splice(chunks.end(), chunks, chunks.begin());
177 pRead=chunks.begin()->data.get();
180 if((frames.front().size-=size)==0)
182 if(frames.front().closeFd)
183 freeFd(frames.front().id.fd);
192 write(wakeUpFdOut, &x, 1);
196 :buffer(pollFds, fdBuffers), sendMessage(sendMessage_), pollFds(2), socket(fd_)
202 socketpair(AF_UNIX, SOCK_STREAM, 0, socPair);
207 fcntl(
socket, F_SETFL, (fcntl(
socket, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
208 pollFds[0].events = POLLIN|POLLHUP;
210 pollFds[1].events = POLLIN|POLLHUP;
219 msg =
"The file descriptor has been marked non-blocking (O_NONBLOCK) and the write would block.";
223 msg =
"The file descriptor is not a valid file descriptor or is not open for writing.";
227 msg =
"The buffer is outside your accessible address space.";
231 msg =
"An attempt was made to write a file that exceeds the implementation-defined maximum file size or the process’s file size limit, or to write at a position past the maximum allowed offset.";
235 msg =
"The call was interrupted by a signal before any data was written; see signal(7).";
239 msg =
"The file descriptor is attached to an object which is unsuitable for writing; or the file was opened with the O_DIRECT flag, and either the address specified for the buffer, the value specified in count, or the current file offset is not suitably aligned.";
243 msg =
"A low-level I/O error occurred while modifying the inode.";
247 msg =
"The device containing the file referred to by the file descriptor has no room for the data.";
251 msg =
"The file descriptor is connected to a pipe or socket whose reading end is closed. When this happens the writing process will also receive a SIGPIPE signal. (Thus, the write return value is seen only if the program catches, blocks or ignores this signal.)";
261 msg =
"Non-blocking I/O has been selected using O_NONBLOCK and no data was immediately available for reading.";
265 msg =
"The file descriptor is not valid or is not open for reading.";
269 msg =
"The buffer is outside your accessible address space.";
273 msg =
"The call was interrupted by a signal before any data was written; see signal(7).";
277 msg =
"The file descriptor is attached to an object which is unsuitable for reading; or the file was opened with the O_DIRECT flag, and either the address specified in buf, the value specified in count, or the current file offset is not suitably aligned.";
281 msg =
"I/O error. This will happen for example when the process is in a background process group, tries to read from its controlling tty, and either it is ignoring or blocking SIGTTIN or its process group is orphaned. It may also occur when there is a low-level I/O error while reading from a disk or tape.";
285 msg =
"The file descriptor refers to a directory.";
295 msg =
"An invalid file descriptor was given in one of the sets.";
299 msg =
"The array given as argument was not contained in the calling program’s address space.";
303 msg =
"A signal occurred before any requested event; see signal(7).";
307 msg =
"The nfds value exceeds the RLIMIT_NOFILE value.";
311 msg =
"There was no space to allocate file descriptor tables.";
318 std::vector<pollfd>::iterator it=std::find_if(pollFds.begin(), pollFds.end(),
equalsFd(fd));
319 if(it != pollFds.end())