Whamcloud - gitweb
merge b_devel into HEAD (20030626 merge tag) for 0.7.1
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43
44 inline void oti_to_request(struct obd_trans_info *oti,
45                            struct ptlrpc_request *req)
46 {
47         int i;
48         struct oti_req_ack_lock *ack_lock;
49
50         if(oti == NULL)
51                 return;
52
53         if (req->rq_repmsg)
54                 req->rq_repmsg->transno = oti->oti_transno;
55
56         /* XXX 4 == entries in oti_ack_locks??? */
57         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
58                 if (!ack_lock->mode)
59                         break;
60                 memcpy(&req->rq_ack_locks[i].lock, &ack_lock->lock,
61                        sizeof(req->rq_ack_locks[i].lock));
62                 req->rq_ack_locks[i].mode = ack_lock->mode;
63         }
64         EXIT;
65 }
66
67 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
68 {
69         struct lustre_handle *conn = &req->rq_reqmsg->handle;
70         struct ost_body *body;
71         int rc, size = sizeof(*body);
72         ENTRY;
73
74         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
75                                    lustre_swab_ost_body);
76         if (body == NULL)
77                 RETURN (-EFAULT);
78
79         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
80         if (rc)
81                 RETURN(rc);
82
83         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
84         RETURN(0);
85 }
86
87 static int ost_getattr(struct ptlrpc_request *req)
88 {
89         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
90         struct ost_body *body, *repbody;
91         int rc, size = sizeof(*body);
92         ENTRY;
93
94         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
95                                    lustre_swab_ost_body);
96         if (body == NULL)
97                 RETURN (-EFAULT);
98
99         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
100         if (rc)
101                 RETURN(rc);
102
103         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
104         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
105         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
106         RETURN(0);
107 }
108
109 static int ost_statfs(struct ptlrpc_request *req)
110 {
111         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
112         struct obd_statfs *osfs;
113         int rc, size = sizeof(*osfs);
114         ENTRY;
115
116         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
117         if (rc)
118                 RETURN(rc);
119
120         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
121         memset(osfs, 0, size);
122
123         req->rq_status = obd_statfs(conn, osfs);
124         if (req->rq_status != 0)
125                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
126
127         RETURN(0);
128 }
129
130 static int ost_syncfs(struct ptlrpc_request *req)
131 {
132         struct obd_statfs *osfs;
133         int rc, size = sizeof(*osfs);
134         ENTRY;
135
136         rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
137         if (rc)
138                 RETURN(rc);
139
140         rc = obd_syncfs(req->rq_export);
141         if (rc) {
142                 CERROR("ost: syncfs failed: rc %d\n", rc);
143                 req->rq_status = rc;
144                 RETURN(rc);
145         }
146
147         RETURN(0);
148 }
149
150 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
151 {
152         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
153         struct ost_body *body, *repbody;
154         int rc, size = sizeof(*repbody);
155         ENTRY;
156
157         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
158                                    lustre_swab_ost_body);
159         if (body == NULL)
160                 return (-EFAULT);
161
162         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
163         if (rc)
164                 RETURN(rc);
165
166         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
167         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
168         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
169         RETURN(0);
170 }
171
172 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
173 {
174         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
175         struct ost_body *body, *repbody;
176         int rc, size = sizeof(*repbody);
177         ENTRY;
178
179         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
180                                    lustre_swab_ost_body);
181         if (body == NULL)
182                 RETURN (-EFAULT);
183
184         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
185         if (rc)
186                 RETURN(rc);
187
188         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
189         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
190         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
191         RETURN(0);
192 }
193
194 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
195 {
196         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
197         struct ost_body *body, *repbody;
198         int rc, size = sizeof(*repbody);
199         ENTRY;
200
201         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
202                                    lustre_swab_ost_body);
203         if (body == NULL)
204                 RETURN (-EFAULT);
205
206         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
207         if (rc)
208                 RETURN(rc);
209
210         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
211         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
212         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
213         RETURN(0);
214 }
215
216 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
217 {
218         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
219         struct ost_body *body, *repbody;
220         int rc, size = sizeof(*repbody);
221         ENTRY;
222
223         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
224                                    lustre_swab_ost_body);
225         if (body == NULL)
226                 RETURN (-EFAULT);
227
228         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
229             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
230                 RETURN(-EINVAL);
231
232         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
233         if (rc)
234                 RETURN(rc);
235
236         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
237         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
238         req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
239                                    repbody->oa.o_blocks, oti);
240         RETURN(0);
241 }
242
243 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
244 {
245         struct lustre_handle *conn = &req->rq_reqmsg->handle;
246         struct ost_body *body, *repbody;
247         int rc, size = sizeof(*repbody);
248         ENTRY;
249
250         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
251                                    lustre_swab_ost_body);
252         if (body == NULL)
253                 RETURN (-EFAULT);
254
255         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
256         if (rc)
257                 RETURN(rc);
258
259         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
260         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
261
262         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
263         RETURN(0);
264 }
265
266 static int ost_bulk_timeout(void *data)
267 {
268         ENTRY;
269         /* We don't fail the connection here, because having the export
270          * killed makes the (vital) call to commitrw very sad.
271          */
272         RETURN(1);
273 }
274
275 static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
276                                  struct niobuf_remote *rnb, int nrnb,
277                                  struct niobuf_remote **pp_rnbp)
278 {
279         /* Copy a remote niobuf, splitting it into page-sized chunks
280          * and setting ioo[i].ioo_bufcnt accordingly */
281         struct niobuf_remote *pp_rnb;
282         int   i;
283         int   j;
284         int   page;
285         int   rnbidx = 0;
286         int   npages = 0;
287
288         /* first count and check the number of pages required */
289         for (i = 0; i < nioo; i++)
290                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
291                         obd_off offset = rnb[rnbidx].offset;
292                         obd_off p0 = offset >> PAGE_SHIFT;
293                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
294
295                         LASSERT (rnbidx < nrnb);
296
297                         npages += (pn + 1 - p0);
298
299                         if (rnb[rnbidx].len == 0) {
300                                 CERROR("zero len BRW: obj %d objid "LPX64
301                                        " buf %u\n", i, ioo[i].ioo_id, j);
302                                 return (-EINVAL);
303                         }
304                         if (j > 0 &&
305                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
306                                 CERROR("unordered BRW: obj %d objid "LPX64
307                                        " buf %u offset "LPX64" <= "LPX64"\n",
308                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
309                                        rnb[rnbidx].offset);
310                                 return (-EINVAL);
311                         }
312                 }
313
314         LASSERT (rnbidx == nrnb);
315
316         if (npages == nrnb) {       /* all niobufs are for single pages */
317                 *pp_rnbp = rnb;
318                 return (npages);
319         }
320
321         OBD_ALLOC (pp_rnb, sizeof (*pp_rnb) * npages);
322         if (pp_rnb == NULL)
323                 return (-ENOMEM);
324
325         /* now do the actual split */
326         page = rnbidx = 0;
327         for (i = 0; i < nioo; i++) {
328                 int  obj_pages = 0;
329
330                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
331                         obd_off off = rnb[rnbidx].offset;
332                         int     nob = rnb[rnbidx].len;
333
334                         LASSERT (rnbidx < nrnb);
335                         do {
336                                 obd_off  poff = off & (PAGE_SIZE - 1);
337                                 int      pnob = (poff + nob > PAGE_SIZE) ?
338                                                 PAGE_SIZE - poff : nob;
339
340                                 LASSERT (page < npages);
341                                 pp_rnb[page].len = pnob;
342                                 pp_rnb[page].offset = off;
343                                 pp_rnb[page].flags = rnb->flags;
344
345                                 CDEBUG (D_PAGE, "   obj %d id "LPX64
346                                         "page %d(%d) "LPX64" for %d\n",
347                                         i, ioo[i].ioo_id, obj_pages, page,
348                                         pp_rnb[page].offset, pp_rnb[page].len);
349                                 page++;
350                                 obj_pages++;
351
352                                 off += pnob;
353                                 nob -= pnob;
354                         } while (nob > 0);
355                         LASSERT (nob == 0);
356                 }
357                 ioo[i].ioo_bufcnt = obj_pages;
358         }
359         LASSERT (page == npages);
360
361         *pp_rnbp = pp_rnb;
362         return (npages);
363 }
364
365 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
366                                    struct niobuf_remote *rnb)
367 {
368         if (pp_rnb == rnb)                      /* didn't allocate above */
369                 return;
370
371         OBD_FREE (pp_rnb, sizeof (*pp_rnb) * npages);
372 }
373
374 #if CHECKSUM_BULK
375 __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
376 {
377         __u64             cksum = 0;
378         struct list_head *tmp;
379         char             *ptr;
380
381         list_for_each (tmp, &desc->bd_page_list) {
382                 struct ptlrpc_bulk_page *bp;
383
384                 bp = list_entry (tmp, struct ptlrpc_bulk_page, bp_link);
385                 ptr = kmap (bp->bp_page);
386                 ost_checksum (&cksum, ptr + bp->bp_pageoffset, bp->bp_buflen);
387                 kunmap (bp->bp_page);
388         }
389 }
390 #endif
391
392 static int ost_brw_read(struct ptlrpc_request *req)
393 {
394         struct ptlrpc_bulk_desc *desc;
395         struct niobuf_remote    *remote_nb;
396         struct niobuf_remote    *pp_rnb;
397         struct niobuf_local     *local_nb;
398         struct obd_ioobj        *ioo;
399         struct ost_body         *body;
400         struct l_wait_info       lwi;
401         void                    *desc_priv = NULL;
402         int                      size[1] = { sizeof(*body) };
403         int                      comms_error = 0;
404         int                      niocount;
405         int                      npages;
406         int                      nob = 0;
407         int                      rc;
408         int                      i;
409         ENTRY;
410
411         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
412                 GOTO(out, rc = -EIO);
413
414         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
415         if (body == NULL) {
416                 CERROR ("Missing/short ost_body\n");
417                 GOTO (out, rc = -EFAULT);
418         }
419
420         ioo = lustre_swab_reqbuf (req, 1, sizeof (*ioo),
421                                   lustre_swab_obd_ioobj);
422         if (ioo == NULL) {
423                 CERROR ("Missing/short ioobj\n");
424                 GOTO (out, rc = -EFAULT);
425         }
426
427         niocount = ioo->ioo_bufcnt;
428         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
429                                        lustre_swab_niobuf_remote);
430         if (remote_nb == NULL) {
431                 CERROR ("Missing/short niobuf\n");
432                 GOTO (out, rc = -EFAULT);
433         }
434         if (lustre_msg_swabbed (req->rq_reqmsg)) { /* swab remaining niobufs */
435                 for (i = 1; i < niocount; i++)
436                         lustre_swab_niobuf_remote (&remote_nb[i]);
437         }
438
439         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
440         if (rc)
441                 GOTO(out, rc);
442
443         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
444         npages = get_per_page_niobufs (ioo, 1, remote_nb, niocount, &pp_rnb);
445         if (npages < 0)
446                 GOTO(out, rc = npages);
447
448         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
449         if (local_nb == NULL)
450                 GOTO(out_pp_rnb, rc = -ENOMEM);
451
452         desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
453         if (desc == NULL)
454                 GOTO(out_local, rc = -ENOMEM);
455
456         rc = obd_preprw(OBD_BRW_READ, req->rq_export, 1, ioo, npages,
457                         pp_rnb, local_nb, &desc_priv, NULL);
458         if (rc != 0)
459                 GOTO(out_bulk, rc);
460
461         nob = 0;
462         for (i = 0; i < npages; i++) {
463                 int page_rc = local_nb[i].rc;
464
465                 if (page_rc < 0) {              /* error */
466                         rc = page_rc;
467                         break;
468                 }
469
470                 LASSERT (page_rc <= pp_rnb[i].len);
471                 nob += page_rc;
472                 if (page_rc != 0) {             /* some data! */
473                         LASSERT (local_nb[i].page != NULL);
474                         rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
475                                                    pp_rnb[i].offset& ~PAGE_MASK,
476                                                    page_rc);
477                         if (rc != 0)
478                                 break;
479                 }
480
481                 if (page_rc != pp_rnb[i].len) { /* short read */
482                         /* All subsequent pages should be 0 */
483                         while (++i < npages)
484                                 LASSERT (local_nb[i].rc == 0);
485                         break;
486                 }
487         }
488
489         if (rc == 0) {
490                 rc = ptlrpc_bulk_put(desc);
491                 if (rc == 0) {
492                         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout,
493                                           desc);
494                         rc = l_wait_event(desc->bd_waitq,
495                                           ptlrpc_bulk_complete(desc), &lwi);
496                         if (rc) {
497                                 LASSERT(rc == -ETIMEDOUT);
498                                 CERROR ("timeout waiting for bulk PUT\n");
499                                 ptlrpc_abort_bulk (desc);
500                         }
501                 } else {
502                         CERROR("ptlrpc_bulk_put failed RC: %d\n", rc);
503                 }
504                 comms_error = rc != 0;
505         }
506
507         /* Must commit after prep above in all cases */
508         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, 1, ioo, npages,
509                           local_nb, desc_priv, NULL);
510
511 #if CHECKSUM_BULK
512         if (rc == 0) {
513                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
514                 body->oa.o_rdev = ost_checksum_bulk (desc);
515                 body->oa.o_valid |= OBD_MD_FLCKSUM;
516         }
517 #endif
518
519  out_bulk:
520         ptlrpc_free_bulk (desc);
521  out_local:
522         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
523  out_pp_rnb:
524         free_per_page_niobufs (npages, pp_rnb, remote_nb);
525  out:
526         LASSERT (rc <= 0);
527         if (rc == 0) {
528                 req->rq_status = nob;
529                 ptlrpc_reply(req);
530         } else if (!comms_error) {
531                 /* only reply if comms OK */
532                 req->rq_status = rc;
533                 ptlrpc_error(req);
534         } else {
535                 if (req->rq_repmsg != NULL) {
536                         /* reply out callback would free */
537                         OBD_FREE (req->rq_repmsg, req->rq_replen);
538                 }
539                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
540                        req->rq_export->exp_client_uuid.uuid,
541                        req->rq_connection->c_remote_uuid.uuid,
542                        req->rq_connection->c_peer.peer_nid);
543                 ptlrpc_fail_export(req->rq_export);
544         }
545
546         RETURN(rc);
547 }
548
549 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
550 {
551         struct ptlrpc_bulk_desc *desc;
552         struct niobuf_remote    *remote_nb;
553         struct niobuf_remote    *pp_rnb;
554         struct niobuf_local     *local_nb;
555         struct obd_ioobj        *ioo;
556         struct ost_body         *body;
557         struct l_wait_info       lwi;
558         void                    *desc_priv = NULL;
559         __u32                   *rcs;
560         int                      size[2] = { sizeof (*body) };
561         int                      objcount, niocount, npages;
562         int                      comms_error = 0;
563         int                      rc, rc2, swab, i, j;
564         ENTRY;
565
566         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
567                 GOTO(out, rc = -EIO);
568
569         /* pause before transaction has been started */
570         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE, 
571                          obd_timeout +1);
572
573         swab = lustre_msg_swabbed (req->rq_reqmsg);
574         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
575                                    lustre_swab_ost_body);
576         if (body == NULL) {
577                 CERROR ("Missing/short ost_body\n");
578                 GOTO(out, rc = -EFAULT);
579         }
580
581         LASSERT_REQSWAB (req, 1);
582         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
583         if (objcount == 0) {
584                 CERROR ("Missing/short ioobj\n");
585                 GOTO (out, rc = -EFAULT);
586         }
587         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof (*ioo));
588         LASSERT (ioo != NULL);
589         for (niocount = i = 0; i < objcount; i++) {
590                 if (swab)
591                         lustre_swab_obd_ioobj (&ioo[i]);
592                 if (ioo[i].ioo_bufcnt == 0) {
593                         CERROR ("ioo[%d] has zero bufcnt\n", i);
594                         GOTO (out, rc = -EFAULT);
595                 }
596                 niocount += ioo[i].ioo_bufcnt;
597         }
598
599         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
600                                        lustre_swab_niobuf_remote);
601         if (remote_nb == NULL) {
602                 CERROR ("Missing/short niobuf\n");
603                 GOTO(out, rc = -EFAULT);
604         }
605         if (swab) {                             /* swab the remaining niobufs */
606                 for (i = 1; i < niocount; i++)
607                         lustre_swab_niobuf_remote (&remote_nb[i]);
608         }
609
610         size[1] = niocount * sizeof (*rcs);
611         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
612                              &req->rq_repmsg);
613         if (rc != 0)
614                 GOTO (out, rc);
615         rcs = lustre_msg_buf (req->rq_repmsg, 1, niocount * sizeof (*rcs));
616
617         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
618         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
619         if (npages < 0)
620                 GOTO (out, rc = npages);
621
622         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
623         if (local_nb == NULL)
624                 GOTO(out_pp_rnb, rc = -ENOMEM);
625
626         desc = ptlrpc_prep_bulk_exp (req, BULK_GET_SINK, OST_BULK_PORTAL);
627         if (desc == NULL)
628                 GOTO(out_local, rc = -ENOMEM);
629
630         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, objcount, ioo,
631                         npages, pp_rnb, local_nb, &desc_priv, oti);
632         if (rc != 0)
633                 GOTO (out_bulk, rc);
634
635         /* NB Having prepped, we must commit... */
636
637         for (i = 0; i < npages; i++) {
638                 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
639                                            pp_rnb[i].offset & (PAGE_SIZE - 1),
640                                            pp_rnb[i].len);
641                 if (rc != 0)
642                         break;
643         }
644
645         if (rc == 0) {
646                 rc = ptlrpc_bulk_get(desc);
647                 if (rc == 0) {
648                         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout,
649                                           desc);
650                         rc = l_wait_event(desc->bd_waitq,
651                                           ptlrpc_bulk_complete(desc), &lwi);
652                         if (rc) {
653                                 LASSERT(rc == -ETIMEDOUT);
654                                 CERROR ("timeout waiting for bulk GET\n");
655                                 ptlrpc_abort_bulk (desc);
656                         }
657                 } else {
658                         CERROR("ptlrpc_bulk_get failed RC: %d\n", rc);
659                 }
660                 comms_error = rc != 0;
661         }
662
663 #if CHECKSUM_BULK
664         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
665                 static int cksum_counter;
666                 __u64 client_cksum = body->oa.o_rdev;
667                 __u64 cksum = ost_checksum_bulk (desc);
668
669                 if (client_cksum != cksum) {
670                         CERROR("Bad checksum: client "LPX64", server "LPX64
671                                ", client NID "LPX64"\n", client_cksum, cksum,
672                                req->rq_connection->c_peer.peer_nid);
673                         cksum_counter = 1;
674                 } else {
675                         cksum_counter++;
676                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
677                                 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
678                                         cksum_counter,
679                                         req->rq_connection->c_peer.peer_nid,
680                                         cksum);
681                 }
682         }
683 #endif
684         /* Must commit after prep above in all cases */
685         rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, objcount, ioo,
686                            npages, local_nb, desc_priv, oti);
687
688         if (rc == 0) {
689                 /* set per-requested niobuf return codes */
690                 for (i = j = 0; i < niocount; i++) {
691                         int nob = remote_nb[i].len;
692
693                         rcs[i] = 0;
694                         do {
695                                 LASSERT (j < npages);
696                                 if (local_nb[j].rc < 0)
697                                         rcs[i] = local_nb[j].rc;
698                                 nob -= pp_rnb[j].len;
699                                 j++;
700                         } while (nob > 0);
701                         LASSERT (nob == 0);
702                 }
703                 LASSERT (j == npages);
704         }
705         if (rc == 0)
706                 rc = rc2;
707
708  out_bulk:
709         ptlrpc_free_bulk (desc);
710  out_local:
711         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
712  out_pp_rnb:
713         free_per_page_niobufs (npages, pp_rnb, remote_nb);
714  out:
715         if (rc == 0) {
716                 oti_to_request(oti, req);
717                 rc = ptlrpc_reply(req);
718         } else if (!comms_error) {
719                 /* Only reply if there was no comms problem with bulk */
720                 req->rq_status = rc;
721                 ptlrpc_error(req);
722         } else {
723                 if (req->rq_repmsg != NULL) {
724                         /* reply out callback would free */
725                         OBD_FREE (req->rq_repmsg, req->rq_replen);
726                 }
727                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
728                        req->rq_export->exp_client_uuid.uuid,
729                        req->rq_connection->c_remote_uuid.uuid,
730                        req->rq_connection->c_peer.peer_nid);
731                 ptlrpc_fail_export(req->rq_export);
732         }
733         RETURN(rc);
734 }
735
736 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
737 {
738         struct lustre_handle *conn = &req->rq_reqmsg->handle;
739         struct niobuf_remote *remote_nb, *res_nb;
740         struct obd_ioobj *ioo;
741         struct ost_body *body;
742         int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
743         int n;
744         int swab;
745         ENTRY;
746
747         /* XXX not set to use latest protocol */
748
749         swab = lustre_msg_swabbed (req->rq_reqmsg);
750         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
751                                    lustre_swab_ost_body);
752         if (body == NULL) {
753                 CERROR ("Missing/short ost_body\n");
754                 GOTO (out, rc = -EFAULT);
755         }
756
757         ioo = lustre_swab_reqbuf(req, 1, sizeof (*ioo),
758                                  lustre_swab_obd_ioobj);
759         if (ioo == NULL) {
760                 CERROR ("Missing/short ioobj\n");
761                 GOTO (out, rc = -EFAULT);
762         }
763         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
764         niocount = ioo[0].ioo_bufcnt;
765         for (i = 1; i < objcount; i++) {
766                 if (swab)
767                         lustre_swab_obd_ioobj (&ioo[i]);
768                 niocount += ioo[i].ioo_bufcnt;
769         }
770
771         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
772                                        lustre_swab_niobuf_remote);
773         if (remote_nb == NULL) {
774                 CERROR ("Missing/short niobuf\n");
775                 GOTO (out, rc = -EFAULT);
776         }
777         if (swab) {                             /* swab the remaining niobufs */
778                 for (i = 1; i < niocount; i++)
779                         lustre_swab_niobuf_remote (&remote_nb[i]);
780         }
781
782         for (i = n = 0; i < objcount; i++) {
783                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, n++) {
784                         if (remote_nb[n].len == 0) {
785                                 CERROR("zero len BRW: objid "LPX64" buf %u\n",
786                                        ioo[i].ioo_id, j);
787                                 GOTO(out, rc = -EINVAL);
788                         }
789                         if (j && remote_nb[n].offset <= remote_nb[n-1].offset) {
790                                 CERROR("unordered BRW: objid "LPX64
791                                        " buf %u offset "LPX64" <= "LPX64"\n",
792                                        ioo[i].ioo_id, j, remote_nb[n].offset,
793                                        remote_nb[n-1].offset);
794                                 GOTO(out, rc = -EINVAL);
795                         }
796                 }
797         }
798
799         size[1] = niocount * sizeof(*remote_nb);
800         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
801         if (rc)
802                 GOTO(out, rc);
803
804         req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
805                                         niocount, remote_nb);
806
807         if (req->rq_status)
808                 GOTO (out, rc = 0);
809
810         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
811         memcpy (res_nb, remote_nb, size[1]);
812         rc = 0;
813 out:
814         if (rc) {
815                 OBD_FREE(req->rq_repmsg, req->rq_replen);
816                 req->rq_repmsg = NULL;
817                 req->rq_status = rc;
818                 ptlrpc_error(req);
819         } else
820                 ptlrpc_reply(req);
821
822         return rc;
823 }
824
825 static int filter_recovery_request(struct ptlrpc_request *req,
826                                    struct obd_device *obd, int *process)
827 {
828         switch (req->rq_reqmsg->opc) {
829         case OST_CONNECT: /* This will never get here, but for completeness. */
830         case OST_DISCONNECT:
831                *process = 1;
832                RETURN(0);
833
834         case OBD_PING:
835         case OST_CLOSE:
836         case OST_CREATE:
837         case OST_DESTROY:
838         case OST_OPEN:
839         case OST_PUNCH:
840         case OST_SETATTR: 
841         case OST_SYNCFS:
842         case OST_WRITE:
843         case LDLM_ENQUEUE:
844                 *process = target_queue_recovery_request(req, obd);
845                 RETURN(0);
846
847         default:
848                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
849                 *process = 0;
850                 /* XXX what should we set rq_status to here? */
851                 req->rq_status = -EAGAIN;
852                 RETURN(ptlrpc_error(req));
853         }
854 }
855
856
857
858 static int ost_handle(struct ptlrpc_request *req)
859 {
860         struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
861         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
862         ENTRY;
863
864         /* XXX identical to MDS */
865         if (req->rq_reqmsg->opc != OST_CONNECT) {
866                 struct obd_device *obd;
867                 int abort_recovery, recovering;
868
869                 if (req->rq_export == NULL) {
870                         CERROR("lustre_ost: operation %d on unconnected OST\n",
871                                req->rq_reqmsg->opc);
872                         req->rq_status = -ENOTCONN;
873                         GOTO(out, rc = -ENOTCONN);
874                 }
875
876                 obd = req->rq_export->exp_obd;
877
878                 /* Check for aborted recovery. */
879                 spin_lock_bh(&obd->obd_processing_task_lock);
880                 abort_recovery = obd->obd_abort_recovery;
881                 recovering = obd->obd_recovering;
882                 spin_unlock_bh(&obd->obd_processing_task_lock);
883                 if (abort_recovery) {
884                         target_abort_recovery(obd);
885                 } else if (recovering) {
886                         rc = filter_recovery_request(req, obd, &should_process);
887                         if (rc || !should_process)
888                                 RETURN(rc);
889                 }
890         } 
891
892         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
893                 GOTO(out, rc = -EINVAL);
894
895         switch (req->rq_reqmsg->opc) {
896         case OST_CONNECT:
897                 CDEBUG(D_INODE, "connect\n");
898                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
899                 rc = target_handle_connect(req, ost_handle);
900                 break;
901         case OST_DISCONNECT:
902                 CDEBUG(D_INODE, "disconnect\n");
903                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
904                 rc = target_handle_disconnect(req);
905                 break;
906         case OST_CREATE:
907                 CDEBUG(D_INODE, "create\n");
908                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
909                 rc = ost_create(req, oti);
910                 break;
911         case OST_DESTROY:
912                 CDEBUG(D_INODE, "destroy\n");
913                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
914                 rc = ost_destroy(req, oti);
915                 break;
916         case OST_GETATTR:
917                 CDEBUG(D_INODE, "getattr\n");
918                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
919                 rc = ost_getattr(req);
920                 break;
921         case OST_SETATTR:
922                 CDEBUG(D_INODE, "setattr\n");
923                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
924                 rc = ost_setattr(req, oti);
925                 break;
926         case OST_OPEN:
927                 CDEBUG(D_INODE, "open\n");
928                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
929                 rc = ost_open(req, oti);
930                 break;
931         case OST_CLOSE:
932                 CDEBUG(D_INODE, "close\n");
933                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
934                 rc = ost_close(req, oti);
935                 break;
936         case OST_WRITE:
937                 CDEBUG(D_INODE, "write\n");
938                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
939                 rc = ost_brw_write(req, oti);
940                 /* ost_brw sends its own replies */
941                 RETURN(rc);
942         case OST_READ:
943                 CDEBUG(D_INODE, "read\n");
944                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
945                 rc = ost_brw_read(req);
946                 /* ost_brw sends its own replies */
947                 RETURN(rc);
948         case OST_SAN_READ:
949                 CDEBUG(D_INODE, "san read\n");
950                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
951                 rc = ost_san_brw(req, OBD_BRW_READ);
952                 /* ost_san_brw sends its own replies */
953                 RETURN(rc);
954         case OST_SAN_WRITE:
955                 CDEBUG(D_INODE, "san write\n");
956                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
957                 rc = ost_san_brw(req, OBD_BRW_WRITE);
958                 /* ost_san_brw sends its own replies */
959                 RETURN(rc);
960         case OST_PUNCH:
961                 CDEBUG(D_INODE, "punch\n");
962                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
963                 rc = ost_punch(req, oti);
964                 break;
965         case OST_STATFS:
966                 CDEBUG(D_INODE, "statfs\n");
967                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
968                 rc = ost_statfs(req);
969                 break;
970         case OST_SYNCFS:
971                 CDEBUG(D_INODE, "sync\n");
972                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
973                 rc = ost_syncfs(req);
974                 break;
975         case OBD_PING:
976                 DEBUG_REQ(D_INODE, req, "ping");
977                 rc = target_handle_ping(req);
978                 break;
979         case LDLM_ENQUEUE:
980                 CDEBUG(D_INODE, "enqueue\n");
981                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
982                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
983                                          ldlm_server_blocking_ast);
984                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
985                 break;
986         case LDLM_CONVERT:
987                 CDEBUG(D_INODE, "convert\n");
988                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
989                 rc = ldlm_handle_convert(req);
990                 break;
991         case LDLM_CANCEL:
992                 CDEBUG(D_INODE, "cancel\n");
993                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
994                 rc = ldlm_handle_cancel(req);
995                 break;
996         case LDLM_BL_CALLBACK:
997         case LDLM_CP_CALLBACK:
998                 CDEBUG(D_INODE, "callback\n");
999                 CERROR("callbacks should not happen on OST\n");
1000                 /* fall through */
1001         default:
1002                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1003                 req->rq_status = -ENOTSUPP;
1004                 rc = ptlrpc_error(req);
1005                 RETURN(rc);
1006         }
1007
1008         EXIT;
1009         /* If we're DISCONNECTing, the export_data is already freed */
1010         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1011                 struct obd_device *obd  = req->rq_export->exp_obd;
1012                 if (!obd->obd_no_transno) {
1013                         req->rq_repmsg->last_committed =
1014                                 obd->obd_last_committed;
1015                 } else {
1016                         DEBUG_REQ(D_IOCTL, req,
1017                                   "not sending last_committed update");
1018                 }
1019                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1020                        obd->obd_last_committed, req->rq_xid);
1021         }
1022
1023 out:
1024         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1025                 struct obd_device *obd = req->rq_export->exp_obd;
1026
1027                 if (obd && obd->obd_recovering) {
1028                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1029                         return target_queue_final_reply(req, rc);
1030                 }
1031                 /* Lost a race with recovery; let the error path DTRT. */
1032                 rc = req->rq_status = -ENOTCONN;
1033         }
1034
1035         if (!rc)
1036                 oti_to_request(oti, req);
1037
1038         target_send_reply(req, rc, fail);
1039         return 0;
1040 }
1041
1042 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1043 {
1044         struct ost_obd *ost = &obddev->u.ost;
1045         int err;
1046         int i;
1047         ENTRY;
1048
1049         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1050                                            OST_BUFSIZE, OST_MAXREQSIZE,
1051                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1052                                            ost_handle, "ost", obddev);
1053         if (!ost->ost_service) {
1054                 CERROR("failed to start service\n");
1055                 GOTO(error_disc, err = -ENOMEM);
1056         }
1057
1058         for (i = 0; i < OST_NUM_THREADS; i++) {
1059                 char name[32];
1060                 sprintf(name, "ll_ost_%02d", i);
1061                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
1062                 if (err) {
1063                         CERROR("error starting thread #%d: rc %d\n", i, err);
1064                         GOTO(error_disc, err = -EINVAL);
1065                 }
1066         }
1067
1068         RETURN(0);
1069
1070 error_disc:
1071         RETURN(err);
1072 }
1073
1074 static int ost_cleanup(struct obd_device *obddev, int force, int failover)
1075 {
1076         struct ost_obd *ost = &obddev->u.ost;
1077         int err = 0;
1078         ENTRY;
1079
1080         if (obddev->obd_recovering)
1081                 target_cancel_recovery_timer(obddev);
1082
1083         ptlrpc_stop_all_threads(ost->ost_service);
1084         ptlrpc_unregister_service(ost->ost_service);
1085
1086         RETURN(err);
1087 }
1088
1089 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1090 {
1091         struct lprocfs_static_vars lvars;
1092
1093         lprocfs_init_vars(&lvars);
1094         return lprocfs_obd_attach(dev, lvars.obd_vars);
1095 }
1096
1097 int ost_detach(struct obd_device *dev)
1098 {
1099         return lprocfs_obd_detach(dev);
1100 }
1101
1102 /* I don't think this function is ever used, since nothing 
1103  * connects directly to this module.
1104  */
1105 static int ost_connect(struct lustre_handle *conn,
1106                        struct obd_device *obd, struct obd_uuid *cluuid)
1107 {
1108         struct obd_export *exp;
1109         int rc;
1110         ENTRY;
1111
1112         if (!conn || !obd || !cluuid)
1113                 RETURN(-EINVAL);
1114
1115         rc = class_connect(conn, obd, cluuid);
1116         if (rc)
1117                 RETURN(rc);
1118         exp = class_conn2export(conn);
1119         LASSERT(exp);
1120         class_export_put(exp);
1121
1122         RETURN(0);
1123 }
1124
1125 /* use obd ops to offer management infrastructure */
1126 static struct obd_ops ost_obd_ops = {
1127         o_owner:        THIS_MODULE,
1128         o_attach:       ost_attach,
1129         o_detach:       ost_detach,
1130         o_setup:        ost_setup,
1131         o_cleanup:      ost_cleanup,
1132         o_connect:      ost_connect,
1133 };
1134
1135 static int __init ost_init(void)
1136 {
1137         struct lprocfs_static_vars lvars;
1138         ENTRY;
1139
1140         lprocfs_init_vars(&lvars);
1141         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1142                                    LUSTRE_OST_NAME));
1143 }
1144
1145 static void __exit ost_exit(void)
1146 {
1147         class_unregister_type(LUSTRE_OST_NAME);
1148 }
1149
1150 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1151 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1152 MODULE_LICENSE("GPL");
1153
1154 module_init(ost_init);
1155 module_exit(ost_exit);