Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <portals/list.h>
47
48 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
49 {
50         if (oti == NULL)
51                 return;
52         memset(oti, 0, sizeof *oti);
53
54         if (req->rq_repmsg && req->rq_reqmsg != 0)
55                 oti->oti_transno = req->rq_repmsg->transno;
56 }
57
58 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
59 {
60         struct oti_req_ack_lock *ack_lock;
61         int i;
62
63         if (oti == NULL)
64                 return;
65
66         if (req->rq_repmsg)
67                 req->rq_repmsg->transno = oti->oti_transno;
68
69         /* XXX 4 == entries in oti_ack_locks??? */
70         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
71                 if (!ack_lock->mode)
72                         break;
73                 /* XXX not even calling target_send_reply in some cases... */
74                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
75         }
76 }
77
78 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
79                        struct obd_trans_info *oti)
80 {
81         struct ost_body *body, *repbody;
82         int rc, size = sizeof(*body);
83         ENTRY;
84
85         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
86         if (body == NULL)
87                 RETURN(-EFAULT);
88
89         rc = lustre_pack_reply(req, 1, &size, NULL);
90         if (rc)
91                 RETURN(rc);
92
93         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
94                 oti->oti_logcookies = obdo_logcookie(&body->oa);
95         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
96         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
97         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
98         RETURN(0);
99 }
100
101 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
102 {
103         struct ost_body *body, *repbody;
104         int rc, size = sizeof(*body);
105         ENTRY;
106
107         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
108         if (body == NULL)
109                 RETURN(-EFAULT);
110
111         rc = lustre_pack_reply(req, 1, &size, NULL);
112         if (rc)
113                 RETURN(rc);
114
115         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
116         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
117         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
118         RETURN(0);
119 }
120
121 static int ost_statfs(struct ptlrpc_request *req)
122 {
123         struct obd_statfs *osfs;
124         int rc, size = sizeof(*osfs);
125         ENTRY;
126
127         rc = lustre_pack_reply(req, 1, &size, NULL);
128         if (rc)
129                 RETURN(rc);
130
131         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
132
133         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
134         if (req->rq_status != 0)
135                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
136
137         RETURN(0);
138 }
139
140 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
141                       struct obd_trans_info *oti)
142 {
143         struct ost_body *body, *repbody;
144         int rc, size = sizeof(*repbody);
145         ENTRY;
146
147         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
148         if (body == NULL)
149                 RETURN(-EFAULT);
150
151         rc = lustre_pack_reply(req, 1, &size, NULL);
152         if (rc)
153                 RETURN(rc);
154
155         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
156         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
157         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
158         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
159         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
160         RETURN(0);
161 }
162
163 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
164                      struct obd_trans_info *oti)
165 {
166         struct ost_body *body, *repbody;
167         int rc, size = sizeof(*repbody);
168         ENTRY;
169
170         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
171         if (body == NULL)
172                 RETURN(-EFAULT);
173
174         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
175             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
176                 RETURN(-EINVAL);
177
178         rc = lustre_pack_reply(req, 1, &size, NULL);
179         if (rc)
180                 RETURN(rc);
181
182         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
183         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
184         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
185                                    repbody->oa.o_blocks, oti);
186         RETURN(0);
187 }
188
189 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
190 {
191         struct ost_body *body, *repbody;
192         int rc, size = sizeof(*repbody);
193         ENTRY;
194
195         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
196         if (body == NULL)
197                 RETURN(-EFAULT);
198
199         rc = lustre_pack_reply(req, 1, &size, NULL);
200         if (rc)
201                 RETURN(rc);
202
203         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
204         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
205         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
206                                   repbody->oa.o_blocks);
207         RETURN(0);
208 }
209
210 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
211                        struct obd_trans_info *oti)
212 {
213         struct ost_body *body, *repbody;
214         int rc, size = sizeof(*repbody);
215         ENTRY;
216
217         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
218         if (body == NULL)
219                 RETURN(-EFAULT);
220
221         rc = lustre_pack_reply(req, 1, &size, NULL);
222         if (rc)
223                 RETURN(rc);
224
225         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
226         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
227
228         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
229         RETURN(0);
230 }
231
232 static int ost_bulk_timeout(void *data)
233 {
234         ENTRY;
235         /* We don't fail the connection here, because having the export
236          * killed makes the (vital) call to commitrw very sad.
237          */
238         RETURN(1);
239 }
240
241 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
242                                 struct niobuf_remote *rnb, int nrnb,
243                                 struct niobuf_remote **pp_rnbp)
244 {
245         /* Copy a remote niobuf, splitting it into page-sized chunks
246          * and setting ioo[i].ioo_bufcnt accordingly */
247         struct niobuf_remote *pp_rnb;
248         int   i;
249         int   j;
250         int   page;
251         int   rnbidx = 0;
252         int   npages = 0;
253
254         /* first count and check the number of pages required */
255         for (i = 0; i < nioo; i++)
256                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
257                         obd_off offset = rnb[rnbidx].offset;
258                         obd_off p0 = offset >> PAGE_SHIFT;
259                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
260
261                         LASSERT(rnbidx < nrnb);
262
263                         npages += (pn + 1 - p0);
264
265                         if (rnb[rnbidx].len == 0) {
266                                 CERROR("zero len BRW: obj %d objid "LPX64
267                                        " buf %u\n", i, ioo[i].ioo_id, j);
268                                 return -EINVAL;
269                         }
270                         if (j > 0 &&
271                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
272                                 CERROR("unordered BRW: obj %d objid "LPX64
273                                        " buf %u offset "LPX64" <= "LPX64"\n",
274                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
275                                        rnb[rnbidx].offset);
276                                 return -EINVAL;
277                         }
278                 }
279
280         LASSERT(rnbidx == nrnb);
281
282         if (npages == nrnb) {       /* all niobufs are for single pages */
283                 *pp_rnbp = rnb;
284                 return npages;
285         }
286
287         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
288         if (pp_rnb == NULL)
289                 return -ENOMEM;
290
291         /* now do the actual split */
292         page = rnbidx = 0;
293         for (i = 0; i < nioo; i++) {
294                 int  obj_pages = 0;
295
296                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
297                         obd_off off = rnb[rnbidx].offset;
298                         int     nob = rnb[rnbidx].len;
299
300                         LASSERT(rnbidx < nrnb);
301                         do {
302                                 obd_off  poff = off & (PAGE_SIZE - 1);
303                                 int      pnob = (poff + nob > PAGE_SIZE) ?
304                                                 PAGE_SIZE - poff : nob;
305
306                                 LASSERT(page < npages);
307                                 pp_rnb[page].len = pnob;
308                                 pp_rnb[page].offset = off;
309                                 pp_rnb[page].flags = rnb[rnbidx].flags;
310
311                                 CDEBUG(0, "   obj %d id "LPX64
312                                        "page %d(%d) "LPX64" for %d, flg %x\n",
313                                        i, ioo[i].ioo_id, obj_pages, page,
314                                        pp_rnb[page].offset, pp_rnb[page].len,
315                                        pp_rnb[page].flags);
316                                 page++;
317                                 obj_pages++;
318
319                                 off += pnob;
320                                 nob -= pnob;
321                         } while (nob > 0);
322                         LASSERT(nob == 0);
323                 }
324                 ioo[i].ioo_bufcnt = obj_pages;
325         }
326         LASSERT(page == npages);
327
328         *pp_rnbp = pp_rnb;
329         return npages;
330 }
331
332 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
333                                    struct niobuf_remote *rnb)
334 {
335         if (pp_rnb == rnb)                      /* didn't allocate above */
336                 return;
337
338         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
339 }
340
341 #if CHECKSUM_BULK
342 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
343 {
344         obd_count cksum = 0;
345         struct ptlrpc_bulk_page *bp;
346
347         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
348                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
349                              bp->bp_buflen);
350                 kunmap(bp->bp_page);
351         }
352
353         return cksum;
354 }
355 #endif
356
357 static int ost_brw_read(struct ptlrpc_request *req)
358 {
359         struct ptlrpc_bulk_desc *desc;
360         struct niobuf_remote    *remote_nb;
361         struct niobuf_remote    *pp_rnb;
362         struct niobuf_local     *local_nb;
363         struct obd_ioobj        *ioo;
364         struct ost_body         *body, *repbody;
365         struct l_wait_info       lwi;
366         struct obd_trans_info    oti = { 0 };
367         char                     str[PTL_NALFMT_SIZE];
368         int                      size[1] = { sizeof(*body) };
369         int                      comms_error = 0;
370         int                      niocount;
371         int                      npages;
372         int                      nob = 0;
373         int                      rc;
374         int                      i;
375         ENTRY;
376
377         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
378                 GOTO(out, rc = -EIO);
379
380         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
381                          (obd_timeout + 1) / 4);
382
383         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
384         if (body == NULL) {
385                 CERROR("Missing/short ost_body\n");
386                 GOTO(out, rc = -EFAULT);
387         }
388
389         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
390         if (ioo == NULL) {
391                 CERROR("Missing/short ioobj\n");
392                 GOTO(out, rc = -EFAULT);
393         }
394
395         niocount = ioo->ioo_bufcnt;
396         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
397                                        lustre_swab_niobuf_remote);
398         if (remote_nb == NULL) {
399                 CERROR("Missing/short niobuf\n");
400                 GOTO(out, rc = -EFAULT);
401         }
402         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
403                 for (i = 1; i < niocount; i++)
404                         lustre_swab_niobuf_remote (&remote_nb[i]);
405         }
406
407         rc = lustre_pack_reply(req, 1, size, NULL);
408         if (rc)
409                 GOTO(out, rc);
410
411         /* FIXME all niobuf splitting should be done in obdfilter if needed */
412         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
413         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
414         if (npages < 0)
415                 GOTO(out, rc = npages);
416
417         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
418         if (local_nb == NULL)
419                 GOTO(out_pp_rnb, rc = -ENOMEM);
420
421         desc = ptlrpc_prep_bulk_exp (req, npages, 
422                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
423         if (desc == NULL)
424                 GOTO(out_local, rc = -ENOMEM);
425
426         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
427                         ioo, npages, pp_rnb, local_nb, &oti);
428         if (rc != 0)
429                 GOTO(out_bulk, rc);
430
431         nob = 0;
432         for (i = 0; i < npages; i++) {
433                 int page_rc = local_nb[i].rc;
434
435                 if (page_rc < 0) {              /* error */
436                         rc = page_rc;
437                         break;
438                 }
439
440                 LASSERT(page_rc <= pp_rnb[i].len);
441                 nob += page_rc;
442                 if (page_rc != 0) {             /* some data! */
443                         LASSERT (local_nb[i].page != NULL);
444                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
445                                               pp_rnb[i].offset & (PAGE_SIZE - 1),
446                                               page_rc);
447                 }
448
449                 if (page_rc != pp_rnb[i].len) { /* short read */
450                         /* All subsequent pages should be 0 */
451                         while(++i < npages)
452                                 LASSERT(local_nb[i].rc == 0);
453                         break;
454                 }
455         }
456
457         if (rc == 0) {
458                 rc = ptlrpc_start_bulk_transfer(desc);
459                 if (rc == 0) {
460                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
461                                           ost_bulk_timeout, desc);
462                         rc = l_wait_event(desc->bd_waitq,
463                                           !ptlrpc_bulk_active(desc), &lwi);
464                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
465                         if (rc == -ETIMEDOUT) {
466                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
467                                 ptlrpc_abort_bulk(desc);
468                         } else if (!desc->bd_success ||
469                                    desc->bd_nob_transferred != desc->bd_nob) {
470                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
471                                           desc->bd_success ? 
472                                           "truncated" : "network error on",
473                                           desc->bd_nob_transferred, 
474                                           desc->bd_nob);
475                                 /* XXX should this be a different errno? */
476                                 rc = -ETIMEDOUT;
477                         }
478                 } else {
479                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
480                 }
481                 comms_error = rc != 0;
482         }
483
484         /* Must commit after prep above in all cases */
485         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
486                           ioo, npages, local_nb, &oti);
487
488         if (rc == 0) {
489                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
490                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
491
492 #if CHECKSUM_BULK
493                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
494                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
495 #endif
496         }
497
498  out_bulk:
499         ptlrpc_free_bulk(desc);
500  out_local:
501         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
502  out_pp_rnb:
503         free_per_page_niobufs(npages, pp_rnb, remote_nb);
504  out:
505         LASSERT(rc <= 0);
506         if (rc == 0) {
507                 req->rq_status = nob;
508                 ptlrpc_reply(req);
509         } else if (!comms_error) {
510                 /* only reply if comms OK */
511                 req->rq_status = rc;
512                 ptlrpc_error(req);
513         } else {
514                 if (req->rq_reply_state != NULL) {
515                         /* reply out callback would free */
516                         lustre_free_reply_state (req->rq_reply_state);
517                 }
518                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
519                         CERROR("bulk IO comms error: "
520                                "evicting %s@%s nid "LPX64" (%s)\n",
521                                req->rq_export->exp_client_uuid.uuid,
522                                req->rq_export->exp_connection->c_remote_uuid.uuid,
523                                req->rq_peer.peer_nid,
524                                portals_nid2str(req->rq_peer.peer_ni->pni_number,
525                                                req->rq_peer.peer_nid,
526                                                str));
527                         ptlrpc_fail_export(req->rq_export);
528                 } else {
529                         CERROR("ignoring bulk IO comms error: "
530                                "client reconnected %s@%s nid "LPX64" (%s)\n",  
531                                req->rq_export->exp_client_uuid.uuid,
532                                req->rq_export->exp_connection->c_remote_uuid.uuid,
533                                req->rq_peer.peer_nid,
534                                portals_nid2str(req->rq_peer.peer_ni->pni_number,
535                                                req->rq_peer.peer_nid,
536                                                str));
537                 }
538         }
539
540         RETURN(rc);
541 }
542
543 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
544 {
545         struct ptlrpc_bulk_desc *desc;
546         struct niobuf_remote    *remote_nb;
547         struct niobuf_remote    *pp_rnb;
548         struct niobuf_local     *local_nb;
549         struct obd_ioobj        *ioo;
550         struct ost_body         *body, *repbody;
551         struct l_wait_info       lwi;
552         __u32                   *rcs;
553         int                      size[2] = { sizeof(*body) };
554         int                      objcount, niocount, npages;
555         int                      comms_error = 0;
556         int                      rc, rc2, swab, i, j;
557         char                     str[PTL_NALFMT_SIZE];
558         ENTRY;
559
560         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
561                 GOTO(out, rc = -EIO);
562
563         /* pause before transaction has been started */
564         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
565                          (obd_timeout + 1) / 4);
566
567         swab = lustre_msg_swabbed(req->rq_reqmsg);
568         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
569         if (body == NULL) {
570                 CERROR("Missing/short ost_body\n");
571                 GOTO(out, rc = -EFAULT);
572         }
573
574         LASSERT_REQSWAB(req, 1);
575         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
576         if (objcount == 0) {
577                 CERROR("Missing/short ioobj\n");
578                 GOTO(out, rc = -EFAULT);
579         }
580         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
581         LASSERT (ioo != NULL);
582         for (niocount = i = 0; i < objcount; i++) {
583                 if (swab)
584                         lustre_swab_obd_ioobj (&ioo[i]);
585                 if (ioo[i].ioo_bufcnt == 0) {
586                         CERROR("ioo[%d] has zero bufcnt\n", i);
587                         GOTO(out, rc = -EFAULT);
588                 }
589                 niocount += ioo[i].ioo_bufcnt;
590         }
591
592         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
593                                        lustre_swab_niobuf_remote);
594         if (remote_nb == NULL) {
595                 CERROR("Missing/short niobuf\n");
596                 GOTO(out, rc = -EFAULT);
597         }
598         if (swab) {                             /* swab the remaining niobufs */
599                 for (i = 1; i < niocount; i++)
600                         lustre_swab_niobuf_remote (&remote_nb[i]);
601         }
602
603         size[1] = niocount * sizeof(*rcs);
604         rc = lustre_pack_reply(req, 2, size, NULL);
605         if (rc != 0)
606                 GOTO(out, rc);
607         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
608
609         /* FIXME all niobuf splitting should be done in obdfilter if needed */
610         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
611         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
612         if (npages < 0)
613                 GOTO(out, rc = npages);
614
615         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
616         if (local_nb == NULL)
617                 GOTO(out_pp_rnb, rc = -ENOMEM);
618
619         desc = ptlrpc_prep_bulk_exp (req, npages, 
620                                      BULK_GET_SINK, OST_BULK_PORTAL);
621         if (desc == NULL)
622                 GOTO(out_local, rc = -ENOMEM);
623
624         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
625                         ioo, npages, pp_rnb, local_nb, oti);
626         if (rc != 0)
627                 GOTO(out_bulk, rc);
628
629         /* NB Having prepped, we must commit... */
630
631         for (i = 0; i < npages; i++)
632                 ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
633                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
634                                       pp_rnb[i].len);
635
636         rc = ptlrpc_start_bulk_transfer (desc);
637         if (rc == 0) {
638                 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
639                                   ost_bulk_timeout, desc);
640                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
641                                   &lwi);
642                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
643                 if (rc == -ETIMEDOUT) {
644                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
645                         ptlrpc_abort_bulk(desc);
646                 } else if (!desc->bd_success ||
647                            desc->bd_nob_transferred != desc->bd_nob) {
648                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
649                                   desc->bd_success ? 
650                                   "truncated" : "network error on",
651                                   desc->bd_nob_transferred, desc->bd_nob);
652                         /* XXX should this be a different errno? */
653                         rc = -ETIMEDOUT;
654                 }
655         } else {
656                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
657         }
658         comms_error = rc != 0;
659
660         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
661         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
662
663 #if CHECKSUM_BULK
664         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
665                 static int cksum_counter;
666                 obd_count client_cksum = body->oa.o_cksum;
667                 obd_count cksum = ost_checksum_bulk(desc);
668
669                 portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
670                                 req->rq_connection->c_peer.peer_nid, str);
671                 if (client_cksum != cksum) {
672                         CERROR("Bad checksum: client %x, server %x, client NID "
673                                LPX64" (%s)\n", client_cksum, cksum,
674                                req->rq_connection->c_peer.peer_nid, str);
675                         cksum_counter = 1;
676                         repbody->oa.o_cksum = cksum;
677                 } else {
678                         cksum_counter++;
679                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
680                                 CWARN("Checksum %u from "LPX64": %x OK\n",
681                                       cksum_counter,
682                                       req->rq_connection->c_peer.peer_nid,
683                                       cksum);
684                 }
685         }
686 #endif
687         /* Must commit after prep above in all cases */
688         rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
689                            objcount, ioo, npages, local_nb, oti);
690
691         if (rc == 0) {
692                 /* set per-requested niobuf return codes */
693                 for (i = j = 0; i < niocount; i++) {
694                         int nob = remote_nb[i].len;
695
696                         rcs[i] = 0;
697                         do {
698                                 LASSERT(j < npages);
699                                 if (local_nb[j].rc < 0)
700                                         rcs[i] = local_nb[j].rc;
701                                 nob -= pp_rnb[j].len;
702                                 j++;
703                         } while (nob > 0);
704                         LASSERT(nob == 0);
705                 }
706                 LASSERT(j == npages);
707         }
708         if (rc == 0)
709                 rc = rc2;
710
711  out_bulk:
712         ptlrpc_free_bulk(desc);
713  out_local:
714         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
715  out_pp_rnb:
716         free_per_page_niobufs(npages, pp_rnb, remote_nb);
717  out:
718         if (rc == 0) {
719                 oti_to_request(oti, req);
720                 rc = ptlrpc_reply(req);
721         } else if (!comms_error) {
722                 /* Only reply if there was no comms problem with bulk */
723                 req->rq_status = rc;
724                 ptlrpc_error(req);
725         } else {
726                 if (req->rq_reply_state != NULL) {
727                         /* reply out callback would free */
728                         lustre_free_reply_state (req->rq_reply_state);
729                 }
730                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
731                         CERROR("bulk IO comms error: "
732                                "evicting %s@%s nid "LPX64" (%s)\n",
733                                req->rq_export->exp_client_uuid.uuid,
734                                req->rq_export->exp_connection->c_remote_uuid.uuid,
735                                req->rq_peer.peer_nid,
736                                portals_nid2str(req->rq_peer.peer_ni->pni_number,
737                                                req->rq_peer.peer_nid,
738                                                str));
739                         ptlrpc_fail_export(req->rq_export);
740                 } else {
741                         CERROR("ignoring bulk IO comms error: "
742                                "client reconnected %s@%s nid "LPX64" (%s)\n",
743                                req->rq_export->exp_client_uuid.uuid,
744                                req->rq_export->exp_connection->c_remote_uuid.uuid,
745                                req->rq_peer.peer_nid,
746                                portals_nid2str(req->rq_peer.peer_ni->pni_number,
747                                                req->rq_peer.peer_nid,
748                                                str));
749                 }        
750         }
751         RETURN(rc);
752 }
753
754 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
755 {
756         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
757         struct obd_ioobj *ioo;
758         struct ost_body *body, *repbody;
759         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
760         int swab;
761         ENTRY;
762
763         /* XXX not set to use latest protocol */
764
765         swab = lustre_msg_swabbed(req->rq_reqmsg);
766         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
767         if (body == NULL) {
768                 CERROR("Missing/short ost_body\n");
769                 GOTO(out, rc = -EFAULT);
770         }
771
772         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
773         if (ioo == NULL) {
774                 CERROR("Missing/short ioobj\n");
775                 GOTO(out, rc = -EFAULT);
776         }
777         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
778         niocount = ioo[0].ioo_bufcnt;
779         for (i = 1; i < objcount; i++) {
780                 if (swab)
781                         lustre_swab_obd_ioobj (&ioo[i]);
782                 niocount += ioo[i].ioo_bufcnt;
783         }
784
785         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
786                                        lustre_swab_niobuf_remote);
787         if (remote_nb == NULL) {
788                 CERROR("Missing/short niobuf\n");
789                 GOTO(out, rc = -EFAULT);
790         }
791         if (swab) {                             /* swab the remaining niobufs */
792                 for (i = 1; i < niocount; i++)
793                         lustre_swab_niobuf_remote (&remote_nb[i]);
794         }
795
796         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
797         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
798         if (npages < 0)
799                 GOTO (out, rc = npages);
800  
801         size[1] = npages * sizeof(*pp_rnb);
802         rc = lustre_pack_reply(req, 2, size, NULL);
803         if (rc)
804                 GOTO(out_pp_rnb, rc);
805
806         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
807                                         objcount, ioo, npages, pp_rnb);
808
809         if (req->rq_status)
810                 GOTO(out_pp_rnb, rc = 0);
811
812         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
813         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
814
815         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
816         memcpy(res_nb, remote_nb, size[1]);
817         rc = 0;
818 out_pp_rnb:
819         free_per_page_niobufs(npages, pp_rnb, remote_nb);
820 out:
821         if (rc) {
822                 req->rq_status = rc;
823                 ptlrpc_error(req);
824         } else
825                 ptlrpc_reply(req);
826
827         return rc;
828 }
829
830
831 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
832 {
833         char *key;
834         int keylen, rc = 0;
835         ENTRY;
836
837         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
838         if (key == NULL) {
839                 DEBUG_REQ(D_HA, req, "no set_info key");
840                 RETURN(-EFAULT);
841         }
842         keylen = req->rq_reqmsg->buflens[0];
843
844         rc = lustre_pack_reply(req, 0, NULL, NULL);
845         if (rc)
846                 RETURN(rc);
847
848         rc = obd_set_info(exp, keylen, key, 0, NULL);
849         req->rq_repmsg->status = 0;
850         RETURN(rc);
851 }
852
853 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
854 {
855         char *key;
856         int keylen, rc = 0, size = sizeof(obd_id);
857         obd_id *reply;
858         ENTRY;
859
860         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
861         if (key == NULL) {
862                 DEBUG_REQ(D_HA, req, "no get_info key");
863                 RETURN(-EFAULT);
864         }
865         keylen = req->rq_reqmsg->buflens[0];
866
867         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
868                 RETURN(-EPROTO);
869
870         rc = lustre_pack_reply(req, 1, &size, NULL);
871         if (rc)
872                 RETURN(rc);
873
874         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
875         rc = obd_get_info(exp, keylen, key, &size, reply);
876         req->rq_repmsg->status = 0;
877         RETURN(rc);
878 }
879
880 static int ost_filter_recovery_request(struct ptlrpc_request *req,
881                                        struct obd_device *obd, int *process)
882 {
883         switch (req->rq_reqmsg->opc) {
884         case OST_CONNECT: /* This will never get here, but for completeness. */
885         case OST_DISCONNECT:
886                *process = 1;
887                RETURN(0);
888
889         case OBD_PING:
890         case OST_CREATE:
891         case OST_DESTROY:
892         case OST_PUNCH:
893         case OST_SETATTR:
894         case OST_SYNC:
895         case OST_WRITE:
896         case OBD_LOG_CANCEL:
897         case LDLM_ENQUEUE:
898                 *process = target_queue_recovery_request(req, obd);
899                 RETURN(0);
900
901         default:
902                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
903                 *process = 0;
904                 /* XXX what should we set rq_status to here? */
905                 req->rq_status = -EAGAIN;
906                 RETURN(ptlrpc_error(req));
907         }
908 }
909
910
911
912 static int ost_handle(struct ptlrpc_request *req)
913 {
914         struct obd_trans_info trans_info = { 0, };
915         struct obd_trans_info *oti = &trans_info;
916         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
917         struct obd_export *exp = NULL;
918         ENTRY;
919
920         LASSERT(current->journal_info == NULL);
921         /* XXX identical to MDS */
922         if (req->rq_reqmsg->opc != OST_CONNECT) {
923                 struct obd_device *obd;
924                 int abort_recovery, recovering;
925
926                 exp = req->rq_export;
927
928                 if (exp == NULL) {
929                         CDEBUG(D_HA, "operation %d on unconnected OST\n",
930                                req->rq_reqmsg->opc);
931                         req->rq_status = -ENOTCONN;
932                         GOTO(out, rc = -ENOTCONN);
933                 }
934
935                 obd = exp->exp_obd;
936
937                 /* Check for aborted recovery. */
938                 spin_lock_bh(&obd->obd_processing_task_lock);
939                 abort_recovery = obd->obd_abort_recovery;
940                 recovering = obd->obd_recovering;
941                 spin_unlock_bh(&obd->obd_processing_task_lock);
942                 if (abort_recovery) {
943                         target_abort_recovery(obd);
944                 } else if (recovering) {
945                         rc = ost_filter_recovery_request(req, obd,
946                                                          &should_process);
947                         if (rc || !should_process)
948                                 RETURN(rc);
949                 }
950         }
951
952         oti_init(oti, req);
953
954         switch (req->rq_reqmsg->opc) {
955         case OST_CONNECT: {
956                 CDEBUG(D_INODE, "connect\n");
957                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
958                 rc = target_handle_connect(req, ost_handle);
959                 break;
960         }
961         case OST_DISCONNECT:
962                 CDEBUG(D_INODE, "disconnect\n");
963                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
964                 rc = target_handle_disconnect(req);
965                 break;
966         case OST_CREATE:
967                 CDEBUG(D_INODE, "create\n");
968                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
969                 rc = ost_create(exp, req, oti);
970                 break;
971         case OST_DESTROY:
972                 CDEBUG(D_INODE, "destroy\n");
973                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
974                 rc = ost_destroy(exp, req, oti);
975                 break;
976         case OST_GETATTR:
977                 CDEBUG(D_INODE, "getattr\n");
978                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
979                 rc = ost_getattr(exp, req);
980                 break;
981         case OST_SETATTR:
982                 CDEBUG(D_INODE, "setattr\n");
983                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
984                 rc = ost_setattr(exp, req, oti);
985                 break;
986         case OST_WRITE:
987                 CDEBUG(D_INODE, "write\n");
988                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
989                 rc = ost_brw_write(req, oti);
990                 LASSERT(current->journal_info == NULL);
991                 /* ost_brw sends its own replies */
992                 RETURN(rc);
993         case OST_READ:
994                 CDEBUG(D_INODE, "read\n");
995                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
996                 rc = ost_brw_read(req);
997                 LASSERT(current->journal_info == NULL);
998                 /* ost_brw sends its own replies */
999                 RETURN(rc);
1000         case OST_SAN_READ:
1001                 CDEBUG(D_INODE, "san read\n");
1002                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1003                 rc = ost_san_brw(req, OBD_BRW_READ);
1004                 /* ost_san_brw sends its own replies */
1005                 RETURN(rc);
1006         case OST_SAN_WRITE:
1007                 CDEBUG(D_INODE, "san write\n");
1008                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1009                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1010                 /* ost_san_brw sends its own replies */
1011                 RETURN(rc);
1012         case OST_PUNCH:
1013                 CDEBUG(D_INODE, "punch\n");
1014                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1015                 rc = ost_punch(exp, req, oti);
1016                 break;
1017         case OST_STATFS:
1018                 CDEBUG(D_INODE, "statfs\n");
1019                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1020                 rc = ost_statfs(req);
1021                 break;
1022         case OST_SYNC:
1023                 CDEBUG(D_INODE, "sync\n");
1024                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1025                 rc = ost_sync(exp, req);
1026                 break;
1027         case OST_SET_INFO:
1028                 DEBUG_REQ(D_INODE, req, "set_info");
1029                 rc = ost_set_info(exp, req);
1030                 break;
1031         case OST_GET_INFO:
1032                 DEBUG_REQ(D_INODE, req, "get_info");
1033                 rc = ost_get_info(exp, req);
1034                 break;
1035         case OBD_PING:
1036                 DEBUG_REQ(D_INODE, req, "ping");
1037                 rc = target_handle_ping(req);
1038                 break;
1039         /* FIXME - just reply status */
1040         case LLOG_ORIGIN_CONNECT:
1041                 DEBUG_REQ(D_INODE, req, "log connect\n");
1042                 rc = llog_handle_connect(req); 
1043                 req->rq_status = rc;
1044                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1045                 if (rc)
1046                         RETURN(rc);
1047                 RETURN(ptlrpc_reply(req));
1048         case OBD_LOG_CANCEL:
1049                 CDEBUG(D_INODE, "log cancel\n");
1050                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1051                 rc = llog_origin_handle_cancel(req);
1052                 req->rq_status = rc;
1053                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1054                 if (rc)
1055                         RETURN(rc);
1056                 RETURN(ptlrpc_reply(req));
1057         case LDLM_ENQUEUE:
1058                 CDEBUG(D_INODE, "enqueue\n");
1059                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1060                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1061                                          ldlm_server_blocking_ast);
1062                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1063                 break;
1064         case LDLM_CONVERT:
1065                 CDEBUG(D_INODE, "convert\n");
1066                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1067                 rc = ldlm_handle_convert(req);
1068                 break;
1069         case LDLM_CANCEL:
1070                 CDEBUG(D_INODE, "cancel\n");
1071                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1072                 rc = ldlm_handle_cancel(req);
1073                 break;
1074         case LDLM_BL_CALLBACK:
1075         case LDLM_CP_CALLBACK:
1076                 CDEBUG(D_INODE, "callback\n");
1077                 CERROR("callbacks should not happen on OST\n");
1078                 /* fall through */
1079         default:
1080                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1081                 req->rq_status = -ENOTSUPP;
1082                 rc = ptlrpc_error(req);
1083                 RETURN(rc);
1084         }
1085
1086         LASSERT(current->journal_info == NULL);
1087
1088         EXIT;
1089         /* If we're DISCONNECTing, the export_data is already freed */
1090         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1091                 struct obd_device *obd  = req->rq_export->exp_obd;
1092                 if (!obd->obd_no_transno) {
1093                         req->rq_repmsg->last_committed =
1094                                 obd->obd_last_committed;
1095                 } else {
1096                         DEBUG_REQ(D_IOCTL, req,
1097                                   "not sending last_committed update");
1098                 }
1099                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1100                        obd->obd_last_committed, req->rq_xid);
1101         }
1102
1103 out:
1104         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1105                 struct obd_device *obd = req->rq_export->exp_obd;
1106
1107                 if (obd && obd->obd_recovering) {
1108                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1109                         return target_queue_final_reply(req, rc);
1110                 }
1111                 /* Lost a race with recovery; let the error path DTRT. */
1112                 rc = req->rq_status = -ENOTCONN;
1113         }
1114
1115         if (!rc)
1116                 oti_to_request(oti, req);
1117
1118         target_send_reply(req, rc, fail);
1119         return 0;
1120 }
1121
1122 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1123 {
1124         struct ost_obd *ost = &obddev->u.ost;
1125         int rc;
1126         ENTRY;
1127
1128         /* Get rid of unneeded supplementary groups */
1129         current->ngroups = 0;
1130         memset(current->groups, 0, sizeof(current->groups));
1131
1132         rc = llog_start_commit_thread();
1133         if (rc < 0)
1134                 RETURN(rc);
1135
1136         ost->ost_service = 
1137                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1138                                 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1139                                 ost_handle, "ost",
1140                                 obddev->obd_proc_entry);
1141         if (ost->ost_service == NULL) {
1142                 CERROR("failed to start service\n");
1143                 RETURN(-ENOMEM);
1144         }
1145         
1146         rc = ptlrpc_start_n_threads(obddev, ost->ost_service, OST_NUM_THREADS, 
1147                                  "ll_ost");
1148         if (rc)
1149                 GOTO(out, rc = -EINVAL);
1150
1151         ost->ost_create_service =
1152                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1153                                 OST_CREATE_PORTAL, OSC_REPLY_PORTAL,
1154                                 ost_handle, "ost_create",
1155                                 obddev->obd_proc_entry);
1156         if (ost->ost_create_service == NULL) {
1157                 CERROR("failed to start OST create service\n");
1158                 GOTO(out, rc = -ENOMEM);
1159         }
1160
1161         rc = ptlrpc_start_n_threads(obddev, ost->ost_create_service, 1,
1162                                     "ll_ost_create");
1163         if (rc) 
1164                 GOTO(out_create, rc = -EINVAL);
1165
1166         RETURN(0);
1167
1168 out_create:
1169         ptlrpc_unregister_service(ost->ost_create_service);
1170 out:
1171         ptlrpc_unregister_service(ost->ost_service);
1172         RETURN(rc);
1173 }
1174
1175 static int ost_cleanup(struct obd_device *obddev, int flags)
1176 {
1177         struct ost_obd *ost = &obddev->u.ost;
1178         int err = 0;
1179         ENTRY;
1180
1181         spin_lock_bh(&obddev->obd_processing_task_lock);
1182         if (obddev->obd_recovering) {
1183                 target_cancel_recovery_timer(obddev);
1184                 obddev->obd_recovering = 0;
1185         }
1186         spin_unlock_bh(&obddev->obd_processing_task_lock);
1187
1188         ptlrpc_stop_all_threads(ost->ost_service);
1189         ptlrpc_unregister_service(ost->ost_service);
1190
1191         ptlrpc_stop_all_threads(ost->ost_create_service);
1192         ptlrpc_unregister_service(ost->ost_create_service);
1193
1194         RETURN(err);
1195 }
1196
1197 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1198 {
1199         struct lprocfs_static_vars lvars;
1200
1201         lprocfs_init_vars(ost,&lvars);
1202         return lprocfs_obd_attach(dev, lvars.obd_vars);
1203 }
1204
1205 int ost_detach(struct obd_device *dev)
1206 {
1207         return lprocfs_obd_detach(dev);
1208 }
1209
1210 /* use obd ops to offer management infrastructure */
1211 static struct obd_ops ost_obd_ops = {
1212         o_owner:        THIS_MODULE,
1213         o_attach:       ost_attach,
1214         o_detach:       ost_detach,
1215         o_setup:        ost_setup,
1216         o_cleanup:      ost_cleanup,
1217 };
1218
1219 static int __init ost_init(void)
1220 {
1221         struct lprocfs_static_vars lvars;
1222         ENTRY;
1223
1224         lprocfs_init_vars(ost,&lvars);
1225         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1226                                    LUSTRE_OST_NAME));
1227 }
1228
1229 static void /*__exit*/ ost_exit(void)
1230 {
1231         class_unregister_type(LUSTRE_OST_NAME);
1232 }
1233
1234 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1235 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1236 MODULE_LICENSE("GPL");
1237
1238 module_init(ost_init);
1239 module_exit(ost_exit);