~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Linux Cross Reference
Linux/net/sunrpc/xprt.c

Version: ~ [ 2.4.0 ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 /*
  2  *  linux/net/sunrpc/xprt.c
  3  *
  4  *  This is a generic RPC call interface supporting congestion avoidance,
  5  *  and asynchronous calls.
  6  *
  7  *  The interface works like this:
  8  *
  9  *  -   When a process places a call, it allocates a request slot if
 10  *      one is available. Otherwise, it sleeps on the backlog queue
 11  *      (xprt_reserve).
 12  *  -   Next, the caller puts together the RPC message, stuffs it into
 13  *      the request struct, and calls xprt_call().
 14  *  -   xprt_call transmits the message and installs the caller on the
 15  *      socket's wait list. At the same time, it installs a timer that
 16  *      is run after the packet's timeout has expired.
 17  *  -   When a packet arrives, the data_ready handler walks the list of
 18  *      pending requests for that socket. If a matching XID is found, the
 19  *      caller is woken up, and the timer removed.
 20  *  -   When no reply arrives within the timeout interval, the timer is
 21  *      fired by the kernel and runs xprt_timer(). It either adjusts the
 22  *      timeout values (minor timeout) or wakes up the caller with a status
 23  *      of -ETIMEDOUT.
 24  *  -   When the caller receives a notification from RPC that a reply arrived,
 25  *      it should release the RPC slot, and process the reply.
 26  *      If the call timed out, it may choose to retry the operation by
 27  *      adjusting the initial timeout value, and simply calling rpc_call
 28  *      again.
 29  *
 30  *  Support for async RPC is done through a set of RPC-specific scheduling
 31  *  primitives that `transparently' work for processes as well as async
 32  *  tasks that rely on callbacks.
 33  *
 34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
 35  *
 36  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
 37  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
 38  *  TCP NFS related read + write fixes
 39  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 40  *
 41  *  Rewrite of larges part of the code in order to stabilize TCP stuff.
 42  *  Fix behaviour when socket buffer is full.
 43  *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
 44  */
 45 
 46 #define __KERNEL_SYSCALLS__
 47 
 48 #include <linux/version.h>
 49 #include <linux/types.h>
 50 #include <linux/malloc.h>
 51 #include <linux/capability.h>
 52 #include <linux/sched.h>
 53 #include <linux/errno.h>
 54 #include <linux/socket.h>
 55 #include <linux/in.h>
 56 #include <linux/net.h>
 57 #include <linux/mm.h>
 58 #include <linux/udp.h>
 59 #include <linux/unistd.h>
 60 #include <linux/sunrpc/clnt.h>
 61 #include <linux/file.h>
 62 
 63 #include <net/sock.h>
 64 #include <net/checksum.h>
 65 #include <net/udp.h>
 66 
 67 #include <asm/uaccess.h>
 68 
 69 /* Following value should be > 32k + RPC overhead */
 70 #define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)
 71 
 72 extern spinlock_t rpc_queue_lock;
 73 
 74 /*
 75  * Local variables
 76  */
 77 
 78 /* Spinlock for critical sections in the code. */
 79 spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;
 80 spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
 81 
 82 #ifdef RPC_DEBUG
 83 # undef  RPC_DEBUG_DATA
 84 # define RPCDBG_FACILITY        RPCDBG_XPRT
 85 #endif
 86 
 87 #ifndef MAX
 88 # define MAX(a, b)      ((a) > (b)? (a) : (b))
 89 # define MIN(a, b)      ((a) < (b)? (a) : (b))
 90 #endif
 91 
 92 /*
 93  * Local functions
 94  */
 95 static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 96 static void     do_xprt_transmit(struct rpc_task *);
 97 static void     xprt_reserve_status(struct rpc_task *task);
 98 static void     xprt_disconnect(struct rpc_xprt *);
 99 static void     xprt_reconn_status(struct rpc_task *task);
100 static struct socket *xprt_create_socket(int, struct rpc_timeout *);
101 static int      xprt_bind_socket(struct rpc_xprt *, struct socket *);
102 static void     xprt_remove_pending(struct rpc_xprt *);
103 
104 #ifdef RPC_DEBUG_DATA
105 /*
106  * Print the buffer contents (first 128 bytes only--just enough for
107  * diropres return).
108  */
109 static void
110 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
111 {
112         u8      *buf = (u8 *) packet;
113         int     j;
114 
115         dprintk("RPC:      %s\n", msg);
116         for (j = 0; j < count && j < 128; j += 4) {
117                 if (!(j & 31)) {
118                         if (j)
119                                 dprintk("\n");
120                         dprintk("0x%04x ", j);
121                 }
122                 dprintk("%02x%02x%02x%02x ",
123                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
124         }
125         dprintk("\n");
126 }
127 #else
128 static inline void
129 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
130 {
131         /* NOP */
132 }
133 #endif
134 
135 /*
136  * Look up RPC transport given an INET socket
137  */
138 static inline struct rpc_xprt *
139 xprt_from_sock(struct sock *sk)
140 {
141         return (struct rpc_xprt *) sk->user_data;
142 }
143 
144 /*
145  *      Adjust the iovec to move on 'n' bytes
146  */
147  
148 extern inline void
149 xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
150 {
151         struct iovec *iv=msg->msg_iov;
152         int i;
153         
154         /*
155          *      Eat any sent iovecs
156          */
157         while (iv->iov_len <= amount) {
158                 amount -= iv->iov_len;
159                 iv++;
160                 msg->msg_iovlen--;
161         }
162 
163         /*
164          *      And chew down the partial one
165          */
166         niv[0].iov_len = iv->iov_len-amount;
167         niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
168         iv++;
169 
170         /*
171          *      And copy any others
172          */
173         for(i = 1; i < msg->msg_iovlen; i++)
174                 niv[i]=*iv++;
175 
176         msg->msg_iov=niv;
177 }
178 
179 /*
180  * Write data to socket.
181  */
182 static inline int
183 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
184 {
185         struct socket   *sock = xprt->sock;
186         struct msghdr   msg;
187         mm_segment_t    oldfs;
188         int             result;
189         int             slen = req->rq_slen - req->rq_bytes_sent;
190         struct iovec    niv[MAX_IOVEC];
191 
192         if (slen <= 0)
193                 return 0;
194 
195         if (!sock)
196                 return -ENOTCONN;
197 
198         xprt_pktdump("packet data:",
199                                 req->rq_svec->iov_base,
200                                 req->rq_svec->iov_len);
201 
202         msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
203         msg.msg_iov     = req->rq_svec;
204         msg.msg_iovlen  = req->rq_snr;
205         msg.msg_name    = (struct sockaddr *) &xprt->addr;
206         msg.msg_namelen = sizeof(xprt->addr);
207         msg.msg_control = NULL;
208         msg.msg_controllen = 0;
209 
210         /* Dont repeat bytes */
211         if (req->rq_bytes_sent)
212                 xprt_move_iov(&msg, niv, req->rq_bytes_sent);
213 
214         oldfs = get_fs(); set_fs(get_ds());
215         result = sock_sendmsg(sock, &msg, slen);
216         set_fs(oldfs);
217 
218         dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
219 
220         if (result >= 0)
221                 return result;
222 
223         switch (result) {
224         case -ECONNREFUSED:
225                 /* When the server has died, an ICMP port unreachable message
226                  * prompts ECONNREFUSED.
227                  */
228                 break;
229         case -EAGAIN:
230                 if (test_bit(SOCK_NOSPACE, &sock->flags))
231                         result = -ENOMEM;
232                 break;
233         case -ENOTCONN:
234         case -EPIPE:
235                 /* connection broken */
236                 if (xprt->stream)
237                         result = -ENOTCONN;
238                 break;
239         default:
240                 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
241         }
242         return result;
243 }
244 
245 /*
246  * Read data from socket
247  */
248 static int
249 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
250 {
251         struct socket   *sock = xprt->sock;
252         struct msghdr   msg;
253         mm_segment_t    oldfs;
254         struct iovec    niv[MAX_IOVEC];
255         int             result;
256 
257         if (!sock)
258                 return -ENOTCONN;
259 
260         msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
261         msg.msg_iov     = iov;
262         msg.msg_iovlen  = nr;
263         msg.msg_name    = NULL;
264         msg.msg_namelen = 0;
265         msg.msg_control = NULL;
266         msg.msg_controllen = 0;
267 
268         /* Adjust the iovec if we've already filled it */
269         if (shift)
270                 xprt_move_iov(&msg, niv, shift);
271 
272         oldfs = get_fs(); set_fs(get_ds());
273         result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
274         set_fs(oldfs);
275 
276         dprintk("RPC:      xprt_recvmsg(iov %p, len %d) = %d\n",
277                                                 iov, len, result);
278         return result;
279 }
280 
281 
282 /*
283  * Adjust RPC congestion window
284  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
285  */
286 static void
287 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
288 {
289         unsigned long   cwnd;
290 
291         if (xprt->nocong)
292                 return;
293         spin_lock_bh(&xprt_sock_lock);
294         cwnd = xprt->cwnd;
295         if (result >= 0) {
296                 if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
297                         goto out;
298                 /* The (cwnd >> 1) term makes sure
299                  * the result gets rounded properly. */
300                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
301                 if (cwnd > RPC_MAXCWND)
302                         cwnd = RPC_MAXCWND;
303                 else
304                         pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
305                 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
306                 dprintk("RPC:      cong %08lx, cwnd was %08lx, now %08lx, "
307                         "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
308                         (xprt->congtime-jiffies)*1000/HZ);
309         } else if (result == -ETIMEDOUT) {
310                 if ((cwnd >>= 1) < RPC_CWNDSCALE)
311                         cwnd = RPC_CWNDSCALE;
312                 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
313                 dprintk("RPC:      cong %ld, cwnd was %ld, now %ld, "
314                         "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
315                         (xprt->congtime-jiffies)*1000/HZ);
316                 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
317         }
318 
319         xprt->cwnd = cwnd;
320  out:
321         spin_unlock_bh(&xprt_sock_lock);
322 }
323 
324 /*
325  * Adjust timeout values etc for next retransmit
326  */
327 int
328 xprt_adjust_timeout(struct rpc_timeout *to)
329 {
330         if (to->to_retries > 0) {
331                 if (to->to_exponential)
332                         to->to_current <<= 1;
333                 else
334                         to->to_current += to->to_increment;
335                 if (to->to_maxval && to->to_current >= to->to_maxval)
336                         to->to_current = to->to_maxval;
337         } else {
338                 if (to->to_exponential)
339                         to->to_initval <<= 1;
340                 else
341                         to->to_initval += to->to_increment;
342                 if (to->to_maxval && to->to_initval >= to->to_maxval)
343                         to->to_initval = to->to_maxval;
344                 to->to_current = to->to_initval;
345         }
346 
347         if (!to->to_current) {
348                 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
349                 to->to_current = 5 * HZ;
350         }
351         pprintk("RPC: %lu %s\n", jiffies,
352                         to->to_retries? "retrans" : "timeout");
353         return to->to_retries-- > 0;
354 }
355 
356 /*
357  * Close down a transport socket
358  */
359 static void
360 xprt_close(struct rpc_xprt *xprt)
361 {
362         struct socket   *sock = xprt->sock;
363         struct sock     *sk = xprt->inet;
364 
365         if (!sk)
366                 return;
367 
368         xprt->inet = NULL;
369         xprt->sock = NULL;
370 
371         sk->user_data    = NULL;
372         sk->data_ready   = xprt->old_data_ready;
373         sk->state_change = xprt->old_state_change;
374         sk->write_space  = xprt->old_write_space;
375 
376         xprt_disconnect(xprt);
377         sk->no_check     = 0;
378 
379         sock_release(sock);
380         /*
381          *      TCP doesnt require the rpciod now - other things may
382          *      but rpciod handles that not us.
383          */
384         if(xprt->stream)
385                 rpciod_down();
386 }
387 
388 /*
389  * Mark a transport as disconnected
390  */
391 static void
392 xprt_disconnect(struct rpc_xprt *xprt)
393 {
394         dprintk("RPC:      disconnected transport %p\n", xprt);
395         xprt_clear_connected(xprt);
396         xprt_remove_pending(xprt);
397         rpc_wake_up_status(&xprt->pending, -ENOTCONN);
398 }
399 
400 /*
401  * Reconnect a broken TCP connection.
402  */
403 void
404 xprt_reconnect(struct rpc_task *task)
405 {
406         struct rpc_xprt *xprt = task->tk_xprt;
407         struct socket   *sock = xprt->sock;
408         struct sock     *inet = xprt->inet;
409         int             status;
410 
411         dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
412                                 task->tk_pid, xprt, xprt_connected(xprt));
413         if (xprt->shutdown)
414                 return;
415 
416         if (!xprt->stream)
417                 return;
418 
419         if (!xprt->addr.sin_port) {
420                 task->tk_status = -EIO;
421                 return;
422         }
423 
424         spin_lock(&xprt_lock);
425         if (xprt->connecting) {
426                 task->tk_timeout = 0;
427                 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
428                 spin_unlock(&xprt_lock);
429                 return;
430         }
431         xprt->connecting = 1;
432         spin_unlock(&xprt_lock);
433 
434         status = -ENOTCONN;
435         if (!inet) {
436                 /* Create an unconnected socket */
437                 if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
438                         goto defer;
439                 xprt_bind_socket(xprt, sock);
440                 inet = sock->sk;
441         }
442 
443         xprt_disconnect(xprt);
444 
445         /* Reset TCP record info */
446         xprt->tcp_offset = 0;
447         xprt->tcp_copied = 0;
448         xprt->tcp_more = 0;
449 
450         /* Now connect it asynchronously. */
451         dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
452         status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
453                                 sizeof(xprt->addr), O_NONBLOCK);
454 
455         if (status < 0) {
456                 switch (status) {
457                 case -EALREADY:
458                 case -EINPROGRESS:
459                         status = 0;
460                         break;
461                 case -EISCONN:
462                 case -EPIPE:
463                         status = 0;
464                         xprt_close(xprt);
465                         goto defer;
466                 default:
467                         printk("RPC: TCP connect error %d!\n", -status);
468                         xprt_close(xprt);
469                         goto defer;
470                 }
471 
472                 dprintk("RPC: %4d connect status %d connected %d\n",
473                                 task->tk_pid, status, xprt_connected(xprt));
474 
475                 spin_lock_bh(&xprt_sock_lock);
476                 if (!xprt_connected(xprt)) {
477                         task->tk_timeout = xprt->timeout.to_maxval;
478                         rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);
479                         spin_unlock_bh(&xprt_sock_lock);
480                         return;
481                 }
482                 spin_unlock_bh(&xprt_sock_lock);
483         }
484 defer:
485         spin_lock(&xprt_lock);
486         xprt->connecting = 0;
487         if (status < 0) {
488                 rpc_delay(task, 5*HZ);
489                 task->tk_status = -ENOTCONN;
490         }
491         rpc_wake_up(&xprt->reconn);
492         spin_unlock(&xprt_lock);
493 }
494 
495 /*
496  * Reconnect timeout. We just mark the transport as not being in the
497  * process of reconnecting, and leave the rest to the upper layers.
498  */
499 static void
500 xprt_reconn_status(struct rpc_task *task)
501 {
502         struct rpc_xprt *xprt = task->tk_xprt;
503 
504         dprintk("RPC: %4d xprt_reconn_timeout %d\n",
505                                 task->tk_pid, task->tk_status);
506 
507         spin_lock(&xprt_lock);
508         xprt->connecting = 0;
509         rpc_wake_up(&xprt->reconn);
510         spin_unlock(&xprt_lock);
511 }
512 
513 /*
514  * Look up the RPC request corresponding to a reply, and then lock it.
515  */
516 static inline struct rpc_rqst *
517 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
518 {
519         struct rpc_task *head, *task;
520         struct rpc_rqst *req;
521         int             safe = 0;
522 
523         spin_lock_bh(&rpc_queue_lock);
524         if ((head = xprt->pending.task) != NULL) {
525                 task = head;
526                 do {
527                         if ((req = task->tk_rqstp) && req->rq_xid == xid)
528                                 goto out;
529                         task = task->tk_next;
530                         if (++safe > 100) {
531                                 printk("xprt_lookup_rqst: loop in Q!\n");
532                                 goto out_bad;
533                         }
534                 } while (task != head);
535         }
536         dprintk("RPC:      unknown XID %08x in reply.\n", xid);
537  out_bad:
538         req = NULL;
539  out:
540         if (req && !__rpc_lock_task(req->rq_task))
541                 req = NULL;
542         spin_unlock_bh(&rpc_queue_lock);
543         return req;
544 }
545 
546 /*
547  * Complete reply received.
548  * The TCP code relies on us to remove the request from xprt->pending.
549  */
550 static inline void
551 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
552 {
553         struct rpc_task *task = req->rq_task;
554 
555         /* Adjust congestion window */
556         xprt_adjust_cwnd(xprt, copied);
557 
558 #ifdef RPC_PROFILE
559         /* Profile only reads for now */
560         if (copied > 1024) {
561                 static unsigned long    nextstat = 0;
562                 static unsigned long    pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
563 
564                 pkt_cnt++;
565                 pkt_len += req->rq_slen + copied;
566                 pkt_rtt += jiffies - req->rq_xtime;
567                 if (time_before(nextstat, jiffies)) {
568                         printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
569                         printk("RPC: %ld %ld %ld %ld stat\n",
570                                         jiffies, pkt_cnt, pkt_len, pkt_rtt);
571                         pkt_rtt = pkt_len = pkt_cnt = 0;
572                         nextstat = jiffies + 5 * HZ;
573                 }
574         }
575 #endif
576 
577         dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
578         task->tk_status = copied;
579         req->rq_received = 1;
580 
581         /* ... and wake up the process. */
582         rpc_wake_up_task(task);
583         return;
584 }
585 
586 /*
587  * We have set things up such that we perform the checksum of the UDP
588  * packet in parallel with the copies into the RPC client iovec.  -DaveM
589  */
590 static int csum_partial_copy_to_page_cache(struct iovec *iov,
591                                            struct sk_buff *skb,
592                                            int copied)
593 {
594         __u8 *pkt_data = skb->h.raw + sizeof(struct udphdr);
595         __u8 *cur_ptr = iov->iov_base;
596         __kernel_size_t cur_len = iov->iov_len;
597         unsigned int csum = skb->csum;
598         int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
599         int slack = skb->len - copied - sizeof(struct udphdr);
600 
601         if (need_csum)
602                 csum = csum_partial(skb->h.raw, sizeof(struct udphdr), csum);
603         while (copied > 0) {
604                 if (cur_len) {
605                         int to_move = cur_len;
606                         if (to_move > copied)
607                                 to_move = copied;
608                         if (need_csum)
609                                 csum = csum_partial_copy_nocheck(pkt_data, cur_ptr,
610                                                                  to_move, csum);
611                         else
612                                 memcpy(cur_ptr, pkt_data, to_move);
613                         pkt_data += to_move;
614                         copied -= to_move;
615                         cur_ptr += to_move;
616                         cur_len -= to_move;
617                 }
618                 if (cur_len <= 0) {
619                         iov++;
620                         cur_len = iov->iov_len;
621                         cur_ptr = iov->iov_base;
622                 }
623         }
624         if (need_csum) {
625                 if (slack > 0)
626                         csum = csum_partial(pkt_data, slack, csum);
627                 if ((unsigned short)csum_fold(csum))
628                         return -1;
629         }
630         return 0;
631 }
632 
633 /*
634  * Input handler for RPC replies. Called from a bottom half and hence
635  * atomic.
636  */
637 static void
638 udp_data_ready(struct sock *sk, int len)
639 {
640         struct rpc_task *task;
641         struct rpc_xprt *xprt;
642         struct rpc_rqst *rovr;
643         struct sk_buff  *skb;
644         int             err, repsize, copied;
645 
646         dprintk("RPC:      udp_data_ready...\n");
647         if (!(xprt = xprt_from_sock(sk))) {
648                 printk("RPC:      udp_data_ready request not found!\n");
649                 goto out;
650         }
651 
652         dprintk("RPC:      udp_data_ready client %p\n", xprt);
653 
654         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
655                 goto out;
656 
657         if (xprt->shutdown)
658                 goto dropit;
659 
660         repsize = skb->len - sizeof(struct udphdr);
661         if (repsize < 4) {
662                 printk("RPC: impossible RPC reply size %d!\n", repsize);
663                 goto dropit;
664         }
665 
666         /* Look up and lock the request corresponding to the given XID */
667         rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
668         if (!rovr)
669                 goto dropit;
670         task = rovr->rq_task;
671 
672         dprintk("RPC: %4d received reply\n", task->tk_pid);
673         xprt_pktdump("packet data:",
674                      (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
675 
676         if ((copied = rovr->rq_rlen) > repsize)
677                 copied = repsize;
678 
679         /* Suck it into the iovec, verify checksum if not done by hw. */
680         if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
681                 goto out_unlock;
682 
683         /* Something worked... */
684         dst_confirm(skb->dst);
685 
686         xprt_complete_rqst(xprt, rovr, copied);
687 
688  out_unlock:
689         rpc_unlock_task(task);
690 
691  dropit:
692         skb_free_datagram(sk, skb);
693  out:
694         if (sk->sleep && waitqueue_active(sk->sleep))
695                 wake_up_interruptible(sk->sleep);
696 }
697 
698 /*
699  * TCP read fragment marker
700  */
701 static inline int
702 tcp_read_fraghdr(struct rpc_xprt *xprt)
703 {
704         struct iovec    riov;
705         int             want, result;
706 
707         if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {
708                 xprt->tcp_offset = 0;
709                 xprt->tcp_reclen = 0;
710         }
711         if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
712                 goto done;
713 
714         want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
715         dprintk("RPC:      reading header (%d bytes)\n", want);
716         do {
717                 riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
718                 riov.iov_len  = want;
719                 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
720                 if (result < 0)
721                         return result;
722                 xprt->tcp_offset += result;
723                 want -= result;
724         } while (want);
725 
726         /* Is this another fragment in the last message */
727         if (!xprt->tcp_more)
728                 xprt->tcp_copied = 0; /* No, so we're reading a new message */
729 
730         /* Get the record length and mask out the last fragment bit */
731         xprt->tcp_reclen = ntohl(xprt->tcp_recm);
732         xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
733         xprt->tcp_reclen &= 0x7fffffff;
734 
735         dprintk("RPC:      New record reclen %d morefrags %d\n",
736                                    xprt->tcp_reclen, xprt->tcp_more);
737  done:
738         return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
739 }
740 
741 /*
742  * TCP read xid
743  */
744 static inline int
745 tcp_read_xid(struct rpc_xprt *xprt, int avail)
746 {
747         struct iovec    riov;
748         int             want, result;
749 
750         if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
751                 goto done;
752         want = MIN(sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
753         do {
754                 dprintk("RPC:      reading xid (%d bytes)\n", want);
755                 riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
756                 riov.iov_len  = want;
757                 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
758                 if (result < 0)
759                         return result;
760                 xprt->tcp_copied += result;
761                 xprt->tcp_offset += result;
762                 want  -= result;
763                 avail -= result;
764         } while (want);
765  done:
766         return avail;
767 }
768 
769 /*
770  * TCP read and complete request
771  */
772 static inline int
773 tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
774 {
775         int     want, result;
776 
777         if (req->rq_rlen <= xprt->tcp_copied || !avail)
778                 goto done;
779         want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
780         do {
781                 dprintk("RPC: %4d TCP receiving %d bytes\n",
782                         req->rq_task->tk_pid, want);
783 
784                 result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
785                 if (result < 0)
786                         return result;
787                 xprt->tcp_copied += result;
788                 xprt->tcp_offset += result;
789                 avail  -= result;
790                 want   -= result;
791         } while (want);
792 
793  done:
794         if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
795                 return avail;
796         dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
797         xprt_complete_rqst(xprt, req, xprt->tcp_copied);
798 
799         return avail;
800 }
801 
802 /*
803  * TCP discard extra bytes from a short read
804  */
805 static inline int
806 tcp_read_discard(struct rpc_xprt *xprt, int avail)
807 {
808         struct iovec    riov;
809         static u8       dummy[64];
810         int             want, result = 0;
811 
812         while (avail) {
813                 want = MIN(avail, sizeof(dummy));
814                 riov.iov_base = dummy;
815                 riov.iov_len  = want;
816                 dprintk("RPC:      TCP skipping %d bytes\n", want);
817                 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
818                 if (result < 0)
819                         return result;
820                 xprt->tcp_offset += result;
821                 avail  -= result;
822         }
823         return avail;
824 }
825 
826 /*
827  * TCP record receive routine
828  * This is not the most efficient code since we call recvfrom thrice--
829  * first receiving the record marker, then the XID, then the data.
830  * 
831  * The optimal solution would be a RPC support in the TCP layer, which
832  * would gather all data up to the next record marker and then pass us
833  * the list of all TCP segments ready to be copied.
834  */
835 static int
836 tcp_input_record(struct rpc_xprt *xprt)
837 {
838         struct rpc_rqst *req = NULL;
839         struct rpc_task *task = NULL;
840         int             avail, result;
841 
842         dprintk("RPC:      tcp_input_record\n");
843 
844         if (xprt->shutdown)
845                 return -EIO;
846         if (!xprt_connected(xprt))
847                 return -ENOTCONN;
848 
849         /* Read in a new fragment marker if necessary */
850         /* Can we ever really expect to get completely empty fragments? */
851         if ((result = tcp_read_fraghdr(xprt)) <= 0)
852                 return result;
853         avail = result;
854 
855         /* Read in the xid if necessary */
856         if ((result = tcp_read_xid(xprt, avail)) <= 0)
857                 return result;
858         avail = result;
859 
860         /* Find and lock the request corresponding to this xid */
861         req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
862         if (req) {
863                 task = req->rq_task;
864                 /* Read in the request data */
865                 result = tcp_read_request(xprt,  req, avail);
866                 rpc_unlock_task(task);
867                 if (result < 0)
868                         return result;
869                 avail = result;
870         }
871 
872         /* Skip over any trailing bytes on short reads */
873         if ((result = tcp_read_discard(xprt, avail)) < 0)
874                 return result;
875 
876         dprintk("RPC:      tcp_input_record done (off %d reclen %d copied %d)\n",
877                         xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
878         result = xprt->tcp_reclen;
879         return result;
880 }
881 
882 /*
883  *      TCP task queue stuff
884  */
885 LIST_HEAD(rpc_xprt_pending);    /* List of xprts having pending tcp requests */
886 
887 static inline
888 void tcp_rpciod_queue(void)
889 {
890         rpciod_wake_up();
891 }
892 
893 static inline
894 void xprt_append_pending(struct rpc_xprt *xprt)
895 {
896         if (!list_empty(&xprt->rx_pending))
897                 return;
898         spin_lock_bh(&rpc_queue_lock);
899         if (list_empty(&xprt->rx_pending)) {
900                 list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
901                 dprintk("RPC:     xprt queue %p\n", xprt);
902                 tcp_rpciod_queue();
903         }
904         spin_unlock_bh(&rpc_queue_lock);
905 }
906 
907 static
908 void xprt_remove_pending(struct rpc_xprt *xprt)
909 {
910         spin_lock_bh(&rpc_queue_lock);
911         if (!list_empty(&xprt->rx_pending)) {
912                 list_del(&xprt->rx_pending);
913                 INIT_LIST_HEAD(&xprt->rx_pending);
914         }
915         spin_unlock_bh(&rpc_queue_lock);
916 }
917 
918 static inline
919 struct rpc_xprt *xprt_remove_pending_next(void)
920 {
921         struct rpc_xprt *xprt = NULL;
922 
923         spin_lock_bh(&rpc_queue_lock);
924         if (!list_empty(&rpc_xprt_pending)) {
925                 xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
926                 list_del(&xprt->rx_pending);
927                 INIT_LIST_HEAD(&xprt->rx_pending);
928         }
929         spin_unlock_bh(&rpc_queue_lock);
930         return xprt;
931 }
932 
933 /*
934  *      This is protected from tcp_data_ready and the stack as its run
935  *      inside of the RPC I/O daemon
936  */
937 void
938 __rpciod_tcp_dispatcher(void)
939 {
940         struct rpc_xprt *xprt;
941         int safe_retry = 0, result;
942 
943         dprintk("rpciod_tcp_dispatcher: Queue Running\n");
944 
945         /*
946          *      Empty each pending socket
947          */
948         while ((xprt = xprt_remove_pending_next()) != NULL) {
949                 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
950 
951                 do {
952                         result = tcp_input_record(xprt);
953                 } while (result >= 0);
954 
955                 if (safe_retry++ > 200) {
956                         schedule();
957                         safe_retry = 0;
958                 }
959         }
960 }
961 
962 /*
963  *      data_ready callback for TCP. We can't just jump into the
964  *      tcp recvmsg functions inside of the network receive bh or
965  *      bad things occur. We queue it to pick up after networking
966  *      is done.
967  */
968  
969 static void tcp_data_ready(struct sock *sk, int len)
970 {
971         struct rpc_xprt *xprt;
972 
973         dprintk("RPC:      tcp_data_ready...\n");
974         if (!(xprt = xprt_from_sock(sk)))
975         {
976                 printk("Not a socket with xprt %p\n", sk);
977                 goto out;
978         }
979 
980         if (xprt->shutdown)
981                 goto out;
982 
983         xprt_append_pending(xprt);
984 
985         dprintk("RPC:      tcp_data_ready client %p\n", xprt);
986         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
987                                 sk->state, xprt_connected(xprt),
988                                 sk->dead, sk->zapped);
989  out:
990         if (sk->sleep && waitqueue_active(sk->sleep))
991                 wake_up_interruptible(sk->sleep);
992 }
993 
994 
995 static void
996 tcp_state_change(struct sock *sk)
997 {
998         struct rpc_xprt *xprt;
999 
1000         if (!(xprt = xprt_from_sock(sk)))
1001                 goto out;
1002         dprintk("RPC:      tcp_state_change client %p...\n", xprt);
1003         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
1004                                 sk->state, xprt_connected(xprt),
1005                                 sk->dead, sk->zapped);
1006 
1007         switch (sk->state) {
1008         case TCP_ESTABLISHED:
1009                 if (xprt_test_and_set_connected(xprt))
1010                         break;
1011                 spin_lock_bh(&xprt_sock_lock);
1012                 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1013                         rpc_wake_up_task(xprt->snd_task);
1014                 rpc_wake_up(&xprt->reconn);
1015                 spin_unlock_bh(&xprt_sock_lock);
1016                 break;
1017         case TCP_SYN_SENT:
1018         case TCP_SYN_RECV:
1019                 break;
1020         default:
1021                 xprt_disconnect(xprt);
1022                 break;
1023         }
1024  out:
1025         if (sk->sleep && waitqueue_active(sk->sleep))
1026                 wake_up_interruptible_all(sk->sleep);
1027 }
1028 
1029 /*
1030  * The following 2 routines allow a task to sleep while socket memory is
1031  * low.
1032  */
1033 static void
1034 tcp_write_space(struct sock *sk)
1035 {
1036         struct rpc_xprt *xprt;
1037         struct socket   *sock;
1038 
1039         if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1040                 return;
1041         if (xprt->shutdown)
1042                 return;
1043 
1044         /* Wait until we have enough socket memory */
1045         if (!sock_writeable(sk))
1046                 return;
1047 
1048         if (!xprt_test_and_set_wspace(xprt)) {
1049                 spin_lock_bh(&xprt_sock_lock);
1050                 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1051                         rpc_wake_up_task(xprt->snd_task);
1052                 spin_unlock_bh(&xprt_sock_lock);
1053         }
1054 
1055         if (test_bit(SOCK_NOSPACE, &sock->flags)) {
1056                 if (sk->sleep && waitqueue_active(sk->sleep)) {
1057                         clear_bit(SOCK_NOSPACE, &sock->flags);
1058                         wake_up_interruptible(sk->sleep);
1059                 }
1060         }
1061 }
1062 
1063 static void
1064 udp_write_space(struct sock *sk)
1065 {
1066         struct rpc_xprt *xprt;
1067 
1068         if (!(xprt = xprt_from_sock(sk)))
1069                 return;
1070         if (xprt->shutdown)
1071                 return;
1072 
1073 
1074         /* Wait until we have enough socket memory */
1075         if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
1076                 return;
1077 
1078         if (!xprt_test_and_set_wspace(xprt)) {
1079                 spin_lock_bh(&xprt_sock_lock);
1080                 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1081                         rpc_wake_up_task(xprt->snd_task);
1082                 spin_unlock_bh(&xprt_sock_lock);
1083         }
1084 
1085         if (sk->sleep && waitqueue_active(sk->sleep))
1086                 wake_up_interruptible(sk->sleep);
1087 }
1088 
1089 /*
1090  * RPC receive timeout handler.
1091  */
1092 static void
1093 xprt_timer(struct rpc_task *task)
1094 {
1095         struct rpc_rqst *req = task->tk_rqstp;
1096 
1097         if (req)
1098                 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
1099 
1100         dprintk("RPC: %4d xprt_timer (%s request)\n",
1101                 task->tk_pid, req ? "pending" : "backlogged");
1102 
1103         task->tk_status  = -ETIMEDOUT;
1104         task->tk_timeout = 0;
1105         rpc_wake_up_task(task);
1106 }
1107 
1108 
1109 /*
1110  * Serialize access to sockets, in order to prevent different
1111  * requests from interfering with each other.
1112  */
1113 static int
1114 xprt_down_transmit(struct rpc_task *task)
1115 {
1116         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1117         struct rpc_rqst *req = task->tk_rqstp;
1118 
1119         spin_lock(&xprt_lock);
1120         if (xprt->snd_task && xprt->snd_task != task) {
1121                 dprintk("RPC: %4d TCP write queue full (task %d)\n",
1122                         task->tk_pid, xprt->snd_task->tk_pid);
1123                 task->tk_timeout = 0;
1124                 task->tk_status = -EAGAIN;
1125                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1126         } else if (!xprt->snd_task) {
1127                 xprt->snd_task = task;
1128 #ifdef RPC_PROFILE
1129                 req->rq_xtime = jiffies;
1130 #endif
1131                 req->rq_bytes_sent = 0;
1132         }
1133         spin_unlock(&xprt_lock);
1134         return xprt->snd_task == task;
1135 }
1136 
1137 /*
1138  * Releases the socket for use by other requests.
1139  */
1140 static inline void
1141 xprt_up_transmit(struct rpc_task *task)
1142 {
1143         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1144 
1145         if (xprt->snd_task && xprt->snd_task == task) {
1146                 spin_lock(&xprt_lock);
1147                 xprt->snd_task = NULL;
1148                 rpc_wake_up_next(&xprt->sending);
1149                 spin_unlock(&xprt_lock);
1150         }
1151 }
1152 
1153 /*
1154  * Place the actual RPC call.
1155  * We have to copy the iovec because sendmsg fiddles with its contents.
1156  */
1157 void
1158 xprt_transmit(struct rpc_task *task)
1159 {
1160         struct rpc_rqst *req = task->tk_rqstp;
1161         struct rpc_xprt *xprt = req->rq_xprt;
1162 
1163         dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, 
1164                                 *(u32 *)(req->rq_svec[0].iov_base));
1165 
1166         if (xprt->shutdown)
1167                 task->tk_status = -EIO;
1168 
1169         if (!xprt_connected(xprt))
1170                 task->tk_status = -ENOTCONN;
1171 
1172         if (task->tk_status < 0)
1173                 return;
1174 
1175         if (task->tk_rpcwait)
1176                 rpc_remove_wait_queue(task);
1177 
1178         /* set up everything as needed. */
1179         /* Write the record marker */
1180         if (xprt->stream) {
1181                 u32     *marker = req->rq_svec[0].iov_base;
1182 
1183                 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1184         }
1185 
1186         if (!xprt_down_transmit(task))
1187                 return;
1188 
1189         do_xprt_transmit(task);
1190 }
1191 
1192 static void
1193 do_xprt_transmit(struct rpc_task *task)
1194 {
1195         struct rpc_rqst *req = task->tk_rqstp;
1196         struct rpc_xprt *xprt = req->rq_xprt;
1197         int status, retry = 0;
1198 
1199 
1200         /* For fast networks/servers we have to put the request on
1201          * the pending list now:
1202          * Note that we don't want the task timing out during the
1203          * call to xprt_sendmsg(), so we initially disable the timeout,
1204          * and then reset it later...
1205          */
1206         xprt_receive(task);
1207 
1208         /* Continue transmitting the packet/record. We must be careful
1209          * to cope with writespace callbacks arriving _after_ we have
1210          * called xprt_sendmsg().
1211          */
1212         while (1) {
1213                 xprt_clear_wspace(xprt);
1214                 status = xprt_sendmsg(xprt, req);
1215 
1216                 if (status < 0)
1217                         break;
1218 
1219                 if (xprt->stream) {
1220                         req->rq_bytes_sent += status;
1221 
1222                         if (req->rq_bytes_sent >= req->rq_slen)
1223                                 goto out_receive;
1224                 } else {
1225                         if (status >= req->rq_slen)
1226                                 goto out_receive;
1227                         status = -ENOMEM;
1228                         break;
1229                 }
1230 
1231                 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1232                                 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1233                                 req->rq_slen);
1234 
1235                 status = -EAGAIN;
1236                 if (retry++ > 50)
1237                         break;
1238         }
1239         rpc_unlock_task(task);
1240 
1241         /* Note: at this point, task->tk_sleeping has not yet been set,
1242          *       hence there is no danger of the waking up task being put on
1243          *       schedq, and being picked up by a parallel run of rpciod().
1244          */
1245         rpc_wake_up_task(task);
1246         if (!RPC_IS_RUNNING(task))
1247                 goto out_release;
1248         if (req->rq_received)
1249                 goto out_release;
1250 
1251         task->tk_status = status;
1252 
1253         switch (status) {
1254         case -ENOMEM:
1255                 /* Protect against (udp|tcp)_write_space */
1256                 spin_lock_bh(&xprt_sock_lock);
1257                 if (!xprt_wspace(xprt)) {
1258                         task->tk_timeout = req->rq_timeout.to_current;
1259                         rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1260                 }
1261                 spin_unlock_bh(&xprt_sock_lock);
1262                 return;
1263         case -EAGAIN:
1264                 /* Keep holding the socket if it is blocked */
1265                 rpc_delay(task, HZ>>4);
1266                 return;
1267         case -ECONNREFUSED:
1268         case -ENOTCONN:
1269                 if (!xprt->stream)
1270                         return;
1271         default:
1272                 goto out_release;
1273         }
1274 
1275  out_receive:
1276         dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1277         /* Set the task's receive timeout value */
1278         task->tk_timeout = req->rq_timeout.to_current;
1279         rpc_add_timer(task, xprt_timer);
1280         rpc_unlock_task(task);
1281  out_release:
1282         xprt_up_transmit(task);
1283 }
1284 
1285 /*
1286  * Queue the task for a reply to our call.
1287  * When the callback is invoked, the congestion window should have
1288  * been updated already.
1289  */
1290 void
1291 xprt_receive(struct rpc_task *task)
1292 {
1293         struct rpc_rqst *req = task->tk_rqstp;
1294         struct rpc_xprt *xprt = req->rq_xprt;
1295 
1296         dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1297 
1298         req->rq_received = 0;
1299         task->tk_timeout = 0;
1300         rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
1301 }
1302 
1303 /*
1304  * Reserve an RPC call slot.
1305  */
1306 int
1307 xprt_reserve(struct rpc_task *task)
1308 {
1309         struct rpc_xprt *xprt = task->tk_xprt;
1310 
1311         /* We already have an initialized request. */
1312         if (task->tk_rqstp)
1313                 return 0;
1314 
1315         dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1316                                 task->tk_pid, xprt->cong, xprt->cwnd);
1317         spin_lock_bh(&xprt_sock_lock);
1318         xprt_reserve_status(task);
1319         if (task->tk_rqstp) {
1320                 task->tk_timeout = 0;
1321         } else if (!task->tk_timeout) {
1322                 task->tk_status = -ENOBUFS;
1323         } else {
1324                 dprintk("RPC:      xprt_reserve waiting on backlog\n");
1325                 task->tk_status = -EAGAIN;
1326                 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1327         }
1328         spin_unlock_bh(&xprt_sock_lock);
1329         dprintk("RPC: %4d xprt_reserve returns %d\n",
1330                                 task->tk_pid, task->tk_status);
1331         return task->tk_status;
1332 }
1333 
1334 /*
1335  * Reservation callback
1336  */
1337 static void
1338 xprt_reserve_status(struct rpc_task *task)
1339 {
1340         struct rpc_xprt *xprt = task->tk_xprt;
1341         struct rpc_rqst *req;
1342 
1343         if (xprt->shutdown) {
1344                 task->tk_status = -EIO;
1345         } else if (task->tk_status < 0) {
1346                 /* NOP */
1347         } else if (task->tk_rqstp) {
1348                 /* We've already been given a request slot: NOP */
1349         } else {
1350                 if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
1351                         goto out_nofree;
1352                 /* OK: There's room for us. Grab a free slot and bump
1353                  * congestion value */
1354                 xprt->free     = req->rq_next;
1355                 req->rq_next   = NULL;
1356                 xprt->cong    += RPC_CWNDSCALE;
1357                 task->tk_rqstp = req;
1358                 xprt_request_init(task, xprt);
1359 
1360                 if (xprt->free)
1361                         xprt_clear_backlog(xprt);
1362         }
1363 
1364         return;
1365 
1366 out_nofree:
1367         task->tk_status = -EAGAIN;
1368 }
1369 
1370 /*
1371  * Initialize RPC request
1372  */
1373 static void
1374 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1375 {
1376         struct rpc_rqst *req = task->tk_rqstp;
1377         static u32      xid = 0;
1378 
1379         if (!xid)
1380                 xid = CURRENT_TIME << 12;
1381 
1382         dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1383         task->tk_status = 0;
1384         req->rq_timeout = xprt->timeout;
1385         req->rq_task    = task;
1386         req->rq_xprt    = xprt;
1387         req->rq_xid     = xid++;
1388         if (!xid)
1389                 xid++;
1390 }
1391 
1392 /*
1393  * Release an RPC call slot
1394  */
1395 void
1396 xprt_release(struct rpc_task *task)
1397 {
1398         struct rpc_xprt *xprt = task->tk_xprt;
1399         struct rpc_rqst *req;
1400 
1401         xprt_up_transmit(task);
1402         if (!(req = task->tk_rqstp))
1403                 return;
1404         task->tk_rqstp = NULL;
1405         memset(req, 0, sizeof(*req));   /* mark unused */
1406 
1407         dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1408 
1409         /* remove slot from queue of pending */
1410         if (task->tk_rpcwait) {
1411                 printk("RPC: task of released request still queued!\n");
1412                 rpc_remove_wait_queue(task);
1413         }
1414 
1415         spin_lock_bh(&xprt_sock_lock);
1416         req->rq_next = xprt->free;
1417         xprt->free   = req;
1418 
1419         /* Decrease congestion value. */
1420         xprt->cong -= RPC_CWNDSCALE;
1421 
1422         xprt_clear_backlog(xprt);
1423         spin_unlock_bh(&xprt_sock_lock);
1424 }
1425 
1426 /*
1427  * Set default timeout parameters
1428  */
1429 void
1430 xprt_default_timeout(struct rpc_timeout *to, int proto)
1431 {
1432         if (proto == IPPROTO_UDP)
1433                 xprt_set_timeout(to, 5,  5 * HZ);
1434         else
1435                 xprt_set_timeout(to, 5, 60 * HZ);
1436 }
1437 
1438 /*
1439  * Set constant timeout
1440  */
1441 void
1442 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1443 {
1444         to->to_current   = 
1445         to->to_initval   = 
1446         to->to_increment = incr;
1447         to->to_maxval    = incr * retr;
1448         to->to_resrvval  = incr * retr;
1449         to->to_retries   = retr;
1450         to->to_exponential = 0;
1451 }
1452 
1453 /*
1454  * Initialize an RPC client
1455  */
1456 static struct rpc_xprt *
1457 xprt_setup(struct socket *sock, int proto,
1458                         struct sockaddr_in *ap, struct rpc_timeout *to)
1459 {
1460         struct rpc_xprt *xprt;
1461         struct rpc_rqst *req;
1462         int             i;
1463 
1464         dprintk("RPC:      setting up %s transport...\n",
1465                                 proto == IPPROTO_UDP? "UDP" : "TCP");
1466 
1467         if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1468                 return NULL;
1469         memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1470 
1471         xprt->addr = *ap;
1472         xprt->prot = proto;
1473         xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1474         if (xprt->stream) {
1475                 xprt->cwnd = RPC_MAXCWND;
1476                 xprt->nocong = 1;
1477         } else
1478                 xprt->cwnd = RPC_INITCWND;
1479         xprt->congtime = jiffies;
1480         init_waitqueue_head(&xprt->cong_wait);
1481 
1482         /* Set timeout parameters */
1483         if (to) {
1484                 xprt->timeout = *to;
1485                 xprt->timeout.to_current = to->to_initval;
1486                 xprt->timeout.to_resrvval = to->to_maxval << 1;
1487         } else
1488                 xprt_default_timeout(&xprt->timeout, xprt->prot);
1489 
1490         xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1491         xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1492         xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1493         xprt->reconn  = RPC_INIT_WAITQ("xprt_reconn");
1494 
1495         /* initialize free list */
1496         for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1497                 req->rq_next = req + 1;
1498         req->rq_next = NULL;
1499         xprt->free = xprt->slot;
1500 
1501         INIT_LIST_HEAD(&xprt->rx_pending);
1502 
1503         dprintk("RPC:      created transport %p\n", xprt);
1504         
1505         xprt_bind_socket(xprt, sock);
1506         return xprt;
1507 }
1508 
1509 /*
1510  * Bind to a reserved port
1511  */
1512 static inline int
1513 xprt_bindresvport(struct socket *sock)
1514 {
1515         struct sockaddr_in myaddr;
1516         int             err, port;
1517 
1518         memset(&myaddr, 0, sizeof(myaddr));
1519         myaddr.sin_family = AF_INET;
1520         port = 800;
1521         do {
1522                 myaddr.sin_port = htons(port);
1523                 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1524                                                 sizeof(myaddr));
1525         } while (err == -EADDRINUSE && --port > 0);
1526 
1527         if (err < 0)
1528                 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1529 
1530         return err;
1531 }
1532 
1533 static int 
1534 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1535 {
1536         struct sock     *sk = sock->sk;
1537 
1538         if (xprt->inet)
1539                 return -EBUSY;
1540 
1541         sk->user_data = xprt;
1542         xprt->old_data_ready = sk->data_ready;
1543         xprt->old_state_change = sk->state_change;
1544         xprt->old_write_space = sk->write_space;
1545         if (xprt->prot == IPPROTO_UDP) {
1546                 sk->data_ready = udp_data_ready;
1547                 sk->write_space = udp_write_space;
1548                 sk->no_check = UDP_CSUM_NORCV;
1549                 xprt_set_connected(xprt);
1550         } else {
1551                 sk->data_ready = tcp_data_ready;
1552                 sk->state_change = tcp_state_change;
1553                 sk->write_space = tcp_write_space;
1554                 xprt_clear_connected(xprt);
1555         }
1556 
1557         /* Reset to new socket */
1558         xprt->sock = sock;
1559         xprt->inet = sk;
1560         /*
1561          *      TCP requires the rpc I/O daemon is present
1562          */
1563         if(xprt->stream)
1564                 rpciod_up();
1565 
1566         return 0;
1567 }
1568 
1569 /*
1570  * Create a client socket given the protocol and peer address.
1571  */
1572 static struct socket *
1573 xprt_create_socket(int proto, struct rpc_timeout *to)
1574 {
1575         struct socket   *sock;
1576         int             type, err;
1577 
1578         dprintk("RPC:      xprt_create_socket(%s %d)\n",
1579                            (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1580 
1581         type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1582 
1583         if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1584                 printk("RPC: can't create socket (%d).\n", -err);
1585                 goto failed;
1586         }
1587 
1588         /* If the caller has the capability, bind to a reserved port */
1589         if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
1590                 goto failed;
1591 
1592         return sock;
1593 
1594 failed:
1595         sock_release(sock);
1596         return NULL;
1597 }
1598 
1599 /*
1600  * Create an RPC client transport given the protocol and peer address.
1601  */
1602 struct rpc_xprt *
1603 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1604 {
1605         struct socket   *sock;
1606         struct rpc_xprt *xprt;
1607 
1608         dprintk("RPC:      xprt_create_proto called\n");
1609 
1610         if (!(sock = xprt_create_socket(proto, to)))
1611                 return NULL;
1612 
1613         if (!(xprt = xprt_setup(sock, proto, sap, to)))
1614                 sock_release(sock);
1615 
1616         return xprt;
1617 }
1618 
1619 /*
1620  * Prepare for transport shutdown.
1621  */
1622 void
1623 xprt_shutdown(struct rpc_xprt *xprt)
1624 {
1625         xprt->shutdown = 1;
1626         rpc_wake_up(&xprt->sending);
1627         rpc_wake_up(&xprt->pending);
1628         rpc_wake_up(&xprt->backlog);
1629         rpc_wake_up(&xprt->reconn);
1630         if (waitqueue_active(&xprt->cong_wait))
1631                 wake_up(&xprt->cong_wait);
1632 }
1633 
1634 /*
1635  * Clear the xprt backlog queue
1636  */
1637 int
1638 xprt_clear_backlog(struct rpc_xprt *xprt) {
1639         if (RPCXPRT_CONGESTED(xprt))
1640                 return 0;
1641         rpc_wake_up_next(&xprt->backlog);
1642         if (waitqueue_active(&xprt->cong_wait))
1643                 wake_up(&xprt->cong_wait);
1644         return 1;
1645 }
1646 
1647 /*
1648  * Destroy an RPC transport, killing off all requests.
1649  */
1650 int
1651 xprt_destroy(struct rpc_xprt *xprt)
1652 {
1653         dprintk("RPC:      destroying transport %p\n", xprt);
1654         xprt_shutdown(xprt);
1655         xprt_close(xprt);
1656         kfree(xprt);
1657 
1658         return 0;
1659 }
1660 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.