Whamcloud - gitweb
Fix problem with multiple connections to the same local device.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40
41 static int ost_destroy(struct ptlrpc_request *req)
42 {
43         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
44         struct ost_body *body;
45         int rc, size = sizeof(*body);
46         ENTRY;
47
48         body = lustre_msg_buf(req->rq_reqmsg, 0);
49
50         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
51         if (rc)
52                 RETURN(rc);
53
54         req->rq_status = obd_destroy(conn, &body->oa, NULL);
55         RETURN(0);
56 }
57
58 static int ost_getattr(struct ptlrpc_request *req)
59 {
60         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
61         struct ost_body *body, *repbody;
62         int rc, size = sizeof(*body);
63         ENTRY;
64
65         body = lustre_msg_buf(req->rq_reqmsg, 0);
66
67         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
68         if (rc)
69                 RETURN(rc);
70
71         repbody = lustre_msg_buf(req->rq_repmsg, 0);
72         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
73         req->rq_status = obd_getattr(conn, &repbody->oa);
74         RETURN(0);
75 }
76
77 static int ost_statfs(struct ptlrpc_request *req)
78 {
79         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
80         struct obd_statfs *osfs;
81         struct statfs sfs;
82         int rc, size = sizeof(*osfs);
83         ENTRY;
84
85         rc = obd_statfs(conn, &sfs);
86         if (rc) {
87                 CERROR("ost: statfs failed: rc %d\n", rc);
88                 req->rq_status = rc;
89                 RETURN(rc);
90         }
91
92         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
93         if (rc)
94                 RETURN(rc);
95
96         osfs = lustre_msg_buf(req->rq_repmsg, 0);
97         memset(osfs, 0, size);
98         obd_statfs_pack(osfs, &sfs);
99         RETURN(0);
100 }
101
102 static int ost_open(struct ptlrpc_request *req)
103 {
104         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
105         struct ost_body *body, *repbody;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         body = lustre_msg_buf(req->rq_reqmsg, 0);
110
111         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
112         if (rc)
113                 RETURN(rc);
114
115         repbody = lustre_msg_buf(req->rq_repmsg, 0);
116         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
117         req->rq_status = obd_open(conn, &repbody->oa, NULL);
118         RETURN(0);
119 }
120
121 static int ost_close(struct ptlrpc_request *req)
122 {
123         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
124         struct ost_body *body, *repbody;
125         int rc, size = sizeof(*body);
126         ENTRY;
127
128         body = lustre_msg_buf(req->rq_reqmsg, 0);
129
130         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
131         if (rc)
132                 RETURN(rc);
133
134         repbody = lustre_msg_buf(req->rq_repmsg, 0);
135         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
136         req->rq_status = obd_close(conn, &repbody->oa, NULL);
137         RETURN(0);
138 }
139
140 static int ost_create(struct ptlrpc_request *req)
141 {
142         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
143         struct ost_body *body, *repbody;
144         int rc, size = sizeof(*body);
145         ENTRY;
146
147         body = lustre_msg_buf(req->rq_reqmsg, 0);
148
149         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
150         if (rc)
151                 RETURN(rc);
152
153         repbody = lustre_msg_buf(req->rq_repmsg, 0);
154         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
155         req->rq_status = obd_create(conn, &repbody->oa, NULL);
156         RETURN(0);
157 }
158
159 static int ost_punch(struct ptlrpc_request *req)
160 {
161         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
162         struct ost_body *body, *repbody;
163         int rc, size = sizeof(*body);
164         ENTRY;
165
166         body = lustre_msg_buf(req->rq_reqmsg, 0);
167
168         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
169         if (rc)
170                 RETURN(rc);
171
172         repbody = lustre_msg_buf(req->rq_repmsg, 0);
173         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
174         req->rq_status = obd_punch(conn, &repbody->oa, NULL, 
175                                    repbody->oa.o_blocks, repbody->oa.o_size);
176         RETURN(0);
177 }
178
179 static int ost_setattr(struct ptlrpc_request *req)
180 {
181         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
182         struct ost_body *body, *repbody;
183         int rc, size = sizeof(*body);
184         ENTRY;
185
186         body = lustre_msg_buf(req->rq_reqmsg, 0);
187
188         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
189         if (rc)
190                 RETURN(rc);
191
192         repbody = lustre_msg_buf(req->rq_repmsg, 0);
193         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
194         req->rq_status = obd_setattr(conn, &repbody->oa);
195         RETURN(0);
196 }
197
198 static int ost_brw_read(struct ptlrpc_request *req)
199 {
200         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
201         struct ptlrpc_bulk_desc *desc;
202         void *tmp1, *tmp2, *end2;
203         struct niobuf_remote *remote_nb;
204         struct niobuf_local *local_nb = NULL;
205         struct obd_ioobj *ioo;
206         struct ost_body *body;
207         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
208         ENTRY;
209
210         body = lustre_msg_buf(req->rq_reqmsg, 0);
211         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
212         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
213         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
214         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
215         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
216         cmd = body->data;
217
218         for (i = 0; i < objcount; i++) {
219                 ost_unpack_ioo(&tmp1, &ioo);
220                 if (tmp2 + ioo->ioo_bufcnt > end2) {
221                         LBUG();
222                         GOTO(out, rc = -EFAULT);
223                 }
224                 for (j = 0; j < ioo->ioo_bufcnt; j++)
225                         ost_unpack_niobuf(&tmp2, &remote_nb);
226         }
227
228         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
229         if (rc)
230                 RETURN(rc);
231         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
232         if (local_nb == NULL)
233                 RETURN(-ENOMEM);
234
235         /* The unpackers move tmp1 and tmp2, so reset them before using */
236         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
237         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
238         req->rq_status = obd_preprw(cmd, conn, objcount,
239                                     tmp1, niocount, tmp2, local_nb, NULL);
240
241         if (req->rq_status)
242                 GOTO(out_local, 0);
243
244         desc = ptlrpc_prep_bulk(req->rq_connection);
245         if (desc == NULL)
246                 GOTO(out_local, rc = -ENOMEM);
247         desc->b_portal = OST_BULK_PORTAL;
248
249         for (i = 0; i < niocount; i++) {
250                 struct ptlrpc_bulk_page *bulk;
251                 bulk = ptlrpc_prep_bulk_page(desc);
252                 if (bulk == NULL)
253                         GOTO(out_bulk, rc = -ENOMEM);
254                 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
255                 bulk->b_xid = remote_nb->xid;
256                 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
257                 bulk->b_buflen = PAGE_SIZE;
258         }
259
260         rc = ptlrpc_send_bulk(desc);
261         if (rc)
262                 GOTO(out_bulk, rc);
263
264 #warning OST must time out here.
265         wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
266         if (desc->b_flags & PTL_RPC_FL_INTR)
267                 rc = -EINTR;
268
269         /* The unpackers move tmp1 and tmp2, so reset them before using */
270         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
271         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
272         req->rq_status = obd_commitrw(cmd, conn, objcount,
273                                       tmp1, niocount, local_nb, NULL);
274
275 out_bulk:
276         ptlrpc_free_bulk(desc);
277 out_local:
278         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
279 out:
280         if (rc)
281                 ptlrpc_error(req->rq_svc, req);
282         else
283                 ptlrpc_reply(req->rq_svc, req);
284         RETURN(rc);
285 }
286
287 static int ost_brw_write(struct ptlrpc_request *req)
288 {
289         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
290         struct ptlrpc_bulk_desc *desc;
291         struct niobuf_remote *remote_nb;
292         struct niobuf_local *local_nb, *lnb;
293         struct obd_ioobj *ioo;
294         struct ost_body *body;
295         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
296         void *tmp1, *tmp2, *end2;
297         void *desc_priv = NULL;
298         int reply_sent = 0;
299         ENTRY;
300
301         body = lustre_msg_buf(req->rq_reqmsg, 0);
302         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
303         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
304         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
305         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
306         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
307         cmd = body->data;
308
309         for (i = 0; i < objcount; i++) {
310                 ost_unpack_ioo((void *)&tmp1, &ioo);
311                 if (tmp2 + ioo->ioo_bufcnt > end2) {
312                         rc = -EFAULT;
313                         break;
314                 }
315                 for (j = 0; j < ioo->ioo_bufcnt; j++)
316                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
317         }
318
319         size[1] = niocount * sizeof(*remote_nb);
320         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
321         if (rc)
322                 GOTO(out, rc);
323         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
324
325         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
326         if (local_nb == NULL)
327                 GOTO(out, rc = -ENOMEM);
328
329         /* The unpackers move tmp1 and tmp2, so reset them before using */
330         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
331         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
332         req->rq_status = obd_preprw(cmd, conn, objcount,
333                                     tmp1, niocount, tmp2, local_nb, &desc_priv);
334         if (req->rq_status)
335                 GOTO(out_free, rc = 0); /* XXX is this correct? */
336
337         desc = ptlrpc_prep_bulk(req->rq_connection);
338         if (desc == NULL)
339                 GOTO(fail_preprw, rc = -ENOMEM);
340         desc->b_cb = NULL;
341         desc->b_portal = OSC_BULK_PORTAL;
342         desc->b_desc_private = desc_priv;
343         memcpy(&(desc->b_conn), &conn, sizeof(conn));
344
345         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
346                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
347                 struct ptlrpc_bulk_page *bulk;
348
349                 bulk = ptlrpc_prep_bulk_page(desc);
350                 if (bulk == NULL)
351                         GOTO(fail_bulk, rc = -ENOMEM);
352
353                 spin_lock(&srv->srv_lock);
354                 bulk->b_xid = srv->srv_xid++;
355                 spin_unlock(&srv->srv_lock);
356
357                 bulk->b_buf = lnb->addr;
358                 bulk->b_page = lnb->page;
359                 bulk->b_flags = lnb->flags;
360                 bulk->b_dentry = lnb->dentry;
361                 bulk->b_buflen = PAGE_SIZE;
362                 bulk->b_cb = NULL;
363
364                 /* this advances remote_nb */
365                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
366                                 bulk->b_xid);
367         }
368
369         rc = ptlrpc_register_bulk(desc);
370         if (rc)
371                 GOTO(fail_bulk, rc);
372
373         reply_sent = 1;
374         ptlrpc_reply(req->rq_svc, req);
375
376 #warning OST must time out here.
377         wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD);
378
379         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
380                           desc->b_desc_private);
381         ptlrpc_free_bulk(desc);
382         EXIT;
383 out_free:
384         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
385 out:
386         if (!reply_sent) {
387                 if (rc)
388                         ptlrpc_error(req->rq_svc, req);
389                 else
390                         ptlrpc_reply(req->rq_svc, req);
391         }
392         return rc;
393
394 fail_bulk:
395         ptlrpc_free_bulk(desc);
396 fail_preprw:
397         /* FIXME: how do we undo the preprw? */
398         goto out_free;
399 }
400
401 static int ost_brw(struct ptlrpc_request *req)
402 {
403         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
404
405         if (body->data & OBD_BRW_WRITE)
406                 return ost_brw_write(req);
407         else
408                 return ost_brw_read(req);
409 }
410
411
412 static int ost_handle(struct ptlrpc_request *req)
413 {
414         int rc;
415         ENTRY;
416
417         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
418         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
419                 CERROR("lustre_ost: Invalid request\n");
420                 GOTO(out, rc);
421         }
422
423         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
424                 CERROR("lustre_ost: wrong packet type sent %d\n",
425                        req->rq_reqmsg->type);
426                 GOTO(out, rc = -EINVAL);
427         }
428
429         if (req->rq_reqmsg->opc != OST_CONNECT &&
430             req->rq_export == NULL) {
431                 CERROR("lustre_ost: operation %d on unconnected OST\n",
432                        req->rq_reqmsg->opc);
433                 GOTO(out, rc = -ENOTCONN);
434         }
435
436         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
437                 GOTO(out, rc = -EINVAL);
438
439         switch (req->rq_reqmsg->opc) {
440         case OST_CONNECT:
441                 CDEBUG(D_INODE, "connect\n");
442                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
443                 rc = target_handle_connect(req);
444                 break;
445         case OST_DISCONNECT:
446                 CDEBUG(D_INODE, "disconnect\n");
447                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
448                 rc = target_handle_disconnect(req);
449                 break;
450         case OST_CREATE:
451                 CDEBUG(D_INODE, "create\n");
452                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
453                 rc = ost_create(req);
454                 break;
455         case OST_DESTROY:
456                 CDEBUG(D_INODE, "destroy\n");
457                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
458                 rc = ost_destroy(req);
459                 break;
460         case OST_GETATTR:
461                 CDEBUG(D_INODE, "getattr\n");
462                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
463                 rc = ost_getattr(req);
464                 break;
465         case OST_SETATTR:
466                 CDEBUG(D_INODE, "setattr\n");
467                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
468                 rc = ost_setattr(req);
469                 break;
470         case OST_OPEN:
471                 CDEBUG(D_INODE, "setattr\n");
472                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
473                 rc = ost_open(req);
474                 break;
475         case OST_CLOSE:
476                 CDEBUG(D_INODE, "setattr\n");
477                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
478                 rc = ost_close(req);
479                 break;
480         case OST_BRW:
481                 CDEBUG(D_INODE, "brw\n");
482                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
483                 rc = ost_brw(req);
484                 /* ost_brw sends its own replies */
485                 RETURN(rc);
486         case OST_PUNCH:
487                 CDEBUG(D_INODE, "punch\n");
488                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
489                 rc = ost_punch(req);
490                 break;
491         case OST_STATFS:
492                 CDEBUG(D_INODE, "statfs\n");
493                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
494                 rc = ost_statfs(req);
495                 break;
496         default:
497                 req->rq_status = -ENOTSUPP;
498                 rc = ptlrpc_error(req->rq_svc, req);
499                 RETURN(rc);
500         }
501
502         EXIT;
503 out:
504         //req->rq_status = rc;
505         if (rc) {
506                 CERROR("ost: processing error (opcode=%d): %d\n",
507                        req->rq_reqmsg->opc, rc);
508                 ptlrpc_error(req->rq_svc, req);
509         } else {
510                 CDEBUG(D_INODE, "sending reply\n");
511                 if (req->rq_repmsg == NULL)
512                         CERROR("handler for opcode %d returned rc=0 without "
513                                "creating rq_repmsg; needs to return rc != "
514                                "0!\n", req->rq_reqmsg->opc);
515                 ptlrpc_reply(req->rq_svc, req);
516         }
517
518         return 0;
519 }
520
521 #define OST_NUM_THREADS 6
522
523 /* mount the file system (secretly) */
524 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
525 {
526         struct obd_ioctl_data* data = buf;
527         struct ost_obd *ost = &obddev->u.ost;
528         struct obd_device *tgt;
529         int err;
530         int i;
531         ENTRY;
532
533         if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
534                 RETURN(-ENODEV);
535
536         MOD_INC_USE_COUNT;
537         tgt = &obd_dev[data->ioc_dev];
538         if (!(tgt->obd_flags & OBD_ATTACHED) ||
539             !(tgt->obd_flags & OBD_SET_UP)) {
540                 CERROR("device not attached or not set up (%d)\n",
541                        data->ioc_dev);
542                 GOTO(error_dec, err = -EINVAL);
543         }
544
545         err = obd_connect(&ost->ost_conn, tgt);
546         if (err) {
547                 CERROR("fail to connect to device %d\n", data->ioc_dev);
548                 GOTO(error_dec, err = -EINVAL);
549         }
550
551         ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL,
552                                            OSC_REPLY_PORTAL, "self",ost_handle);
553         if (!ost->ost_service) {
554                 CERROR("failed to start service\n");
555                 GOTO(error_disc, err = -EINVAL);
556         }
557
558         for (i = 0; i < OST_NUM_THREADS; i++) {
559                 err = ptlrpc_start_thread(obddev, ost->ost_service,
560                                           "lustre_ost");
561                 if (err) {
562                         CERROR("error starting thread #%d: rc %d\n", i, err);
563                         GOTO(error_disc, err = -EINVAL);
564                 }
565         }
566
567         RETURN(0);
568
569 error_disc:
570         obd_disconnect(&ost->ost_conn);
571 error_dec:
572         MOD_DEC_USE_COUNT;
573         RETURN(err);
574 }
575
576 static int ost_cleanup(struct obd_device * obddev)
577 {
578         struct ost_obd *ost = &obddev->u.ost;
579         int err;
580
581         ENTRY;
582
583         if ( !list_empty(&obddev->obd_exports) ) {
584                 CERROR("still has clients!\n");
585                 RETURN(-EBUSY);
586         }
587
588         ptlrpc_stop_all_threads(ost->ost_service);
589         ptlrpc_unregister_service(ost->ost_service);
590
591         err = obd_disconnect(&ost->ost_conn);
592         if (err) {
593                 CERROR("lustre ost: fail to disconnect device\n");
594                 RETURN(-EINVAL);
595         }
596
597         MOD_DEC_USE_COUNT;
598         RETURN(0);
599 }
600
601 /* use obd ops to offer management infrastructure */
602 static struct obd_ops ost_obd_ops = {
603         o_setup:       ost_setup,
604         o_cleanup:     ost_cleanup,
605 };
606
607 static int __init ost_init(void)
608 {
609         class_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
610         return 0;
611 }
612
613 static void __exit ost_exit(void)
614 {
615         class_unregister_type(LUSTRE_OST_NAME);
616 }
617
618 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
619 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
620 MODULE_LICENSE("GPL");
621
622 module_init(ost_init);
623 module_exit(ost_exit);