Whamcloud - gitweb
*** empty log message ***
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
53
54
55
56 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
57 {
58         struct obd_conn conn; 
59         int rc;
60
61         ENTRY;
62         
63         conn.oc_id = req->rq_req.ost->connid;
64         conn.oc_dev = ost->ost_tgt;
65
66         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
67                           &req->rq_replen, &req->rq_repbuf); 
68         if (rc) { 
69                 CERROR("cannot pack reply\n"); 
70                 return rc;
71         }
72
73         req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); 
74
75         EXIT;
76         return 0;
77 }
78
79 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
80 {
81         struct obd_conn conn; 
82         int rc;
83
84         ENTRY;
85         
86         conn.oc_id = req->rq_req.ost->connid;
87         conn.oc_dev = ost->ost_tgt;
88
89         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
90                           &req->rq_replen, &req->rq_repbuf); 
91         if (rc) { 
92                 CERROR("cannot pack reply\n"); 
93                 return rc;
94         }
95         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
96         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
97
98         req->rq_rep.ost->result =  obd_getattr(&conn, &req->rq_rep.ost->oa); 
99
100         EXIT;
101         return 0;
102 }
103
104 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
105 {
106         struct obd_conn conn; 
107         int rc;
108
109         ENTRY;
110         
111         conn.oc_id = req->rq_req.ost->connid;
112         conn.oc_dev = ost->ost_tgt;
113
114         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
115                           &req->rq_replen, &req->rq_repbuf); 
116         if (rc) { 
117                 CERROR("cannot pack reply\n"); 
118                 return rc;
119         }
120
121         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
122                sizeof(req->rq_req.ost->oa));
123
124         req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); 
125
126         EXIT;
127         return 0;
128 }
129
130 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
131 {
132         struct obd_conn conn; 
133         int rc;
134
135         ENTRY;
136         
137         conn.oc_id = req->rq_req.ost->connid;
138         conn.oc_dev = ost->ost_tgt;
139
140         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
141                           &req->rq_replen, &req->rq_repbuf); 
142         if (rc) { 
143                 CERROR("cannot pack reply\n"); 
144                 return rc;
145         }
146
147         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
148                sizeof(req->rq_req.ost->oa));
149
150         req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, 
151                                             req->rq_rep.ost->oa.o_size,
152                                             req->rq_rep.ost->oa.o_blocks); 
153
154         EXIT;
155         return 0;
156 }
157
158
159 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
160 {
161         struct obd_conn conn; 
162         int rc;
163
164         ENTRY;
165         
166         conn.oc_id = req->rq_req.ost->connid;
167         conn.oc_dev = ost->ost_tgt;
168
169         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
170                           &req->rq_replen, &req->rq_repbuf); 
171         if (rc) { 
172                 CERROR("cannot pack reply\n"); 
173                 return rc;
174         }
175
176         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
177                sizeof(req->rq_req.ost->oa));
178
179         req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); 
180
181         EXIT;
182         return 0;
183 }
184
185 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
186 {
187         struct obd_conn conn; 
188         int rc;
189
190         ENTRY;
191         
192         conn.oc_dev = ost->ost_tgt;
193
194         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
195                           &req->rq_replen, &req->rq_repbuf); 
196         if (rc) { 
197                 CERROR("cannot pack reply\n"); 
198                 return rc;
199         }
200
201         req->rq_rep.ost->result = obd_connect(&conn);
202
203         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
204         req->rq_rep.ost->connid = conn.oc_id;
205         EXIT;
206         return 0;
207 }
208
209 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
210 {
211         struct obd_conn conn; 
212         int rc;
213
214         ENTRY;
215         
216         conn.oc_dev = ost->ost_tgt;
217         conn.oc_id = req->rq_req.ost->connid;
218
219         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
220                           &req->rq_replen, &req->rq_repbuf); 
221         if (rc) { 
222                 CERROR("cannot pack reply\n"); 
223                 return rc;
224         }
225         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
226         req->rq_rep.ost->result = obd_disconnect(&conn);
227
228         EXIT;
229         return 0;
230 }
231
232 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
233 {
234         struct obd_conn conn; 
235         int rc;
236         int vallen;
237         void *val;
238         char *ptr; 
239
240         ENTRY;
241         
242         conn.oc_id = req->rq_req.ost->connid;
243         conn.oc_dev = ost->ost_tgt;
244
245         ptr = ost_req_buf1(req->rq_req.ost);
246         req->rq_rep.ost->result = obd_get_info(&conn, 
247                                                req->rq_req.ost->buflen1, ptr, 
248                                                &vallen, &val); 
249
250         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
251                           &req->rq_rep, &req->rq_replen, &req->rq_repbuf); 
252         if (rc) { 
253                 CERROR("cannot pack reply\n"); 
254                 return rc;
255         }
256
257         EXIT;
258         return 0;
259 }
260
261 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
262 {
263         struct ptlrpc_bulk_desc **bulk_vec = NULL;
264         struct ptlrpc_bulk_desc *bulk = NULL;
265         struct obd_conn conn; 
266         int rc;
267         int i, j;
268         int objcount, niocount;
269         char *tmp1, *tmp2, *end2;
270         char *res = NULL;
271         int cmd;
272         struct niobuf *nb, *src;
273         struct obd_ioobj *ioo;
274         struct ost_req *r = req->rq_req.ost;
275
276         ENTRY;
277         
278         tmp1 = ost_req_buf1(r);
279         tmp2 = ost_req_buf2(r);
280         end2 = tmp2 + req->rq_req.ost->buflen2;
281         objcount = r->buflen1 / sizeof(*ioo); 
282         niocount = r->buflen2 / sizeof(*nb); 
283         cmd = r->cmd;
284
285         conn.oc_id = req->rq_req.ost->connid;
286         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
287
288         for (i = 0; i < objcount; i++) {
289                 ost_unpack_ioo((void *)&tmp1, &ioo);
290                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
291                         rc = -EFAULT;
292                         break; 
293                 }
294                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
295                         ost_unpack_niobuf((void *)&tmp2, &nb); 
296                 }
297         }
298
299         rc = ost_pack_rep(NULL, 0, NULL, 0,
300                           &req->rq_rephdr, &req->rq_rep,
301                           &req->rq_replen, &req->rq_repbuf);
302         if (rc) {
303                 CERROR("cannot pack reply\n"); 
304                 return rc;
305         }
306         OBD_ALLOC(res, sizeof(struct niobuf) * niocount);
307         if (res == NULL) {
308                 EXIT;
309                 return -ENOMEM;
310         }
311
312         /* The unpackers move tmp1 and tmp2, so reset them before using */
313         tmp1 = ost_req_buf1(r);
314         tmp2 = ost_req_buf2(r);
315         req->rq_rep.ost->result = obd_preprw
316                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
317                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
318
319         if (req->rq_rep.ost->result) {
320                 EXIT;
321                 goto out;
322         }
323
324         for (i = 0; i < niocount; i++) {
325                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
326                 if (bulk == NULL) {
327                         CERROR("cannot alloc bulk desc\n");
328                         rc = -ENOMEM;
329                         goto out;
330                 }
331
332                 src = &((struct niobuf *)tmp2)[i];
333
334                 bulk->b_xid = src->xid;
335                 bulk->b_buf = (void *)(unsigned long)src->addr;
336                 bulk->b_buflen = PAGE_SIZE;
337                 rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
338                 if (rc) {
339                         EXIT;
340                         goto out;
341                 }
342                 wait_event_interruptible(bulk->b_waitq,
343                                          ptlrpc_check_bulk_sent(bulk));
344
345                 if (bulk->b_flags == PTL_RPC_INTR) {
346                         EXIT;
347                         goto out;
348                 }
349
350                 OBD_FREE(bulk, sizeof(*bulk));
351                 bulk = NULL;
352         }
353
354 #if 0
355         /* Local delivery */
356         dst = &((struct niobuf *)tmp2)[i];
357         memcpy((void *)(unsigned long)dst->addr,
358                (void *)(unsigned long)src->addr, PAGE_SIZE);
359 #endif
360         barrier();
361
362  out:
363         if (res != NULL)
364                 OBD_FREE(res, sizeof(struct niobuf) * niocount);
365         if (bulk != NULL)
366                 OBD_FREE(bulk, sizeof(*bulk));
367         if (bulk_vec != NULL) {
368                 for (i = 0; i < niocount; i++) {
369                         if (bulk_vec[i] != NULL)
370                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
371                 }
372                 OBD_FREE(bulk_vec,
373                          niocount * sizeof(struct ptlrpc_bulk_desc *));
374         }
375
376         EXIT;
377         return 0;
378 }
379
380 int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
381 {
382         struct ptlrpc_bulk_desc **bulk_vec = NULL;
383         struct ptlrpc_bulk_desc *bulk = NULL;
384         struct obd_conn conn; 
385         int rc;
386         int i, j;
387         int objcount, niocount;
388         char *tmp1, *tmp2, *end2;
389         char *res;
390         int cmd;
391         struct niobuf *nb, *dst;
392         struct obd_ioobj *ioo;
393         struct ost_req *r = req->rq_req.ost;
394
395         ENTRY;
396         
397         tmp1 = ost_req_buf1(r);
398         tmp2 = ost_req_buf2(r);
399         end2 = tmp2 + req->rq_req.ost->buflen2;
400         objcount = r->buflen1 / sizeof(*ioo); 
401         niocount = r->buflen2 / sizeof(*nb); 
402         cmd = r->cmd;
403
404         conn.oc_id = req->rq_req.ost->connid;
405         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
406
407         for (i = 0; i < objcount; i++) {
408                 ost_unpack_ioo((void *)&tmp1, &ioo);
409                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
410                         rc = -EFAULT;
411                         break; 
412                 }
413                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
414                         ost_unpack_niobuf((void *)&tmp2, &nb); 
415                 }
416         }
417
418         rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
419                           &req->rq_rephdr, &req->rq_rep,
420                           &req->rq_replen, &req->rq_repbuf);
421         if (rc) { 
422                 CERROR("cannot pack reply\n"); 
423                 return rc;
424         }
425         res = ost_rep_buf2(req->rq_rep.ost);
426
427         /* The unpackers move tmp1 and tmp2, so reset them before using */
428         tmp1 = ost_req_buf1(r);
429         tmp2 = ost_req_buf2(r);
430         req->rq_rep.ost->result = obd_preprw
431                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
432                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
433
434         if (req->rq_rep.ost->result) {
435                 EXIT;
436                 goto out;
437         }
438
439         /* Setup buffers for the incoming pages, then send the niobufs
440          * describing those buffers to the OSC. */
441         OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *));
442         if (bulk_vec == NULL) {
443                 CERROR("cannot alloc bulk desc vector\n");
444                 return -ENOMEM;
445         }
446         memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *));
447
448         for (i = 0; i < niocount; i++) {
449                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
450
451                 bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
452                 if (bulk_vec[i] == NULL) {
453                         CERROR("cannot alloc bulk desc\n");
454                         rc = -ENOMEM;
455                         goto out;
456                 }
457
458                 spin_lock(&srv->srv_lock);
459                 bulk_vec[i]->b_xid = srv->srv_xid++;
460                 spin_unlock(&srv->srv_lock);
461
462                 dst = &((struct niobuf *)res)[i];
463                 dst->xid = HTON__u32(bulk_vec[i]->b_xid);
464
465                 bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
466                 bulk_vec[i]->b_buflen = PAGE_SIZE;
467                 bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
468                 rc = ptlrpc_register_bulk(bulk_vec[i]);
469                 if (rc)
470                         goto out;
471
472 #if 0
473                 /* Local delivery */
474                 src = &((struct niobuf *)tmp2)[i];
475                 memcpy((void *)(unsigned long)dst->addr,
476                        (void *)(unsigned long)src->addr, src->len);
477 #endif
478         }
479         barrier();
480
481  out:
482         if (bulk != NULL)
483                 OBD_FREE(bulk, sizeof(*bulk));
484         if (bulk_vec != NULL) {
485                 for (i = 0; i < niocount; i++) {
486                         if (bulk_vec[i] != NULL)
487                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
488                 }
489                 OBD_FREE(bulk_vec,
490                          niocount * sizeof(struct ptlrpc_bulk_desc *));
491         }
492
493         EXIT;
494         return 0;
495 }
496
497 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
498 {
499         struct ost_req *r = req->rq_req.ost;
500         int cmd = r->cmd;
501
502         if (cmd == OBD_BRW_READ)
503                 return ost_brw_read(obddev, req);
504         else
505                 return ost_brw_write(obddev, req);
506 }
507
508 int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
509 {
510         struct obd_conn conn; 
511         int rc, i, j, cmd;
512         int objcount, niocount;
513         char *tmp1, *tmp2, *end2;
514         struct niobuf *nb;
515         struct obd_ioobj *ioo;
516         struct ost_req *r = req->rq_req.ost;
517
518         ENTRY;
519         
520         tmp1 = ost_req_buf1(r);
521         tmp2 = ost_req_buf2(r);
522         end2 = tmp2 + req->rq_req.ost->buflen2;
523         objcount = r->buflen1 / sizeof(*ioo); 
524         niocount = r->buflen2 / sizeof(*nb); 
525         cmd = r->cmd;
526
527         conn.oc_id = req->rq_req.ost->connid;
528         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
529
530         for (i = 0; i < objcount; i++) {
531                 ost_unpack_ioo((void *)&tmp1, &ioo);
532                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
533                         rc = -EFAULT;
534                         break; 
535                 }
536                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
537                         ost_unpack_niobuf((void *)&tmp2, &nb); 
538                 }
539         }
540
541         rc = ost_pack_rep(NULL, 0, NULL, 0,
542                           &req->rq_rephdr, &req->rq_rep,
543                           &req->rq_replen, &req->rq_repbuf);
544         if (rc) { 
545                 CERROR("cannot pack reply\n"); 
546                 return rc;
547         }
548
549         /* The unpackers move tmp1 and tmp2, so reset them before using */
550         tmp1 = ost_req_buf1(r);
551         tmp2 = ost_req_buf2(r);
552         req->rq_rep.ost->result = obd_commitrw
553                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
554                  niocount, (struct niobuf *)tmp2);
555
556         return 0;
557 }
558
559 static int ost_handle(struct obd_device *obddev, 
560                struct ptlrpc_service *svc, 
561                struct ptlrpc_request *req)
562 {
563         int rc;
564         struct ost_obd *ost = &obddev->u.ost;
565         struct ptlreq_hdr *hdr;
566
567         ENTRY;
568
569         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
570         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
571                 CERROR("lustre_ost: wrong packet type sent %d\n",
572                        NTOH__u32(hdr->type));
573                 rc = -EINVAL;
574                 goto out;
575         }
576
577         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
578                             &req->rq_reqhdr, &req->rq_req);
579         if (rc) { 
580                 CERROR("lustre_ost: Invalid request\n");
581                 EXIT; 
582                 goto out;
583         }
584
585         switch (req->rq_reqhdr->opc) { 
586
587         case OST_CONNECT:
588                 CDEBUG(D_INODE, "connect\n");
589                 rc = ost_connect(ost, req);
590                 break;
591         case OST_DISCONNECT:
592                 CDEBUG(D_INODE, "disconnect\n");
593                 rc = ost_disconnect(ost, req);
594                 break;
595         case OST_GET_INFO:
596                 CDEBUG(D_INODE, "get_info\n");
597                 rc = ost_get_info(ost, req);
598                 break;
599         case OST_CREATE:
600                 CDEBUG(D_INODE, "create\n");
601                 rc = ost_create(ost, req);
602                 break;
603         case OST_DESTROY:
604                 CDEBUG(D_INODE, "destroy\n");
605                 rc = ost_destroy(ost, req);
606                 break;
607         case OST_GETATTR:
608                 CDEBUG(D_INODE, "getattr\n");
609                 rc = ost_getattr(ost, req);
610                 break;
611         case OST_SETATTR:
612                 CDEBUG(D_INODE, "setattr\n");
613                 rc = ost_setattr(ost, req);
614                 break;
615         case OST_BRW:
616                 CDEBUG(D_INODE, "brw\n");
617                 rc = ost_brw(ost, req);
618                 break;
619         case OST_BRW_COMPLETE:
620                 CDEBUG(D_INODE, "brw_complete\n");
621                 rc = ost_brw_complete(ost, req);
622                 break;
623         case OST_PUNCH:
624                 CDEBUG(D_INODE, "punch\n");
625                 rc = ost_punch(ost, req);
626                 break;
627         default:
628                 req->rq_status = -ENOTSUPP;
629                 return ptlrpc_error(obddev, svc, req);
630         }
631
632 out:
633         req->rq_status = rc;
634         if (rc) { 
635                 CERROR("ost: processing error %d\n", rc);
636                 ptlrpc_error(obddev, svc, req);
637         } else { 
638                 CDEBUG(D_INODE, "sending reply\n"); 
639                 ptlrpc_reply(obddev, svc, req); 
640         }
641
642         return 0;
643 }
644
645
646 /* mount the file system (secretly) */
647 static int ost_setup(struct obd_device *obddev, obd_count len,
648                         void *buf)
649                         
650 {
651         struct obd_ioctl_data* data = buf;
652         struct ost_obd *ost = &obddev->u.ost;
653         struct obd_device *tgt;
654         int err; 
655         ENTRY;
656
657         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
658                 EXIT;
659                 return -ENODEV;
660         }
661
662         tgt = &obd_dev[data->ioc_dev];
663         ost->ost_tgt = tgt;
664         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
665              ! (tgt->obd_flags & OBD_SET_UP) ){
666                 CERROR("device not attached or not set up (%d)\n", 
667                        data->ioc_dev);
668                 EXIT;
669                 return -EINVAL;
670         } 
671
672         ost->ost_conn.oc_dev = tgt;
673         err = obd_connect(&ost->ost_conn);
674         if (err) { 
675                 CERROR("fail to connect to device %d\n", data->ioc_dev); 
676                 return -EINVAL;
677         }
678
679         ost->ost_service = ptlrpc_init_svc( 64 * 1024, 
680                                             OST_REQUEST_PORTAL,
681                                             OSC_REPLY_PORTAL,
682                                             "self", 
683                                             ost_unpack_req,
684                                             ost_pack_rep,
685                                             ost_handle);
686         if (!ost->ost_service) { 
687                 obd_disconnect(&ost->ost_conn); 
688                 return -EINVAL;
689         }
690                                             
691         rpc_register_service(ost->ost_service, "self");
692
693         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); 
694         if (err) { 
695                 obd_disconnect(&ost->ost_conn); 
696                 return -EINVAL;
697         }
698                 
699         MOD_INC_USE_COUNT;
700         EXIT; 
701         return 0;
702
703
704 static int ost_cleanup(struct obd_device * obddev)
705 {
706         struct ost_obd *ost = &obddev->u.ost;
707         int err;
708
709         ENTRY;
710
711         if ( !list_empty(&obddev->obd_gen_clients) ) {
712                 CERROR("still has clients!\n");
713                 EXIT;
714                 return -EBUSY;
715         }
716
717         ptlrpc_stop_thread(ost->ost_service);
718         rpc_unregister_service(ost->ost_service);
719
720         if (!list_empty(&ost->ost_service->srv_reqs)) {
721                 // XXX reply with errors and clean up
722                 CERROR("Request list not empty!\n");
723         }
724         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
725
726         err = obd_disconnect(&ost->ost_conn);
727         if (err) { 
728                 CERROR("lustre ost: fail to disconnect device\n");
729                 return -EINVAL;
730         }
731
732         MOD_DEC_USE_COUNT;
733         EXIT;
734         return 0;
735 }
736
737 /* use obd ops to offer management infrastructure */
738 static struct obd_ops ost_obd_ops = {
739         o_setup:       ost_setup,
740         o_cleanup:     ost_cleanup,
741 };
742
743 static int __init ost_init(void)
744 {
745         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
746         return 0;
747 }
748
749 static void __exit ost_exit(void)
750 {
751         obd_unregister_type(LUSTRE_OST_NAME);
752 }
753
754 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
755 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
756 MODULE_LICENSE("GPL");
757
758 module_init(ost_init);
759 module_exit(ost_exit);