Whamcloud - gitweb
- more fixes to export handling. Probably tomorrow they can be
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40
41 static int ost_destroy(struct ptlrpc_request *req)
42 {
43         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
44         struct ost_body *body;
45         int rc, size = sizeof(*body);
46         ENTRY;
47
48         body = lustre_msg_buf(req->rq_reqmsg, 0);
49
50         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
51         if (rc)
52                 RETURN(rc);
53
54         req->rq_status = obd_destroy(conn, &body->oa);
55         RETURN(0);
56 }
57
58 static int ost_getattr(struct ptlrpc_request *req)
59 {
60         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
61         struct ost_body *body, *repbody;
62         int rc, size = sizeof(*body);
63         ENTRY;
64
65         body = lustre_msg_buf(req->rq_reqmsg, 0);
66
67         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
68         if (rc)
69                 RETURN(rc);
70
71         repbody = lustre_msg_buf(req->rq_repmsg, 0);
72         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
73         req->rq_status = obd_getattr(conn, &repbody->oa);
74         RETURN(0);
75 }
76
77 static int ost_open(struct ptlrpc_request *req)
78 {
79         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
80         struct ost_body *body, *repbody;
81         int rc, size = sizeof(*body);
82         ENTRY;
83
84         body = lustre_msg_buf(req->rq_reqmsg, 0);
85
86         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
87         if (rc)
88                 RETURN(rc);
89
90         repbody = lustre_msg_buf(req->rq_repmsg, 0);
91         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
92         req->rq_status = obd_open(conn, &repbody->oa);
93         RETURN(0);
94 }
95
96 static int ost_close(struct ptlrpc_request *req)
97 {
98         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
99         struct ost_body *body, *repbody;
100         int rc, size = sizeof(*body);
101         ENTRY;
102
103         body = lustre_msg_buf(req->rq_reqmsg, 0);
104
105         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
106         if (rc)
107                 RETURN(rc);
108
109         repbody = lustre_msg_buf(req->rq_repmsg, 0);
110         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
111         req->rq_status = obd_close(conn, &repbody->oa);
112         RETURN(0);
113 }
114
115 static int ost_create(struct ptlrpc_request *req)
116 {
117         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
118         struct ost_body *body, *repbody;
119         int rc, size = sizeof(*body);
120         ENTRY;
121
122         body = lustre_msg_buf(req->rq_reqmsg, 0);
123
124         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
125         if (rc)
126                 RETURN(rc);
127
128         repbody = lustre_msg_buf(req->rq_repmsg, 0);
129         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
130         req->rq_status = obd_create(conn, &repbody->oa);
131         RETURN(0);
132 }
133
134 static int ost_punch(struct ptlrpc_request *req)
135 {
136         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
137         struct ost_body *body, *repbody;
138         int rc, size = sizeof(*body);
139         ENTRY;
140
141         body = lustre_msg_buf(req->rq_reqmsg, 0);
142
143         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
144         if (rc)
145                 RETURN(rc);
146
147         repbody = lustre_msg_buf(req->rq_repmsg, 0);
148         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
149         req->rq_status = obd_punch(conn, &repbody->oa,
150                                    repbody->oa.o_blocks, repbody->oa.o_size);
151         RETURN(0);
152 }
153
154 static int ost_setattr(struct ptlrpc_request *req)
155 {
156         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
157         struct ost_body *body, *repbody;
158         int rc, size = sizeof(*body);
159         ENTRY;
160
161         body = lustre_msg_buf(req->rq_reqmsg, 0);
162
163         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
164         if (rc)
165                 RETURN(rc);
166
167         repbody = lustre_msg_buf(req->rq_repmsg, 0);
168         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
169         req->rq_status = obd_setattr(conn, &repbody->oa);
170         RETURN(0);
171 }
172
173 static int ost_connect(struct ptlrpc_request *req)
174 {
175         struct ost_body *body;
176         struct obd_device *target;
177         struct obd_export *export;
178         struct obd_conn conn;
179         char *uuid;
180         int rc, size = sizeof(*body), i;
181         ENTRY;
182
183         uuid = lustre_msg_buf(req->rq_reqmsg, 0);
184         if (req->rq_reqmsg->buflens[0] > 37) {
185                 /* Invalid UUID */
186                 req->rq_status = -EINVAL;
187                 RETURN(0);
188         }
189
190         i = obd_class_uuid2dev(uuid);
191         if (i == -1) {
192                 req->rq_status = -ENODEV;
193                 RETURN(0);
194         }
195
196         target = &obd_dev[i];
197         if (!target) {
198                 req->rq_status = -ENODEV;
199                 RETURN(0);
200         }
201
202         conn.addr = req->rq_reqmsg->addr;
203         conn.cookie = req->rq_reqmsg->cookie;
204
205         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
206         if (rc)
207                 RETURN(rc);
208
209         req->rq_status = obd_connect(&conn, target);
210         req->rq_repmsg->addr = conn.addr;
211         req->rq_repmsg->cookie = conn.cookie;
212
213         export = gen_client(&conn); 
214         if (!export)
215                 LBUG();
216
217         req->rq_export = export;
218         export->export_connection = req->rq_connection;
219         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
220         body = lustre_msg_buf(req->rq_repmsg, 0);
221         body->connid = conn.oc_id;
222         RETURN(0);
223 }
224
225 static int ost_disconnect(struct ptlrpc_request *req)
226 {
227         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
228         struct ost_body *body;
229         int rc;
230         ENTRY;
231
232         body = lustre_msg_buf(req->rq_reqmsg, 0);
233
234         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
235         if (rc)
236                 RETURN(rc);
237
238         req->rq_status = obd_disconnect(conn);
239         RETURN(0);
240 }
241
242 static int ost_get_info(struct ptlrpc_request *req)
243 {
244         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
245         struct ost_body *body;
246         int rc, size[2] = {sizeof(*body)};
247         char *bufs[2] = {NULL, NULL}, *ptr;
248         ENTRY;
249
250         body = lustre_msg_buf(req->rq_reqmsg, 0);
251
252         ptr = lustre_msg_buf(req->rq_reqmsg, 1);
253         if (!ptr)
254                 RETURN(-EINVAL);
255
256         req->rq_status = obd_get_info(conn, req->rq_reqmsg->buflens[1], ptr,
257                                       &(size[1]), (void **)&(bufs[1]));
258
259         rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
260         if (rc)
261                 CERROR("cannot pack reply\n");
262
263         RETURN(rc);
264 }
265
266 static int ost_brw_read(struct ptlrpc_request *req)
267 {
268         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
269         struct ptlrpc_bulk_desc *desc;
270         void *tmp1, *tmp2, *end2;
271         struct niobuf_remote *remote_nb;
272         struct niobuf_local *local_nb = NULL;
273         struct obd_ioobj *ioo;
274         struct ost_body *body;
275         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
276         ENTRY;
277
278         body = lustre_msg_buf(req->rq_reqmsg, 0);
279         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
280         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
281         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
282         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
283         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
284         cmd = body->data;
285
286         for (i = 0; i < objcount; i++) {
287                 ost_unpack_ioo(&tmp1, &ioo);
288                 if (tmp2 + ioo->ioo_bufcnt > end2) {
289                         LBUG();
290                         GOTO(out, rc = -EFAULT);
291                 }
292                 for (j = 0; j < ioo->ioo_bufcnt; j++)
293                         ost_unpack_niobuf(&tmp2, &remote_nb);
294         }
295
296         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
297         if (rc)
298                 RETURN(rc);
299         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
300         if (local_nb == NULL)
301                 RETURN(-ENOMEM);
302
303         /* The unpackers move tmp1 and tmp2, so reset them before using */
304         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
305         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
306         req->rq_status = obd_preprw(cmd, conn, objcount,
307                                     tmp1, niocount, tmp2, local_nb, NULL);
308
309         if (req->rq_status)
310                 GOTO(out_local, 0);
311
312         desc = ptlrpc_prep_bulk(req->rq_connection);
313         if (desc == NULL)
314                 GOTO(out_local, rc = -ENOMEM);
315         desc->b_portal = OST_BULK_PORTAL;
316
317         for (i = 0; i < niocount; i++) {
318                 struct ptlrpc_bulk_page *bulk;
319                 bulk = ptlrpc_prep_bulk_page(desc);
320                 if (bulk == NULL)
321                         GOTO(out_bulk, rc = -ENOMEM);
322                 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
323                 bulk->b_xid = remote_nb->xid;
324                 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
325                 bulk->b_buflen = PAGE_SIZE;
326         }
327
328         rc = ptlrpc_send_bulk(desc);
329         if (rc)
330                 GOTO(out_bulk, rc);
331
332         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
333         if (desc->b_flags & PTL_RPC_FL_INTR)
334                 rc = -EINTR;
335
336         /* The unpackers move tmp1 and tmp2, so reset them before using */
337         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
338         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
339         req->rq_status = obd_commitrw(cmd, conn, objcount,
340                                       tmp1, niocount, local_nb, NULL);
341
342 out_bulk:
343         ptlrpc_free_bulk(desc);
344 out_local:
345         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
346 out:
347         if (rc)
348                 ptlrpc_error(req->rq_svc, req);
349         else
350                 ptlrpc_reply(req->rq_svc, req);
351         RETURN(rc);
352 }
353
354 static int ost_brw_write(struct ptlrpc_request *req)
355 {
356         struct obd_conn *conn = (struct obd_conn *)req->rq_reqmsg;
357         struct ptlrpc_bulk_desc *desc;
358         struct niobuf_remote *remote_nb;
359         struct niobuf_local *local_nb, *lnb;
360         struct obd_ioobj *ioo;
361         struct ost_body *body;
362         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
363         void *tmp1, *tmp2, *end2;
364         void *desc_priv = NULL;
365         int reply_sent = 0;
366         ENTRY;
367
368         body = lustre_msg_buf(req->rq_reqmsg, 0);
369         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
370         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
371         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
372         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
373         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
374         cmd = body->data;
375
376         for (i = 0; i < objcount; i++) {
377                 ost_unpack_ioo((void *)&tmp1, &ioo);
378                 if (tmp2 + ioo->ioo_bufcnt > end2) {
379                         rc = -EFAULT;
380                         break;
381                 }
382                 for (j = 0; j < ioo->ioo_bufcnt; j++)
383                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
384         }
385
386         size[1] = niocount * sizeof(*remote_nb);
387         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
388         if (rc)
389                 GOTO(out, rc);
390         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
391
392         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
393         if (local_nb == NULL)
394                 GOTO(out, rc = -ENOMEM);
395
396         /* The unpackers move tmp1 and tmp2, so reset them before using */
397         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
398         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
399         req->rq_status = obd_preprw(cmd, conn, objcount,
400                                     tmp1, niocount, tmp2, local_nb, &desc_priv);
401         if (req->rq_status)
402                 GOTO(out_free, rc = 0); /* XXX is this correct? */
403
404         desc = ptlrpc_prep_bulk(req->rq_connection);
405         if (desc == NULL)
406                 GOTO(fail_preprw, rc = -ENOMEM);
407         desc->b_cb = NULL;
408         desc->b_portal = OSC_BULK_PORTAL;
409         desc->b_desc_private = desc_priv;
410         memcpy(&(desc->b_conn), &conn, sizeof(conn));
411
412         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
413                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
414                 struct ptlrpc_bulk_page *bulk;
415
416                 bulk = ptlrpc_prep_bulk_page(desc);
417                 if (bulk == NULL)
418                         GOTO(fail_bulk, rc = -ENOMEM);
419
420                 spin_lock(&srv->srv_lock);
421                 bulk->b_xid = srv->srv_xid++;
422                 spin_unlock(&srv->srv_lock);
423
424                 bulk->b_buf = lnb->addr;
425                 bulk->b_page = lnb->page;
426                 bulk->b_flags = lnb->flags;
427                 bulk->b_dentry = lnb->dentry;
428                 bulk->b_buflen = PAGE_SIZE;
429                 bulk->b_cb = NULL;
430
431                 /* this advances remote_nb */
432                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
433                                 bulk->b_xid);
434         }
435
436         rc = ptlrpc_register_bulk(desc);
437         if (rc)
438                 GOTO(fail_bulk, rc);
439
440         reply_sent = 1;
441         ptlrpc_reply(req->rq_svc, req);
442
443         wait_event_interruptible(desc->b_waitq,
444                                  desc->b_flags & PTL_BULK_FL_RCVD);
445
446         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
447                           desc->b_desc_private);
448         ptlrpc_free_bulk(desc);
449         EXIT;
450 out_free:
451         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
452 out:
453         if (!reply_sent) {
454                 if (rc)
455                         ptlrpc_error(req->rq_svc, req);
456                 else
457                         ptlrpc_reply(req->rq_svc, req);
458         }
459         return rc;
460
461 fail_bulk:
462         ptlrpc_free_bulk(desc);
463 fail_preprw:
464         /* FIXME: how do we undo the preprw? */
465         goto out_free;
466 }
467
468 static int ost_brw(struct ptlrpc_request *req)
469 {
470         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
471
472         if (body->data & OBD_BRW_WRITE)
473                 return ost_brw_write(req);
474         else
475                 return ost_brw_read(req);
476 }
477
478
479 static int ost_handle(struct ptlrpc_request *req)
480 {
481         int rc;
482         ENTRY;
483
484         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
485         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
486                 CERROR("lustre_mds: Invalid request\n");
487                 GOTO(out, rc);
488         }
489
490         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
491                 CERROR("lustre_mds: wrong packet type sent %d\n",
492                        req->rq_reqmsg->type);
493                 GOTO(out, rc = -EINVAL);
494         }
495
496         if (req->rq_reqmsg->opc != OST_CONNECT &&
497             req->rq_export == NULL)
498                 GOTO(out, rc = -ENOTCONN);
499
500         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
501                 GOTO(out, rc = -EINVAL);
502
503         switch (req->rq_reqmsg->opc) {
504         case OST_CONNECT:
505                 CDEBUG(D_INODE, "connect\n");
506                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
507                 rc = ost_connect(req);
508                 break;
509         case OST_DISCONNECT:
510                 CDEBUG(D_INODE, "disconnect\n");
511                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
512                 rc = ost_disconnect(req);
513                 break;
514         case OST_GET_INFO:
515                 CDEBUG(D_INODE, "get_info\n");
516                 OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
517                 rc = ost_get_info(req);
518                 break;
519         case OST_CREATE:
520                 CDEBUG(D_INODE, "create\n");
521                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
522                 rc = ost_create(req);
523                 break;
524         case OST_DESTROY:
525                 CDEBUG(D_INODE, "destroy\n");
526                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
527                 rc = ost_destroy(req);
528                 break;
529         case OST_GETATTR:
530                 CDEBUG(D_INODE, "getattr\n");
531                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
532                 rc = ost_getattr(req);
533                 break;
534         case OST_SETATTR:
535                 CDEBUG(D_INODE, "setattr\n");
536                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
537                 rc = ost_setattr(req);
538                 break;
539         case OST_OPEN:
540                 CDEBUG(D_INODE, "setattr\n");
541                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
542                 rc = ost_open(req);
543                 break;
544         case OST_CLOSE:
545                 CDEBUG(D_INODE, "setattr\n");
546                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
547                 rc = ost_close(req);
548                 break;
549         case OST_BRW:
550                 CDEBUG(D_INODE, "brw\n");
551                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
552                 rc = ost_brw(req);
553                 /* ost_brw sends its own replies */
554                 RETURN(rc);
555         case OST_PUNCH:
556                 CDEBUG(D_INODE, "punch\n");
557                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
558                 rc = ost_punch(req);
559                 break;
560 #if 0
561         case OST_STATFS:
562                 CDEBUG(D_INODE, "statfs\n");
563                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
564                 rc = ost_statfs(req);
565                 break;
566 #endif
567         default:
568                 req->rq_status = -ENOTSUPP;
569                 rc = ptlrpc_error(req->rq_svc, req);
570                 RETURN(rc);
571         }
572
573         EXIT;
574 out:
575         //req->rq_status = rc;
576         if (rc) {
577                 CERROR("ost: processing error %d\n", rc);
578                 ptlrpc_error(req->rq_svc, req);
579         } else {
580                 CDEBUG(D_INODE, "sending reply\n");
581                 ptlrpc_reply(req->rq_svc, req);
582         }
583
584         return 0;
585 }
586
587 #define OST_NUM_THREADS 6
588
589 /* mount the file system (secretly) */
590 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
591 {
592         struct obd_ioctl_data* data = buf;
593         struct ost_obd *ost = &obddev->u.ost;
594         struct obd_device *tgt;
595         int err;
596         int i;
597         ENTRY;
598
599         if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
600                 RETURN(-ENODEV);
601
602         MOD_INC_USE_COUNT;
603         tgt = &obd_dev[data->ioc_dev];
604         if (!(tgt->obd_flags & OBD_ATTACHED) ||
605             !(tgt->obd_flags & OBD_SET_UP)) {
606                 CERROR("device not attached or not set up (%d)\n",
607                        data->ioc_dev);
608                 GOTO(error_dec, err = -EINVAL);
609         }
610
611         err = obd_connect(&ost->ost_conn, tgt);
612         if (err) {
613                 CERROR("fail to connect to device %d\n", data->ioc_dev);
614                 GOTO(error_dec, err = -EINVAL);
615         }
616
617         obddev->obd_namespace =
618                 ldlm_namespace_new("ost", LDLM_NAMESPACE_SERVER);
619         if (obddev->obd_namespace == NULL)
620                 LBUG();
621
622         ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL,
623                                            OSC_REPLY_PORTAL, "self",ost_handle);
624         if (!ost->ost_service) {
625                 CERROR("failed to start service\n");
626                 GOTO(error_disc, err = -EINVAL);
627         }
628
629         for (i = 0; i < OST_NUM_THREADS; i++) {
630                 err = ptlrpc_start_thread(obddev, ost->ost_service,
631                                           "lustre_ost");
632                 if (err) {
633                         CERROR("error starting thread #%d: rc %d\n", i, err);
634                         GOTO(error_disc, err = -EINVAL);
635                 }
636         }
637
638         RETURN(0);
639
640 error_disc:
641         obd_disconnect(&ost->ost_conn);
642 error_dec:
643         MOD_DEC_USE_COUNT;
644         RETURN(err);
645 }
646
647 static int ost_cleanup(struct obd_device * obddev)
648 {
649         struct ost_obd *ost = &obddev->u.ost;
650         int err;
651
652         ENTRY;
653
654         if ( !list_empty(&obddev->obd_exports) ) {
655                 CERROR("still has clients!\n");
656                 RETURN(-EBUSY);
657         }
658
659         ptlrpc_stop_all_threads(ost->ost_service);
660         ptlrpc_unregister_service(ost->ost_service);
661
662         err = obd_disconnect(&ost->ost_conn);
663         if (err) {
664                 CERROR("lustre ost: fail to disconnect device\n");
665                 RETURN(-EINVAL);
666         }
667
668         ldlm_namespace_free(obddev->obd_namespace);
669
670         MOD_DEC_USE_COUNT;
671         RETURN(0);
672 }
673
674 /* use obd ops to offer management infrastructure */
675 static struct obd_ops ost_obd_ops = {
676         o_setup:       ost_setup,
677         o_cleanup:     ost_cleanup,
678 };
679
680 static int __init ost_init(void)
681 {
682         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
683         return 0;
684 }
685
686 static void __exit ost_exit(void)
687 {
688         obd_unregister_type(LUSTRE_OST_NAME);
689 }
690
691 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
692 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
693 MODULE_LICENSE("GPL");
694
695 module_init(ost_init);
696 module_exit(ost_exit);