Whamcloud - gitweb
- bulk handling from callbacks
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
53
54
55
56 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
57 {
58         struct obd_conn conn; 
59         int rc;
60
61         ENTRY;
62         
63         conn.oc_id = req->rq_req.ost->connid;
64         conn.oc_dev = ost->ost_tgt;
65
66         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
67                           &req->rq_replen, &req->rq_repbuf); 
68         if (rc) { 
69                 CERROR("cannot pack reply\n"); 
70                 return rc;
71         }
72
73         req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); 
74
75         EXIT;
76         return 0;
77 }
78
79 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
80 {
81         struct obd_conn conn; 
82         int rc;
83
84         ENTRY;
85         
86         conn.oc_id = req->rq_req.ost->connid;
87         conn.oc_dev = ost->ost_tgt;
88
89         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
90                           &req->rq_replen, &req->rq_repbuf); 
91         if (rc) { 
92                 CERROR("cannot pack reply\n"); 
93                 return rc;
94         }
95         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
96         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
97
98         req->rq_rep.ost->result =  obd_getattr(&conn, &req->rq_rep.ost->oa); 
99
100         EXIT;
101         return 0;
102 }
103
104 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
105 {
106         struct obd_conn conn; 
107         int rc;
108
109         ENTRY;
110         
111         conn.oc_id = req->rq_req.ost->connid;
112         conn.oc_dev = ost->ost_tgt;
113
114         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
115                           &req->rq_replen, &req->rq_repbuf); 
116         if (rc) { 
117                 CERROR("cannot pack reply\n"); 
118                 return rc;
119         }
120
121         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
122                sizeof(req->rq_req.ost->oa));
123
124         req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); 
125
126         EXIT;
127         return 0;
128 }
129
130 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
131 {
132         struct obd_conn conn; 
133         int rc;
134
135         ENTRY;
136         
137         conn.oc_id = req->rq_req.ost->connid;
138         conn.oc_dev = ost->ost_tgt;
139
140         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
141                           &req->rq_replen, &req->rq_repbuf); 
142         if (rc) { 
143                 CERROR("cannot pack reply\n"); 
144                 return rc;
145         }
146
147         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
148                sizeof(req->rq_req.ost->oa));
149
150         req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, 
151                                             req->rq_rep.ost->oa.o_size,
152                                             req->rq_rep.ost->oa.o_blocks); 
153
154         EXIT;
155         return 0;
156 }
157
158
159 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
160 {
161         struct obd_conn conn; 
162         int rc;
163
164         ENTRY;
165         
166         conn.oc_id = req->rq_req.ost->connid;
167         conn.oc_dev = ost->ost_tgt;
168
169         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
170                           &req->rq_replen, &req->rq_repbuf); 
171         if (rc) { 
172                 CERROR("cannot pack reply\n"); 
173                 return rc;
174         }
175
176         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
177                sizeof(req->rq_req.ost->oa));
178
179         req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); 
180
181         EXIT;
182         return 0;
183 }
184
185 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
186 {
187         struct obd_conn conn; 
188         int rc;
189
190         ENTRY;
191         
192         conn.oc_dev = ost->ost_tgt;
193
194         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
195                           &req->rq_replen, &req->rq_repbuf); 
196         if (rc) { 
197                 CERROR("cannot pack reply\n"); 
198                 return rc;
199         }
200
201         req->rq_rep.ost->result = obd_connect(&conn);
202
203         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
204         req->rq_rep.ost->connid = conn.oc_id;
205         EXIT;
206         return 0;
207 }
208
209 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
210 {
211         struct obd_conn conn; 
212         int rc;
213
214         ENTRY;
215         
216         conn.oc_dev = ost->ost_tgt;
217         conn.oc_id = req->rq_req.ost->connid;
218
219         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
220                           &req->rq_replen, &req->rq_repbuf); 
221         if (rc) { 
222                 CERROR("cannot pack reply\n"); 
223                 return rc;
224         }
225         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
226         req->rq_rep.ost->result = obd_disconnect(&conn);
227
228         EXIT;
229         return 0;
230 }
231
232 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
233 {
234         struct obd_conn conn; 
235         int rc;
236         int vallen;
237         void *val;
238         char *ptr; 
239
240         ENTRY;
241         
242         conn.oc_id = req->rq_req.ost->connid;
243         conn.oc_dev = ost->ost_tgt;
244
245         ptr = ost_req_buf1(req->rq_req.ost);
246         req->rq_rep.ost->result = obd_get_info(&conn, 
247                                                req->rq_req.ost->buflen1, ptr, 
248                                                &vallen, &val); 
249
250         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
251                           &req->rq_rep, &req->rq_replen, &req->rq_repbuf); 
252         if (rc) { 
253                 CERROR("cannot pack reply\n"); 
254                 return rc;
255         }
256
257         EXIT;
258         return 0;
259 }
260
261 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
262 {
263         struct ptlrpc_bulk_desc **bulk_vec = NULL;
264         struct ptlrpc_bulk_desc *bulk = NULL;
265         struct obd_conn conn; 
266         int rc;
267         int i, j;
268         int objcount, niocount;
269         char *tmp1, *tmp2, *end2;
270         char *res = NULL;
271         int cmd;
272         struct niobuf *nb, *src;
273         struct obd_ioobj *ioo;
274         struct ost_req *r = req->rq_req.ost;
275
276         ENTRY;
277         
278         tmp1 = ost_req_buf1(r);
279         tmp2 = ost_req_buf2(r);
280         end2 = tmp2 + req->rq_req.ost->buflen2;
281         objcount = r->buflen1 / sizeof(*ioo); 
282         niocount = r->buflen2 / sizeof(*nb); 
283         cmd = r->cmd;
284
285         conn.oc_id = req->rq_req.ost->connid;
286         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
287
288         for (i = 0; i < objcount; i++) {
289                 ost_unpack_ioo((void *)&tmp1, &ioo);
290                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
291                         rc = -EFAULT;
292                         break; 
293                 }
294                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
295                         ost_unpack_niobuf((void *)&tmp2, &nb); 
296                 }
297         }
298
299         rc = ost_pack_rep(NULL, 0, NULL, 0,
300                           &req->rq_rephdr, &req->rq_rep,
301                           &req->rq_replen, &req->rq_repbuf);
302         if (rc) {
303                 CERROR("cannot pack reply\n"); 
304                 return rc;
305         }
306         OBD_ALLOC(res, sizeof(struct niobuf) * niocount);
307         if (res == NULL) {
308                 EXIT;
309                 return -ENOMEM;
310         }
311
312         /* The unpackers move tmp1 and tmp2, so reset them before using */
313         tmp1 = ost_req_buf1(r);
314         tmp2 = ost_req_buf2(r);
315         req->rq_rep.ost->result = obd_preprw
316                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
317                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
318
319         if (req->rq_rep.ost->result) {
320                 EXIT;
321                 goto out;
322         }
323
324         for (i = 0; i < niocount; i++) {
325                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
326                 if (bulk == NULL) {
327                         CERROR("cannot alloc bulk desc\n");
328                         rc = -ENOMEM;
329                         goto out;
330                 }
331
332                 src = &((struct niobuf *)tmp2)[i];
333
334                 bulk->b_xid = src->xid;
335                 bulk->b_buf = (void *)(unsigned long)src->addr;
336                 bulk->b_buflen = PAGE_SIZE;
337                 rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
338                 if (rc) {
339                         EXIT;
340                         goto out;
341                 }
342                 wait_event_interruptible(bulk->b_waitq,
343                                          ptlrpc_check_bulk_sent(bulk));
344
345                 if (bulk->b_flags == PTL_RPC_INTR) {
346                         EXIT;
347                         goto out;
348                 }
349
350                 OBD_FREE(bulk, sizeof(*bulk));
351                 bulk = NULL;
352         }
353
354 #if 0
355         /* Local delivery */
356         dst = &((struct niobuf *)tmp2)[i];
357         memcpy((void *)(unsigned long)dst->addr,
358                (void *)(unsigned long)src->addr, PAGE_SIZE);
359 #endif
360         barrier();
361
362  out:
363         if (res != NULL)
364                 OBD_FREE(res, sizeof(struct niobuf) * niocount);
365         if (bulk != NULL)
366                 OBD_FREE(bulk, sizeof(*bulk));
367         if (bulk_vec != NULL) {
368                 for (i = 0; i < niocount; i++) {
369                         if (bulk_vec[i] != NULL)
370                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
371                 }
372                 OBD_FREE(bulk_vec,
373                          niocount * sizeof(struct ptlrpc_bulk_desc *));
374         }
375
376         EXIT;
377         return 0;
378 }
379
380 int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
381 {
382         struct ptlrpc_bulk_desc **bulk_vec = NULL;
383         struct ptlrpc_bulk_desc *bulk = NULL;
384         struct obd_conn conn; 
385         int rc;
386         int i, j;
387         int objcount, niocount;
388         char *tmp1, *tmp2, *end2;
389         char *res;
390         int cmd;
391         struct niobuf *nb, *dst;
392         struct obd_ioobj *ioo;
393         struct ost_req *r = req->rq_req.ost;
394
395         ENTRY;
396         
397         tmp1 = ost_req_buf1(r);
398         tmp2 = ost_req_buf2(r);
399         end2 = tmp2 + req->rq_req.ost->buflen2;
400         objcount = r->buflen1 / sizeof(*ioo); 
401         niocount = r->buflen2 / sizeof(*nb); 
402         cmd = r->cmd;
403
404         conn.oc_id = req->rq_req.ost->connid;
405         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
406
407         for (i = 0; i < objcount; i++) {
408                 ost_unpack_ioo((void *)&tmp1, &ioo);
409                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
410                         rc = -EFAULT;
411                         break; 
412                 }
413                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
414                         ost_unpack_niobuf((void *)&tmp2, &nb); 
415                 }
416         }
417
418         rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
419                           &req->rq_rephdr, &req->rq_rep,
420                           &req->rq_replen, &req->rq_repbuf);
421         if (rc) { 
422                 CERROR("cannot pack reply\n"); 
423                 return rc;
424         }
425         res = ost_rep_buf2(req->rq_rep.ost);
426
427         /* The unpackers move tmp1 and tmp2, so reset them before using */
428         tmp1 = ost_req_buf1(r);
429         tmp2 = ost_req_buf2(r);
430         req->rq_rep.ost->result = obd_preprw
431                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
432                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
433
434         if (req->rq_rep.ost->result) {
435                 EXIT;
436                 goto out;
437         }
438
439         /* Setup buffers for the incoming pages, then send the niobufs
440          * describing those buffers to the OSC. */
441         OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *));
442         if (bulk_vec == NULL) {
443                 CERROR("cannot alloc bulk desc vector\n");
444                 return -ENOMEM;
445         }
446         memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *));
447
448         for (i = 0; i < niocount; i++) {
449                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
450
451                 bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
452                 if (bulk_vec[i] == NULL) {
453                         CERROR("cannot alloc bulk desc\n");
454                         rc = -ENOMEM;
455                         goto out;
456                 }
457
458                 spin_lock(&srv->srv_lock);
459                 bulk_vec[i]->b_xid = srv->srv_xid++;
460                 spin_unlock(&srv->srv_lock);
461
462                 dst = &((struct niobuf *)res)[i];
463                 dst->xid = HTON__u32(bulk_vec[i]->b_xid);
464
465                 bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
466                 bulk_vec[i]->b_buflen = PAGE_SIZE;
467                 bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
468                 rc = ptlrpc_register_bulk(bulk_vec[i]);
469                 if (rc)
470                         goto out;
471
472 #if 0
473                 /* Local delivery */
474                 src = &((struct niobuf *)tmp2)[i];
475                 memcpy((void *)(unsigned long)dst->addr,
476                        (void *)(unsigned long)src->addr, src->len);
477 #endif
478         }
479         barrier();
480
481  out:
482         if (bulk != NULL)
483                 OBD_FREE(bulk, sizeof(*bulk));
484         if (bulk_vec != NULL) {
485                 for (i = 0; i < niocount; i++) {
486                         if (bulk_vec[i] != NULL)
487                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
488                 }
489                 OBD_FREE(bulk_vec,
490                          niocount * sizeof(struct ptlrpc_bulk_desc *));
491         }
492
493         EXIT;
494         return 0;
495 }
496
497 int ost_commit_page(struct obd_conn *conn, struct page *page)
498 {
499         struct obd_ioobj obj;
500         struct niobuf buf;
501         int rc;
502         ENTRY;
503
504         memset(&buf, 0, sizeof(buf));
505         memset(&obj, 0, sizeof(obj));
506
507         buf.page = page;
508         obj.ioo_bufcnt = 1;
509         
510         rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf); 
511         EXIT;
512         return rc;
513 }
514
515
516 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
517 {
518         struct ost_req *r = req->rq_req.ost;
519         int cmd = r->cmd;
520
521         if (cmd == OBD_BRW_READ)
522                 return ost_brw_read(obddev, req);
523         else
524                 return ost_brw_write(obddev, req);
525 }
526
527 int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
528 {
529         struct obd_conn conn; 
530         int rc, i, j, cmd;
531         int objcount, niocount;
532         char *tmp1, *tmp2, *end2;
533         struct niobuf *nb;
534         struct obd_ioobj *ioo;
535         struct ost_req *r = req->rq_req.ost;
536
537         ENTRY;
538         
539         tmp1 = ost_req_buf1(r);
540         tmp2 = ost_req_buf2(r);
541         end2 = tmp2 + req->rq_req.ost->buflen2;
542         objcount = r->buflen1 / sizeof(*ioo); 
543         niocount = r->buflen2 / sizeof(*nb); 
544         cmd = r->cmd;
545
546         conn.oc_id = req->rq_req.ost->connid;
547         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
548
549         for (i = 0; i < objcount; i++) {
550                 ost_unpack_ioo((void *)&tmp1, &ioo);
551                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
552                         rc = -EFAULT;
553                         break; 
554                 }
555                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
556                         ost_unpack_niobuf((void *)&tmp2, &nb); 
557                 }
558         }
559
560         rc = ost_pack_rep(NULL, 0, NULL, 0,
561                           &req->rq_rephdr, &req->rq_rep,
562                           &req->rq_replen, &req->rq_repbuf);
563         if (rc) { 
564                 CERROR("cannot pack reply\n"); 
565                 return rc;
566         }
567
568         /* The unpackers move tmp1 and tmp2, so reset them before using */
569         tmp1 = ost_req_buf1(r);
570         tmp2 = ost_req_buf2(r);
571         req->rq_rep.ost->result = obd_commitrw
572                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
573                  niocount, (struct niobuf *)tmp2);
574
575         return 0;
576 }
577
578 static int ost_handle(struct obd_device *obddev, 
579                struct ptlrpc_service *svc, 
580                struct ptlrpc_request *req)
581 {
582         int rc;
583         struct ost_obd *ost = &obddev->u.ost;
584         struct ptlreq_hdr *hdr;
585
586         ENTRY;
587
588         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
589         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
590                 CERROR("lustre_ost: wrong packet type sent %d\n",
591                        NTOH__u32(hdr->type));
592                 rc = -EINVAL;
593                 goto out;
594         }
595
596         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
597                             &req->rq_reqhdr, &req->rq_req);
598         if (rc) { 
599                 CERROR("lustre_ost: Invalid request\n");
600                 EXIT; 
601                 goto out;
602         }
603
604         switch (req->rq_reqhdr->opc) { 
605
606         case OST_CONNECT:
607                 CDEBUG(D_INODE, "connect\n");
608                 rc = ost_connect(ost, req);
609                 break;
610         case OST_DISCONNECT:
611                 CDEBUG(D_INODE, "disconnect\n");
612                 rc = ost_disconnect(ost, req);
613                 break;
614         case OST_GET_INFO:
615                 CDEBUG(D_INODE, "get_info\n");
616                 rc = ost_get_info(ost, req);
617                 break;
618         case OST_CREATE:
619                 CDEBUG(D_INODE, "create\n");
620                 rc = ost_create(ost, req);
621                 break;
622         case OST_DESTROY:
623                 CDEBUG(D_INODE, "destroy\n");
624                 rc = ost_destroy(ost, req);
625                 break;
626         case OST_GETATTR:
627                 CDEBUG(D_INODE, "getattr\n");
628                 rc = ost_getattr(ost, req);
629                 break;
630         case OST_SETATTR:
631                 CDEBUG(D_INODE, "setattr\n");
632                 rc = ost_setattr(ost, req);
633                 break;
634         case OST_BRW:
635                 CDEBUG(D_INODE, "brw\n");
636                 rc = ost_brw(ost, req);
637                 break;
638         case OST_BRW_COMPLETE:
639                 CDEBUG(D_INODE, "brw_complete\n");
640                 rc = ost_brw_complete(ost, req);
641                 break;
642         case OST_PUNCH:
643                 CDEBUG(D_INODE, "punch\n");
644                 rc = ost_punch(ost, req);
645                 break;
646         default:
647                 req->rq_status = -ENOTSUPP;
648                 return ptlrpc_error(obddev, svc, req);
649         }
650
651 out:
652         req->rq_status = rc;
653         if (rc) { 
654                 CERROR("ost: processing error %d\n", rc);
655                 ptlrpc_error(obddev, svc, req);
656         } else { 
657                 CDEBUG(D_INODE, "sending reply\n"); 
658                 ptlrpc_reply(obddev, svc, req); 
659         }
660
661         return 0;
662 }
663
664
665 /* mount the file system (secretly) */
666 static int ost_setup(struct obd_device *obddev, obd_count len,
667                         void *buf)
668                         
669 {
670         struct obd_ioctl_data* data = buf;
671         struct ost_obd *ost = &obddev->u.ost;
672         struct obd_device *tgt;
673         int err; 
674         ENTRY;
675
676         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
677                 EXIT;
678                 return -ENODEV;
679         }
680
681         tgt = &obd_dev[data->ioc_dev];
682         ost->ost_tgt = tgt;
683         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
684              ! (tgt->obd_flags & OBD_SET_UP) ){
685                 CERROR("device not attached or not set up (%d)\n", 
686                        data->ioc_dev);
687                 EXIT;
688                 return -EINVAL;
689         } 
690
691         ost->ost_conn.oc_dev = tgt;
692         err = obd_connect(&ost->ost_conn);
693         if (err) { 
694                 CERROR("fail to connect to device %d\n", data->ioc_dev); 
695                 return -EINVAL;
696         }
697
698         ost->ost_service = ptlrpc_init_svc( 64 * 1024, 
699                                             OST_REQUEST_PORTAL,
700                                             OSC_REPLY_PORTAL,
701                                             "self", 
702                                             ost_unpack_req,
703                                             ost_pack_rep,
704                                             ost_handle);
705         if (!ost->ost_service) { 
706                 obd_disconnect(&ost->ost_conn); 
707                 return -EINVAL;
708         }
709                                             
710         rpc_register_service(ost->ost_service, "self");
711
712         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); 
713         if (err) { 
714                 obd_disconnect(&ost->ost_conn); 
715                 return -EINVAL;
716         }
717                 
718         MOD_INC_USE_COUNT;
719         EXIT; 
720         return 0;
721
722
723 static int ost_cleanup(struct obd_device * obddev)
724 {
725         struct ost_obd *ost = &obddev->u.ost;
726         int err;
727
728         ENTRY;
729
730         if ( !list_empty(&obddev->obd_gen_clients) ) {
731                 CERROR("still has clients!\n");
732                 EXIT;
733                 return -EBUSY;
734         }
735
736         ptlrpc_stop_thread(ost->ost_service);
737         rpc_unregister_service(ost->ost_service);
738
739         if (!list_empty(&ost->ost_service->srv_reqs)) {
740                 // XXX reply with errors and clean up
741                 CERROR("Request list not empty!\n");
742         }
743         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
744
745         err = obd_disconnect(&ost->ost_conn);
746         if (err) { 
747                 CERROR("lustre ost: fail to disconnect device\n");
748                 return -EINVAL;
749         }
750
751         MOD_DEC_USE_COUNT;
752         EXIT;
753         return 0;
754 }
755
756 /* use obd ops to offer management infrastructure */
757 static struct obd_ops ost_obd_ops = {
758         o_setup:       ost_setup,
759         o_cleanup:     ost_cleanup,
760 };
761
762 static int __init ost_init(void)
763 {
764         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
765         return 0;
766 }
767
768 static void __exit ost_exit(void)
769 {
770         obd_unregister_type(LUSTRE_OST_NAME);
771 }
772
773 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
774 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
775 MODULE_LICENSE("GPL");
776
777 module_init(ost_init);
778 module_exit(ost_exit);