Whamcloud - gitweb
minor changes to debug prepw and prepare page
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  ost/ost_handler.c
3  *  Storage Target Handling functions
4  *  
5  *  Lustre Object Server Module (OST)
6  * 
7  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
8  *
9  *  This code is issued under the GNU General Public License.
10  *  See the file COPYING in this distribution
11  *
12  *  by Peter Braam <braam@clusterfs.com>
13  * 
14  *  This server is single threaded at present (but can easily be multi
15  *  threaded). For testing and management it is treated as an
16  *  obd_device, although it does not export a full OBD method table
17  *  (the requests are coming in over the wire, so object target
18  *  modules do not have a full method table.)
19  * 
20  */
21
22 #define EXPORT_SYMTAB
23
24 #include <linux/version.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/stat.h>
28 #include <linux/locks.h>
29 #include <linux/ext2_fs.h>
30 #include <linux/quotaops.h>
31 #include <asm/unistd.h>
32
33 #define DEBUG_SUBSYSTEM S_OST
34
35 #include <linux/obd_support.h>
36 #include <linux/obd.h>
37 #include <linux/obd_class.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_idl.h>
40 #include <linux/lustre_mds.h>
41 #include <linux/obd_class.h>
42
43 // for testing
44 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
45 {
46         struct ptlrpc_request *srv_req; 
47         struct ost_obd *ost = &obddev->u.ost;
48         
49         if (!ost) { 
50                 EXIT;
51                 return -1;
52         }
53
54         OBD_ALLOC(srv_req, sizeof(*srv_req));
55         if (!srv_req) { 
56                 EXIT;
57                 return -ENOMEM;
58         }
59
60         CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
61                __LINE__, ost, req, srv_req);
62
63         memset(srv_req, 0, sizeof(*req)); 
64
65         /* move the request buffer */
66         srv_req->rq_reqbuf = req->rq_reqbuf;
67         srv_req->rq_reqlen    = req->rq_reqlen;
68         srv_req->rq_ost = ost;
69
70         /* remember where it came from */
71         srv_req->rq_reply_handle = req;
72
73         list_add(&srv_req->rq_list, &ost->ost_reqs); 
74         wake_up(&ost->ost_waitq);
75         return 0;
76 }
77
78 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
79 {
80         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
81
82         ENTRY;
83
84         if (req->rq_ost->ost_service != NULL) {
85                 /* This is a request that came from the network via portals. */
86
87                 /* FIXME: we need to increment the count of handled events */
88                 ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
89         } else {
90                 /* This is a local request that came from another thread. */
91
92                 /* move the reply to the client */ 
93                 clnt_req->rq_replen = req->rq_replen;
94                 clnt_req->rq_repbuf = req->rq_repbuf;
95                 req->rq_repbuf = NULL;
96                 req->rq_replen = 0;
97
98                 /* free the request buffer */
99                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
100                 req->rq_reqbuf = NULL;
101
102                 /* wake up the client */ 
103                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
104         }
105
106         EXIT;
107         return 0;
108 }
109
110 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
111 {
112         struct ptlrep_hdr *hdr;
113
114         ENTRY;
115
116         OBD_ALLOC(hdr, sizeof(*hdr));
117         if (!hdr) { 
118                 EXIT;
119                 return -ENOMEM;
120         }
121
122         memset(hdr, 0, sizeof(*hdr));
123         
124         hdr->seqno = req->rq_reqhdr->seqno;
125         hdr->status = req->rq_status; 
126         hdr->type = OST_TYPE_ERR;
127
128         req->rq_repbuf = (char *)hdr;
129         req->rq_replen = sizeof(*hdr); 
130
131         EXIT;
132         return ost_reply(obddev, req);
133 }
134
135 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
136 {
137         struct obd_conn conn; 
138         int rc;
139
140         ENTRY;
141         
142         conn.oc_id = req->rq_req.ost->connid;
143         conn.oc_dev = ost->ost_tgt;
144
145         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
146                           &req->rq_replen, &req->rq_repbuf); 
147         if (rc) { 
148                 CERROR("cannot pack reply\n"); 
149                 return rc;
150         }
151
152         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
153                 (&conn, &req->rq_req.ost->oa); 
154
155         EXIT;
156         return 0;
157 }
158
159 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
160 {
161         struct obd_conn conn; 
162         int rc;
163
164         ENTRY;
165         
166         conn.oc_id = req->rq_req.ost->connid;
167         conn.oc_dev = ost->ost_tgt;
168
169         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
170                           &req->rq_replen, &req->rq_repbuf); 
171         if (rc) { 
172                 CERROR("cannot pack reply\n"); 
173                 return rc;
174         }
175         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
176         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
177
178         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
179                 (&conn, &req->rq_rep.ost->oa); 
180
181         EXIT;
182         return 0;
183 }
184
185 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
186 {
187         struct obd_conn conn; 
188         int rc;
189
190         ENTRY;
191         
192         conn.oc_id = req->rq_req.ost->connid;
193         conn.oc_dev = ost->ost_tgt;
194
195         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
196                           &req->rq_replen, &req->rq_repbuf); 
197         if (rc) { 
198                 CERROR("cannot pack reply\n"); 
199                 return rc;
200         }
201
202         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
203
204         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
205                 (&conn, &req->rq_rep.ost->oa); 
206
207         EXIT;
208         return 0;
209 }
210
211 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
212 {
213         struct obd_conn conn; 
214         int rc;
215
216         ENTRY;
217         
218         conn.oc_id = req->rq_req.ost->connid;
219         conn.oc_dev = ost->ost_tgt;
220
221         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
222                           &req->rq_replen, &req->rq_repbuf); 
223         if (rc) { 
224                 CERROR("cannot pack reply\n"); 
225                 return rc;
226         }
227
228         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
229
230         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
231                 (&conn, &req->rq_rep.ost->oa, 
232                  req->rq_rep.ost->oa.o_size,
233                  req->rq_rep.ost->oa.o_blocks); 
234
235         EXIT;
236         return 0;
237 }
238
239
240 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
241 {
242         struct obd_conn conn; 
243         int rc;
244
245         ENTRY;
246         
247         conn.oc_id = req->rq_req.ost->connid;
248         conn.oc_dev = ost->ost_tgt;
249
250         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
251                           &req->rq_replen, &req->rq_repbuf); 
252         if (rc) { 
253                 CERROR("cannot pack reply\n"); 
254                 return rc;
255         }
256
257         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
258                sizeof(req->rq_req.ost->oa));
259
260         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
261                 (&conn, &req->rq_rep.ost->oa); 
262
263         EXIT;
264         return 0;
265 }
266
267 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
268 {
269         struct obd_conn conn; 
270         int rc;
271
272         ENTRY;
273         
274         conn.oc_dev = ost->ost_tgt;
275
276         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
277                           &req->rq_replen, &req->rq_repbuf); 
278         if (rc) { 
279                 CERROR("cannot pack reply\n"); 
280                 return rc;
281         }
282
283         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
284
285         CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
286                conn.oc_id);
287         req->rq_rep.ost->connid = conn.oc_id;
288         EXIT;
289         return 0;
290 }
291
292 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
293 {
294         struct obd_conn conn; 
295         int rc;
296
297         ENTRY;
298         
299         conn.oc_dev = ost->ost_tgt;
300         conn.oc_id = req->rq_req.ost->connid;
301
302         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
303                           &req->rq_replen, &req->rq_repbuf); 
304         if (rc) { 
305                 CERROR("cannot pack reply\n"); 
306                 return rc;
307         }
308
309         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
310
311         EXIT;
312         return 0;
313 }
314
315 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
316 {
317         struct obd_conn conn; 
318         int rc;
319         int vallen;
320         void *val;
321         char *ptr; 
322
323         ENTRY;
324         
325         conn.oc_id = req->rq_req.ost->connid;
326         conn.oc_dev = ost->ost_tgt;
327
328         ptr = ost_req_buf1(req->rq_req.ost);
329         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
330                 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); 
331
332         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
333                           &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); 
334         if (rc) { 
335                 CERROR("cannot pack reply\n"); 
336                 return rc;
337         }
338
339         EXIT;
340         return 0;
341 }
342
343 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
344 {
345         struct obd_conn conn; 
346         int rc;
347         int i, j;
348         int objcount, niocount;
349         char *tmp1, *tmp2, *end2;
350         char *res;
351         int cmd;
352         struct niobuf *nb, *src, *dst;
353         struct obd_ioobj *ioo;
354         struct ost_req *r = req->rq_req.ost;
355
356         ENTRY;
357         
358         tmp1 = ost_req_buf1(r);
359         tmp2 = ost_req_buf2(r);
360         end2 = tmp2 + req->rq_req.ost->buflen2;
361         objcount = r->buflen1 / sizeof(*ioo); 
362         niocount = r->buflen2 / sizeof(*nb); 
363         cmd = r->cmd;
364
365         conn.oc_id = req->rq_req.ost->connid;
366         conn.oc_dev = req->rq_ost->ost_tgt;
367
368         rc = ost_pack_rep(NULL, niocount, NULL, 0, 
369                           &req->rq_rephdr, &req->rq_rep.ost,
370                           &req->rq_replen, &req->rq_repbuf); 
371         if (rc) { 
372                 CERROR("cannot pack reply\n"); 
373                 return rc;
374         }
375         res = ost_rep_buf1(req->rq_rep.ost); 
376
377         for (i=0; i < objcount; i++) { 
378                 ost_unpack_ioo((void *)&tmp1, &ioo);
379                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
380                         rc = -EFAULT;
381                         break; 
382                 }
383                 for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { 
384                         ost_unpack_niobuf((void *)&tmp2, &nb); 
385                 }
386         }
387
388         /* The unpackers move tmp1 and tmp2, so reset them before using */
389         tmp1 = ost_req_buf1(r);
390         tmp2 = ost_req_buf2(r);
391         req->rq_rep.ost->result = 
392                 req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
393                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
394                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
395
396         if (req->rq_rep.ost->result) {
397                 EXIT;
398                 goto out;
399         }
400
401         if (cmd == OBD_BRW_WRITE) { 
402                 for (i=0; i<niocount; i++) { 
403                         src = &((struct niobuf *)tmp2)[i];
404                         dst = &((struct niobuf *)res)[i];
405                         memcpy((void *)(unsigned long)dst->addr, 
406                                (void *)(unsigned long)src->addr, 
407                                src->len);
408                 }
409                 barrier();
410         } else { 
411                 for (i=0; i<niocount; i++) { 
412                         dst = &((struct niobuf *)tmp2)[i];
413                         src = &((struct niobuf *)res)[i];
414                         memcpy((void *)(unsigned long)dst->addr, 
415                                (void *)(unsigned long)src->addr, 
416                                PAGE_SIZE); 
417                 }
418                 barrier();
419         }
420
421         req->rq_rep.ost->result = 
422                 req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
423                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
424                  niocount, (struct niobuf *)res); 
425
426  out:
427         EXIT;
428         return 0;
429 }
430
431 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
432 {
433         int rc;
434         struct ost_obd *ost = &obddev->u.ost;
435         struct ptlreq_hdr *hdr;
436
437         ENTRY;
438         CDEBUG(0, "req at %p\n", req);
439
440         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
441         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
442                 CERROR("lustre_ost: wrong packet type sent %d\n",
443                        NTOH__u32(hdr->type));
444                 rc = -EINVAL;
445                 goto out;
446         }
447
448         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
449                             &req->rq_reqhdr, &req->rq_req.ost);
450         if (rc) { 
451                 CERROR("lustre_ost: Invalid request\n");
452                 EXIT; 
453                 goto out;
454         }
455
456         switch (req->rq_reqhdr->opc) { 
457
458         case OST_CONNECT:
459                 CDEBUG(D_INODE, "connect\n");
460                 rc = ost_connect(ost, req);
461                 break;
462         case OST_DISCONNECT:
463                 CDEBUG(D_INODE, "disconnect\n");
464                 rc = ost_disconnect(ost, req);
465                 break;
466         case OST_GET_INFO:
467                 CDEBUG(D_INODE, "get_info\n");
468                 rc = ost_get_info(ost, req);
469                 break;
470         case OST_CREATE:
471                 CDEBUG(D_INODE, "create\n");
472                 rc = ost_create(ost, req);
473                 break;
474         case OST_DESTROY:
475                 CDEBUG(D_INODE, "destroy\n");
476                 rc = ost_destroy(ost, req);
477                 break;
478         case OST_GETATTR:
479                 CDEBUG(D_INODE, "getattr\n");
480                 rc = ost_getattr(ost, req);
481                 break;
482         case OST_SETATTR:
483                 CDEBUG(D_INODE, "setattr\n");
484                 rc = ost_setattr(ost, req);
485                 break;
486         case OST_BRW:
487                 CDEBUG(D_INODE, "brw\n");
488                 rc = ost_brw(ost, req);
489                 break;
490         case OST_PUNCH:
491                 CDEBUG(D_INODE, "punch\n");
492                 rc = ost_punch(ost, req);
493                 break;
494         default:
495                 req->rq_status = -ENOTSUPP;
496                 return ost_error(obddev, req);
497         }
498
499 out:
500         req->rq_status = rc;
501         if (rc) { 
502                 CERROR("ost: processing error %d\n", rc);
503                 ost_error(obddev, req);
504         } else { 
505                 CDEBUG(D_INODE, "sending reply\n"); 
506                 ost_reply(obddev, req); 
507         }
508
509         return 0;
510 }
511
512 int ost_main(void *arg)
513 {
514         struct obd_device *obddev = (struct obd_device *) arg;
515         struct ost_obd *ost = &obddev->u.ost;
516         ENTRY;
517
518         lock_kernel();
519         daemonize();
520         spin_lock_irq(&current->sigmask_lock);
521         sigfillset(&current->blocked);
522         recalc_sigpending(current);
523         spin_unlock_irq(&current->sigmask_lock);
524
525         sprintf(current->comm, "lustre_ost");
526
527         /* Record that the  thread is running */
528         ost->ost_thread = current;
529         wake_up(&ost->ost_done_waitq); 
530
531         /* XXX maintain a list of all managed devices: insert here */
532
533         /* And now, wait forever for commit wakeup events. */
534         while (1) {
535                 int rc; 
536
537                 if (ost->ost_flags & OST_EXIT)
538                         break;
539
540                 wake_up(&ost->ost_done_waitq);
541                 interruptible_sleep_on(&ost->ost_waitq);
542                 barrier();
543                 CDEBUG(D_INODE, "ost_wakes: pick up req here and continue\n"); 
544
545
546                 if (ost->ost_service != NULL) {
547                         ptl_event_t ev;
548
549                         while (1) {
550                                 struct ptlrpc_request request;
551                                 struct ptlrpc_service *service;
552                                 rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
553                                 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
554                                         break;
555
556                                 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
557
558                                 /* FIXME: If we move to an event-driven model,
559                                  * we should put the request on the stack of
560                                  * mds_handle instead. */
561                                 memset(&request, 0, sizeof(request));
562                                 request.rq_reqbuf = ev.mem_desc.start +
563                                         ev.offset;
564                                 request.rq_reqlen = ev.mem_desc.length;
565                                 request.rq_ost = ost;
566                                 request.rq_xid = ev.match_bits;
567
568                                 request.rq_peer.peer_nid = ev.initiator.nid;
569                                 /* FIXME: this NI should be the incoming NI.
570                                  * We don't know how to find that from here. */
571                                 request.rq_peer.peer_ni =
572                                         ost->ost_service->srv_self.peer_ni;
573                                 rc = ost_handle(obddev, &request);
574
575                                 /* Inform the rpc layer the event has been handled */
576                                 ptl_received_rpc(service);
577                         }
578                 } else {
579                         struct ptlrpc_request *request;
580
581                         if (list_empty(&ost->ost_reqs)) { 
582                                 CDEBUG(D_INODE, "woke because of timer\n"); 
583                         } else { 
584                                 request = list_entry(ost->ost_reqs.next,
585                                                      struct ptlrpc_request,
586                                                      rq_list);
587                                 list_del(&request->rq_list);
588                                 rc = ost_handle(obddev, request); 
589                         }
590                 }
591         }
592
593         /* XXX maintain a list of all managed devices: cleanup here */
594
595         ost->ost_thread = NULL;
596         wake_up(&ost->ost_done_waitq);
597         CERROR("lustre_ost: exiting\n");
598         return 0;
599 }
600
601 static void ost_stop_srv_thread(struct ost_obd *ost)
602 {
603         ost->ost_flags |= OST_EXIT;
604
605         while (ost->ost_thread) {
606                 wake_up(&ost->ost_waitq);
607                 sleep_on(&ost->ost_done_waitq);
608         }
609 }
610
611 static void ost_start_srv_thread(struct obd_device *obd)
612 {
613         struct ost_obd *ost = &obd->u.ost;
614         ENTRY;
615
616         init_waitqueue_head(&ost->ost_waitq);
617         init_waitqueue_head(&ost->ost_done_waitq);
618         kernel_thread(ost_main, (void *)obd, 
619                       CLONE_VM | CLONE_FS | CLONE_FILES);
620         while (!ost->ost_thread) 
621                 sleep_on(&ost->ost_done_waitq);
622         EXIT;
623 }
624
625 /* mount the file system (secretly) */
626 static int ost_setup(struct obd_device *obddev, obd_count len,
627                         void *buf)
628                         
629 {
630         struct obd_ioctl_data* data = buf;
631         struct ost_obd *ost = &obddev->u.ost;
632         struct obd_device *tgt;
633         struct lustre_peer peer;
634         int err; 
635         ENTRY;
636
637         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
638                 EXIT;
639                 return -ENODEV;
640         }
641
642         tgt = &obd_dev[data->ioc_dev];
643         ost->ost_tgt = tgt;
644         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
645              ! (tgt->obd_flags & OBD_SET_UP) ){
646                 CERROR("device not attached or not set up (%d)\n", 
647                        data->ioc_dev);
648                 EXIT;
649                 return -EINVAL;
650         } 
651
652         ost->ost_conn.oc_dev = tgt;
653         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
654         if (err) { 
655                 CERROR("lustre ost: fail to connect to device %d\n", 
656                        data->ioc_dev); 
657                 return -EINVAL;
658         }
659
660         INIT_LIST_HEAD(&ost->ost_reqs);
661         ost->ost_thread = NULL;
662         ost->ost_flags = 0;
663
664         spin_lock_init(&obddev->u.ost.ost_lock);
665
666         err = kportal_uuid_to_peer("self", &peer);
667         if (err == 0) {
668                 OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
669                 if (ost->ost_service == NULL)
670                         return -ENOMEM;
671                 ost->ost_service->srv_buf_size = 64 * 1024;
672                 ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
673                 memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
674                 ost->ost_service->srv_wait_queue = &ost->ost_waitq;
675
676                 rpc_register_service(ost->ost_service, "self");
677         }
678
679         ost_start_srv_thread(obddev);
680
681         MOD_INC_USE_COUNT;
682         EXIT; 
683         return 0;
684
685
686 static int ost_cleanup(struct obd_device * obddev)
687 {
688         struct ost_obd *ost = &obddev->u.ost;
689         struct obd_device *tgt;
690         int err;
691
692         ENTRY;
693
694         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
695                 EXIT;
696                 return 0;
697         }
698
699         if ( !list_empty(&obddev->obd_gen_clients) ) {
700                 CERROR("still has clients!\n");
701                 EXIT;
702                 return -EBUSY;
703         }
704
705         ost_stop_srv_thread(ost);
706         rpc_unregister_service(ost->ost_service);
707         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
708
709         if (!list_empty(&ost->ost_reqs)) {
710                 // XXX reply with errors and clean up
711                 CDEBUG(D_INODE, "Request list not empty!\n");
712         }
713
714         tgt = ost->ost_tgt;
715         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
716         if (err) { 
717                 CERROR("lustre ost: fail to disconnect device\n");
718                 return -EINVAL;
719         }
720         
721
722         MOD_DEC_USE_COUNT;
723         EXIT;
724         return 0;
725 }
726
727 /* use obd ops to offer management infrastructure */
728 static struct obd_ops ost_obd_ops = {
729         o_setup:       ost_setup,
730         o_cleanup:     ost_cleanup,
731 };
732
733 static int __init ost_init(void)
734 {
735         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
736         return 0;
737 }
738
739 static void __exit ost_exit(void)
740 {
741         obd_unregister_type(LUSTRE_OST_NAME);
742 }
743
744 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
745 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
746 MODULE_LICENSE("GPL");
747
748 // for testing (maybe this stays)
749 EXPORT_SYMBOL(ost_queue_req);
750
751 module_init(ost_init);
752 module_exit(ost_exit);