Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <portals/list.h>
47
48 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
49 {
50         if (oti == NULL)
51                 return;
52         memset(oti, 0, sizeof *oti);
53
54         if (req->rq_repmsg && req->rq_reqmsg != 0)
55                 oti->oti_transno = req->rq_repmsg->transno;
56 }
57
58 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
59 {
60         struct oti_req_ack_lock *ack_lock;
61         int i;
62
63         if (oti == NULL)
64                 return;
65
66         if (req->rq_repmsg)
67                 req->rq_repmsg->transno = oti->oti_transno;
68
69         /* XXX 4 == entries in oti_ack_locks??? */
70         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
71                 if (!ack_lock->mode)
72                         break;
73                 ldlm_put_lock_into_req(req, &ack_lock->lock, ack_lock->mode);
74         }
75 }
76
77 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
78                        struct obd_trans_info *oti)
79 {
80         struct ost_body *body, *repbody;
81         int rc, size = sizeof(*body);
82         ENTRY;
83
84         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
85         if (body == NULL)
86                 RETURN(-EFAULT);
87
88         rc = lustre_pack_reply(req, 1, &size, NULL);
89         if (rc)
90                 RETURN(rc);
91
92         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
93                 oti->oti_logcookies = obdo_logcookie(&body->oa);
94         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
95         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
96         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
97         RETURN(0);
98 }
99
100 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
101 {
102         struct ost_body *body, *repbody;
103         int rc, size = sizeof(*body);
104         ENTRY;
105
106         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
107         if (body == NULL)
108                 RETURN(-EFAULT);
109
110         rc = lustre_pack_reply(req, 1, &size, NULL);
111         if (rc)
112                 RETURN(rc);
113
114         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
115         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
116         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
117         RETURN(0);
118 }
119
120 static int ost_statfs(struct ptlrpc_request *req)
121 {
122         struct obd_statfs *osfs;
123         int rc, size = sizeof(*osfs);
124         ENTRY;
125
126         rc = lustre_pack_reply(req, 1, &size, NULL);
127         if (rc)
128                 RETURN(rc);
129
130         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
131
132         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
133         if (req->rq_status != 0)
134                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
135
136         RETURN(0);
137 }
138
139 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
140                       struct obd_trans_info *oti)
141 {
142         struct ost_body *body, *repbody;
143         int rc, size = sizeof(*repbody);
144         ENTRY;
145
146         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
147         if (body == NULL)
148                 RETURN(-EFAULT);
149
150         rc = lustre_pack_reply(req, 1, &size, NULL);
151         if (rc)
152                 RETURN(rc);
153
154         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
155         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
156         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
157         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
158         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
159         RETURN(0);
160 }
161
162 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
163                      struct obd_trans_info *oti)
164 {
165         struct ost_body *body, *repbody;
166         int rc, size = sizeof(*repbody);
167         ENTRY;
168
169         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
170         if (body == NULL)
171                 RETURN(-EFAULT);
172
173         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
174             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
175                 RETURN(-EINVAL);
176
177         rc = lustre_pack_reply(req, 1, &size, NULL);
178         if (rc)
179                 RETURN(rc);
180
181         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
182         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
183         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
184                                    repbody->oa.o_blocks, oti);
185         RETURN(0);
186 }
187
188 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
189 {
190         struct ost_body *body, *repbody;
191         int rc, size = sizeof(*repbody);
192         ENTRY;
193
194         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
195         if (body == NULL)
196                 RETURN(-EFAULT);
197
198         rc = lustre_pack_reply(req, 1, &size, NULL);
199         if (rc)
200                 RETURN(rc);
201
202         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
203         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
204         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
205                                   repbody->oa.o_blocks);
206         RETURN(0);
207 }
208
209 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
210                        struct obd_trans_info *oti)
211 {
212         struct ost_body *body, *repbody;
213         int rc, size = sizeof(*repbody);
214         ENTRY;
215
216         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
217         if (body == NULL)
218                 RETURN(-EFAULT);
219
220         rc = lustre_pack_reply(req, 1, &size, NULL);
221         if (rc)
222                 RETURN(rc);
223
224         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
225         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
226
227         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
228         RETURN(0);
229 }
230
231 static int ost_bulk_timeout(void *data)
232 {
233         ENTRY;
234         /* We don't fail the connection here, because having the export
235          * killed makes the (vital) call to commitrw very sad.
236          */
237         RETURN(1);
238 }
239
240 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
241                                 struct niobuf_remote *rnb, int nrnb,
242                                 struct niobuf_remote **pp_rnbp)
243 {
244         /* Copy a remote niobuf, splitting it into page-sized chunks
245          * and setting ioo[i].ioo_bufcnt accordingly */
246         struct niobuf_remote *pp_rnb;
247         int   i;
248         int   j;
249         int   page;
250         int   rnbidx = 0;
251         int   npages = 0;
252
253         /* first count and check the number of pages required */
254         for (i = 0; i < nioo; i++)
255                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
256                         obd_off offset = rnb[rnbidx].offset;
257                         obd_off p0 = offset >> PAGE_SHIFT;
258                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
259
260                         LASSERT(rnbidx < nrnb);
261
262                         npages += (pn + 1 - p0);
263
264                         if (rnb[rnbidx].len == 0) {
265                                 CERROR("zero len BRW: obj %d objid "LPX64
266                                        " buf %u\n", i, ioo[i].ioo_id, j);
267                                 return -EINVAL;
268                         }
269                         if (j > 0 &&
270                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
271                                 CERROR("unordered BRW: obj %d objid "LPX64
272                                        " buf %u offset "LPX64" <= "LPX64"\n",
273                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
274                                        rnb[rnbidx].offset);
275                                 return -EINVAL;
276                         }
277                 }
278
279         LASSERT(rnbidx == nrnb);
280
281         if (npages == nrnb) {       /* all niobufs are for single pages */
282                 *pp_rnbp = rnb;
283                 return npages;
284         }
285
286         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
287         if (pp_rnb == NULL)
288                 return -ENOMEM;
289
290         /* now do the actual split */
291         page = rnbidx = 0;
292         for (i = 0; i < nioo; i++) {
293                 int  obj_pages = 0;
294
295                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
296                         obd_off off = rnb[rnbidx].offset;
297                         int     nob = rnb[rnbidx].len;
298
299                         LASSERT(rnbidx < nrnb);
300                         do {
301                                 obd_off  poff = off & (PAGE_SIZE - 1);
302                                 int      pnob = (poff + nob > PAGE_SIZE) ?
303                                                 PAGE_SIZE - poff : nob;
304
305                                 LASSERT(page < npages);
306                                 pp_rnb[page].len = pnob;
307                                 pp_rnb[page].offset = off;
308                                 pp_rnb[page].flags = rnb->flags;
309
310                                 CDEBUG(D_PAGE, "   obj %d id "LPX64
311                                        "page %d(%d) "LPX64" for %d\n",
312                                        i, ioo[i].ioo_id, obj_pages, page,
313                                        pp_rnb[page].offset, pp_rnb[page].len);
314                                 page++;
315                                 obj_pages++;
316
317                                 off += pnob;
318                                 nob -= pnob;
319                         } while (nob > 0);
320                         LASSERT(nob == 0);
321                 }
322                 ioo[i].ioo_bufcnt = obj_pages;
323         }
324         LASSERT(page == npages);
325
326         *pp_rnbp = pp_rnb;
327         return npages;
328 }
329
330 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
331                                    struct niobuf_remote *rnb)
332 {
333         if (pp_rnb == rnb)                      /* didn't allocate above */
334                 return;
335
336         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
337 }
338
339 #if CHECKSUM_BULK
340 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
341 {
342         obd_count cksum = 0;
343         struct ptlrpc_bulk_page *bp;
344
345         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
346                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
347                              bp->bp_buflen);
348                 kunmap(bp->bp_page);
349         }
350
351         return cksum;
352 }
353 #endif
354
355 static int ost_brw_read(struct ptlrpc_request *req)
356 {
357         struct ptlrpc_bulk_desc *desc;
358         struct niobuf_remote    *remote_nb;
359         struct niobuf_remote    *pp_rnb;
360         struct niobuf_local     *local_nb;
361         struct obd_ioobj        *ioo;
362         struct ost_body         *body, *repbody;
363         struct l_wait_info       lwi;
364         struct obd_trans_info    oti = { 0 };
365         char                     str[PTL_NALFMT_SIZE];
366         int                      size[1] = { sizeof(*body) };
367         int                      comms_error = 0;
368         int                      niocount;
369         int                      npages;
370         int                      nob = 0;
371         int                      rc;
372         int                      i;
373         ENTRY;
374
375         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
376                 GOTO(out, rc = -EIO);
377
378         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
379                          (obd_timeout + 1) / 4);
380
381         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
382         if (body == NULL) {
383                 CERROR("Missing/short ost_body\n");
384                 GOTO(out, rc = -EFAULT);
385         }
386
387         /* BUG 974: when we send back cache grants, don't clear this flag */
388         body->oa.o_valid &= ~OBD_MD_FLRDEV;
389
390         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
391         if (ioo == NULL) {
392                 CERROR("Missing/short ioobj\n");
393                 GOTO(out, rc = -EFAULT);
394         }
395
396         niocount = ioo->ioo_bufcnt;
397         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
398                                        lustre_swab_niobuf_remote);
399         if (remote_nb == NULL) {
400                 CERROR("Missing/short niobuf\n");
401                 GOTO(out, rc = -EFAULT);
402         }
403         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
404                 for (i = 1; i < niocount; i++)
405                         lustre_swab_niobuf_remote (&remote_nb[i]);
406         }
407
408         size[0] = sizeof(*body);
409         rc = lustre_pack_reply(req, 1, size, NULL);
410         if (rc)
411                 GOTO(out, rc);
412
413         /* FIXME all niobuf splitting should be done in obdfilter if needed */
414         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
415         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
416         if (npages < 0)
417                 GOTO(out, rc = npages);
418
419         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
420         if (local_nb == NULL)
421                 GOTO(out_pp_rnb, rc = -ENOMEM);
422
423         desc = ptlrpc_prep_bulk_exp(req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
424         if (desc == NULL)
425                 GOTO(out_local, rc = -ENOMEM);
426
427         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
428                         ioo, npages, pp_rnb, local_nb, &oti);
429         if (rc != 0)
430                 GOTO(out_bulk, rc);
431
432         nob = 0;
433         for (i = 0; i < npages; i++) {
434                 int page_rc = local_nb[i].rc;
435
436                 if (page_rc < 0) {              /* error */
437                         rc = page_rc;
438                         break;
439                 }
440
441                 LASSERT(page_rc <= pp_rnb[i].len);
442                 nob += page_rc;
443                 if (page_rc != 0) {             /* some data! */
444                         LASSERT (local_nb[i].page != NULL);
445                         rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
446                                                    pp_rnb[i].offset& ~PAGE_MASK,
447                                                    page_rc);
448                         if (rc != 0)
449                                 break;
450                 }
451
452                 if (page_rc != pp_rnb[i].len) { /* short read */
453                         /* All subsequent pages should be 0 */
454                         while(++i < npages)
455                                 LASSERT(local_nb[i].rc == 0);
456                         break;
457                 }
458         }
459
460         if (rc == 0) {
461                 rc = ptlrpc_bulk_put(desc);
462                 if (rc == 0) {
463                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
464                                           ost_bulk_timeout, desc);
465                         rc = l_wait_event(desc->bd_waitq,
466                                           ptlrpc_bulk_complete(desc), &lwi);
467                         if (rc) {
468                                 LASSERT(rc == -ETIMEDOUT);
469                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
470                                 ptlrpc_abort_bulk(desc);
471                         }
472                 } else {
473                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
474                 }
475                 comms_error = rc != 0;
476         }
477
478         /* Must commit after prep above in all cases */
479         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
480                           ioo, npages, local_nb, &oti);
481
482         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
483         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
484
485 #if CHECKSUM_BULK
486         if (rc == 0) {
487                 repbody->oa.o_nlink = ost_checksum_bulk(desc);
488                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
489         }
490 #endif
491
492  out_bulk:
493         ptlrpc_free_bulk(desc);
494  out_local:
495         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
496  out_pp_rnb:
497         free_per_page_niobufs(npages, pp_rnb, remote_nb);
498  out:
499         LASSERT(rc <= 0);
500         if (rc == 0) {
501                 req->rq_status = nob;
502                 ptlrpc_reply(req);
503         } else if (!comms_error) {
504                 /* only reply if comms OK */
505                 req->rq_status = rc;
506                 ptlrpc_error(req);
507         } else {
508                 if (req->rq_repmsg != NULL) {
509                         /* reply out callback would free */
510                         OBD_FREE(req->rq_repmsg, req->rq_replen);
511                 }
512                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
513                         CERROR("bulk IO comms error: "
514                                "evicting %s@%s nid "LPX64" (%s)\n",
515                                req->rq_export->exp_client_uuid.uuid,
516                                req->rq_connection->c_remote_uuid.uuid,
517                                req->rq_connection->c_peer.peer_nid,
518                                portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
519                                                req->rq_connection->c_peer.peer_nid,
520                                                str));
521                         ptlrpc_fail_export(req->rq_export);
522                 } else {
523                         CERROR("ignoring bulk IO comms error: "
524                                "client reconnected %s@%s nid "LPX64" (%s)\n",  
525                                req->rq_export->exp_client_uuid.uuid,
526                                req->rq_connection->c_remote_uuid.uuid,
527                                req->rq_connection->c_peer.peer_nid,
528                                portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
529                                                req->rq_connection->c_peer.peer_nid,
530                                                str));
531                 }
532         }
533
534         RETURN(rc);
535 }
536
537 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
538 {
539         struct ptlrpc_bulk_desc *desc;
540         struct niobuf_remote    *remote_nb;
541         struct niobuf_remote    *pp_rnb;
542         struct niobuf_local     *local_nb;
543         struct obd_ioobj        *ioo;
544         struct ost_body         *body, *repbody;
545         struct l_wait_info       lwi;
546         __u32                   *rcs;
547         int                      size[2] = { sizeof(*body) };
548         int                      objcount, niocount, npages;
549         int                      comms_error = 0;
550         int                      rc, rc2, swab, i, j;
551         char                    str[PTL_NALFMT_SIZE];
552         ENTRY;
553
554         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
555                 GOTO(out, rc = -EIO);
556
557         /* pause before transaction has been started */
558         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
559                          (obd_timeout + 1) / 4);
560
561         swab = lustre_msg_swabbed(req->rq_reqmsg);
562         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
563         if (body == NULL) {
564                 CERROR("Missing/short ost_body\n");
565                 GOTO(out, rc = -EFAULT);
566         }
567
568         /* BUG 974: when we send back cache grants, don't clear this flag */
569         body->oa.o_valid &= ~OBD_MD_FLRDEV;
570
571         LASSERT_REQSWAB(req, 1);
572         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
573         if (objcount == 0) {
574                 CERROR("Missing/short ioobj\n");
575                 GOTO(out, rc = -EFAULT);
576         }
577         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
578         LASSERT (ioo != NULL);
579         for (niocount = i = 0; i < objcount; i++) {
580                 if (swab)
581                         lustre_swab_obd_ioobj (&ioo[i]);
582                 if (ioo[i].ioo_bufcnt == 0) {
583                         CERROR("ioo[%d] has zero bufcnt\n", i);
584                         GOTO(out, rc = -EFAULT);
585                 }
586                 niocount += ioo[i].ioo_bufcnt;
587         }
588
589         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
590                                        lustre_swab_niobuf_remote);
591         if (remote_nb == NULL) {
592                 CERROR("Missing/short niobuf\n");
593                 GOTO(out, rc = -EFAULT);
594         }
595         if (swab) {                             /* swab the remaining niobufs */
596                 for (i = 1; i < niocount; i++)
597                         lustre_swab_niobuf_remote (&remote_nb[i]);
598         }
599
600         size[1] = niocount * sizeof(*rcs);
601         rc = lustre_pack_reply(req, 2, size, NULL);
602         if (rc != 0)
603                 GOTO(out, rc);
604         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
605
606         /* FIXME all niobuf splitting should be done in obdfilter if needed */
607         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
608         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
609         if (npages < 0)
610                 GOTO(out, rc = npages);
611
612         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
613         if (local_nb == NULL)
614                 GOTO(out_pp_rnb, rc = -ENOMEM);
615
616         desc = ptlrpc_prep_bulk_exp(req, BULK_GET_SINK, OST_BULK_PORTAL);
617         if (desc == NULL)
618                 GOTO(out_local, rc = -ENOMEM);
619
620         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
621                         ioo, npages, pp_rnb, local_nb, oti);
622         if (rc != 0)
623                 GOTO(out_bulk, rc);
624
625         /* NB Having prepped, we must commit... */
626
627         for (i = 0; i < npages; i++) {
628                 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
629                                            pp_rnb[i].offset & (PAGE_SIZE - 1),
630                                            pp_rnb[i].len);
631                 if (rc != 0)
632                         break;
633         }
634
635         if (rc == 0) {
636                 rc = ptlrpc_bulk_get(desc);
637                 if (rc == 0) {
638                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
639                                           ost_bulk_timeout, desc);
640                         rc = l_wait_event(desc->bd_waitq,
641                                           ptlrpc_bulk_complete(desc), &lwi);
642                         if (rc) {
643                                 LASSERT(rc == -ETIMEDOUT);
644                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
645                                 ptlrpc_abort_bulk(desc);
646                         }
647                 } else {
648                         DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
649                 }
650                 comms_error = rc != 0;
651         }
652
653         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
654         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
655
656 #if CHECKSUM_BULK
657         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
658                 static int cksum_counter;
659                 obd_count client_cksum = body->oa.o_nlink;
660                 obd_count cksum = ost_checksum_bulk(desc);
661
662                 portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
663                                 req->rq_connection->c_peer.peer_nid, str);
664                 if (client_cksum != cksum) {
665                         CERROR("Bad checksum: client %x, server %x, client NID "
666                                LPX64" (%s)\n", client_cksum, cksum,
667                                req->rq_connection->c_peer.peer_nid, str);
668                         cksum_counter = 1;
669                         repbody->oa.o_nlink = cksum;
670                 } else {
671                         cksum_counter++;
672                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
673                                 CWARN("Checksum %u from "LPX64": %x OK\n",
674                                       cksum_counter,
675                                       req->rq_connection->c_peer.peer_nid,
676                                       cksum);
677                 }
678         }
679 #endif
680         /* Must commit after prep above in all cases */
681         rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
682                            objcount, ioo, npages, local_nb, oti);
683
684         if (rc == 0) {
685                 /* set per-requested niobuf return codes */
686                 for (i = j = 0; i < niocount; i++) {
687                         int nob = remote_nb[i].len;
688
689                         rcs[i] = 0;
690                         do {
691                                 LASSERT(j < npages);
692                                 if (local_nb[j].rc < 0)
693                                         rcs[i] = local_nb[j].rc;
694                                 nob -= pp_rnb[j].len;
695                                 j++;
696                         } while (nob > 0);
697                         LASSERT(nob == 0);
698                 }
699                 LASSERT(j == npages);
700         }
701         if (rc == 0)
702                 rc = rc2;
703
704  out_bulk:
705         ptlrpc_free_bulk(desc);
706  out_local:
707         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
708  out_pp_rnb:
709         free_per_page_niobufs(npages, pp_rnb, remote_nb);
710  out:
711         if (rc == 0) {
712                 oti_to_request(oti, req);
713                 rc = ptlrpc_reply(req);
714         } else if (!comms_error) {
715                 /* Only reply if there was no comms problem with bulk */
716                 req->rq_status = rc;
717                 ptlrpc_error(req);
718         } else {
719                 if (req->rq_repmsg != NULL) {
720                         /* reply out callback would free */
721                         OBD_FREE (req->rq_repmsg, req->rq_replen);
722                 }
723                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
724                         CERROR("bulk IO comms error: "
725                                "evicting %s@%s nid "LPX64" (%s)\n",
726                                req->rq_export->exp_client_uuid.uuid,
727                                req->rq_connection->c_remote_uuid.uuid,
728                                req->rq_connection->c_peer.peer_nid,
729                                portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
730                                                req->rq_connection->c_peer.peer_nid,
731                                                str));
732                         ptlrpc_fail_export(req->rq_export);
733                 } else {
734                         CERROR("ignoring bulk IO comms error: "
735                                "client reconnected %s@%s nid "LPX64" (%s)\n",
736                                req->rq_export->exp_client_uuid.uuid,
737                                req->rq_connection->c_remote_uuid.uuid,
738                                req->rq_connection->c_peer.peer_nid,
739                                portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
740                                                req->rq_connection->c_peer.peer_nid,
741                                                str));
742                 }        
743         }
744         RETURN(rc);
745 }
746
747 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
748 {
749         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
750         struct obd_ioobj *ioo;
751         struct ost_body *body, *repbody;
752         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
753         int swab;
754         ENTRY;
755
756         /* XXX not set to use latest protocol */
757
758         swab = lustre_msg_swabbed(req->rq_reqmsg);
759         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
760         if (body == NULL) {
761                 CERROR("Missing/short ost_body\n");
762                 GOTO(out, rc = -EFAULT);
763         }
764
765         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
766         if (ioo == NULL) {
767                 CERROR("Missing/short ioobj\n");
768                 GOTO(out, rc = -EFAULT);
769         }
770         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
771         niocount = ioo[0].ioo_bufcnt;
772         for (i = 1; i < objcount; i++) {
773                 if (swab)
774                         lustre_swab_obd_ioobj (&ioo[i]);
775                 niocount += ioo[i].ioo_bufcnt;
776         }
777
778         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
779                                        lustre_swab_niobuf_remote);
780         if (remote_nb == NULL) {
781                 CERROR("Missing/short niobuf\n");
782                 GOTO(out, rc = -EFAULT);
783         }
784         if (swab) {                             /* swab the remaining niobufs */
785                 for (i = 1; i < niocount; i++)
786                         lustre_swab_niobuf_remote (&remote_nb[i]);
787         }
788
789         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
790         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
791         if (npages < 0)
792                 GOTO (out, rc = npages);
793  
794         size[1] = npages * sizeof(*pp_rnb);
795         rc = lustre_pack_reply(req, 2, size, NULL);
796         if (rc)
797                 GOTO(out_pp_rnb, rc);
798
799         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
800                                         objcount, ioo, npages, pp_rnb);
801
802         if (req->rq_status)
803                 GOTO(out_pp_rnb, rc = 0);
804
805         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
806         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
807
808         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
809         memcpy(res_nb, remote_nb, size[1]);
810         rc = 0;
811 out_pp_rnb:
812         free_per_page_niobufs(npages, pp_rnb, remote_nb);
813 out:
814         if (rc) {
815                 OBD_FREE(req->rq_repmsg, req->rq_replen);
816                 req->rq_repmsg = NULL;
817                 req->rq_status = rc;
818                 ptlrpc_error(req);
819         } else
820                 ptlrpc_reply(req);
821
822         return rc;
823 }
824
825
826 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
827 {
828         char *key;
829         int keylen, rc = 0;
830         ENTRY;
831
832         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
833         if (key == NULL) {
834                 DEBUG_REQ(D_HA, req, "no set_info key");
835                 RETURN(-EFAULT);
836         }
837         keylen = req->rq_reqmsg->buflens[0];
838
839         rc = lustre_pack_reply(req, 0, NULL, NULL);
840         if (rc)
841                 RETURN(rc);
842
843         rc = obd_set_info(exp, keylen, key, 0, NULL);
844         req->rq_repmsg->status = 0;
845         RETURN(rc);
846 }
847
848 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
849 {
850         char *key;
851         int keylen, rc = 0, size = sizeof(obd_id);
852         obd_id *reply;
853         ENTRY;
854
855         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
856         if (key == NULL) {
857                 DEBUG_REQ(D_HA, req, "no get_info key");
858                 RETURN(-EFAULT);
859         }
860         keylen = req->rq_reqmsg->buflens[0];
861
862         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
863                 RETURN(-EPROTO);
864
865         rc = lustre_pack_reply(req, 1, &size, NULL);
866         if (rc)
867                 RETURN(rc);
868
869         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
870         rc = obd_get_info(exp, keylen, key, &size, reply);
871         req->rq_repmsg->status = 0;
872         RETURN(rc);
873 }
874
875 static int ost_filter_recovery_request(struct ptlrpc_request *req,
876                                        struct obd_device *obd, int *process)
877 {
878         switch (req->rq_reqmsg->opc) {
879         case OST_CONNECT: /* This will never get here, but for completeness. */
880         case OST_DISCONNECT:
881                *process = 1;
882                RETURN(0);
883
884         case OBD_PING:
885         case OST_CREATE:
886         case OST_DESTROY:
887         case OST_PUNCH:
888         case OST_SETATTR:
889         case OST_SYNC:
890         case OST_WRITE:
891         case OBD_LOG_CANCEL:
892         case LDLM_ENQUEUE:
893                 *process = target_queue_recovery_request(req, obd);
894                 RETURN(0);
895
896         default:
897                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
898                 *process = 0;
899                 /* XXX what should we set rq_status to here? */
900                 req->rq_status = -EAGAIN;
901                 RETURN(ptlrpc_error(req));
902         }
903 }
904
905
906
907 static int ost_handle(struct ptlrpc_request *req)
908 {
909         struct obd_trans_info trans_info = { 0, };
910         struct obd_trans_info *oti = &trans_info;
911         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
912         struct obd_export *exp = NULL;
913         ENTRY;
914
915         LASSERT(current->journal_info == NULL);
916         /* XXX identical to MDS */
917         if (req->rq_reqmsg->opc != OST_CONNECT) {
918                 struct obd_device *obd;
919                 int abort_recovery, recovering;
920
921                 exp = req->rq_export;
922
923                 if (exp == NULL) {
924                         CDEBUG(D_HA, "operation %d on unconnected OST\n",
925                                req->rq_reqmsg->opc);
926                         req->rq_status = -ENOTCONN;
927                         GOTO(out, rc = -ENOTCONN);
928                 }
929
930                 obd = exp->exp_obd;
931
932                 /* Check for aborted recovery. */
933                 spin_lock_bh(&obd->obd_processing_task_lock);
934                 abort_recovery = obd->obd_abort_recovery;
935                 recovering = obd->obd_recovering;
936                 spin_unlock_bh(&obd->obd_processing_task_lock);
937                 if (abort_recovery) {
938                         target_abort_recovery(obd);
939                 } else if (recovering) {
940                         rc = ost_filter_recovery_request(req, obd,
941                                                          &should_process);
942                         if (rc || !should_process)
943                                 RETURN(rc);
944                 }
945         }
946
947         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
948                 GOTO(out, rc = -EINVAL);
949
950         oti_init(oti, req);
951
952         switch (req->rq_reqmsg->opc) {
953         case OST_CONNECT:
954                 CDEBUG(D_INODE, "connect\n");
955                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
956                 rc = target_handle_connect(req, ost_handle);
957                 break;
958         case OST_DISCONNECT:
959                 CDEBUG(D_INODE, "disconnect\n");
960                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
961                 rc = target_handle_disconnect(req);
962                 break;
963         case OST_CREATE:
964                 CDEBUG(D_INODE, "create\n");
965                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
966                 rc = ost_create(exp, req, oti);
967                 break;
968         case OST_DESTROY:
969                 CDEBUG(D_INODE, "destroy\n");
970                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
971                 rc = ost_destroy(exp, req, oti);
972                 break;
973         case OST_GETATTR:
974                 CDEBUG(D_INODE, "getattr\n");
975                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
976                 rc = ost_getattr(exp, req);
977                 break;
978         case OST_SETATTR:
979                 CDEBUG(D_INODE, "setattr\n");
980                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
981                 rc = ost_setattr(exp, req, oti);
982                 break;
983         case OST_WRITE:
984                 CDEBUG(D_INODE, "write\n");
985                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
986                 rc = ost_brw_write(req, oti);
987                 LASSERT(current->journal_info == NULL);
988                 /* ost_brw sends its own replies */
989                 RETURN(rc);
990         case OST_READ:
991                 CDEBUG(D_INODE, "read\n");
992                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
993                 rc = ost_brw_read(req);
994                 LASSERT(current->journal_info == NULL);
995                 /* ost_brw sends its own replies */
996                 RETURN(rc);
997         case OST_SAN_READ:
998                 CDEBUG(D_INODE, "san read\n");
999                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1000                 rc = ost_san_brw(req, OBD_BRW_READ);
1001                 /* ost_san_brw sends its own replies */
1002                 RETURN(rc);
1003         case OST_SAN_WRITE:
1004                 CDEBUG(D_INODE, "san write\n");
1005                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1006                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1007                 /* ost_san_brw sends its own replies */
1008                 RETURN(rc);
1009         case OST_PUNCH:
1010                 CDEBUG(D_INODE, "punch\n");
1011                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1012                 rc = ost_punch(exp, req, oti);
1013                 break;
1014         case OST_STATFS:
1015                 CDEBUG(D_INODE, "statfs\n");
1016                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1017                 rc = ost_statfs(req);
1018                 break;
1019         case OST_SYNC:
1020                 CDEBUG(D_INODE, "sync\n");
1021                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1022                 rc = ost_sync(exp, req);
1023                 break;
1024         case OST_SET_INFO:
1025                 DEBUG_REQ(D_INODE, req, "set_info");
1026                 rc = ost_set_info(exp, req);
1027                 break;
1028         case OST_GET_INFO:
1029                 DEBUG_REQ(D_INODE, req, "get_info");
1030                 rc = ost_get_info(exp, req);
1031                 break;
1032         case OBD_PING:
1033                 DEBUG_REQ(D_INODE, req, "ping");
1034                 rc = target_handle_ping(req);
1035                 break;
1036 #ifdef ENABLE_ORPHANS
1037         /* FIXME - just reply status */
1038         case LLOG_ORIGIN_CONNECT:
1039                 DEBUG_REQ(D_INODE, req, "log connect\n");
1040                 rc = llog_handle_connect(req); 
1041                 req->rq_status = rc;
1042                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1043                 if (rc)
1044                         RETURN(rc);
1045                 RETURN(ptlrpc_reply(req));
1046                 //break;
1047         case OBD_LOG_CANCEL:
1048                 CDEBUG(D_INODE, "log cancel\n");
1049                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1050                 rc = llog_origin_handle_cancel(req);
1051                 req->rq_status = rc;
1052                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1053                 if (rc)
1054                         RETURN(rc);
1055                 RETURN(ptlrpc_reply(req));
1056                 //break;
1057 #endif
1058         case LDLM_ENQUEUE:
1059                 CDEBUG(D_INODE, "enqueue\n");
1060                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1061                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1062                                          ldlm_server_blocking_ast);
1063                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1064                 break;
1065         case LDLM_CONVERT:
1066                 CDEBUG(D_INODE, "convert\n");
1067                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1068                 rc = ldlm_handle_convert(req);
1069                 break;
1070         case LDLM_CANCEL:
1071                 CDEBUG(D_INODE, "cancel\n");
1072                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1073                 rc = ldlm_handle_cancel(req);
1074                 break;
1075         case LDLM_BL_CALLBACK:
1076         case LDLM_CP_CALLBACK:
1077                 CDEBUG(D_INODE, "callback\n");
1078                 CERROR("callbacks should not happen on OST\n");
1079                 /* fall through */
1080         default:
1081                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1082                 req->rq_status = -ENOTSUPP;
1083                 rc = ptlrpc_error(req);
1084                 RETURN(rc);
1085         }
1086
1087         LASSERT(current->journal_info == NULL);
1088
1089         EXIT;
1090         /* If we're DISCONNECTing, the export_data is already freed */
1091         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1092                 struct obd_device *obd  = req->rq_export->exp_obd;
1093                 if (!obd->obd_no_transno) {
1094                         req->rq_repmsg->last_committed =
1095                                 obd->obd_last_committed;
1096                 } else {
1097                         DEBUG_REQ(D_IOCTL, req,
1098                                   "not sending last_committed update");
1099                 }
1100                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1101                        obd->obd_last_committed, req->rq_xid);
1102         }
1103
1104 out:
1105         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1106                 struct obd_device *obd = req->rq_export->exp_obd;
1107
1108                 if (obd && obd->obd_recovering) {
1109                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1110                         return target_queue_final_reply(req, rc);
1111                 }
1112                 /* Lost a race with recovery; let the error path DTRT. */
1113                 rc = req->rq_status = -ENOTCONN;
1114         }
1115
1116         if (!rc)
1117                 oti_to_request(oti, req);
1118
1119         target_send_reply(req, rc, fail);
1120         return 0;
1121 }
1122
1123 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1124 {
1125         struct ost_obd *ost = &obddev->u.ost;
1126         int rc;
1127         ENTRY;
1128
1129         /* Get rid of unneeded supplementary groups */
1130         current->ngroups = 0;
1131         memset(current->groups, 0, sizeof(current->groups));
1132
1133         rc = llog_start_commit_thread();
1134         if (rc < 0)
1135                 RETURN(rc);
1136
1137         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1138                                            OST_BUFSIZE, OST_MAXREQSIZE,
1139                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1140                                            ost_handle, "ost", 
1141                                            obddev->obd_proc_entry);
1142         if (ost->ost_service == NULL) {
1143                 CERROR("failed to start service\n");
1144                 RETURN(-ENOMEM);
1145         }
1146         
1147         rc = ptlrpc_start_n_threads(obddev, ost->ost_service, OST_NUM_THREADS, 
1148                                  "ll_ost");
1149         if (rc)
1150                 GOTO(out, rc = -EINVAL);
1151
1152         ost->ost_create_service =
1153                 ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, OST_BUFSIZE,
1154                                 OST_MAXREQSIZE, OST_CREATE_PORTAL,
1155                                 OSC_REPLY_PORTAL, ost_handle, "ost_create",
1156                                 obddev->obd_proc_entry);
1157         if (ost->ost_create_service == NULL) {
1158                 CERROR("failed to start OST create service\n");
1159                 GOTO(out, rc = -ENOMEM);
1160         }
1161
1162         rc = ptlrpc_start_n_threads(obddev, ost->ost_create_service, 1,
1163                                     "ll_ost_create");
1164         if (rc) 
1165                 GOTO(out_create, rc = -EINVAL);
1166
1167         RETURN(0);
1168
1169 out_create:
1170         ptlrpc_unregister_service(ost->ost_create_service);
1171 out:
1172         ptlrpc_unregister_service(ost->ost_service);
1173         RETURN(rc);
1174 }
1175
1176 static int ost_cleanup(struct obd_device *obddev, int flags)
1177 {
1178         struct ost_obd *ost = &obddev->u.ost;
1179         int err = 0;
1180         ENTRY;
1181
1182         if (obddev->obd_recovering)
1183                 target_cancel_recovery_timer(obddev);
1184
1185         ptlrpc_stop_all_threads(ost->ost_service);
1186         ptlrpc_unregister_service(ost->ost_service);
1187
1188         ptlrpc_stop_all_threads(ost->ost_create_service);
1189         ptlrpc_unregister_service(ost->ost_create_service);
1190
1191         RETURN(err);
1192 }
1193
1194 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1195 {
1196         struct lprocfs_static_vars lvars;
1197
1198         lprocfs_init_vars(ost,&lvars);
1199         return lprocfs_obd_attach(dev, lvars.obd_vars);
1200 }
1201
1202 int ost_detach(struct obd_device *dev)
1203 {
1204         return lprocfs_obd_detach(dev);
1205 }
1206
1207 /* use obd ops to offer management infrastructure */
1208 static struct obd_ops ost_obd_ops = {
1209         o_owner:        THIS_MODULE,
1210         o_attach:       ost_attach,
1211         o_detach:       ost_detach,
1212         o_setup:        ost_setup,
1213         o_cleanup:      ost_cleanup,
1214 };
1215
1216 static int __init ost_init(void)
1217 {
1218         struct lprocfs_static_vars lvars;
1219         ENTRY;
1220
1221         lprocfs_init_vars(ost,&lvars);
1222         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1223                                    LUSTRE_OST_NAME));
1224 }
1225
1226 static void /*__exit*/ ost_exit(void)
1227 {
1228         class_unregister_type(LUSTRE_OST_NAME);
1229 }
1230
1231 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1232 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1233 MODULE_LICENSE("GPL");
1234
1235 module_init(ost_init);
1236 module_exit(ost_exit);