Whamcloud - gitweb
- small include changes needed compile on powerpc
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/init.h>
41
42 static int ost_destroy(struct ptlrpc_request *req)
43 {
44         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
45         struct ost_body *body;
46         int rc, size = sizeof(*body);
47         ENTRY;
48
49         body = lustre_msg_buf(req->rq_reqmsg, 0);
50
51         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
52         if (rc)
53                 RETURN(rc);
54
55         req->rq_status = obd_destroy(conn, &body->oa, NULL);
56         RETURN(0);
57 }
58
59 static int ost_getattr(struct ptlrpc_request *req)
60 {
61         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
62         struct ost_body *body, *repbody;
63         int rc, size = sizeof(*body);
64         ENTRY;
65
66         body = lustre_msg_buf(req->rq_reqmsg, 0);
67
68         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
69         if (rc)
70                 RETURN(rc);
71
72         repbody = lustre_msg_buf(req->rq_repmsg, 0);
73         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
74         req->rq_status = obd_getattr(conn, &repbody->oa);
75         RETURN(0);
76 }
77
78 static int ost_statfs(struct ptlrpc_request *req)
79 {
80         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
81         struct obd_statfs *osfs;
82         struct statfs sfs;
83         int rc, size = sizeof(*osfs);
84         ENTRY;
85
86         rc = obd_statfs(conn, &sfs);
87         if (rc) {
88                 CERROR("ost: statfs failed: rc %d\n", rc);
89                 req->rq_status = rc;
90                 RETURN(rc);
91         }
92
93         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
94         if (rc)
95                 RETURN(rc);
96
97         osfs = lustre_msg_buf(req->rq_repmsg, 0);
98         memset(osfs, 0, size);
99         obd_statfs_pack(osfs, &sfs);
100         RETURN(0);
101 }
102
103 static int ost_open(struct ptlrpc_request *req)
104 {
105         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
106         struct ost_body *body, *repbody;
107         int rc, size = sizeof(*body);
108         ENTRY;
109
110         body = lustre_msg_buf(req->rq_reqmsg, 0);
111
112         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
113         if (rc)
114                 RETURN(rc);
115
116         repbody = lustre_msg_buf(req->rq_repmsg, 0);
117         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
118         req->rq_status = obd_open(conn, &repbody->oa, NULL);
119         RETURN(0);
120 }
121
122 static int ost_close(struct ptlrpc_request *req)
123 {
124         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
125         struct ost_body *body, *repbody;
126         int rc, size = sizeof(*body);
127         ENTRY;
128
129         body = lustre_msg_buf(req->rq_reqmsg, 0);
130
131         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
132         if (rc)
133                 RETURN(rc);
134
135         repbody = lustre_msg_buf(req->rq_repmsg, 0);
136         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
137         req->rq_status = obd_close(conn, &repbody->oa, NULL);
138         RETURN(0);
139 }
140
141 static int ost_create(struct ptlrpc_request *req)
142 {
143         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
144         struct ost_body *body, *repbody;
145         int rc, size = sizeof(*body);
146         ENTRY;
147
148         body = lustre_msg_buf(req->rq_reqmsg, 0);
149
150         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
151         if (rc)
152                 RETURN(rc);
153
154         repbody = lustre_msg_buf(req->rq_repmsg, 0);
155         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
156         req->rq_status = obd_create(conn, &repbody->oa, NULL);
157         RETURN(0);
158 }
159
160 static int ost_punch(struct ptlrpc_request *req)
161 {
162         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
163         struct ost_body *body, *repbody;
164         int rc, size = sizeof(*body);
165         ENTRY;
166
167         body = lustre_msg_buf(req->rq_reqmsg, 0);
168
169         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
170         if (rc)
171                 RETURN(rc);
172
173         repbody = lustre_msg_buf(req->rq_repmsg, 0);
174         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
175         req->rq_status = obd_punch(conn, &repbody->oa, NULL, 
176                                    repbody->oa.o_blocks, repbody->oa.o_size);
177         RETURN(0);
178 }
179
180 static int ost_setattr(struct ptlrpc_request *req)
181 {
182         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
183         struct ost_body *body, *repbody;
184         int rc, size = sizeof(*body);
185         ENTRY;
186
187         body = lustre_msg_buf(req->rq_reqmsg, 0);
188
189         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
190         if (rc)
191                 RETURN(rc);
192
193         repbody = lustre_msg_buf(req->rq_repmsg, 0);
194         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
195         req->rq_status = obd_setattr(conn, &repbody->oa);
196         RETURN(0);
197 }
198
199 static int ost_brw_read(struct ptlrpc_request *req)
200 {
201         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
202         struct ptlrpc_bulk_desc *desc;
203         void *tmp1, *tmp2, *end2;
204         struct niobuf_remote *remote_nb;
205         struct niobuf_local *local_nb = NULL;
206         struct obd_ioobj *ioo;
207         struct ost_body *body;
208         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
209         ENTRY;
210
211         body = lustre_msg_buf(req->rq_reqmsg, 0);
212         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
213         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
214         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
215         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
216         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
217         cmd = OBD_BRW_READ;
218
219         for (i = 0; i < objcount; i++) {
220                 ost_unpack_ioo(&tmp1, &ioo);
221                 if (tmp2 + ioo->ioo_bufcnt > end2) {
222                         LBUG();
223                         GOTO(out, rc = -EFAULT);
224                 }
225                 for (j = 0; j < ioo->ioo_bufcnt; j++)
226                         ost_unpack_niobuf(&tmp2, &remote_nb);
227         }
228
229         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
230         if (rc)
231                 RETURN(rc);
232         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
233         if (local_nb == NULL)
234                 RETURN(-ENOMEM);
235
236         /* The unpackers move tmp1 and tmp2, so reset them before using */
237         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
238         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
239         req->rq_status = obd_preprw(cmd, conn, objcount,
240                                     tmp1, niocount, tmp2, local_nb, NULL);
241
242         if (req->rq_status)
243                 GOTO(out_local, 0);
244
245         desc = ptlrpc_prep_bulk(req->rq_connection);
246         if (desc == NULL)
247                 GOTO(out_local, rc = -ENOMEM);
248         desc->b_portal = OST_BULK_PORTAL;
249
250         for (i = 0; i < niocount; i++) {
251                 struct ptlrpc_bulk_page *bulk;
252                 bulk = ptlrpc_prep_bulk_page(desc);
253                 if (bulk == NULL)
254                         GOTO(out_bulk, rc = -ENOMEM);
255                 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
256                 bulk->b_xid = remote_nb->xid;
257                 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
258                 bulk->b_buflen = PAGE_SIZE;
259         }
260
261         rc = ptlrpc_send_bulk(desc);
262         if (rc)
263                 GOTO(out_bulk, rc);
264
265 #warning OST must time out here.
266         wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
267         if (desc->b_flags & PTL_RPC_FL_INTR)
268                 rc = -EINTR;
269
270         /* The unpackers move tmp1 and tmp2, so reset them before using */
271         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
272         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
273         req->rq_status = obd_commitrw(cmd, conn, objcount,
274                                       tmp1, niocount, local_nb, NULL);
275
276 out_bulk:
277         ptlrpc_free_bulk(desc);
278 out_local:
279         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
280 out:
281         if (rc)
282                 ptlrpc_error(req->rq_svc, req);
283         else
284                 ptlrpc_reply(req->rq_svc, req);
285         RETURN(rc);
286 }
287
288 static int ost_brw_write(struct ptlrpc_request *req)
289 {
290         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
291         struct ptlrpc_bulk_desc *desc;
292         struct niobuf_remote *remote_nb;
293         struct niobuf_local *local_nb, *lnb;
294         struct obd_ioobj *ioo;
295         struct ost_body *body;
296         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
297         void *tmp1, *tmp2, *end2;
298         void *desc_priv = NULL;
299         int reply_sent = 0;
300         ENTRY;
301
302         body = lustre_msg_buf(req->rq_reqmsg, 0);
303         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
304         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
305         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
306         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
307         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
308         cmd = OBD_BRW_WRITE;
309
310         for (i = 0; i < objcount; i++) {
311                 ost_unpack_ioo((void *)&tmp1, &ioo);
312                 if (tmp2 + ioo->ioo_bufcnt > end2) {
313                         rc = -EFAULT;
314                         break;
315                 }
316                 for (j = 0; j < ioo->ioo_bufcnt; j++)
317                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
318         }
319
320         size[1] = niocount * sizeof(*remote_nb);
321         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
322         if (rc)
323                 GOTO(out, rc);
324         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
325
326         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
327         if (local_nb == NULL)
328                 GOTO(out, rc = -ENOMEM);
329
330         /* The unpackers move tmp1 and tmp2, so reset them before using */
331         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
332         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
333         req->rq_status = obd_preprw(cmd, conn, objcount,
334                                     tmp1, niocount, tmp2, local_nb, &desc_priv);
335         if (req->rq_status)
336                 GOTO(out_free, rc = 0); /* XXX is this correct? */
337
338         desc = ptlrpc_prep_bulk(req->rq_connection);
339         if (desc == NULL)
340                 GOTO(fail_preprw, rc = -ENOMEM);
341         desc->b_cb = NULL;
342         desc->b_portal = OSC_BULK_PORTAL;
343         desc->b_desc_private = desc_priv;
344         memcpy(&(desc->b_conn), &conn, sizeof(conn));
345
346         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
347                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
348                 struct ptlrpc_bulk_page *bulk;
349
350                 bulk = ptlrpc_prep_bulk_page(desc);
351                 if (bulk == NULL)
352                         GOTO(fail_bulk, rc = -ENOMEM);
353
354                 spin_lock(&srv->srv_lock);
355                 bulk->b_xid = srv->srv_xid++;
356                 spin_unlock(&srv->srv_lock);
357
358                 bulk->b_buf = lnb->addr;
359                 bulk->b_page = lnb->page;
360                 bulk->b_flags = lnb->flags;
361                 bulk->b_dentry = lnb->dentry;
362                 bulk->b_buflen = PAGE_SIZE;
363                 bulk->b_cb = NULL;
364
365                 /* this advances remote_nb */
366                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
367                                 bulk->b_xid);
368         }
369
370         rc = ptlrpc_register_bulk(desc);
371         if (rc)
372                 GOTO(fail_bulk, rc);
373
374         reply_sent = 1;
375         ptlrpc_reply(req->rq_svc, req);
376
377 #warning OST must time out here.
378         wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD);
379
380         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
381                           desc->b_desc_private);
382         ptlrpc_free_bulk(desc);
383         EXIT;
384 out_free:
385         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
386 out:
387         if (!reply_sent) {
388                 if (rc)
389                         ptlrpc_error(req->rq_svc, req);
390                 else
391                         ptlrpc_reply(req->rq_svc, req);
392         }
393         return rc;
394
395 fail_bulk:
396         ptlrpc_free_bulk(desc);
397 fail_preprw:
398         /* FIXME: how do we undo the preprw? */
399         goto out_free;
400 }
401
402 static int ost_handle(struct ptlrpc_request *req)
403 {
404         int rc;
405         ENTRY;
406
407         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
408         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
409                 CERROR("lustre_ost: Invalid request\n");
410                 GOTO(out, rc);
411         }
412
413         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
414                 CERROR("lustre_ost: wrong packet type sent %d\n",
415                        req->rq_reqmsg->type);
416                 GOTO(out, rc = -EINVAL);
417         }
418
419         if (req->rq_reqmsg->opc != OST_CONNECT &&
420             req->rq_export == NULL) {
421                 CERROR("lustre_ost: operation %d on unconnected OST\n",
422                        req->rq_reqmsg->opc);
423                 GOTO(out, rc = -ENOTCONN);
424         }
425
426         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
427                 GOTO(out, rc = -EINVAL);
428
429         switch (req->rq_reqmsg->opc) {
430         case OST_CONNECT:
431                 CDEBUG(D_INODE, "connect\n");
432                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
433                 rc = target_handle_connect(req);
434                 break;
435         case OST_DISCONNECT:
436                 CDEBUG(D_INODE, "disconnect\n");
437                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
438                 rc = target_handle_disconnect(req);
439                 break;
440         case OST_CREATE:
441                 CDEBUG(D_INODE, "create\n");
442                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
443                 rc = ost_create(req);
444                 break;
445         case OST_DESTROY:
446                 CDEBUG(D_INODE, "destroy\n");
447                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
448                 rc = ost_destroy(req);
449                 break;
450         case OST_GETATTR:
451                 CDEBUG(D_INODE, "getattr\n");
452                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
453                 rc = ost_getattr(req);
454                 break;
455         case OST_SETATTR:
456                 CDEBUG(D_INODE, "setattr\n");
457                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
458                 rc = ost_setattr(req);
459                 break;
460         case OST_OPEN:
461                 CDEBUG(D_INODE, "setattr\n");
462                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
463                 rc = ost_open(req);
464                 break;
465         case OST_CLOSE:
466                 CDEBUG(D_INODE, "setattr\n");
467                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
468                 rc = ost_close(req);
469                 break;
470         case OST_WRITE:
471                 CDEBUG(D_INODE, "write\n");
472                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
473                 rc = ost_brw_write(req);
474                 /* ost_brw sends its own replies */
475                 RETURN(rc);
476         case OST_READ:
477                 CDEBUG(D_INODE, "read\n");
478                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
479                 rc = ost_brw_read(req);
480                 /* ost_brw sends its own replies */
481                 RETURN(rc);
482         case OST_PUNCH:
483                 CDEBUG(D_INODE, "punch\n");
484                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
485                 rc = ost_punch(req);
486                 break;
487         case OST_STATFS:
488                 CDEBUG(D_INODE, "statfs\n");
489                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
490                 rc = ost_statfs(req);
491                 break;
492         default:
493                 req->rq_status = -ENOTSUPP;
494                 rc = ptlrpc_error(req->rq_svc, req);
495                 RETURN(rc);
496         }
497
498         EXIT;
499 out:
500         //req->rq_status = rc;
501         if (rc) {
502                 CERROR("ost: processing error (opcode=%d): %d\n",
503                        req->rq_reqmsg->opc, rc);
504                 ptlrpc_error(req->rq_svc, req);
505         } else {
506                 CDEBUG(D_INODE, "sending reply\n");
507                 if (req->rq_repmsg == NULL)
508                         CERROR("handler for opcode %d returned rc=0 without "
509                                "creating rq_repmsg; needs to return rc != "
510                                "0!\n", req->rq_reqmsg->opc);
511                 ptlrpc_reply(req->rq_svc, req);
512         }
513
514         return 0;
515 }
516
517 #define OST_NUM_THREADS 6
518
519 /* mount the file system (secretly) */
520 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
521 {
522         struct obd_ioctl_data* data = buf;
523         struct ost_obd *ost = &obddev->u.ost;
524         struct obd_device *tgt;
525         int err;
526         int i;
527         ENTRY;
528
529         if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
530                 RETURN(-ENODEV);
531
532         MOD_INC_USE_COUNT;
533         tgt = &obd_dev[data->ioc_dev];
534         if (!(tgt->obd_flags & OBD_ATTACHED) ||
535             !(tgt->obd_flags & OBD_SET_UP)) {
536                 CERROR("device not attached or not set up (%d)\n",
537                        data->ioc_dev);
538                 GOTO(error_dec, err = -EINVAL);
539         }
540
541         err = obd_connect(&ost->ost_conn, tgt);
542         if (err) {
543                 CERROR("fail to connect to device %d\n", data->ioc_dev);
544                 GOTO(error_dec, err = -EINVAL);
545         }
546
547         ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL,
548                                            OSC_REPLY_PORTAL, "self",ost_handle);
549         if (!ost->ost_service) {
550                 CERROR("failed to start service\n");
551                 GOTO(error_disc, err = -EINVAL);
552         }
553
554         for (i = 0; i < OST_NUM_THREADS; i++) {
555                 err = ptlrpc_start_thread(obddev, ost->ost_service,
556                                           "lustre_ost");
557                 if (err) {
558                         CERROR("error starting thread #%d: rc %d\n", i, err);
559                         GOTO(error_disc, err = -EINVAL);
560                 }
561         }
562
563         RETURN(0);
564
565 error_disc:
566         obd_disconnect(&ost->ost_conn);
567 error_dec:
568         MOD_DEC_USE_COUNT;
569         RETURN(err);
570 }
571
572 static int ost_cleanup(struct obd_device * obddev)
573 {
574         struct ost_obd *ost = &obddev->u.ost;
575         int err;
576
577         ENTRY;
578
579         if ( !list_empty(&obddev->obd_exports) ) {
580                 CERROR("still has clients!\n");
581                 RETURN(-EBUSY);
582         }
583
584         ptlrpc_stop_all_threads(ost->ost_service);
585         ptlrpc_unregister_service(ost->ost_service);
586
587         err = obd_disconnect(&ost->ost_conn);
588         if (err) {
589                 CERROR("lustre ost: fail to disconnect device\n");
590                 RETURN(-EINVAL);
591         }
592
593         MOD_DEC_USE_COUNT;
594         RETURN(0);
595 }
596
597 /* use obd ops to offer management infrastructure */
598 static struct obd_ops ost_obd_ops = {
599         o_setup:       ost_setup,
600         o_cleanup:     ost_cleanup,
601 };
602
603 static int __init ost_init(void)
604 {
605         class_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
606         return 0;
607 }
608
609 static void __exit ost_exit(void)
610 {
611         class_unregister_type(LUSTRE_OST_NAME);
612 }
613
614 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
615 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
616 MODULE_LICENSE("GPL");
617
618 module_init(ost_init);
619 module_exit(ost_exit);