Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43
44 inline void oti_init(struct obd_trans_info *oti,
45                            struct ptlrpc_request *req)
46 {
47         if(oti == NULL)
48                 return;
49         memset(oti, 0, sizeof *oti);
50
51         
52         if (req->rq_repmsg && req->rq_reqmsg != 0)
53                 oti->oti_transno = req->rq_repmsg->transno;
54
55         EXIT;
56 }
57
58 inline void oti_to_request(struct obd_trans_info *oti,
59                            struct ptlrpc_request *req)
60 {
61         int i;
62         struct oti_req_ack_lock *ack_lock;
63
64         if(oti == NULL)
65                 return;
66
67         if (req->rq_repmsg)
68                 req->rq_repmsg->transno = oti->oti_transno;
69
70         /* XXX 4 == entries in oti_ack_locks??? */
71         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
72                 if (!ack_lock->mode)
73                         break;
74                 memcpy(&req->rq_ack_locks[i].lock, &ack_lock->lock,
75                        sizeof(req->rq_ack_locks[i].lock));
76                 req->rq_ack_locks[i].mode = ack_lock->mode;
77         }
78         EXIT;
79 }
80
81 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
82 {
83         struct lustre_handle *conn = &req->rq_reqmsg->handle;
84         struct ost_body *body;
85         int rc, size = sizeof(*body);
86         ENTRY;
87
88         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
89                                    lustre_swab_ost_body);
90         if (body == NULL)
91                 RETURN (-EFAULT);
92
93         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
94         if (rc)
95                 RETURN(rc);
96
97         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
98         RETURN(0);
99 }
100
101 static int ost_getattr(struct ptlrpc_request *req)
102 {
103         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
104         struct ost_body *body, *repbody;
105         int rc, size = sizeof(*body);
106         ENTRY;
107
108         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
109                                    lustre_swab_ost_body);
110         if (body == NULL)
111                 RETURN (-EFAULT);
112
113         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
118         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
120         RETURN(0);
121 }
122
123 static int ost_statfs(struct ptlrpc_request *req)
124 {
125         struct obd_statfs *osfs;
126         int rc, size = sizeof(*osfs);
127         ENTRY;
128
129         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
130         if (rc)
131                 RETURN(rc);
132
133         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
134         memset(osfs, 0, size);
135
136         req->rq_status = obd_statfs(req->rq_export, osfs);
137         if (req->rq_status != 0)
138                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
139
140         RETURN(0);
141 }
142
143 static int ost_syncfs(struct ptlrpc_request *req)
144 {
145         struct obd_statfs *osfs;
146         int rc, size = sizeof(*osfs);
147         ENTRY;
148
149         rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
150         if (rc)
151                 RETURN(rc);
152
153         rc = obd_syncfs(req->rq_export);
154         if (rc) {
155                 CERROR("ost: syncfs failed: rc %d\n", rc);
156                 req->rq_status = rc;
157                 RETURN(rc);
158         }
159
160         RETURN(0);
161 }
162
163 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
164 {
165         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
166         struct ost_body *body, *repbody;
167         int rc, size = sizeof(*repbody);
168         ENTRY;
169
170         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
171                                    lustre_swab_ost_body);
172         if (body == NULL)
173                 return (-EFAULT);
174
175         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
176         if (rc)
177                 RETURN(rc);
178
179         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
180         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
181         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
182         RETURN(0);
183 }
184
185 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
186 {
187         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
188         struct ost_body *body, *repbody;
189         int rc, size = sizeof(*repbody);
190         ENTRY;
191
192         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
193                                    lustre_swab_ost_body);
194         if (body == NULL)
195                 RETURN (-EFAULT);
196
197         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
198         if (rc)
199                 RETURN(rc);
200
201         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
202         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
203         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
204         RETURN(0);
205 }
206
207 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
208 {
209         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
210         struct ost_body *body, *repbody;
211         int rc, size = sizeof(*repbody);
212         ENTRY;
213
214         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
215                                    lustre_swab_ost_body);
216         if (body == NULL)
217                 RETURN (-EFAULT);
218
219         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
220         if (rc)
221                 RETURN(rc);
222
223         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
224         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
225         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
226         RETURN(0);
227 }
228
229 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
230 {
231         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
232         struct ost_body *body, *repbody;
233         int rc, size = sizeof(*repbody);
234         ENTRY;
235
236         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
237                                    lustre_swab_ost_body);
238         if (body == NULL)
239                 RETURN (-EFAULT);
240
241         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
242             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
243                 RETURN(-EINVAL);
244
245         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
246         if (rc)
247                 RETURN(rc);
248
249         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
250         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
251         req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
252                                    repbody->oa.o_blocks, oti);
253         RETURN(0);
254 }
255
256 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
257 {
258         struct lustre_handle *conn = &req->rq_reqmsg->handle;
259         struct ost_body *body, *repbody;
260         int rc, size = sizeof(*repbody);
261         ENTRY;
262
263         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
264                                    lustre_swab_ost_body);
265         if (body == NULL)
266                 RETURN (-EFAULT);
267
268         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
269         if (rc)
270                 RETURN(rc);
271
272         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
273         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
274
275         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
276         RETURN(0);
277 }
278
279 static int ost_bulk_timeout(void *data)
280 {
281         ENTRY;
282         /* We don't fail the connection here, because having the export
283          * killed makes the (vital) call to commitrw very sad.
284          */
285         RETURN(1);
286 }
287
288 static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
289                                  struct niobuf_remote *rnb, int nrnb,
290                                  struct niobuf_remote **pp_rnbp)
291 {
292         /* Copy a remote niobuf, splitting it into page-sized chunks
293          * and setting ioo[i].ioo_bufcnt accordingly */
294         struct niobuf_remote *pp_rnb;
295         int   i;
296         int   j;
297         int   page;
298         int   rnbidx = 0;
299         int   npages = 0;
300
301         /* first count and check the number of pages required */
302         for (i = 0; i < nioo; i++)
303                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
304                         obd_off offset = rnb[rnbidx].offset;
305                         obd_off p0 = offset >> PAGE_SHIFT;
306                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
307
308                         LASSERT (rnbidx < nrnb);
309
310                         npages += (pn + 1 - p0);
311
312                         if (rnb[rnbidx].len == 0) {
313                                 CERROR("zero len BRW: obj %d objid "LPX64
314                                        " buf %u\n", i, ioo[i].ioo_id, j);
315                                 return (-EINVAL);
316                         }
317                         if (j > 0 &&
318                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
319                                 CERROR("unordered BRW: obj %d objid "LPX64
320                                        " buf %u offset "LPX64" <= "LPX64"\n",
321                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
322                                        rnb[rnbidx].offset);
323                                 return (-EINVAL);
324                         }
325                 }
326
327         LASSERT (rnbidx == nrnb);
328
329         if (npages == nrnb) {       /* all niobufs are for single pages */
330                 *pp_rnbp = rnb;
331                 return (npages);
332         }
333
334         OBD_ALLOC (pp_rnb, sizeof (*pp_rnb) * npages);
335         if (pp_rnb == NULL)
336                 return (-ENOMEM);
337
338         /* now do the actual split */
339         page = rnbidx = 0;
340         for (i = 0; i < nioo; i++) {
341                 int  obj_pages = 0;
342
343                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
344                         obd_off off = rnb[rnbidx].offset;
345                         int     nob = rnb[rnbidx].len;
346
347                         LASSERT (rnbidx < nrnb);
348                         do {
349                                 obd_off  poff = off & (PAGE_SIZE - 1);
350                                 int      pnob = (poff + nob > PAGE_SIZE) ?
351                                                 PAGE_SIZE - poff : nob;
352
353                                 LASSERT (page < npages);
354                                 pp_rnb[page].len = pnob;
355                                 pp_rnb[page].offset = off;
356                                 pp_rnb[page].flags = rnb->flags;
357
358                                 CDEBUG (D_PAGE, "   obj %d id "LPX64
359                                         "page %d(%d) "LPX64" for %d\n",
360                                         i, ioo[i].ioo_id, obj_pages, page,
361                                         pp_rnb[page].offset, pp_rnb[page].len);
362                                 page++;
363                                 obj_pages++;
364
365                                 off += pnob;
366                                 nob -= pnob;
367                         } while (nob > 0);
368                         LASSERT (nob == 0);
369                 }
370                 ioo[i].ioo_bufcnt = obj_pages;
371         }
372         LASSERT (page == npages);
373
374         *pp_rnbp = pp_rnb;
375         return (npages);
376 }
377
378 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
379                                    struct niobuf_remote *rnb)
380 {
381         if (pp_rnb == rnb)                      /* didn't allocate above */
382                 return;
383
384         OBD_FREE (pp_rnb, sizeof (*pp_rnb) * npages);
385 }
386
387 #if CHECKSUM_BULK
388 __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
389 {
390         __u64             cksum = 0;
391         struct list_head *tmp;
392         char             *ptr;
393
394         list_for_each (tmp, &desc->bd_page_list) {
395                 struct ptlrpc_bulk_page *bp;
396
397                 bp = list_entry (tmp, struct ptlrpc_bulk_page, bp_link);
398                 ptr = kmap (bp->bp_page);
399                 ost_checksum (&cksum, ptr + bp->bp_pageoffset, bp->bp_buflen);
400                 kunmap (bp->bp_page);
401         }
402 }
403 #endif
404
405 static int ost_brw_read(struct ptlrpc_request *req)
406 {
407         struct ptlrpc_bulk_desc *desc;
408         struct niobuf_remote    *remote_nb;
409         struct niobuf_remote    *pp_rnb;
410         struct niobuf_local     *local_nb;
411         struct obd_ioobj        *ioo;
412         struct ost_body         *body;
413         struct l_wait_info       lwi;
414         void                    *desc_priv = NULL;
415         int                      size[1] = { sizeof(*body) };
416         int                      comms_error = 0;
417         int                      niocount;
418         int                      npages;
419         int                      nob = 0;
420         int                      rc;
421         int                      i;
422         ENTRY;
423
424         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
425                 GOTO(out, rc = -EIO);
426
427         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
428         if (body == NULL) {
429                 CERROR ("Missing/short ost_body\n");
430                 GOTO (out, rc = -EFAULT);
431         }
432
433         ioo = lustre_swab_reqbuf (req, 1, sizeof (*ioo),
434                                   lustre_swab_obd_ioobj);
435         if (ioo == NULL) {
436                 CERROR ("Missing/short ioobj\n");
437                 GOTO (out, rc = -EFAULT);
438         }
439
440         niocount = ioo->ioo_bufcnt;
441         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
442                                        lustre_swab_niobuf_remote);
443         if (remote_nb == NULL) {
444                 CERROR ("Missing/short niobuf\n");
445                 GOTO (out, rc = -EFAULT);
446         }
447         if (lustre_msg_swabbed (req->rq_reqmsg)) { /* swab remaining niobufs */
448                 for (i = 1; i < niocount; i++)
449                         lustre_swab_niobuf_remote (&remote_nb[i]);
450         }
451
452         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
453         if (rc)
454                 GOTO(out, rc);
455
456         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
457         npages = get_per_page_niobufs (ioo, 1, remote_nb, niocount, &pp_rnb);
458         if (npages < 0)
459                 GOTO(out, rc = npages);
460
461         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
462         if (local_nb == NULL)
463                 GOTO(out_pp_rnb, rc = -ENOMEM);
464
465         desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
466         if (desc == NULL)
467                 GOTO(out_local, rc = -ENOMEM);
468
469         rc = obd_preprw(OBD_BRW_READ, req->rq_export, NULL, 1, ioo, npages,
470                         pp_rnb, local_nb, &desc_priv, NULL);
471         if (rc != 0)
472                 GOTO(out_bulk, rc);
473
474         nob = 0;
475         for (i = 0; i < npages; i++) {
476                 int page_rc = local_nb[i].rc;
477
478                 if (page_rc < 0) {              /* error */
479                         rc = page_rc;
480                         break;
481                 }
482
483                 LASSERT (page_rc <= pp_rnb[i].len);
484                 nob += page_rc;
485                 if (page_rc != 0) {             /* some data! */
486                         LASSERT (local_nb[i].page != NULL);
487                         rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
488                                                    pp_rnb[i].offset& ~PAGE_MASK,
489                                                    page_rc);
490                         if (rc != 0)
491                                 break;
492                 }
493
494                 if (page_rc != pp_rnb[i].len) { /* short read */
495                         /* All subsequent pages should be 0 */
496                         while (++i < npages)
497                                 LASSERT (local_nb[i].rc == 0);
498                         break;
499                 }
500         }
501
502         if (rc == 0) {
503                 rc = ptlrpc_bulk_put(desc);
504                 if (rc == 0) {
505                         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout,
506                                           desc);
507                         rc = l_wait_event(desc->bd_waitq,
508                                           ptlrpc_bulk_complete(desc), &lwi);
509                         if (rc) {
510                                 LASSERT(rc == -ETIMEDOUT);
511                                 CERROR ("timeout waiting for bulk PUT\n");
512                                 ptlrpc_abort_bulk (desc);
513                         }
514                 } else {
515                         CERROR("ptlrpc_bulk_put failed RC: %d\n", rc);
516                 }
517                 comms_error = rc != 0;
518         }
519
520         /* Must commit after prep above in all cases */
521         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, 1, ioo, npages,
522                           local_nb, desc_priv, NULL);
523
524 #if CHECKSUM_BULK
525         if (rc == 0) {
526                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
527                 body->oa.o_rdev = ost_checksum_bulk (desc);
528                 body->oa.o_valid |= OBD_MD_FLCKSUM;
529         }
530 #endif
531
532  out_bulk:
533         ptlrpc_free_bulk (desc);
534  out_local:
535         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
536  out_pp_rnb:
537         free_per_page_niobufs (npages, pp_rnb, remote_nb);
538  out:
539         LASSERT (rc <= 0);
540         if (rc == 0) {
541                 req->rq_status = nob;
542                 ptlrpc_reply(req);
543         } else if (!comms_error) {
544                 /* only reply if comms OK */
545                 req->rq_status = rc;
546                 ptlrpc_error(req);
547         } else {
548                 if (req->rq_repmsg != NULL) {
549                         /* reply out callback would free */
550                         OBD_FREE (req->rq_repmsg, req->rq_replen);
551                 }
552                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
553                        req->rq_export->exp_client_uuid.uuid,
554                        req->rq_connection->c_remote_uuid.uuid,
555                        req->rq_connection->c_peer.peer_nid);
556                 ptlrpc_fail_export(req->rq_export);
557         }
558
559         RETURN(rc);
560 }
561
562 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
563 {
564         struct ptlrpc_bulk_desc *desc;
565         struct niobuf_remote    *remote_nb;
566         struct niobuf_remote    *pp_rnb;
567         struct niobuf_local     *local_nb;
568         struct obd_ioobj        *ioo;
569         struct ost_body         *body;
570         struct l_wait_info       lwi;
571         void                    *desc_priv = NULL;
572         __u32                   *rcs;
573         int                      size[2] = { sizeof (*body) };
574         int                      objcount, niocount, npages;
575         int                      comms_error = 0;
576         int                      rc, rc2, swab, i, j;
577         ENTRY;
578
579         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
580                 GOTO(out, rc = -EIO);
581
582         /* pause before transaction has been started */
583         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE, 
584                          obd_timeout +1);
585
586         swab = lustre_msg_swabbed (req->rq_reqmsg);
587         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
588                                    lustre_swab_ost_body);
589         if (body == NULL) {
590                 CERROR ("Missing/short ost_body\n");
591                 GOTO(out, rc = -EFAULT);
592         }
593
594         LASSERT_REQSWAB (req, 1);
595         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
596         if (objcount == 0) {
597                 CERROR ("Missing/short ioobj\n");
598                 GOTO (out, rc = -EFAULT);
599         }
600         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof (*ioo));
601         LASSERT (ioo != NULL);
602         for (niocount = i = 0; i < objcount; i++) {
603                 if (swab)
604                         lustre_swab_obd_ioobj (&ioo[i]);
605                 if (ioo[i].ioo_bufcnt == 0) {
606                         CERROR ("ioo[%d] has zero bufcnt\n", i);
607                         GOTO (out, rc = -EFAULT);
608                 }
609                 niocount += ioo[i].ioo_bufcnt;
610         }
611
612         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
613                                        lustre_swab_niobuf_remote);
614         if (remote_nb == NULL) {
615                 CERROR ("Missing/short niobuf\n");
616                 GOTO(out, rc = -EFAULT);
617         }
618         if (swab) {                             /* swab the remaining niobufs */
619                 for (i = 1; i < niocount; i++)
620                         lustre_swab_niobuf_remote (&remote_nb[i]);
621         }
622
623         size[1] = niocount * sizeof (*rcs);
624         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
625                              &req->rq_repmsg);
626         if (rc != 0)
627                 GOTO (out, rc);
628         rcs = lustre_msg_buf (req->rq_repmsg, 1, niocount * sizeof (*rcs));
629
630         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
631         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
632         if (npages < 0)
633                 GOTO (out, rc = npages);
634
635         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
636         if (local_nb == NULL)
637                 GOTO(out_pp_rnb, rc = -ENOMEM);
638
639         desc = ptlrpc_prep_bulk_exp (req, BULK_GET_SINK, OST_BULK_PORTAL);
640         if (desc == NULL)
641                 GOTO(out_local, rc = -ENOMEM);
642
643         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, NULL, objcount, ioo,
644                         npages, pp_rnb, local_nb, &desc_priv, oti);
645         if (rc != 0)
646                 GOTO (out_bulk, rc);
647
648         /* NB Having prepped, we must commit... */
649
650         for (i = 0; i < npages; i++) {
651                 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
652                                            pp_rnb[i].offset & (PAGE_SIZE - 1),
653                                            pp_rnb[i].len);
654                 if (rc != 0)
655                         break;
656         }
657
658         if (rc == 0) {
659                 rc = ptlrpc_bulk_get(desc);
660                 if (rc == 0) {
661                         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout,
662                                           desc);
663                         rc = l_wait_event(desc->bd_waitq,
664                                           ptlrpc_bulk_complete(desc), &lwi);
665                         if (rc) {
666                                 LASSERT(rc == -ETIMEDOUT);
667                                 CERROR ("timeout waiting for bulk GET\n");
668                                 ptlrpc_abort_bulk (desc);
669                         }
670                 } else {
671                         CERROR("ptlrpc_bulk_get failed RC: %d\n", rc);
672                 }
673                 comms_error = rc != 0;
674         }
675
676 #if CHECKSUM_BULK
677         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
678                 static int cksum_counter;
679                 __u64 client_cksum = body->oa.o_rdev;
680                 __u64 cksum = ost_checksum_bulk (desc);
681
682                 if (client_cksum != cksum) {
683                         CERROR("Bad checksum: client "LPX64", server "LPX64
684                                ", client NID "LPX64"\n", client_cksum, cksum,
685                                req->rq_connection->c_peer.peer_nid);
686                         cksum_counter = 1;
687                 } else {
688                         cksum_counter++;
689                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
690                                 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
691                                         cksum_counter,
692                                         req->rq_connection->c_peer.peer_nid,
693                                         cksum);
694                 }
695         }
696 #endif
697         /* Must commit after prep above in all cases */
698         rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, objcount, ioo,
699                            npages, local_nb, desc_priv, oti);
700
701         if (rc == 0) {
702                 /* set per-requested niobuf return codes */
703                 for (i = j = 0; i < niocount; i++) {
704                         int nob = remote_nb[i].len;
705
706                         rcs[i] = 0;
707                         do {
708                                 LASSERT (j < npages);
709                                 if (local_nb[j].rc < 0)
710                                         rcs[i] = local_nb[j].rc;
711                                 nob -= pp_rnb[j].len;
712                                 j++;
713                         } while (nob > 0);
714                         LASSERT (nob == 0);
715                 }
716                 LASSERT (j == npages);
717         }
718         if (rc == 0)
719                 rc = rc2;
720
721  out_bulk:
722         ptlrpc_free_bulk (desc);
723  out_local:
724         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
725  out_pp_rnb:
726         free_per_page_niobufs (npages, pp_rnb, remote_nb);
727  out:
728         if (rc == 0) {
729                 oti_to_request(oti, req);
730                 rc = ptlrpc_reply(req);
731         } else if (!comms_error) {
732                 /* Only reply if there was no comms problem with bulk */
733                 req->rq_status = rc;
734                 ptlrpc_error(req);
735         } else {
736                 if (req->rq_repmsg != NULL) {
737                         /* reply out callback would free */
738                         OBD_FREE (req->rq_repmsg, req->rq_replen);
739                 }
740                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
741                        req->rq_export->exp_client_uuid.uuid,
742                        req->rq_connection->c_remote_uuid.uuid,
743                        req->rq_connection->c_peer.peer_nid);
744                 ptlrpc_fail_export(req->rq_export);
745         }
746         RETURN(rc);
747 }
748
749 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
750 {
751         struct lustre_handle *conn = &req->rq_reqmsg->handle;
752         struct niobuf_remote *remote_nb, *res_nb;
753         struct obd_ioobj *ioo;
754         struct ost_body *body;
755         int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
756         int n;
757         int swab;
758         ENTRY;
759
760         /* XXX not set to use latest protocol */
761
762         swab = lustre_msg_swabbed (req->rq_reqmsg);
763         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
764                                    lustre_swab_ost_body);
765         if (body == NULL) {
766                 CERROR ("Missing/short ost_body\n");
767                 GOTO (out, rc = -EFAULT);
768         }
769
770         ioo = lustre_swab_reqbuf(req, 1, sizeof (*ioo),
771                                  lustre_swab_obd_ioobj);
772         if (ioo == NULL) {
773                 CERROR ("Missing/short ioobj\n");
774                 GOTO (out, rc = -EFAULT);
775         }
776         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
777         niocount = ioo[0].ioo_bufcnt;
778         for (i = 1; i < objcount; i++) {
779                 if (swab)
780                         lustre_swab_obd_ioobj (&ioo[i]);
781                 niocount += ioo[i].ioo_bufcnt;
782         }
783
784         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
785                                        lustre_swab_niobuf_remote);
786         if (remote_nb == NULL) {
787                 CERROR ("Missing/short niobuf\n");
788                 GOTO (out, rc = -EFAULT);
789         }
790         if (swab) {                             /* swab the remaining niobufs */
791                 for (i = 1; i < niocount; i++)
792                         lustre_swab_niobuf_remote (&remote_nb[i]);
793         }
794
795         for (i = n = 0; i < objcount; i++) {
796                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, n++) {
797                         if (remote_nb[n].len == 0) {
798                                 CERROR("zero len BRW: objid "LPX64" buf %u\n",
799                                        ioo[i].ioo_id, j);
800                                 GOTO(out, rc = -EINVAL);
801                         }
802                         if (j && remote_nb[n].offset <= remote_nb[n-1].offset) {
803                                 CERROR("unordered BRW: objid "LPX64
804                                        " buf %u offset "LPX64" <= "LPX64"\n",
805                                        ioo[i].ioo_id, j, remote_nb[n].offset,
806                                        remote_nb[n-1].offset);
807                                 GOTO(out, rc = -EINVAL);
808                         }
809                 }
810         }
811
812         size[1] = niocount * sizeof(*remote_nb);
813         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
814         if (rc)
815                 GOTO(out, rc);
816
817         req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
818                                         niocount, remote_nb);
819
820         if (req->rq_status)
821                 GOTO (out, rc = 0);
822
823         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
824         memcpy (res_nb, remote_nb, size[1]);
825         rc = 0;
826 out:
827         if (rc) {
828                 OBD_FREE(req->rq_repmsg, req->rq_replen);
829                 req->rq_repmsg = NULL;
830                 req->rq_status = rc;
831                 ptlrpc_error(req);
832         } else
833                 ptlrpc_reply(req);
834
835         return rc;
836 }
837
838 static int filter_recovery_request(struct ptlrpc_request *req,
839                                    struct obd_device *obd, int *process)
840 {
841         switch (req->rq_reqmsg->opc) {
842         case OST_CONNECT: /* This will never get here, but for completeness. */
843         case OST_DISCONNECT:
844                *process = 1;
845                RETURN(0);
846
847         case OBD_PING:
848         case OST_CLOSE:
849         case OST_CREATE:
850         case OST_DESTROY:
851         case OST_OPEN:
852         case OST_PUNCH:
853         case OST_SETATTR: 
854         case OST_SYNCFS:
855         case OST_WRITE:
856         case LDLM_ENQUEUE:
857                 *process = target_queue_recovery_request(req, obd);
858                 RETURN(0);
859
860         default:
861                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
862                 *process = 0;
863                 /* XXX what should we set rq_status to here? */
864                 req->rq_status = -EAGAIN;
865                 RETURN(ptlrpc_error(req));
866         }
867 }
868
869
870
871 static int ost_handle(struct ptlrpc_request *req)
872 {
873         struct obd_trans_info trans_info = { 0, };
874         struct obd_trans_info *oti = &trans_info;
875         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
876         ENTRY;
877
878         /* XXX identical to MDS */
879         if (req->rq_reqmsg->opc != OST_CONNECT) {
880                 struct obd_device *obd;
881                 int abort_recovery, recovering;
882
883                 if (req->rq_export == NULL) {
884                         CERROR("lustre_ost: operation %d on unconnected OST\n",
885                                req->rq_reqmsg->opc);
886                         req->rq_status = -ENOTCONN;
887                         GOTO(out, rc = -ENOTCONN);
888                 }
889
890                 obd = req->rq_export->exp_obd;
891
892                 /* Check for aborted recovery. */
893                 spin_lock_bh(&obd->obd_processing_task_lock);
894                 abort_recovery = obd->obd_abort_recovery;
895                 recovering = obd->obd_recovering;
896                 spin_unlock_bh(&obd->obd_processing_task_lock);
897                 if (abort_recovery) {
898                         target_abort_recovery(obd);
899                 } else if (recovering) {
900                         rc = filter_recovery_request(req, obd, &should_process);
901                         if (rc || !should_process)
902                                 RETURN(rc);
903                 }
904         } 
905
906         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
907                 GOTO(out, rc = -EINVAL);
908
909         oti_init(oti, req);
910
911         switch (req->rq_reqmsg->opc) {
912         case OST_CONNECT:
913                 CDEBUG(D_INODE, "connect\n");
914                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
915                 rc = target_handle_connect(req, ost_handle);
916                 break;
917         case OST_DISCONNECT:
918                 CDEBUG(D_INODE, "disconnect\n");
919                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
920                 rc = target_handle_disconnect(req);
921                 break;
922         case OST_CREATE:
923                 CDEBUG(D_INODE, "create\n");
924                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
925                 rc = ost_create(req, oti);
926                 break;
927         case OST_DESTROY:
928                 CDEBUG(D_INODE, "destroy\n");
929                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
930                 rc = ost_destroy(req, oti);
931                 break;
932         case OST_GETATTR:
933                 CDEBUG(D_INODE, "getattr\n");
934                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
935                 rc = ost_getattr(req);
936                 break;
937         case OST_SETATTR:
938                 CDEBUG(D_INODE, "setattr\n");
939                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
940                 rc = ost_setattr(req, oti);
941                 break;
942         case OST_OPEN:
943                 CDEBUG(D_INODE, "open\n");
944                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
945                 rc = ost_open(req, oti);
946                 break;
947         case OST_CLOSE:
948                 CDEBUG(D_INODE, "close\n");
949                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
950                 rc = ost_close(req, oti);
951                 break;
952         case OST_WRITE:
953                 CDEBUG(D_INODE, "write\n");
954                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
955                 rc = ost_brw_write(req, oti);
956                 /* ost_brw sends its own replies */
957                 RETURN(rc);
958         case OST_READ:
959                 CDEBUG(D_INODE, "read\n");
960                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
961                 rc = ost_brw_read(req);
962                 /* ost_brw sends its own replies */
963                 RETURN(rc);
964         case OST_SAN_READ:
965                 CDEBUG(D_INODE, "san read\n");
966                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
967                 rc = ost_san_brw(req, OBD_BRW_READ);
968                 /* ost_san_brw sends its own replies */
969                 RETURN(rc);
970         case OST_SAN_WRITE:
971                 CDEBUG(D_INODE, "san write\n");
972                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
973                 rc = ost_san_brw(req, OBD_BRW_WRITE);
974                 /* ost_san_brw sends its own replies */
975                 RETURN(rc);
976         case OST_PUNCH:
977                 CDEBUG(D_INODE, "punch\n");
978                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
979                 rc = ost_punch(req, oti);
980                 break;
981         case OST_STATFS:
982                 CDEBUG(D_INODE, "statfs\n");
983                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
984                 rc = ost_statfs(req);
985                 break;
986         case OST_SYNCFS:
987                 CDEBUG(D_INODE, "sync\n");
988                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
989                 rc = ost_syncfs(req);
990                 break;
991         case OBD_PING:
992                 DEBUG_REQ(D_INODE, req, "ping");
993                 rc = target_handle_ping(req);
994                 break;
995         case LDLM_ENQUEUE:
996                 CDEBUG(D_INODE, "enqueue\n");
997                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
998                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
999                                          ldlm_server_blocking_ast);
1000                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1001                 break;
1002         case LDLM_CONVERT:
1003                 CDEBUG(D_INODE, "convert\n");
1004                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1005                 rc = ldlm_handle_convert(req);
1006                 break;
1007         case LDLM_CANCEL:
1008                 CDEBUG(D_INODE, "cancel\n");
1009                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1010                 rc = ldlm_handle_cancel(req);
1011                 break;
1012         case LDLM_BL_CALLBACK:
1013         case LDLM_CP_CALLBACK:
1014                 CDEBUG(D_INODE, "callback\n");
1015                 CERROR("callbacks should not happen on OST\n");
1016                 /* fall through */
1017         default:
1018                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1019                 req->rq_status = -ENOTSUPP;
1020                 rc = ptlrpc_error(req);
1021                 RETURN(rc);
1022         }
1023
1024         EXIT;
1025         /* If we're DISCONNECTing, the export_data is already freed */
1026         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1027                 struct obd_device *obd  = req->rq_export->exp_obd;
1028                 if (!obd->obd_no_transno) {
1029                         req->rq_repmsg->last_committed =
1030                                 obd->obd_last_committed;
1031                 } else {
1032                         DEBUG_REQ(D_IOCTL, req,
1033                                   "not sending last_committed update");
1034                 }
1035                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1036                        obd->obd_last_committed, req->rq_xid);
1037         }
1038
1039 out:
1040         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1041                 struct obd_device *obd = req->rq_export->exp_obd;
1042
1043                 if (obd && obd->obd_recovering) {
1044                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1045                         return target_queue_final_reply(req, rc);
1046                 }
1047                 /* Lost a race with recovery; let the error path DTRT. */
1048                 rc = req->rq_status = -ENOTCONN;
1049         }
1050
1051         if (!rc)
1052                 oti_to_request(oti, req);
1053
1054         target_send_reply(req, rc, fail);
1055         return 0;
1056 }
1057
1058 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1059 {
1060         struct ost_obd *ost = &obddev->u.ost;
1061         int err;
1062         int i;
1063         ENTRY;
1064
1065         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1066                                            OST_BUFSIZE, OST_MAXREQSIZE,
1067                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1068                                            ost_handle, "ost", obddev);
1069         if (!ost->ost_service) {
1070                 CERROR("failed to start service\n");
1071                 GOTO(error_disc, err = -ENOMEM);
1072         }
1073
1074         for (i = 0; i < OST_NUM_THREADS; i++) {
1075                 char name[32];
1076                 sprintf(name, "ll_ost_%02d", i);
1077                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
1078                 if (err) {
1079                         CERROR("error starting thread #%d: rc %d\n", i, err);
1080                         GOTO(error_disc, err = -EINVAL);
1081                 }
1082         }
1083
1084         RETURN(0);
1085
1086 error_disc:
1087         RETURN(err);
1088 }
1089
1090 static int ost_cleanup(struct obd_device *obddev, int force, int failover)
1091 {
1092         struct ost_obd *ost = &obddev->u.ost;
1093         int err = 0;
1094         ENTRY;
1095
1096         if (obddev->obd_recovering)
1097                 target_cancel_recovery_timer(obddev);
1098
1099         ptlrpc_stop_all_threads(ost->ost_service);
1100         ptlrpc_unregister_service(ost->ost_service);
1101
1102         RETURN(err);
1103 }
1104
1105 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1106 {
1107         struct lprocfs_static_vars lvars;
1108
1109         lprocfs_init_vars(&lvars);
1110         return lprocfs_obd_attach(dev, lvars.obd_vars);
1111 }
1112
1113 int ost_detach(struct obd_device *dev)
1114 {
1115         return lprocfs_obd_detach(dev);
1116 }
1117
1118 /* I don't think this function is ever used, since nothing 
1119  * connects directly to this module.
1120  */
1121 static int ost_connect(struct lustre_handle *conn,
1122                        struct obd_device *obd, struct obd_uuid *cluuid)
1123 {
1124         struct obd_export *exp;
1125         int rc;
1126         ENTRY;
1127
1128         if (!conn || !obd || !cluuid)
1129                 RETURN(-EINVAL);
1130
1131         rc = class_connect(conn, obd, cluuid);
1132         if (rc)
1133                 RETURN(rc);
1134         exp = class_conn2export(conn);
1135         LASSERT(exp);
1136         class_export_put(exp);
1137
1138         RETURN(0);
1139 }
1140
1141 /* use obd ops to offer management infrastructure */
1142 static struct obd_ops ost_obd_ops = {
1143         o_owner:        THIS_MODULE,
1144         o_attach:       ost_attach,
1145         o_detach:       ost_detach,
1146         o_setup:        ost_setup,
1147         o_cleanup:      ost_cleanup,
1148         o_connect:      ost_connect,
1149 };
1150
1151 static int __init ost_init(void)
1152 {
1153         struct lprocfs_static_vars lvars;
1154         ENTRY;
1155
1156         lprocfs_init_vars(&lvars);
1157         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1158                                    LUSTRE_OST_NAME));
1159 }
1160
1161 static void __exit ost_exit(void)
1162 {
1163         class_unregister_type(LUSTRE_OST_NAME);
1164 }
1165
1166 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1167 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1168 MODULE_LICENSE("GPL");
1169
1170 module_init(ost_init);
1171 module_exit(ost_exit);