Whamcloud - gitweb
Don't have spaces in process names.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/init.h>
41
42 static int ost_destroy(struct ptlrpc_request *req)
43 {
44         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
45         struct ost_body *body;
46         int rc, size = sizeof(*body);
47         ENTRY;
48
49         body = lustre_msg_buf(req->rq_reqmsg, 0);
50
51         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
52         if (rc)
53                 RETURN(rc);
54
55         req->rq_status = obd_destroy(conn, &body->oa, NULL);
56         RETURN(0);
57 }
58
59 static int ost_getattr(struct ptlrpc_request *req)
60 {
61         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
62         struct ost_body *body, *repbody;
63         int rc, size = sizeof(*body);
64         ENTRY;
65
66         body = lustre_msg_buf(req->rq_reqmsg, 0);
67
68         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
69         if (rc)
70                 RETURN(rc);
71
72         repbody = lustre_msg_buf(req->rq_repmsg, 0);
73         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
74         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
75         RETURN(0);
76 }
77
78 static int ost_statfs(struct ptlrpc_request *req)
79 {
80         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
81         struct obd_statfs *osfs;
82         struct statfs sfs;
83         int rc, size = sizeof(*osfs);
84         ENTRY;
85
86         rc = obd_statfs(conn, &sfs);
87         if (rc) {
88                 CERROR("ost: statfs failed: rc %d\n", rc);
89                 req->rq_status = rc;
90                 RETURN(rc);
91         }
92
93         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
94         if (rc)
95                 RETURN(rc);
96
97         osfs = lustre_msg_buf(req->rq_repmsg, 0);
98         memset(osfs, 0, size);
99         obd_statfs_pack(osfs, &sfs);
100         RETURN(0);
101 }
102
103 static int ost_open(struct ptlrpc_request *req)
104 {
105         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
106         struct ost_body *body, *repbody;
107         int rc, size = sizeof(*body);
108         ENTRY;
109
110         body = lustre_msg_buf(req->rq_reqmsg, 0);
111
112         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
113         if (rc)
114                 RETURN(rc);
115
116         repbody = lustre_msg_buf(req->rq_repmsg, 0);
117         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
118         req->rq_status = obd_open(conn, &repbody->oa, NULL);
119         RETURN(0);
120 }
121
122 static int ost_close(struct ptlrpc_request *req)
123 {
124         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
125         struct ost_body *body, *repbody;
126         int rc, size = sizeof(*body);
127         ENTRY;
128
129         body = lustre_msg_buf(req->rq_reqmsg, 0);
130
131         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
132         if (rc)
133                 RETURN(rc);
134
135         repbody = lustre_msg_buf(req->rq_repmsg, 0);
136         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
137         req->rq_status = obd_close(conn, &repbody->oa, NULL);
138         RETURN(0);
139 }
140
141 static int ost_create(struct ptlrpc_request *req)
142 {
143         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
144         struct ost_body *body, *repbody;
145         int rc, size = sizeof(*body);
146         ENTRY;
147
148         body = lustre_msg_buf(req->rq_reqmsg, 0);
149
150         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
151         if (rc)
152                 RETURN(rc);
153
154         repbody = lustre_msg_buf(req->rq_repmsg, 0);
155         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
156         req->rq_status = obd_create(conn, &repbody->oa, NULL);
157         RETURN(0);
158 }
159
160 static int ost_punch(struct ptlrpc_request *req)
161 {
162         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
163         struct ost_body *body, *repbody;
164         int rc, size = sizeof(*body);
165         ENTRY;
166
167         body = lustre_msg_buf(req->rq_reqmsg, 0);
168
169         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
170         if (rc)
171                 RETURN(rc);
172
173         repbody = lustre_msg_buf(req->rq_repmsg, 0);
174         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
175         req->rq_status = obd_punch(conn, &repbody->oa, NULL, 
176                                    repbody->oa.o_blocks, repbody->oa.o_size);
177         RETURN(0);
178 }
179
180 static int ost_setattr(struct ptlrpc_request *req)
181 {
182         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
183         struct ost_body *body, *repbody;
184         int rc, size = sizeof(*body);
185         ENTRY;
186
187         body = lustre_msg_buf(req->rq_reqmsg, 0);
188
189         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
190         if (rc)
191                 RETURN(rc);
192
193         repbody = lustre_msg_buf(req->rq_repmsg, 0);
194         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
195         req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
196         RETURN(0);
197 }
198
199 static int ost_brw_read(struct ptlrpc_request *req)
200 {
201         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
202         struct ptlrpc_bulk_desc *desc;
203         void *tmp1, *tmp2, *end2;
204         struct niobuf_remote *remote_nb;
205         struct niobuf_local *local_nb = NULL;
206         struct obd_ioobj *ioo;
207         struct ost_body *body;
208         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
209         ENTRY;
210
211         body = lustre_msg_buf(req->rq_reqmsg, 0);
212         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
213         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
214         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
215         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
216         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
217         cmd = OBD_BRW_READ;
218
219         for (i = 0; i < objcount; i++) {
220                 ost_unpack_ioo(&tmp1, &ioo);
221                 if (tmp2 + ioo->ioo_bufcnt > end2) {
222                         LBUG();
223                         GOTO(out, rc = -EFAULT);
224                 }
225                 for (j = 0; j < ioo->ioo_bufcnt; j++)
226                         ost_unpack_niobuf(&tmp2, &remote_nb);
227         }
228
229         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
230         if (rc)
231                 RETURN(rc);
232         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
233         if (local_nb == NULL)
234                 RETURN(-ENOMEM);
235
236         /* The unpackers move tmp1 and tmp2, so reset them before using */
237         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
238         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
239         req->rq_status = obd_preprw(cmd, conn, objcount,
240                                     tmp1, niocount, tmp2, local_nb, NULL);
241
242         if (req->rq_status)
243                 GOTO(out_local, 0);
244
245         desc = ptlrpc_prep_bulk(req->rq_connection);
246         if (desc == NULL)
247                 GOTO(out_local, rc = -ENOMEM);
248         desc->b_portal = OST_BULK_PORTAL;
249
250         for (i = 0; i < niocount; i++) {
251                 struct ptlrpc_bulk_page *bulk;
252                 bulk = ptlrpc_prep_bulk_page(desc);
253                 if (bulk == NULL)
254                         GOTO(out_bulk, rc = -ENOMEM);
255                 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
256                 bulk->b_xid = remote_nb->xid;
257                 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
258                 bulk->b_buflen = PAGE_SIZE;
259         }
260
261         rc = ptlrpc_send_bulk(desc);
262         if (rc)
263                 GOTO(out_bulk, rc);
264
265 #warning OST must time out here.
266         wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
267         if (desc->b_flags & PTL_RPC_FL_INTR)
268                 rc = -EINTR;
269
270         /* The unpackers move tmp1 and tmp2, so reset them before using */
271         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
272         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
273         req->rq_status = obd_commitrw(cmd, conn, objcount,
274                                       tmp1, niocount, local_nb, NULL);
275
276 out_bulk:
277         ptlrpc_free_bulk(desc);
278 out_local:
279         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
280 out:
281         if (rc)
282                 ptlrpc_error(req->rq_svc, req);
283         else
284                 ptlrpc_reply(req->rq_svc, req);
285         RETURN(rc);
286 }
287
288 static int ost_brw_write(struct ptlrpc_request *req)
289 {
290         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
291         struct ptlrpc_bulk_desc *desc;
292         struct niobuf_remote *remote_nb;
293         struct niobuf_local *local_nb, *lnb;
294         struct obd_ioobj *ioo;
295         struct ost_body *body;
296         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
297         void *tmp1, *tmp2, *end2;
298         void *desc_priv = NULL;
299         int reply_sent = 0;
300         struct ptlrpc_service *srv;
301         __u32 xid;
302         ENTRY;
303
304         body = lustre_msg_buf(req->rq_reqmsg, 0);
305         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
306         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
307         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
308         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
309         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
310         cmd = OBD_BRW_WRITE;
311
312         for (i = 0; i < objcount; i++) {
313                 ost_unpack_ioo((void *)&tmp1, &ioo);
314                 if (tmp2 + ioo->ioo_bufcnt > end2) {
315                         rc = -EFAULT;
316                         break;
317                 }
318                 for (j = 0; j < ioo->ioo_bufcnt; j++)
319                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
320         }
321
322         size[1] = niocount * sizeof(*remote_nb);
323         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
324         if (rc)
325                 GOTO(out, rc);
326         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
327
328         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
329         if (local_nb == NULL)
330                 GOTO(out, rc = -ENOMEM);
331
332         /* The unpackers move tmp1 and tmp2, so reset them before using */
333         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
334         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
335         req->rq_status = obd_preprw(cmd, conn, objcount,
336                                     tmp1, niocount, tmp2, local_nb, &desc_priv);
337         if (req->rq_status)
338                 GOTO(out_free, rc = 0); /* XXX is this correct? */
339
340         desc = ptlrpc_prep_bulk(req->rq_connection);
341         if (desc == NULL)
342                 GOTO(fail_preprw, rc = -ENOMEM);
343         desc->b_cb = NULL;
344         desc->b_portal = OSC_BULK_PORTAL;
345         desc->b_desc_private = desc_priv;
346         memcpy(&(desc->b_conn), &conn, sizeof(conn));
347
348         srv = req->rq_obd->u.ost.ost_service;
349         spin_lock(&srv->srv_lock);
350         xid = srv->srv_xid++;                   /* single xid for all pages */
351         spin_unlock(&srv->srv_lock);
352
353         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
354                 struct ptlrpc_bulk_page *bulk;
355
356                 bulk = ptlrpc_prep_bulk_page(desc);
357                 if (bulk == NULL)
358                         GOTO(fail_bulk, rc = -ENOMEM);
359
360                 bulk->b_xid = xid;              /* single xid for all pages */
361
362                 bulk->b_buf = lnb->addr;
363                 bulk->b_page = lnb->page;
364                 bulk->b_flags = lnb->flags;
365                 bulk->b_dentry = lnb->dentry;
366                 bulk->b_buflen = PAGE_SIZE;
367                 bulk->b_cb = NULL;
368
369                 /* this advances remote_nb */
370                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
371                                 bulk->b_xid);
372         }
373
374         rc = ptlrpc_register_bulk(desc);
375         if (rc)
376                 GOTO(fail_bulk, rc);
377
378         reply_sent = 1;
379         ptlrpc_reply(req->rq_svc, req);
380
381 #warning OST must time out here.
382         wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD);
383
384         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
385                           desc->b_desc_private);
386         ptlrpc_free_bulk(desc);
387         EXIT;
388 out_free:
389         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
390 out:
391         if (!reply_sent) {
392                 if (rc)
393                         ptlrpc_error(req->rq_svc, req);
394                 else
395                         ptlrpc_reply(req->rq_svc, req);
396         }
397         return rc;
398
399 fail_bulk:
400         ptlrpc_free_bulk(desc);
401 fail_preprw:
402         /* FIXME: how do we undo the preprw? */
403         goto out_free;
404 }
405
406 static int ost_handle(struct ptlrpc_request *req)
407 {
408         int rc;
409         ENTRY;
410
411         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
412         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
413                 CERROR("lustre_ost: Invalid request\n");
414                 GOTO(out, rc);
415         }
416
417         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
418                 CERROR("lustre_ost: wrong packet type sent %d\n",
419                        req->rq_reqmsg->type);
420                 GOTO(out, rc = -EINVAL);
421         }
422
423         if (req->rq_reqmsg->opc != OST_CONNECT &&
424             req->rq_export == NULL) {
425                 CERROR("lustre_ost: operation %d on unconnected OST\n",
426                        req->rq_reqmsg->opc);
427                 GOTO(out, rc = -ENOTCONN);
428         }
429
430         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
431                 GOTO(out, rc = -EINVAL);
432
433         switch (req->rq_reqmsg->opc) {
434         case OST_CONNECT:
435                 CDEBUG(D_INODE, "connect\n");
436                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
437                 rc = target_handle_connect(req);
438                 break;
439         case OST_DISCONNECT:
440                 CDEBUG(D_INODE, "disconnect\n");
441                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
442                 rc = target_handle_disconnect(req);
443                 break;
444         case OST_CREATE:
445                 CDEBUG(D_INODE, "create\n");
446                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
447                 rc = ost_create(req);
448                 break;
449         case OST_DESTROY:
450                 CDEBUG(D_INODE, "destroy\n");
451                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
452                 rc = ost_destroy(req);
453                 break;
454         case OST_GETATTR:
455                 CDEBUG(D_INODE, "getattr\n");
456                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
457                 rc = ost_getattr(req);
458                 break;
459         case OST_SETATTR:
460                 CDEBUG(D_INODE, "setattr\n");
461                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
462                 rc = ost_setattr(req);
463                 break;
464         case OST_OPEN:
465                 CDEBUG(D_INODE, "setattr\n");
466                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
467                 rc = ost_open(req);
468                 break;
469         case OST_CLOSE:
470                 CDEBUG(D_INODE, "setattr\n");
471                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
472                 rc = ost_close(req);
473                 break;
474         case OST_WRITE:
475                 CDEBUG(D_INODE, "write\n");
476                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
477                 rc = ost_brw_write(req);
478                 /* ost_brw sends its own replies */
479                 RETURN(rc);
480         case OST_READ:
481                 CDEBUG(D_INODE, "read\n");
482                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
483                 rc = ost_brw_read(req);
484                 /* ost_brw sends its own replies */
485                 RETURN(rc);
486         case OST_PUNCH:
487                 CDEBUG(D_INODE, "punch\n");
488                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
489                 rc = ost_punch(req);
490                 break;
491         case OST_STATFS:
492                 CDEBUG(D_INODE, "statfs\n");
493                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
494                 rc = ost_statfs(req);
495                 break;
496         case LDLM_ENQUEUE:
497                 CDEBUG(D_INODE, "enqueue\n");
498                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
499                 rc = ldlm_handle_enqueue(req);
500                 if (rc)
501                         break;
502                 RETURN(0);
503         case LDLM_CONVERT:
504                 CDEBUG(D_INODE, "convert\n");
505                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
506                 rc = ldlm_handle_convert(req);
507                 if (rc)
508                         break;
509                 RETURN(0);
510         case LDLM_CANCEL:
511                 CDEBUG(D_INODE, "cancel\n");
512                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
513                 rc = ldlm_handle_cancel(req);
514                 if (rc)
515                         break;
516                 RETURN(0);
517         case LDLM_BL_CALLBACK:
518         case LDLM_CP_CALLBACK:
519                 CDEBUG(D_INODE, "callback\n");
520                 CERROR("callbacks should not happen on OST\n");
521                 LBUG();
522                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
523                 break;
524         default:
525                 req->rq_status = -ENOTSUPP;
526                 rc = ptlrpc_error(req->rq_svc, req);
527                 RETURN(rc);
528         }
529
530         EXIT;
531 out:
532         //req->rq_status = rc;
533         if (rc) {
534                 CERROR("ost: processing error (opcode=%d): %d\n",
535                        req->rq_reqmsg->opc, rc);
536                 ptlrpc_error(req->rq_svc, req);
537         } else {
538                 CDEBUG(D_INODE, "sending reply\n");
539                 if (req->rq_repmsg == NULL)
540                         CERROR("handler for opcode %d returned rc=0 without "
541                                "creating rq_repmsg; needs to return rc != "
542                                "0!\n", req->rq_reqmsg->opc);
543                 ptlrpc_reply(req->rq_svc, req);
544         }
545
546         return 0;
547 }
548
549 #define OST_NUM_THREADS 6
550
551 /* mount the file system (secretly) */
552 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
553 {
554         struct obd_ioctl_data* data = buf;
555         struct ost_obd *ost = &obddev->u.ost;
556         struct obd_device *tgt;
557         int err;
558         int i;
559         ENTRY;
560
561         if (data->ioc_inllen1 < 1) {
562                 CERROR("requires a TARGET OBD UUID\n");
563                 RETURN(-EINVAL);
564         }
565         if (data->ioc_inllen1 > 37) {
566                 CERROR("OBD UUID must be less than 38 characters\n");
567                 RETURN(-EINVAL);
568         }
569
570         MOD_INC_USE_COUNT;
571         tgt = class_uuid2obd(data->ioc_inlbuf1);
572         if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
573             !(tgt->obd_flags & OBD_SET_UP)) {
574                 CERROR("device not attached or not set up (%d)\n",
575                        data->ioc_dev);
576                 GOTO(error_dec, err = -EINVAL);
577         }
578
579         err = obd_connect(&ost->ost_conn, tgt);
580         if (err) {
581                 CERROR("fail to connect to device %d\n", data->ioc_dev);
582                 GOTO(error_dec, err = -EINVAL);
583         }
584
585         ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL,
586                                            OSC_REPLY_PORTAL, "self",ost_handle);
587         if (!ost->ost_service) {
588                 CERROR("failed to start service\n");
589                 GOTO(error_disc, err = -EINVAL);
590         }
591
592         for (i = 0; i < OST_NUM_THREADS; i++) {
593                 char name[32];
594                 sprintf(name, "lustre_ost_%02d", i);
595                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
596                 if (err) {
597                         CERROR("error starting thread #%d: rc %d\n", i, err);
598                         GOTO(error_disc, err = -EINVAL);
599                 }
600         }
601
602         RETURN(0);
603
604 error_disc:
605         obd_disconnect(&ost->ost_conn);
606 error_dec:
607         MOD_DEC_USE_COUNT;
608         RETURN(err);
609 }
610
611 static int ost_cleanup(struct obd_device * obddev)
612 {
613         struct ost_obd *ost = &obddev->u.ost;
614         int err;
615
616         ENTRY;
617
618         if ( !list_empty(&obddev->obd_exports) ) {
619                 CERROR("still has clients!\n");
620                 RETURN(-EBUSY);
621         }
622
623         ptlrpc_stop_all_threads(ost->ost_service);
624         ptlrpc_unregister_service(ost->ost_service);
625
626         err = obd_disconnect(&ost->ost_conn);
627         if (err) {
628                 CERROR("lustre ost: fail to disconnect device\n");
629                 RETURN(-EINVAL);
630         }
631
632         MOD_DEC_USE_COUNT;
633         RETURN(0);
634 }
635
636 /* use obd ops to offer management infrastructure */
637 static struct obd_ops ost_obd_ops = {
638         o_setup:       ost_setup,
639         o_cleanup:     ost_cleanup,
640 };
641
642 static int __init ost_init(void)
643 {
644         class_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
645         return 0;
646 }
647
648 static void __exit ost_exit(void)
649 {
650         class_unregister_type(LUSTRE_OST_NAME);
651 }
652
653 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
654 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
655 MODULE_LICENSE("GPL");
656
657 module_init(ost_init);
658 module_exit(ost_exit);