Whamcloud - gitweb
b=4834
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/lustre_debug.h>
44 #include <linux/init.h>
45 #include <linux/lprocfs_status.h>
46 #include <linux/lustre_commit_confd.h>
47 #include <libcfs/list.h>
48 #include "ost_internal.h"
49
50 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
51 {
52         if (oti == NULL)
53                 return;
54         memset(oti, 0, sizeof *oti);
55
56         if (req->rq_repmsg && req->rq_reqmsg != 0)
57                 oti->oti_transno = req->rq_repmsg->transno;
58 }
59
60 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
61 {
62         struct oti_req_ack_lock *ack_lock;
63         int i;
64
65         if (oti == NULL)
66                 return;
67
68         if (req->rq_repmsg)
69                 req->rq_repmsg->transno = oti->oti_transno;
70
71         /* XXX 4 == entries in oti_ack_locks??? */
72         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
73                 if (!ack_lock->mode)
74                         break;
75                 /* XXX not even calling target_send_reply in some cases... */
76                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
77         }
78 }
79
80 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
81                        struct obd_trans_info *oti)
82 {
83         struct ost_body *body, *repbody;
84         int rc, size = sizeof(*body);
85         ENTRY;
86
87         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
88         if (body == NULL)
89                 RETURN(-EFAULT);
90
91         rc = lustre_pack_reply(req, 1, &size, NULL);
92         if (rc)
93                 RETURN(rc);
94
95         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
96                 oti->oti_logcookies = obdo_logcookie(&body->oa);
97         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
98         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
99         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
100         RETURN(0);
101 }
102
103 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
104 {
105         struct ost_body *body, *repbody;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
110         if (body == NULL)
111                 RETURN(-EFAULT);
112
113         rc = lustre_pack_reply(req, 1, &size, NULL);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
118         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
120         RETURN(0);
121 }
122
123 static int ost_statfs(struct ptlrpc_request *req)
124 {
125         struct obd_statfs *osfs;
126         int rc, size = sizeof(*osfs);
127         ENTRY;
128
129         rc = lustre_pack_reply(req, 1, &size, NULL);
130         if (rc)
131                 RETURN(rc);
132
133         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
134
135         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
136         if (req->rq_status != 0)
137                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
138
139         RETURN(0);
140 }
141
142 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
143                       struct obd_trans_info *oti)
144 {
145         struct ost_body *body, *repbody;
146         int rc, size = sizeof(*repbody);
147         ENTRY;
148
149         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
150         if (body == NULL)
151                 RETURN(-EFAULT);
152
153         rc = lustre_pack_reply(req, 1, &size, NULL);
154         if (rc)
155                 RETURN(rc);
156
157         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
158         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
160         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
161         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
162         RETURN(0);
163 }
164
165 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
166                      struct obd_trans_info *oti)
167 {
168         struct ost_body *body, *repbody;
169         int rc, size = sizeof(*repbody);
170         ENTRY;
171
172         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
173         if (body == NULL)
174                 RETURN(-EFAULT);
175
176         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
177             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
178                 RETURN(-EINVAL);
179
180         rc = lustre_pack_reply(req, 1, &size, NULL);
181         if (rc)
182                 RETURN(rc);
183
184         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
185         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
186         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
187                                    repbody->oa.o_blocks, oti);
188         RETURN(0);
189 }
190
191 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
192 {
193         struct ost_body *body, *repbody;
194         int rc, size = sizeof(*repbody);
195         ENTRY;
196
197         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
198         if (body == NULL)
199                 RETURN(-EFAULT);
200
201         rc = lustre_pack_reply(req, 1, &size, NULL);
202         if (rc)
203                 RETURN(rc);
204
205         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
206         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
207         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
208                                   repbody->oa.o_blocks);
209         RETURN(0);
210 }
211
212 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
213                        struct obd_trans_info *oti)
214 {
215         struct ost_body *body, *repbody;
216         int rc, size = sizeof(*repbody);
217         ENTRY;
218
219         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
220         if (body == NULL)
221                 RETURN(-EFAULT);
222
223         rc = lustre_pack_reply(req, 1, &size, NULL);
224         if (rc)
225                 RETURN(rc);
226
227         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
228         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
229
230         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
231         RETURN(0);
232 }
233
234 static int ost_bulk_timeout(void *data)
235 {
236         ENTRY;
237         /* We don't fail the connection here, because having the export
238          * killed makes the (vital) call to commitrw very sad.
239          */
240         RETURN(1);
241 }
242
243 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
244                                 struct niobuf_remote *rnb, int nrnb,
245                                 struct niobuf_remote **pp_rnbp)
246 {
247         /* Copy a remote niobuf, splitting it into page-sized chunks
248          * and setting ioo[i].ioo_bufcnt accordingly */
249         struct niobuf_remote *pp_rnb;
250         int   i;
251         int   j;
252         int   page;
253         int   rnbidx = 0;
254         int   npages = 0;
255
256         /* first count and check the number of pages required */
257         for (i = 0; i < nioo; i++)
258                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
259                         obd_off offset = rnb[rnbidx].offset;
260                         obd_off p0 = offset >> PAGE_SHIFT;
261                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
262
263                         LASSERT(rnbidx < nrnb);
264
265                         npages += (pn + 1 - p0);
266
267                         if (rnb[rnbidx].len == 0) {
268                                 CERROR("zero len BRW: obj %d objid "LPX64
269                                        " buf %u\n", i, ioo[i].ioo_id, j);
270                                 return -EINVAL;
271                         }
272                         if (j > 0 &&
273                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
274                                 CERROR("unordered BRW: obj %d objid "LPX64
275                                        " buf %u offset "LPX64" <= "LPX64"\n",
276                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
277                                        rnb[rnbidx].offset);
278                                 return -EINVAL;
279                         }
280                 }
281
282         LASSERT(rnbidx == nrnb);
283
284         if (npages == nrnb) {       /* all niobufs are for single pages */
285                 *pp_rnbp = rnb;
286                 return npages;
287         }
288
289         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
290         if (pp_rnb == NULL)
291                 return -ENOMEM;
292
293         /* now do the actual split */
294         page = rnbidx = 0;
295         for (i = 0; i < nioo; i++) {
296                 int  obj_pages = 0;
297
298                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
299                         obd_off off = rnb[rnbidx].offset;
300                         int     nob = rnb[rnbidx].len;
301
302                         LASSERT(rnbidx < nrnb);
303                         do {
304                                 obd_off  poff = off & (PAGE_SIZE - 1);
305                                 int      pnob = (poff + nob > PAGE_SIZE) ?
306                                                 PAGE_SIZE - poff : nob;
307
308                                 LASSERT(page < npages);
309                                 pp_rnb[page].len = pnob;
310                                 pp_rnb[page].offset = off;
311                                 pp_rnb[page].flags = rnb[rnbidx].flags;
312
313                                 CDEBUG(0, "   obj %d id "LPX64
314                                        "page %d(%d) "LPX64" for %d, flg %x\n",
315                                        i, ioo[i].ioo_id, obj_pages, page,
316                                        pp_rnb[page].offset, pp_rnb[page].len,
317                                        pp_rnb[page].flags);
318                                 page++;
319                                 obj_pages++;
320
321                                 off += pnob;
322                                 nob -= pnob;
323                         } while (nob > 0);
324                         LASSERT(nob == 0);
325                 }
326                 ioo[i].ioo_bufcnt = obj_pages;
327         }
328         LASSERT(page == npages);
329
330         *pp_rnbp = pp_rnb;
331         return npages;
332 }
333
334 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
335                                    struct niobuf_remote *rnb)
336 {
337         if (pp_rnb == rnb)                      /* didn't allocate above */
338                 return;
339
340         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
341 }
342
343 #if CHECKSUM_BULK
344 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
345 {
346         obd_count cksum = 0;
347         int i;
348
349         for (i = 0; i < desc->bd_iov_count; i++) {
350                 struct page *page = desc->bd_iov[i].kiov_page;
351                 char *ptr = kmap(page);
352                 int psum, off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
353                 int count = desc->bd_iov[i].kiov_len;
354
355                 while (count > 0) {
356                         ost_checksum(&cksum, &psum, ptr + off,
357                                      count > CHECKSUM_CHUNK ?
358                                      CHECKSUM_CHUNK : count);
359                         LL_CDEBUG_PAGE(D_PAGE, page, "off %d checksum %x\n",
360                                        off, psum);
361                         off += CHECKSUM_CHUNK;
362                         count -= CHECKSUM_CHUNK;
363                 }
364                 kunmap(page);
365         }
366
367         return cksum;
368 }
369 #endif
370
371 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_bulk_desc *desc;
374         struct niobuf_remote    *remote_nb;
375         struct niobuf_remote    *pp_rnb;
376         struct niobuf_local     *local_nb;
377         struct obd_ioobj        *ioo;
378         struct ost_body         *body, *repbody;
379         struct l_wait_info       lwi;
380         int                      size[1] = { sizeof(*body) };
381         int                      comms_error = 0;
382         int                      niocount;
383         int                      npages;
384         int                      nob = 0;
385         int                      rc;
386         int                      i;
387         ENTRY;
388
389         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
390                 GOTO(out, rc = -EIO);
391
392         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
393                          (obd_timeout + 1) / 4);
394
395         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
396         if (body == NULL) {
397                 CERROR("Missing/short ost_body\n");
398                 GOTO(out, rc = -EFAULT);
399         }
400
401         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
402         if (ioo == NULL) {
403                 CERROR("Missing/short ioobj\n");
404                 GOTO(out, rc = -EFAULT);
405         }
406
407         niocount = ioo->ioo_bufcnt;
408         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
409                                        lustre_swab_niobuf_remote);
410         if (remote_nb == NULL) {
411                 CERROR("Missing/short niobuf\n");
412                 GOTO(out, rc = -EFAULT);
413         }
414         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
415                 for (i = 1; i < niocount; i++)
416                         lustre_swab_niobuf_remote (&remote_nb[i]);
417         }
418
419         rc = lustre_pack_reply(req, 1, size, NULL);
420         if (rc)
421                 GOTO(out, rc);
422
423         /* FIXME all niobuf splitting should be done in obdfilter if needed */
424         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
425         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
426         if (npages < 0)
427                 GOTO(out, rc = npages);
428
429         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
430         if (local_nb == NULL)
431                 GOTO(out_pp_rnb, rc = -ENOMEM);
432
433         desc = ptlrpc_prep_bulk_exp (req, npages, 
434                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
435         if (desc == NULL)
436                 GOTO(out_local, rc = -ENOMEM);
437
438         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
439                         ioo, npages, pp_rnb, local_nb, oti);
440         if (rc != 0)
441                 GOTO(out_bulk, rc);
442
443         /* We're finishing using body->oa as an input variable */
444         body->oa.o_valid = 0;
445
446         nob = 0;
447         for (i = 0; i < npages; i++) {
448                 int page_rc = local_nb[i].rc;
449
450                 if (page_rc < 0) {              /* error */
451                         rc = page_rc;
452                         break;
453                 }
454
455                 LASSERT(page_rc <= pp_rnb[i].len);
456                 nob += page_rc;
457                 if (page_rc != 0) {             /* some data! */
458                         LASSERT (local_nb[i].page != NULL);
459                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
460                                               pp_rnb[i].offset & (PAGE_SIZE-1),
461                                               page_rc);
462                 }
463
464                 if (page_rc != pp_rnb[i].len) { /* short read */
465                         /* All subsequent pages should be 0 */
466                         while(++i < npages)
467                                 LASSERT(local_nb[i].rc == 0);
468                         break;
469                 }
470         }
471
472         if (rc == 0) {
473                 rc = ptlrpc_start_bulk_transfer(desc);
474                 if (rc == 0) {
475                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
476                                           ost_bulk_timeout, desc);
477                         rc = l_wait_event(desc->bd_waitq,
478                                           !ptlrpc_bulk_active(desc), &lwi);
479                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
480                         if (rc == -ETIMEDOUT) {
481                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
482                                 ptlrpc_abort_bulk(desc);
483                         } else if (!desc->bd_success ||
484                                    desc->bd_nob_transferred != desc->bd_nob) {
485                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
486                                           desc->bd_success ?
487                                           "truncated" : "network error on",
488                                           desc->bd_nob_transferred,
489                                           desc->bd_nob);
490                                 /* XXX should this be a different errno? */
491                                 rc = -ETIMEDOUT;
492                         }
493                 } else {
494                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
495                 }
496                 comms_error = rc != 0;
497         }
498
499         /* Must commit after prep above in all cases */
500         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
501                           ioo, npages, local_nb, oti, rc);
502
503         if (rc == 0) {
504                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
505                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
506
507 #if CHECKSUM_BULK
508                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
509                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
510 #endif
511         }
512
513  out_bulk:
514         ptlrpc_free_bulk(desc);
515  out_local:
516         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
517  out_pp_rnb:
518         free_per_page_niobufs(npages, pp_rnb, remote_nb);
519  out:
520         LASSERT(rc <= 0);
521         if (rc == 0) {
522                 req->rq_status = nob;
523                 target_committed_to_req(req);
524                 ptlrpc_reply(req);
525         } else if (!comms_error) {
526                 /* only reply if comms OK */
527                 target_committed_to_req(req);
528                 req->rq_status = rc;
529                 ptlrpc_error(req);
530         } else {
531                 if (req->rq_reply_state != NULL) {
532                         /* reply out callback would free */
533                         ptlrpc_rs_decref(req->rq_reply_state);
534                         req->rq_reply_state = NULL;
535                 }
536                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
537                         CERROR("bulk IO comms error: "
538                                "evicting %s@%s id %s\n",
539                                req->rq_export->exp_client_uuid.uuid,
540                                req->rq_export->exp_connection->c_remote_uuid.uuid,
541                                req->rq_peerstr);
542                         ptlrpc_fail_export(req->rq_export);
543                 } else {
544                         CERROR("ignoring bulk IO comms error: "
545                                "client reconnected %s@%s id %s\n",
546                                req->rq_export->exp_client_uuid.uuid,
547                                req->rq_export->exp_connection->c_remote_uuid.uuid,
548                                req->rq_peerstr);
549                 }
550         }
551
552         RETURN(rc);
553 }
554
555 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
556 {
557         struct ptlrpc_bulk_desc *desc;
558         struct niobuf_remote    *remote_nb;
559         struct niobuf_remote    *pp_rnb;
560         struct niobuf_local     *local_nb;
561         struct obd_ioobj        *ioo;
562         struct ost_body         *body, *repbody;
563         struct l_wait_info       lwi;
564         __u32                   *rcs;
565         int                      size[2] = { sizeof(*body) };
566         int                      objcount, niocount, npages;
567         int                      comms_error = 0;
568         int                      rc, swab, i, j;
569         ENTRY;
570
571         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
572                 GOTO(out, rc = -EIO);
573
574         /* pause before transaction has been started */
575         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
576                          (obd_timeout + 1) / 4);
577
578         swab = lustre_msg_swabbed(req->rq_reqmsg);
579         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
580         if (body == NULL) {
581                 CERROR("Missing/short ost_body\n");
582                 GOTO(out, rc = -EFAULT);
583         }
584
585         LASSERT_REQSWAB(req, 1);
586         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
587         if (objcount == 0) {
588                 CERROR("Missing/short ioobj\n");
589                 GOTO(out, rc = -EFAULT);
590         }
591         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
592         LASSERT (ioo != NULL);
593         for (niocount = i = 0; i < objcount; i++) {
594                 if (swab)
595                         lustre_swab_obd_ioobj (&ioo[i]);
596                 if (ioo[i].ioo_bufcnt == 0) {
597                         CERROR("ioo[%d] has zero bufcnt\n", i);
598                         GOTO(out, rc = -EFAULT);
599                 }
600                 niocount += ioo[i].ioo_bufcnt;
601         }
602
603         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
604                                        lustre_swab_niobuf_remote);
605         if (remote_nb == NULL) {
606                 CERROR("Missing/short niobuf\n");
607                 GOTO(out, rc = -EFAULT);
608         }
609         if (swab) {                             /* swab the remaining niobufs */
610                 for (i = 1; i < niocount; i++)
611                         lustre_swab_niobuf_remote (&remote_nb[i]);
612         }
613
614         size[1] = niocount * sizeof(*rcs);
615         rc = lustre_pack_reply(req, 2, size, NULL);
616         if (rc != 0)
617                 GOTO(out, rc);
618         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
619
620         /* FIXME all niobuf splitting should be done in obdfilter if needed */
621         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
622         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
623         if (npages < 0)
624                 GOTO(out, rc = npages);
625
626         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
627         if (local_nb == NULL)
628                 GOTO(out_pp_rnb, rc = -ENOMEM);
629
630         desc = ptlrpc_prep_bulk_exp (req, npages, 
631                                      BULK_GET_SINK, OST_BULK_PORTAL);
632         if (desc == NULL)
633                 GOTO(out_local, rc = -ENOMEM);
634
635         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
636                         ioo, npages, pp_rnb, local_nb, oti);
637         if (rc != 0)
638                 GOTO(out_bulk, rc);
639
640         /* NB Having prepped, we must commit... */
641
642         for (i = 0; i < npages; i++)
643                 ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
644                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
645                                       pp_rnb[i].len);
646
647         rc = ptlrpc_start_bulk_transfer (desc);
648         if (rc == 0) {
649                 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
650                                   ost_bulk_timeout, desc);
651                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
652                                   &lwi);
653                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
654                 if (rc == -ETIMEDOUT) {
655                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
656                         ptlrpc_abort_bulk(desc);
657                 } else if (!desc->bd_success ||
658                            desc->bd_nob_transferred != desc->bd_nob) {
659                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
660                                   desc->bd_success ? 
661                                   "truncated" : "network error on",
662                                   desc->bd_nob_transferred, desc->bd_nob);
663                         /* XXX should this be a different errno? */
664                         rc = -ETIMEDOUT;
665                 }
666         } else {
667                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
668         }
669         comms_error = rc != 0;
670
671         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
672         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
673
674 #if CHECKSUM_BULK
675         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
676                 static int cksum_counter;
677                 obd_count client_cksum = body->oa.o_cksum;
678                 obd_count cksum = ost_checksum_bulk(desc);
679
680                 if (client_cksum != cksum) {
681                         CERROR("Bad checksum: client %x, server %x id %s\n",
682                                client_cksum, cksum,
683                                req->rq_peerstr);
684                         cksum_counter = 1;
685                         repbody->oa.o_cksum = cksum;
686                 } else {
687                         cksum_counter++;
688                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
689                                 CWARN("Checksum %u from %s: %x OK\n",
690                                       cksum_counter,
691                                       req->rq_peerstr,
692                                       cksum);
693                 }
694         }
695 #endif
696         /* Must commit after prep above in all cases */
697         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
698                            objcount, ioo, npages, local_nb, oti, rc);
699
700         if (rc == 0) {
701                 /* set per-requested niobuf return codes */
702                 for (i = j = 0; i < niocount; i++) {
703                         int nob = remote_nb[i].len;
704
705                         rcs[i] = 0;
706                         do {
707                                 LASSERT(j < npages);
708                                 if (local_nb[j].rc < 0)
709                                         rcs[i] = local_nb[j].rc;
710                                 nob -= pp_rnb[j].len;
711                                 j++;
712                         } while (nob > 0);
713                         LASSERT(nob == 0);
714                 }
715                 LASSERT(j == npages);
716         }
717
718  out_bulk:
719         ptlrpc_free_bulk(desc);
720  out_local:
721         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
722  out_pp_rnb:
723         free_per_page_niobufs(npages, pp_rnb, remote_nb);
724  out:
725         if (rc == 0) {
726                 oti_to_request(oti, req);
727                 target_committed_to_req(req);
728                 rc = ptlrpc_reply(req);
729         } else if (!comms_error) {
730                 /* Only reply if there was no comms problem with bulk */
731                 target_committed_to_req(req);
732                 req->rq_status = rc;
733                 ptlrpc_error(req);
734         } else {
735                 if (req->rq_reply_state != NULL) {
736                         /* reply out callback would free */
737                         ptlrpc_rs_decref(req->rq_reply_state);
738                         req->rq_reply_state = NULL;
739                 }
740                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
741                         CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
742                                req->rq_export->exp_obd->obd_name,
743                                req->rq_export->exp_client_uuid.uuid,
744                                req->rq_export->exp_connection->c_remote_uuid.uuid,
745                                req->rq_peerstr);
746                         ptlrpc_fail_export(req->rq_export);
747                 } else {
748                         CERROR("ignoring bulk IO comms error: "
749                                "client reconnected %s@%s id %s\n",
750                                req->rq_export->exp_client_uuid.uuid,
751                                req->rq_export->exp_connection->c_remote_uuid.uuid,
752                                req->rq_peerstr);
753                 }
754         }
755         RETURN(rc);
756 }
757
758 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
759 {
760         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
761         struct obd_ioobj *ioo;
762         struct ost_body *body, *repbody;
763         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
764         int swab;
765         ENTRY;
766
767         /* XXX not set to use latest protocol */
768
769         swab = lustre_msg_swabbed(req->rq_reqmsg);
770         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
771         if (body == NULL) {
772                 CERROR("Missing/short ost_body\n");
773                 GOTO(out, rc = -EFAULT);
774         }
775
776         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
777         if (ioo == NULL) {
778                 CERROR("Missing/short ioobj\n");
779                 GOTO(out, rc = -EFAULT);
780         }
781         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
782         niocount = ioo[0].ioo_bufcnt;
783         for (i = 1; i < objcount; i++) {
784                 if (swab)
785                         lustre_swab_obd_ioobj (&ioo[i]);
786                 niocount += ioo[i].ioo_bufcnt;
787         }
788
789         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
790                                        lustre_swab_niobuf_remote);
791         if (remote_nb == NULL) {
792                 CERROR("Missing/short niobuf\n");
793                 GOTO(out, rc = -EFAULT);
794         }
795         if (swab) {                             /* swab the remaining niobufs */
796                 for (i = 1; i < niocount; i++)
797                         lustre_swab_niobuf_remote (&remote_nb[i]);
798         }
799
800         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
801         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
802         if (npages < 0)
803                 GOTO (out, rc = npages);
804
805         size[1] = npages * sizeof(*pp_rnb);
806         rc = lustre_pack_reply(req, 2, size, NULL);
807         if (rc)
808                 GOTO(out_pp_rnb, rc);
809
810         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
811                                         objcount, ioo, npages, pp_rnb);
812
813         if (req->rq_status)
814                 GOTO(out_pp_rnb, rc = 0);
815
816         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
817         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
818
819         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
820         memcpy(res_nb, remote_nb, size[1]);
821         rc = 0;
822 out_pp_rnb:
823         free_per_page_niobufs(npages, pp_rnb, remote_nb);
824 out:
825         target_committed_to_req(req);
826         if (rc) {
827                 req->rq_status = rc;
828                 ptlrpc_error(req);
829         } else {
830                 ptlrpc_reply(req);
831         }
832
833         return rc;
834 }
835
836
837 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
838 {
839         char *key;
840         int keylen, rc = 0;
841         ENTRY;
842
843         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
844         if (key == NULL) {
845                 DEBUG_REQ(D_HA, req, "no set_info key");
846                 RETURN(-EFAULT);
847         }
848         keylen = req->rq_reqmsg->buflens[0];
849
850         rc = lustre_pack_reply(req, 0, NULL, NULL);
851         if (rc)
852                 RETURN(rc);
853
854         rc = obd_set_info(exp, keylen, key, 0, NULL);
855         req->rq_repmsg->status = 0;
856         RETURN(rc);
857 }
858
859 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
860 {
861         char *key;
862         int keylen, rc = 0, size = sizeof(obd_id);
863         obd_id *reply;
864         ENTRY;
865
866         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
867         if (key == NULL) {
868                 DEBUG_REQ(D_HA, req, "no get_info key");
869                 RETURN(-EFAULT);
870         }
871         keylen = req->rq_reqmsg->buflens[0];
872
873         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
874                 RETURN(-EPROTO);
875
876         rc = lustre_pack_reply(req, 1, &size, NULL);
877         if (rc)
878                 RETURN(rc);
879
880         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
881         rc = obd_get_info(exp, keylen, key, &size, reply);
882         req->rq_repmsg->status = 0;
883         RETURN(rc);
884 }
885
886 static int ost_filter_recovery_request(struct ptlrpc_request *req,
887                                        struct obd_device *obd, int *process)
888 {
889         switch (req->rq_reqmsg->opc) {
890         case OST_CONNECT: /* This will never get here, but for completeness. */
891         case OST_DISCONNECT:
892                *process = 1;
893                RETURN(0);
894
895         case OBD_PING:
896         case OST_CREATE:
897         case OST_DESTROY:
898         case OST_PUNCH:
899         case OST_SETATTR:
900         case OST_SYNC:
901         case OST_WRITE:
902         case OBD_LOG_CANCEL:
903         case LDLM_ENQUEUE:
904                 *process = target_queue_recovery_request(req, obd);
905                 RETURN(0);
906
907         default:
908                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
909                 *process = 0;
910                 /* XXX what should we set rq_status to here? */
911                 req->rq_status = -EAGAIN;
912                 RETURN(ptlrpc_error(req));
913         }
914 }
915
916 static int ost_handle(struct ptlrpc_request *req)
917 {
918         struct obd_trans_info trans_info = { 0, };
919         struct obd_trans_info *oti = &trans_info;
920         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
921         struct obd_device *obd = NULL;
922         ENTRY;
923
924         LASSERT(current->journal_info == NULL);
925         /* XXX identical to MDS */
926         if (req->rq_reqmsg->opc != OST_CONNECT) {
927                 int abort_recovery, recovering;
928
929                 if (req->rq_export == NULL) {
930                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
931                                req->rq_reqmsg->opc, req->rq_peerstr);
932                         req->rq_status = -ENOTCONN;
933                         GOTO(out, rc = -ENOTCONN);
934                 }
935
936                 obd = req->rq_export->exp_obd;
937
938                 /* Check for aborted recovery. */
939                 spin_lock_bh(&obd->obd_processing_task_lock);
940                 abort_recovery = obd->obd_abort_recovery;
941                 recovering = obd->obd_recovering;
942                 spin_unlock_bh(&obd->obd_processing_task_lock);
943                 if (abort_recovery) {
944                         target_abort_recovery(obd);
945                 } else if (recovering) {
946                         rc = ost_filter_recovery_request(req, obd,
947                                                          &should_process);
948                         if (rc || !should_process)
949                                 RETURN(rc);
950                 }
951         }
952
953         oti_init(oti, req);
954
955         switch (req->rq_reqmsg->opc) {
956         case OST_CONNECT: {
957                 CDEBUG(D_INODE, "connect\n");
958                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
959                 rc = target_handle_connect(req, ost_handle);
960                 if (!rc)
961                         obd = req->rq_export->exp_obd;
962                 break;
963         }
964         case OST_DISCONNECT:
965                 CDEBUG(D_INODE, "disconnect\n");
966                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
967                 rc = target_handle_disconnect(req);
968                 break;
969         case OST_CREATE:
970                 CDEBUG(D_INODE, "create\n");
971                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
972                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
973                         GOTO(out, rc = -ENOSPC);
974                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
975                         GOTO(out, rc = -EROFS);
976                 rc = ost_create(req->rq_export, req, oti);
977                 break;
978         case OST_DESTROY:
979                 CDEBUG(D_INODE, "destroy\n");
980                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
981                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
982                         GOTO(out, rc = -EROFS);
983                 rc = ost_destroy(req->rq_export, req, oti);
984                 break;
985         case OST_GETATTR:
986                 CDEBUG(D_INODE, "getattr\n");
987                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
988                 rc = ost_getattr(req->rq_export, req);
989                 break;
990         case OST_SETATTR:
991                 CDEBUG(D_INODE, "setattr\n");
992                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
993                 rc = ost_setattr(req->rq_export, req, oti);
994                 break;
995         case OST_WRITE:
996                 CDEBUG(D_INODE, "write\n");
997                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
998                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
999                         GOTO(out, rc = -ENOSPC);
1000                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1001                         GOTO(out, rc = -EROFS);
1002                 rc = ost_brw_write(req, oti);
1003                 LASSERT(current->journal_info == NULL);
1004                 /* ost_brw_write sends its own replies */
1005                 RETURN(rc);
1006         case OST_READ:
1007                 CDEBUG(D_INODE, "read\n");
1008                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1009                 rc = ost_brw_read(req, oti);
1010                 LASSERT(current->journal_info == NULL);
1011                 /* ost_brw_read sends its own replies */
1012                 RETURN(rc);
1013         case OST_SAN_READ:
1014                 CDEBUG(D_INODE, "san read\n");
1015                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1016                 rc = ost_san_brw(req, OBD_BRW_READ);
1017                 /* ost_san_brw sends its own replies */
1018                 RETURN(rc);
1019         case OST_SAN_WRITE:
1020                 CDEBUG(D_INODE, "san write\n");
1021                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1022                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1023                 /* ost_san_brw sends its own replies */
1024                 RETURN(rc);
1025         case OST_PUNCH:
1026                 CDEBUG(D_INODE, "punch\n");
1027                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1028                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1029                         GOTO(out, rc = -EROFS);
1030                 rc = ost_punch(req->rq_export, req, oti);
1031                 break;
1032         case OST_STATFS:
1033                 CDEBUG(D_INODE, "statfs\n");
1034                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1035                 rc = ost_statfs(req);
1036                 break;
1037         case OST_SYNC:
1038                 CDEBUG(D_INODE, "sync\n");
1039                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1040                 rc = ost_sync(req->rq_export, req);
1041                 break;
1042         case OST_SET_INFO:
1043                 DEBUG_REQ(D_INODE, req, "set_info");
1044                 rc = ost_set_info(req->rq_export, req);
1045                 break;
1046         case OST_GET_INFO:
1047                 DEBUG_REQ(D_INODE, req, "get_info");
1048                 rc = ost_get_info(req->rq_export, req);
1049                 break;
1050         case OBD_PING:
1051                 DEBUG_REQ(D_INODE, req, "ping");
1052                 rc = target_handle_ping(req);
1053                 break;
1054         /* FIXME - just reply status */
1055         case LLOG_ORIGIN_CONNECT:
1056                 DEBUG_REQ(D_INODE, req, "log connect\n");
1057                 rc = llog_handle_connect(req); 
1058                 req->rq_status = rc;
1059                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1060                 if (rc)
1061                         RETURN(rc);
1062                 RETURN(ptlrpc_reply(req));
1063         case OBD_LOG_CANCEL:
1064                 CDEBUG(D_INODE, "log cancel\n");
1065                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1066                 rc = llog_origin_handle_cancel(req);
1067                 req->rq_status = rc;
1068                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1069                 if (rc)
1070                         RETURN(rc);
1071                 RETURN(ptlrpc_reply(req));
1072         case LDLM_ENQUEUE:
1073                 CDEBUG(D_INODE, "enqueue\n");
1074                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1075                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1076                                          ldlm_server_blocking_ast,
1077                                          ldlm_server_glimpse_ast);
1078                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1079                 break;
1080         case LDLM_CONVERT:
1081                 CDEBUG(D_INODE, "convert\n");
1082                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1083                 rc = ldlm_handle_convert(req);
1084                 break;
1085         case LDLM_CANCEL:
1086                 CDEBUG(D_INODE, "cancel\n");
1087                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1088                 rc = ldlm_handle_cancel(req);
1089                 break;
1090         case LDLM_BL_CALLBACK:
1091         case LDLM_CP_CALLBACK:
1092                 CDEBUG(D_INODE, "callback\n");
1093                 CERROR("callbacks should not happen on OST\n");
1094                 /* fall through */
1095         default:
1096                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1097                 req->rq_status = -ENOTSUPP;
1098                 rc = ptlrpc_error(req);
1099                 RETURN(rc);
1100         }
1101
1102         LASSERT(current->journal_info == NULL);
1103
1104         EXIT;
1105         /* If we're DISCONNECTing, the export_data is already freed */
1106         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT)
1107                 target_committed_to_req(req);
1108
1109 out:
1110         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1111                 if (obd && obd->obd_recovering) {
1112                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1113                         return target_queue_final_reply(req, rc);
1114                 }
1115                 /* Lost a race with recovery; let the error path DTRT. */
1116                 rc = req->rq_status = -ENOTCONN;
1117         }
1118
1119         if (!rc)
1120                 oti_to_request(oti, req);
1121
1122         target_send_reply(req, rc, fail);
1123         return 0;
1124 }
1125
1126 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1127 {
1128         struct ost_obd *ost = &obd->u.ost;
1129         struct lprocfs_static_vars lvars;
1130         int rc;
1131         ENTRY;
1132
1133         rc = cleanup_group_info();
1134         if (rc)
1135                 RETURN(rc);
1136
1137         rc = llog_start_commit_thread();
1138         if (rc < 0)
1139                 RETURN(rc);
1140
1141         lprocfs_init_vars(ost, &lvars);
1142         lprocfs_obd_setup(obd, lvars.obd_vars);
1143
1144         ost->ost_service =
1145                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1146                                 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1147                                 obd_timeout * 1000, ost_handle, "ost",
1148                                 obd->obd_proc_entry, ost_print_req);
1149         if (ost->ost_service == NULL) {
1150                 CERROR("failed to start service\n");
1151                 GOTO(out_lprocfs, rc = -ENOMEM);
1152         }
1153
1154         rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1155                                     "ll_ost");
1156         if (rc)
1157                 GOTO(out_service, rc = -EINVAL);
1158
1159         ost->ost_create_service =
1160                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1161                                 OST_CREATE_PORTAL, OSC_REPLY_PORTAL,
1162                                 obd_timeout * 1000, ost_handle, "ost_create",
1163                                 obd->obd_proc_entry, ost_print_req);
1164         if (ost->ost_create_service == NULL) {
1165                 CERROR("failed to start OST create service\n");
1166                 GOTO(out_service, rc = -ENOMEM);
1167         }
1168
1169         rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1170                                     "ll_ost_creat");
1171         if (rc)
1172                 GOTO(out_create, rc = -EINVAL);
1173
1174         RETURN(0);
1175
1176 out_create:
1177         ptlrpc_unregister_service(ost->ost_create_service);
1178 out_service:
1179         ptlrpc_unregister_service(ost->ost_service);
1180 out_lprocfs:
1181         lprocfs_obd_cleanup(obd);
1182         RETURN(rc);
1183 }
1184
1185 static int ost_cleanup(struct obd_device *obd)
1186 {
1187         struct ost_obd *ost = &obd->u.ost;
1188         int err = 0;
1189         ENTRY;
1190
1191         spin_lock_bh(&obd->obd_processing_task_lock);
1192         if (obd->obd_recovering) {
1193                 target_cancel_recovery_timer(obd);
1194                 obd->obd_recovering = 0;
1195         }
1196         spin_unlock_bh(&obd->obd_processing_task_lock);
1197
1198         ptlrpc_unregister_service(ost->ost_service);
1199         ptlrpc_unregister_service(ost->ost_create_service);
1200
1201         lprocfs_obd_cleanup(obd);
1202
1203         RETURN(err);
1204 }
1205
1206 /* use obd ops to offer management infrastructure */
1207 static struct obd_ops ost_obd_ops = {
1208         .o_owner        = THIS_MODULE,
1209         .o_setup        = ost_setup,
1210         .o_cleanup      = ost_cleanup,
1211 };
1212
1213 static int __init ost_init(void)
1214 {
1215         struct lprocfs_static_vars lvars;
1216         ENTRY;
1217
1218         lprocfs_init_vars(ost,&lvars);
1219         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1220                                    LUSTRE_OST_NAME));
1221 }
1222
1223 static void /*__exit*/ ost_exit(void)
1224 {
1225         class_unregister_type(LUSTRE_OST_NAME);
1226 }
1227
1228 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1229 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1230 MODULE_LICENSE("GPL");
1231
1232 module_init(ost_init);
1233 module_exit(ost_exit);