Whamcloud - gitweb
file jbd-2.4.19-pre1-jcberr.patch was initially added on branch b_devel.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43 #include <linux/lustre_commit_confd.h>
44 #include <portals/list.h>
45
46 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
47 {
48         if (oti == NULL)
49                 return;
50         memset(oti, 0, sizeof *oti);
51
52         if (req->rq_repmsg && req->rq_reqmsg != 0)
53                 oti->oti_transno = req->rq_repmsg->transno;
54 }
55
56 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
57 {
58         struct oti_req_ack_lock *ack_lock;
59         int i;
60
61         if (oti == NULL)
62                 return;
63
64         if (req->rq_repmsg)
65                 req->rq_repmsg->transno = oti->oti_transno;
66
67         /* XXX 4 == entries in oti_ack_locks??? */
68         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
69                 if (!ack_lock->mode)
70                         break;
71                 memcpy(&req->rq_ack_locks[i].lock, &ack_lock->lock,
72                        sizeof(req->rq_ack_locks[i].lock));
73                 req->rq_ack_locks[i].mode = ack_lock->mode;
74         }
75 }
76
77 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
78 {
79         struct lustre_handle *conn = &req->rq_reqmsg->handle;
80         struct ost_body *body;
81         int rc, size = sizeof(*body);
82         ENTRY;
83
84         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
85         if (body == NULL)
86                 RETURN(-EFAULT);
87
88         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
89         if (rc)
90                 RETURN(rc);
91
92         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
93                 oti->oti_logcookies = obdo_logcookie(&body->oa);
94         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
95         RETURN(0);
96 }
97
98 static int ost_getattr(struct ptlrpc_request *req)
99 {
100         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
101         struct ost_body *body, *repbody;
102         int rc, size = sizeof(*body);
103         ENTRY;
104
105         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
106         if (body == NULL)
107                 RETURN(-EFAULT);
108
109         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
110         if (rc)
111                 RETURN(rc);
112
113         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
114         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
115         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
116         RETURN(0);
117 }
118
119 static int ost_statfs(struct ptlrpc_request *req)
120 {
121         struct obd_statfs *osfs;
122         int rc, size = sizeof(*osfs);
123         ENTRY;
124
125         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
126         if (rc)
127                 RETURN(rc);
128
129         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
130
131         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
132         if (req->rq_status != 0)
133                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
134
135         RETURN(0);
136 }
137
138 static int ost_syncfs(struct ptlrpc_request *req)
139 {
140         struct obd_statfs *osfs;
141         int rc, size = sizeof(*osfs);
142         ENTRY;
143
144         rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
145         if (rc)
146                 RETURN(rc);
147
148         rc = obd_syncfs(req->rq_export);
149         if (rc) {
150                 CERROR("ost: syncfs failed: rc %d\n", rc);
151                 req->rq_status = rc;
152                 RETURN(rc);
153         }
154
155         RETURN(0);
156 }
157
158 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
159 {
160         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
161         struct ost_body *body, *repbody;
162         int rc, size = sizeof(*repbody);
163         ENTRY;
164
165         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
166         if (body == NULL)
167                 RETURN(-EFAULT);
168
169         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
170         if (rc)
171                 RETURN(rc);
172
173         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
174         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
175         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
176         RETURN(0);
177 }
178
179 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
180 {
181         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
182         struct ost_body *body, *repbody;
183         int rc, size = sizeof(*repbody);
184         ENTRY;
185
186         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
187         if (body == NULL)
188                 RETURN(-EFAULT);
189
190         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
191         if (rc)
192                 RETURN(rc);
193
194         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
195         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
196         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
197         RETURN(0);
198 }
199
200 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
201 {
202         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
203         struct ost_body *body, *repbody;
204         int rc, size = sizeof(*repbody);
205         ENTRY;
206
207         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
208         if (body == NULL)
209                 RETURN(-EFAULT);
210
211         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
212         if (rc)
213                 RETURN(rc);
214
215         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
216         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
217         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
218         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
219         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
220         RETURN(0);
221 }
222
223 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
224 {
225         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
226         struct ost_body *body, *repbody;
227         int rc, size = sizeof(*repbody);
228         ENTRY;
229
230         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
231         if (body == NULL)
232                 RETURN(-EFAULT);
233
234         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
235             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
236                 RETURN(-EINVAL);
237
238         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
239         if (rc)
240                 RETURN(rc);
241
242         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
243         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
244         req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
245                                    repbody->oa.o_blocks, oti);
246         RETURN(0);
247 }
248
249 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
250 {
251         struct lustre_handle *conn = &req->rq_reqmsg->handle;
252         struct ost_body *body, *repbody;
253         int rc, size = sizeof(*repbody);
254         ENTRY;
255
256         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
257         if (body == NULL)
258                 RETURN(-EFAULT);
259
260         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
261         if (rc)
262                 RETURN(rc);
263
264         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
265         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
266
267         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
268         RETURN(0);
269 }
270
271 static int ost_bulk_timeout(void *data)
272 {
273         ENTRY;
274         /* We don't fail the connection here, because having the export
275          * killed makes the (vital) call to commitrw very sad.
276          */
277         RETURN(1);
278 }
279
280 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
281                                 struct niobuf_remote *rnb, int nrnb,
282                                 struct niobuf_remote **pp_rnbp)
283 {
284         /* Copy a remote niobuf, splitting it into page-sized chunks
285          * and setting ioo[i].ioo_bufcnt accordingly */
286         struct niobuf_remote *pp_rnb;
287         int   i;
288         int   j;
289         int   page;
290         int   rnbidx = 0;
291         int   npages = 0;
292
293         /* first count and check the number of pages required */
294         for (i = 0; i < nioo; i++)
295                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
296                         obd_off offset = rnb[rnbidx].offset;
297                         obd_off p0 = offset >> PAGE_SHIFT;
298                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
299
300                         LASSERT(rnbidx < nrnb);
301
302                         npages += (pn + 1 - p0);
303
304                         if (rnb[rnbidx].len == 0) {
305                                 CERROR("zero len BRW: obj %d objid "LPX64
306                                        " buf %u\n", i, ioo[i].ioo_id, j);
307                                 return -EINVAL;
308                         }
309                         if (j > 0 &&
310                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
311                                 CERROR("unordered BRW: obj %d objid "LPX64
312                                        " buf %u offset "LPX64" <= "LPX64"\n",
313                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
314                                        rnb[rnbidx].offset);
315                                 return -EINVAL;
316                         }
317                 }
318
319         LASSERT(rnbidx == nrnb);
320
321         if (npages == nrnb) {       /* all niobufs are for single pages */
322                 *pp_rnbp = rnb;
323                 return npages;
324         }
325
326         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
327         if (pp_rnb == NULL)
328                 return -ENOMEM;
329
330         /* now do the actual split */
331         page = rnbidx = 0;
332         for (i = 0; i < nioo; i++) {
333                 int  obj_pages = 0;
334
335                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
336                         obd_off off = rnb[rnbidx].offset;
337                         int     nob = rnb[rnbidx].len;
338
339                         LASSERT(rnbidx < nrnb);
340                         do {
341                                 obd_off  poff = off & (PAGE_SIZE - 1);
342                                 int      pnob = (poff + nob > PAGE_SIZE) ?
343                                                 PAGE_SIZE - poff : nob;
344
345                                 LASSERT(page < npages);
346                                 pp_rnb[page].len = pnob;
347                                 pp_rnb[page].offset = off;
348                                 pp_rnb[page].flags = rnb->flags;
349
350                                 CDEBUG(D_PAGE, "   obj %d id "LPX64
351                                        "page %d(%d) "LPX64" for %d\n",
352                                        i, ioo[i].ioo_id, obj_pages, page,
353                                        pp_rnb[page].offset, pp_rnb[page].len);
354                                 page++;
355                                 obj_pages++;
356
357                                 off += pnob;
358                                 nob -= pnob;
359                         } while (nob > 0);
360                         LASSERT(nob == 0);
361                 }
362                 ioo[i].ioo_bufcnt = obj_pages;
363         }
364         LASSERT(page == npages);
365
366         *pp_rnbp = pp_rnb;
367         return npages;
368 }
369
370 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
371                                    struct niobuf_remote *rnb)
372 {
373         if (pp_rnb == rnb)                      /* didn't allocate above */
374                 return;
375
376         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
377 }
378
379 #if CHECKSUM_BULK
380 __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
381 {
382         __u64             cksum = 0;
383         struct ptlrpc_bulk_page *bp;
384
385         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
386                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
387                              bp->bp_buflen);
388                 kunmap(bp->bp_page);
389         }
390 }
391 #endif
392
393 static int ost_brw_read(struct ptlrpc_request *req)
394 {
395         struct ptlrpc_bulk_desc *desc;
396         struct niobuf_remote    *remote_nb;
397         struct niobuf_remote    *pp_rnb;
398         struct niobuf_local     *local_nb;
399         struct obd_ioobj        *ioo;
400         struct ost_body         *body, *repbody;
401         struct l_wait_info       lwi;
402         struct obd_trans_info    oti = { 0 };
403         int                      size[1] = { sizeof(*body) };
404         int                      comms_error = 0;
405         int                      niocount;
406         int                      npages;
407         int                      nob = 0;
408         int                      rc;
409         int                      i;
410         ENTRY;
411
412         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
413                 GOTO(out, rc = -EIO);
414
415         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
416                          (obd_timeout + 1) / 4);
417
418         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
419         if (body == NULL) {
420                 CERROR("Missing/short ost_body\n");
421                 GOTO(out, rc = -EFAULT);
422         }
423
424         /* BUG 974: when we send back cache grants, don't clear this flag */
425         body->oa.o_valid &= ~OBD_MD_FLRDEV;
426
427         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
428         if (ioo == NULL) {
429                 CERROR("Missing/short ioobj\n");
430                 GOTO(out, rc = -EFAULT);
431         }
432
433         niocount = ioo->ioo_bufcnt;
434         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
435                                        lustre_swab_niobuf_remote);
436         if (remote_nb == NULL) {
437                 CERROR("Missing/short niobuf\n");
438                 GOTO(out, rc = -EFAULT);
439         }
440         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
441                 for (i = 1; i < niocount; i++)
442                         lustre_swab_niobuf_remote (&remote_nb[i]);
443         }
444
445         size[0] = sizeof(*body);
446         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
447         if (rc)
448                 GOTO(out, rc);
449
450         /* FIXME all niobuf splitting should be done in obdfilter if needed */
451         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
452         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
453         if (npages < 0)
454                 GOTO(out, rc = npages);
455
456         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
457         if (local_nb == NULL)
458                 GOTO(out_pp_rnb, rc = -ENOMEM);
459
460         desc = ptlrpc_prep_bulk_exp(req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
461         if (desc == NULL)
462                 GOTO(out_local, rc = -ENOMEM);
463
464         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
465                         ioo, npages, pp_rnb, local_nb, &oti);
466         if (rc != 0)
467                 GOTO(out_bulk, rc);
468
469         nob = 0;
470         for (i = 0; i < npages; i++) {
471                 int page_rc = local_nb[i].rc;
472
473                 if (page_rc < 0) {              /* error */
474                         rc = page_rc;
475                         break;
476                 }
477
478                 LASSERT(page_rc <= pp_rnb[i].len);
479                 nob += page_rc;
480                 if (page_rc != 0) {             /* some data! */
481                         LASSERT (local_nb[i].page != NULL);
482                         rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
483                                                    pp_rnb[i].offset& ~PAGE_MASK,
484                                                    page_rc);
485                         if (rc != 0)
486                                 break;
487                 }
488
489                 if (page_rc != pp_rnb[i].len) { /* short read */
490                         /* All subsequent pages should be 0 */
491                         while(++i < npages)
492                                 LASSERT(local_nb[i].rc == 0);
493                         break;
494                 }
495         }
496
497         if (rc == 0) {
498                 rc = ptlrpc_bulk_put(desc);
499                 if (rc == 0) {
500                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
501                                           ost_bulk_timeout, desc);
502                         rc = l_wait_event(desc->bd_waitq,
503                                           ptlrpc_bulk_complete(desc), &lwi);
504                         if (rc) {
505                                 LASSERT(rc == -ETIMEDOUT);
506                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
507                                 ptlrpc_abort_bulk(desc);
508                         }
509                 } else {
510                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
511                 }
512                 comms_error = rc != 0;
513         }
514
515         /* Must commit after prep above in all cases */
516         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
517                           ioo, npages, local_nb, &oti);
518
519         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
520         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
521
522 #if CHECKSUM_BULK
523         if (rc == 0) {
524                 repbody->oa.o_rdev = ost_checksum_bulk(desc);
525                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
526         }
527 #endif
528
529  out_bulk:
530         ptlrpc_free_bulk(desc);
531  out_local:
532         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
533  out_pp_rnb:
534         free_per_page_niobufs(npages, pp_rnb, remote_nb);
535  out:
536         LASSERT(rc <= 0);
537         if (rc == 0) {
538                 req->rq_status = nob;
539                 ptlrpc_reply(req);
540         } else if (!comms_error) {
541                 /* only reply if comms OK */
542                 req->rq_status = rc;
543                 ptlrpc_error(req);
544         } else {
545                 if (req->rq_repmsg != NULL) {
546                         /* reply out callback would free */
547                         OBD_FREE(req->rq_repmsg, req->rq_replen);
548                 }
549                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
550                        req->rq_export->exp_client_uuid.uuid,
551                        req->rq_connection->c_remote_uuid.uuid,
552                        req->rq_connection->c_peer.peer_nid);
553                 ptlrpc_fail_export(req->rq_export);
554         }
555
556         RETURN(rc);
557 }
558
559 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
560 {
561         struct ptlrpc_bulk_desc *desc;
562         struct niobuf_remote    *remote_nb;
563         struct niobuf_remote    *pp_rnb;
564         struct niobuf_local     *local_nb;
565         struct obd_ioobj        *ioo;
566         struct ost_body         *body, *repbody;
567         struct l_wait_info       lwi;
568         __u32                   *rcs;
569         int                      size[2] = { sizeof(*body) };
570         int                      objcount, niocount, npages;
571         int                      comms_error = 0;
572         int                      rc, rc2, swab, i, j;
573         ENTRY;
574
575         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
576                 GOTO(out, rc = -EIO);
577
578         /* pause before transaction has been started */
579         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
580                          (obd_timeout + 1) / 4);
581
582         swab = lustre_msg_swabbed(req->rq_reqmsg);
583         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
584         if (body == NULL) {
585                 CERROR("Missing/short ost_body\n");
586                 GOTO(out, rc = -EFAULT);
587         }
588
589         /* BUG 974: when we send back cache grants, don't clear this flag */
590         body->oa.o_valid &= ~OBD_MD_FLRDEV;
591
592         LASSERT_REQSWAB(req, 1);
593         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
594         if (objcount == 0) {
595                 CERROR("Missing/short ioobj\n");
596                 GOTO(out, rc = -EFAULT);
597         }
598         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
599         LASSERT (ioo != NULL);
600         for (niocount = i = 0; i < objcount; i++) {
601                 if (swab)
602                         lustre_swab_obd_ioobj (&ioo[i]);
603                 if (ioo[i].ioo_bufcnt == 0) {
604                         CERROR("ioo[%d] has zero bufcnt\n", i);
605                         GOTO(out, rc = -EFAULT);
606                 }
607                 niocount += ioo[i].ioo_bufcnt;
608         }
609
610         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
611                                        lustre_swab_niobuf_remote);
612         if (remote_nb == NULL) {
613                 CERROR("Missing/short niobuf\n");
614                 GOTO(out, rc = -EFAULT);
615         }
616         if (swab) {                             /* swab the remaining niobufs */
617                 for (i = 1; i < niocount; i++)
618                         lustre_swab_niobuf_remote (&remote_nb[i]);
619         }
620
621         size[1] = niocount * sizeof(*rcs);
622         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
623                              &req->rq_repmsg);
624         if (rc != 0)
625                 GOTO(out, rc);
626         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
627
628         /* FIXME all niobuf splitting should be done in obdfilter if needed */
629         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
630         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
631         if (npages < 0)
632                 GOTO(out, rc = npages);
633
634         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
635         if (local_nb == NULL)
636                 GOTO(out_pp_rnb, rc = -ENOMEM);
637
638         desc = ptlrpc_prep_bulk_exp(req, BULK_GET_SINK, OST_BULK_PORTAL);
639         if (desc == NULL)
640                 GOTO(out_local, rc = -ENOMEM);
641
642         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
643                         ioo, npages, pp_rnb, local_nb, oti);
644         if (rc != 0)
645                 GOTO(out_bulk, rc);
646
647         /* NB Having prepped, we must commit... */
648
649         for (i = 0; i < npages; i++) {
650                 rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
651                                            pp_rnb[i].offset & (PAGE_SIZE - 1),
652                                            pp_rnb[i].len);
653                 if (rc != 0)
654                         break;
655         }
656
657         if (rc == 0) {
658                 rc = ptlrpc_bulk_get(desc);
659                 if (rc == 0) {
660                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
661                                           ost_bulk_timeout, desc);
662                         rc = l_wait_event(desc->bd_waitq,
663                                           ptlrpc_bulk_complete(desc), &lwi);
664                         if (rc) {
665                                 LASSERT(rc == -ETIMEDOUT);
666                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
667                                 ptlrpc_abort_bulk(desc);
668                         }
669                 } else {
670                         DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
671                 }
672                 comms_error = rc != 0;
673         }
674
675         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
676         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
677
678 #if CHECKSUM_BULK
679         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
680                 static int cksum_counter;
681                 __u64 client_cksum = body->oa.o_rdev;
682                 __u64 cksum = ost_checksum_bulk(desc);
683
684                 if (client_cksum != cksum) {
685                         CERROR("Bad checksum: client "LPX64", server "LPX64
686                                ", client NID "LPX64"\n", client_cksum, cksum,
687                                req->rq_connection->c_peer.peer_nid);
688                         cksum_counter = 1;
689                         repbody->oa.o_rdev = cksum;
690                 } else {
691                         cksum_counter++;
692                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
693                                 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
694                                         cksum_counter,
695                                         req->rq_connection->c_peer.peer_nid,
696                                         cksum);
697                 }
698         }
699 #endif
700         /* Must commit after prep above in all cases */
701         rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
702                            objcount, ioo, npages, local_nb, oti);
703
704         if (rc == 0) {
705                 /* set per-requested niobuf return codes */
706                 for (i = j = 0; i < niocount; i++) {
707                         int nob = remote_nb[i].len;
708
709                         rcs[i] = 0;
710                         do {
711                                 LASSERT(j < npages);
712                                 if (local_nb[j].rc < 0)
713                                         rcs[i] = local_nb[j].rc;
714                                 nob -= pp_rnb[j].len;
715                                 j++;
716                         } while (nob > 0);
717                         LASSERT(nob == 0);
718                 }
719                 LASSERT(j == npages);
720         }
721         if (rc == 0)
722                 rc = rc2;
723
724  out_bulk:
725         ptlrpc_free_bulk(desc);
726  out_local:
727         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
728  out_pp_rnb:
729         free_per_page_niobufs(npages, pp_rnb, remote_nb);
730  out:
731         if (rc == 0) {
732                 oti_to_request(oti, req);
733                 rc = ptlrpc_reply(req);
734         } else if (!comms_error) {
735                 /* Only reply if there was no comms problem with bulk */
736                 req->rq_status = rc;
737                 ptlrpc_error(req);
738         } else {
739                 if (req->rq_repmsg != NULL) {
740                         /* reply out callback would free */
741                         OBD_FREE (req->rq_repmsg, req->rq_replen);
742                 }
743                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
744                        req->rq_export->exp_client_uuid.uuid,
745                        req->rq_connection->c_remote_uuid.uuid,
746                        req->rq_connection->c_peer.peer_nid);
747                 ptlrpc_fail_export(req->rq_export);
748         }
749         RETURN(rc);
750 }
751
752 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
753 {
754         struct niobuf_remote *remote_nb, *res_nb;
755         struct obd_ioobj *ioo;
756         struct ost_body *body, *repbody;
757         int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
758         int n;
759         int swab;
760         ENTRY;
761
762         /* XXX not set to use latest protocol */
763
764         swab = lustre_msg_swabbed(req->rq_reqmsg);
765         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
766         if (body == NULL) {
767                 CERROR("Missing/short ost_body\n");
768                 GOTO(out, rc = -EFAULT);
769         }
770
771         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
772         if (ioo == NULL) {
773                 CERROR("Missing/short ioobj\n");
774                 GOTO(out, rc = -EFAULT);
775         }
776         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
777         niocount = ioo[0].ioo_bufcnt;
778         for (i = 1; i < objcount; i++) {
779                 if (swab)
780                         lustre_swab_obd_ioobj (&ioo[i]);
781                 niocount += ioo[i].ioo_bufcnt;
782         }
783
784         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
785                                        lustre_swab_niobuf_remote);
786         if (remote_nb == NULL) {
787                 CERROR("Missing/short niobuf\n");
788                 GOTO(out, rc = -EFAULT);
789         }
790         if (swab) {                             /* swab the remaining niobufs */
791                 for (i = 1; i < niocount; i++)
792                         lustre_swab_niobuf_remote (&remote_nb[i]);
793         }
794
795         for (i = n = 0; i < objcount; i++) {
796                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, n++) {
797                         if (remote_nb[n].len == 0) {
798                                 CERROR("zero len BRW: objid "LPX64" buf %u\n",
799                                        ioo[i].ioo_id, j);
800                                 GOTO(out, rc = -EINVAL);
801                         }
802                         if (j && remote_nb[n].offset <= remote_nb[n-1].offset) {
803                                 CERROR("unordered BRW: objid "LPX64
804                                        " buf %u offset "LPX64" <= "LPX64"\n",
805                                        ioo[i].ioo_id, j, remote_nb[n].offset,
806                                        remote_nb[n-1].offset);
807                                 GOTO(out, rc = -EINVAL);
808                         }
809                 }
810         }
811
812         size[1] = niocount * sizeof(*remote_nb);
813         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
814         if (rc)
815                 GOTO(out, rc);
816
817         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
818                                         objcount, ioo, niocount, remote_nb);
819
820         if (req->rq_status)
821                 GOTO(out, rc = 0);
822
823         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
824         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
825
826         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
827         memcpy(res_nb, remote_nb, size[1]);
828         rc = 0;
829 out:
830         if (rc) {
831                 OBD_FREE(req->rq_repmsg, req->rq_replen);
832                 req->rq_repmsg = NULL;
833                 req->rq_status = rc;
834                 ptlrpc_error(req);
835         } else
836                 ptlrpc_reply(req);
837
838         return rc;
839 }
840
841 static int ost_log_cancel(struct ptlrpc_request *req)
842 {
843         struct lustre_handle *conn;
844         struct llog_cookie *logcookies;
845         int num_cookies, rc = 0;
846         ENTRY;
847
848         logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
849         if (logcookies == NULL) {
850                 DEBUG_REQ(D_HA, req, "no cookies sent");
851                 RETURN(-EFAULT);
852         }
853         num_cookies = req->rq_reqmsg->buflens[0] / sizeof(*logcookies);
854
855         /* workaround until we don't need to send replies */
856         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
857         if (rc)
858                 RETURN(rc);
859         req->rq_repmsg->status = 0;
860         /* end workaround */
861
862         conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
863         rc = obd_log_cancel(conn, NULL, num_cookies, logcookies, 0);
864
865         RETURN(rc);
866 }
867
868 static int ost_set_info(struct ptlrpc_request *req)
869 {
870         struct lustre_handle *conn;
871         char *key;
872         int keylen, rc = 0;
873         ENTRY;
874
875         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
876         if (key == NULL) {
877                 DEBUG_REQ(D_HA, req, "no set_info key");
878                 RETURN(-EFAULT);
879         }
880         keylen = req->rq_reqmsg->buflens[0];
881
882         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
883         if (rc)
884                 RETURN(rc);
885
886         conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
887         rc = obd_set_info(conn, keylen, key, 0, NULL);
888         req->rq_repmsg->status = 0;
889         RETURN(rc);
890 }
891
892 static int filter_recovery_request(struct ptlrpc_request *req,
893                                    struct obd_device *obd, int *process)
894 {
895         switch (req->rq_reqmsg->opc) {
896         case OST_CONNECT: /* This will never get here, but for completeness. */
897         case OST_DISCONNECT:
898                *process = 1;
899                RETURN(0);
900
901         case OBD_PING:
902         case OST_CLOSE:
903         case OST_CREATE:
904         case OST_DESTROY:
905         case OST_OPEN:
906         case OST_PUNCH:
907         case OST_SETATTR:
908         case OST_SYNCFS:
909         case OST_WRITE:
910         case OBD_LOG_CANCEL:
911         case LDLM_ENQUEUE:
912                 *process = target_queue_recovery_request(req, obd);
913                 RETURN(0);
914
915         default:
916                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
917                 *process = 0;
918                 /* XXX what should we set rq_status to here? */
919                 req->rq_status = -EAGAIN;
920                 RETURN(ptlrpc_error(req));
921         }
922 }
923
924
925
926 static int ost_handle(struct ptlrpc_request *req)
927 {
928         struct obd_trans_info trans_info = { 0, };
929         struct obd_trans_info *oti = &trans_info;
930         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
931         ENTRY;
932
933         LASSERT(current->journal_info == NULL);
934         /* XXX identical to MDS */
935         if (req->rq_reqmsg->opc != OST_CONNECT) {
936                 struct obd_device *obd;
937                 int abort_recovery, recovering;
938
939                 if (req->rq_export == NULL) {
940                         CDEBUG(D_HA, "operation %d on unconnected OST\n",
941                                req->rq_reqmsg->opc);
942                         req->rq_status = -ENOTCONN;
943                         GOTO(out, rc = -ENOTCONN);
944                 }
945
946                 obd = req->rq_export->exp_obd;
947
948                 /* Check for aborted recovery. */
949                 spin_lock_bh(&obd->obd_processing_task_lock);
950                 abort_recovery = obd->obd_abort_recovery;
951                 recovering = obd->obd_recovering;
952                 spin_unlock_bh(&obd->obd_processing_task_lock);
953                 if (abort_recovery) {
954                         target_abort_recovery(obd);
955                 } else if (recovering) {
956                         rc = filter_recovery_request(req, obd, &should_process);
957                         if (rc || !should_process)
958                                 RETURN(rc);
959                 }
960         }
961
962         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
963                 GOTO(out, rc = -EINVAL);
964
965         oti_init(oti, req);
966
967         switch (req->rq_reqmsg->opc) {
968         case OST_CONNECT:
969                 CDEBUG(D_INODE, "connect\n");
970                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
971                 rc = target_handle_connect(req, ost_handle);
972                 break;
973         case OST_DISCONNECT:
974                 CDEBUG(D_INODE, "disconnect\n");
975                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
976                 rc = target_handle_disconnect(req);
977                 break;
978         case OST_CREATE:
979                 CDEBUG(D_INODE, "create\n");
980                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
981                 rc = ost_create(req, oti);
982                 break;
983         case OST_DESTROY:
984                 CDEBUG(D_INODE, "destroy\n");
985                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
986                 rc = ost_destroy(req, oti);
987                 break;
988         case OST_GETATTR:
989                 CDEBUG(D_INODE, "getattr\n");
990                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
991                 rc = ost_getattr(req);
992                 break;
993         case OST_SETATTR:
994                 CDEBUG(D_INODE, "setattr\n");
995                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
996                 rc = ost_setattr(req, oti);
997                 break;
998         case OST_OPEN:
999                 CDEBUG(D_INODE, "open\n");
1000                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
1001                 rc = ost_open(req, oti);
1002                 break;
1003         case OST_CLOSE:
1004                 CDEBUG(D_INODE, "close\n");
1005                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
1006                 rc = ost_close(req, oti);
1007                 break;
1008         case OST_WRITE:
1009                 CDEBUG(D_INODE, "write\n");
1010                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1011                 rc = ost_brw_write(req, oti);
1012                 LASSERT(current->journal_info == NULL);
1013                 /* ost_brw sends its own replies */
1014                 RETURN(rc);
1015         case OST_READ:
1016                 CDEBUG(D_INODE, "read\n");
1017                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1018                 rc = ost_brw_read(req);
1019                 LASSERT(current->journal_info == NULL);
1020                 /* ost_brw sends its own replies */
1021                 RETURN(rc);
1022         case OST_SAN_READ:
1023                 CDEBUG(D_INODE, "san read\n");
1024                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1025                 rc = ost_san_brw(req, OBD_BRW_READ);
1026                 /* ost_san_brw sends its own replies */
1027                 RETURN(rc);
1028         case OST_SAN_WRITE:
1029                 CDEBUG(D_INODE, "san write\n");
1030                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1031                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1032                 /* ost_san_brw sends its own replies */
1033                 RETURN(rc);
1034         case OST_PUNCH:
1035                 CDEBUG(D_INODE, "punch\n");
1036                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1037                 rc = ost_punch(req, oti);
1038                 break;
1039         case OST_STATFS:
1040                 CDEBUG(D_INODE, "statfs\n");
1041                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1042                 rc = ost_statfs(req);
1043                 break;
1044         case OST_SYNCFS:
1045                 CDEBUG(D_INODE, "sync\n");
1046                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
1047                 rc = ost_syncfs(req);
1048                 break;
1049         case OST_SET_INFO:
1050                 DEBUG_REQ(D_INODE, req, "set_info");
1051                 rc = ost_set_info(req);
1052         case OBD_PING:
1053                 DEBUG_REQ(D_INODE, req, "ping");
1054                 rc = target_handle_ping(req);
1055                 break;
1056         case OBD_LOG_CANCEL:
1057                 CDEBUG(D_INODE, "log cancel\n");
1058                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1059                 rc = ost_log_cancel(req);
1060                 break;
1061         case LDLM_ENQUEUE:
1062                 CDEBUG(D_INODE, "enqueue\n");
1063                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1064                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1065                                          ldlm_server_blocking_ast);
1066                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1067                 break;
1068         case LDLM_CONVERT:
1069                 CDEBUG(D_INODE, "convert\n");
1070                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1071                 rc = ldlm_handle_convert(req);
1072                 break;
1073         case LDLM_CANCEL:
1074                 CDEBUG(D_INODE, "cancel\n");
1075                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1076                 rc = ldlm_handle_cancel(req);
1077                 break;
1078         case LDLM_BL_CALLBACK:
1079         case LDLM_CP_CALLBACK:
1080                 CDEBUG(D_INODE, "callback\n");
1081                 CERROR("callbacks should not happen on OST\n");
1082                 /* fall through */
1083         default:
1084                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1085                 req->rq_status = -ENOTSUPP;
1086                 rc = ptlrpc_error(req);
1087                 RETURN(rc);
1088         }
1089
1090         LASSERT(current->journal_info == NULL);
1091
1092         EXIT;
1093         /* If we're DISCONNECTing, the export_data is already freed */
1094         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1095                 struct obd_device *obd  = req->rq_export->exp_obd;
1096                 if (!obd->obd_no_transno) {
1097                         req->rq_repmsg->last_committed =
1098                                 obd->obd_last_committed;
1099                 } else {
1100                         DEBUG_REQ(D_IOCTL, req,
1101                                   "not sending last_committed update");
1102                 }
1103                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1104                        obd->obd_last_committed, req->rq_xid);
1105         }
1106
1107 out:
1108         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1109                 struct obd_device *obd = req->rq_export->exp_obd;
1110
1111                 if (obd && obd->obd_recovering) {
1112                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1113                         return target_queue_final_reply(req, rc);
1114                 }
1115                 /* Lost a race with recovery; let the error path DTRT. */
1116                 rc = req->rq_status = -ENOTCONN;
1117         }
1118
1119         if (!rc)
1120                 oti_to_request(oti, req);
1121
1122         target_send_reply(req, rc, fail);
1123         return 0;
1124 }
1125
1126 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
1127 {
1128         struct ost_obd *ost = &obddev->u.ost;
1129         int err, i;
1130         ENTRY;
1131
1132 #ifdef ENABLE_ORPHANS
1133         err = llog_start_commit_thread();
1134         if (err < 0)
1135                 RETURN(err);
1136 #endif
1137
1138         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
1139                                            OST_BUFSIZE, OST_MAXREQSIZE,
1140                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
1141                                            ost_handle, "ost", obddev);
1142         if (!ost->ost_service) {
1143                 CERROR("failed to start service\n");
1144                 RETURN(-ENOMEM);
1145         }
1146
1147         for (i = 0; i < OST_NUM_THREADS; i++) {
1148                 char name[32];
1149                 sprintf(name, "ll_ost_%02d", i);
1150                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
1151                 if (err) {
1152                         CERROR("error starting thread #%d: rc %d\n", i, err);
1153                         RETURN(-EINVAL);
1154                 }
1155         }
1156
1157         RETURN(0);
1158 }
1159
1160 static int ost_cleanup(struct obd_device *obddev, int flags)
1161 {
1162         struct ost_obd *ost = &obddev->u.ost;
1163         int err = 0;
1164         ENTRY;
1165
1166         if (obddev->obd_recovering)
1167                 target_cancel_recovery_timer(obddev);
1168
1169         ptlrpc_stop_all_threads(ost->ost_service);
1170         ptlrpc_unregister_service(ost->ost_service);
1171
1172         RETURN(err);
1173 }
1174
1175 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1176 {
1177         struct lprocfs_static_vars lvars;
1178
1179         lprocfs_init_vars(ost,&lvars);
1180         return lprocfs_obd_attach(dev, lvars.obd_vars);
1181 }
1182
1183 int ost_detach(struct obd_device *dev)
1184 {
1185         return lprocfs_obd_detach(dev);
1186 }
1187
1188 /* I don't think this function is ever used, since nothing
1189  * connects directly to this module.
1190  */
1191 static int ost_connect(struct lustre_handle *conn,
1192                        struct obd_device *obd, struct obd_uuid *cluuid)
1193 {
1194         struct obd_export *exp;
1195         int rc;
1196         ENTRY;
1197
1198         if (!conn || !obd || !cluuid)
1199                 RETURN(-EINVAL);
1200
1201         rc = class_connect(conn, obd, cluuid);
1202         if (rc)
1203                 RETURN(rc);
1204         exp = class_conn2export(conn);
1205         LASSERT(exp);
1206         class_export_put(exp);
1207
1208         RETURN(0);
1209 }
1210
1211 /* use obd ops to offer management infrastructure */
1212 static struct obd_ops ost_obd_ops = {
1213         o_owner:        THIS_MODULE,
1214         o_attach:       ost_attach,
1215         o_detach:       ost_detach,
1216         o_setup:        ost_setup,
1217         o_cleanup:      ost_cleanup,
1218         o_connect:      ost_connect,
1219 };
1220
1221 static int __init ost_init(void)
1222 {
1223         struct lprocfs_static_vars lvars;
1224         ENTRY;
1225
1226         lprocfs_init_vars(ost,&lvars);
1227         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
1228                                    LUSTRE_OST_NAME));
1229 }
1230
1231 static void /*__exit*/ ost_exit(void)
1232 {
1233         class_unregister_type(LUSTRE_OST_NAME);
1234 }
1235
1236 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1237 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1238 MODULE_LICENSE("GPL");
1239
1240 module_init(ost_init);
1241 module_exit(ost_exit);