Whamcloud - gitweb
e45c6f96a093c118b0bd9d5b424c83d98a39d1a4
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
53
54 // for testing
55 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
56 {
57         struct ptlrpc_request *srv_req; 
58         struct ost_obd *ost = &obddev->u.ost;
59         
60         if (!ost) { 
61                 EXIT;
62                 return -1;
63         }
64
65         OBD_ALLOC(srv_req, sizeof(*srv_req));
66         if (!srv_req) { 
67                 EXIT;
68                 return -ENOMEM;
69         }
70
71         CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
72                __LINE__, ost, req, srv_req);
73
74         memset(srv_req, 0, sizeof(*req)); 
75
76         /* move the request buffer */
77         srv_req->rq_reqbuf = req->rq_reqbuf;
78         srv_req->rq_reqlen    = req->rq_reqlen;
79         srv_req->rq_ost = ost;
80
81         /* remember where it came from */
82         srv_req->rq_reply_handle = req;
83
84         spin_lock(&ost->ost_lock);
85         list_add(&srv_req->rq_list, &ost->ost_reqs); 
86         spin_unlock(&ost->ost_lock);
87         wake_up(&ost->ost_waitq);
88         return 0;
89 }
90
91 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
92 {
93         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
94
95         ENTRY;
96
97         if (req->rq_ost->ost_service != NULL) {
98                 /* This is a request that came from the network via portals. */
99
100                 /* FIXME: we need to increment the count of handled events */
101                 req->rq_type = PTLRPC_REPLY;
102                 ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL);
103         } else {
104                 /* This is a local request that came from another thread. */
105
106                 /* move the reply to the client */ 
107                 clnt_req->rq_replen = req->rq_replen;
108                 clnt_req->rq_repbuf = req->rq_repbuf;
109                 req->rq_repbuf = NULL;
110                 req->rq_replen = 0;
111
112                 /* free the request buffer */
113                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
114                 req->rq_reqbuf = NULL;
115
116                 /* wake up the client */ 
117                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
118         }
119
120         EXIT;
121         return 0;
122 }
123
124 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
125 {
126         struct ptlrep_hdr *hdr;
127
128         ENTRY;
129
130         OBD_ALLOC(hdr, sizeof(*hdr));
131         if (!hdr) { 
132                 EXIT;
133                 return -ENOMEM;
134         }
135
136         memset(hdr, 0, sizeof(*hdr));
137         
138         hdr->seqno = req->rq_reqhdr->seqno;
139         hdr->status = req->rq_status; 
140         hdr->type = OST_TYPE_ERR;
141
142         req->rq_repbuf = (char *)hdr;
143         req->rq_replen = sizeof(*hdr); 
144
145         EXIT;
146         return ost_reply(obddev, req);
147 }
148
149 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
150 {
151         struct obd_conn conn; 
152         int rc;
153
154         ENTRY;
155         
156         conn.oc_id = req->rq_req.ost->connid;
157         conn.oc_dev = ost->ost_tgt;
158
159         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
160                           &req->rq_replen, &req->rq_repbuf); 
161         if (rc) { 
162                 CERROR("cannot pack reply\n"); 
163                 return rc;
164         }
165
166         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
167                 (&conn, &req->rq_req.ost->oa); 
168
169         EXIT;
170         return 0;
171 }
172
173 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
174 {
175         struct obd_conn conn; 
176         int rc;
177
178         ENTRY;
179         
180         conn.oc_id = req->rq_req.ost->connid;
181         conn.oc_dev = ost->ost_tgt;
182
183         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
184                           &req->rq_replen, &req->rq_repbuf); 
185         if (rc) { 
186                 CERROR("cannot pack reply\n"); 
187                 return rc;
188         }
189         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
190         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
191
192         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
193                 (&conn, &req->rq_rep.ost->oa); 
194
195         EXIT;
196         return 0;
197 }
198
199 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
200 {
201         struct obd_conn conn; 
202         int rc;
203
204         ENTRY;
205         
206         conn.oc_id = req->rq_req.ost->connid;
207         conn.oc_dev = ost->ost_tgt;
208
209         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
210                           &req->rq_replen, &req->rq_repbuf); 
211         if (rc) { 
212                 CERROR("cannot pack reply\n"); 
213                 return rc;
214         }
215
216         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
217
218         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
219                 (&conn, &req->rq_rep.ost->oa); 
220
221         EXIT;
222         return 0;
223 }
224
225 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
226 {
227         struct obd_conn conn; 
228         int rc;
229
230         ENTRY;
231         
232         conn.oc_id = req->rq_req.ost->connid;
233         conn.oc_dev = ost->ost_tgt;
234
235         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
236                           &req->rq_replen, &req->rq_repbuf); 
237         if (rc) { 
238                 CERROR("cannot pack reply\n"); 
239                 return rc;
240         }
241
242         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
243
244         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
245                 (&conn, &req->rq_rep.ost->oa, 
246                  req->rq_rep.ost->oa.o_size,
247                  req->rq_rep.ost->oa.o_blocks); 
248
249         EXIT;
250         return 0;
251 }
252
253
254 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
255 {
256         struct obd_conn conn; 
257         int rc;
258
259         ENTRY;
260         
261         conn.oc_id = req->rq_req.ost->connid;
262         conn.oc_dev = ost->ost_tgt;
263
264         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
265                           &req->rq_replen, &req->rq_repbuf); 
266         if (rc) { 
267                 CERROR("cannot pack reply\n"); 
268                 return rc;
269         }
270
271         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
272                sizeof(req->rq_req.ost->oa));
273
274         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
275                 (&conn, &req->rq_rep.ost->oa); 
276
277         EXIT;
278         return 0;
279 }
280
281 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
282 {
283         struct obd_conn conn; 
284         int rc;
285
286         ENTRY;
287         
288         conn.oc_dev = ost->ost_tgt;
289
290         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
291                           &req->rq_replen, &req->rq_repbuf); 
292         if (rc) { 
293                 CERROR("cannot pack reply\n"); 
294                 return rc;
295         }
296
297         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
298
299         CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
300                conn.oc_id);
301         req->rq_rep.ost->connid = conn.oc_id;
302         EXIT;
303         return 0;
304 }
305
306 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
307 {
308         struct obd_conn conn; 
309         int rc;
310
311         ENTRY;
312         
313         conn.oc_dev = ost->ost_tgt;
314         conn.oc_id = req->rq_req.ost->connid;
315
316         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
317                           &req->rq_replen, &req->rq_repbuf); 
318         if (rc) { 
319                 CERROR("cannot pack reply\n"); 
320                 return rc;
321         }
322
323         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
324
325         EXIT;
326         return 0;
327 }
328
329 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
330 {
331         struct obd_conn conn; 
332         int rc;
333         int vallen;
334         void *val;
335         char *ptr; 
336
337         ENTRY;
338         
339         conn.oc_id = req->rq_req.ost->connid;
340         conn.oc_dev = ost->ost_tgt;
341
342         ptr = ost_req_buf1(req->rq_req.ost);
343         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
344                 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); 
345
346         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
347                           &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); 
348         if (rc) { 
349                 CERROR("cannot pack reply\n"); 
350                 return rc;
351         }
352
353         EXIT;
354         return 0;
355 }
356
357 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
358 {
359         struct obd_conn conn; 
360         int rc;
361         int i, j;
362         int objcount, niocount;
363         char *tmp1, *tmp2, *end2;
364         char *res;
365         int cmd;
366         struct niobuf *nb, *src, *dst;
367         struct obd_ioobj *ioo;
368         struct ost_req *r = req->rq_req.ost;
369
370         ENTRY;
371         
372         tmp1 = ost_req_buf1(r);
373         tmp2 = ost_req_buf2(r);
374         end2 = tmp2 + req->rq_req.ost->buflen2;
375         objcount = r->buflen1 / sizeof(*ioo); 
376         niocount = r->buflen2 / sizeof(*nb); 
377         cmd = r->cmd;
378
379         conn.oc_id = req->rq_req.ost->connid;
380         conn.oc_dev = req->rq_ost->ost_tgt;
381
382         rc = ost_pack_rep(NULL, niocount, NULL, 0, 
383                           &req->rq_rephdr, &req->rq_rep.ost,
384                           &req->rq_replen, &req->rq_repbuf); 
385         if (rc) { 
386                 CERROR("cannot pack reply\n"); 
387                 return rc;
388         }
389         res = ost_rep_buf1(req->rq_rep.ost); 
390
391         for (i=0; i < objcount; i++) { 
392                 ost_unpack_ioo((void *)&tmp1, &ioo);
393                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
394                         rc = -EFAULT;
395                         break; 
396                 }
397                 for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { 
398                         ost_unpack_niobuf((void *)&tmp2, &nb); 
399                 }
400         }
401
402         /* The unpackers move tmp1 and tmp2, so reset them before using */
403         tmp1 = ost_req_buf1(r);
404         tmp2 = ost_req_buf2(r);
405         req->rq_rep.ost->result = 
406                 req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
407                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
408                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
409
410         if (req->rq_rep.ost->result) {
411                 EXIT;
412                 goto out;
413         }
414
415         if (cmd == OBD_BRW_WRITE) {
416                 for (i = 0; i < niocount; i++) {
417                         src = &((struct niobuf *)tmp2)[i];
418                         dst = &((struct niobuf *)res)[i];
419                         memcpy((void *)(unsigned long)dst->addr, 
420                                (void *)(unsigned long)src->addr, 
421                                src->len);
422                 }
423                 barrier();
424         } else { 
425                 for (i = 0; i < niocount; i++) {
426                         dst = &((struct niobuf *)tmp2)[i];
427                         src = &((struct niobuf *)res)[i];
428                         memcpy((void *)(unsigned long)dst->addr, 
429                                (void *)(unsigned long)src->addr, 
430                                PAGE_SIZE); 
431                 }
432                 barrier();
433         }
434
435         req->rq_rep.ost->result = 
436                 req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
437                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
438                  niocount, (struct niobuf *)res); 
439
440  out:
441         EXIT;
442         return 0;
443 }
444
445 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
446 {
447         int rc;
448         struct ost_obd *ost = &obddev->u.ost;
449         struct ptlreq_hdr *hdr;
450
451         ENTRY;
452         CDEBUG(0, "req at %p\n", req);
453
454         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
455         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
456                 CERROR("lustre_ost: wrong packet type sent %d\n",
457                        NTOH__u32(hdr->type));
458                 rc = -EINVAL;
459                 goto out;
460         }
461
462         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
463                             &req->rq_reqhdr, &req->rq_req.ost);
464         if (rc) { 
465                 CERROR("lustre_ost: Invalid request\n");
466                 EXIT; 
467                 goto out;
468         }
469
470         switch (req->rq_reqhdr->opc) { 
471
472         case OST_CONNECT:
473                 CDEBUG(D_INODE, "connect\n");
474                 rc = ost_connect(ost, req);
475                 break;
476         case OST_DISCONNECT:
477                 CDEBUG(D_INODE, "disconnect\n");
478                 rc = ost_disconnect(ost, req);
479                 break;
480         case OST_GET_INFO:
481                 CDEBUG(D_INODE, "get_info\n");
482                 rc = ost_get_info(ost, req);
483                 break;
484         case OST_CREATE:
485                 CDEBUG(D_INODE, "create\n");
486                 rc = ost_create(ost, req);
487                 break;
488         case OST_DESTROY:
489                 CDEBUG(D_INODE, "destroy\n");
490                 rc = ost_destroy(ost, req);
491                 break;
492         case OST_GETATTR:
493                 CDEBUG(D_INODE, "getattr\n");
494                 rc = ost_getattr(ost, req);
495                 break;
496         case OST_SETATTR:
497                 CDEBUG(D_INODE, "setattr\n");
498                 rc = ost_setattr(ost, req);
499                 break;
500         case OST_BRW:
501                 CDEBUG(D_INODE, "brw\n");
502                 rc = ost_brw(ost, req);
503                 break;
504         case OST_PUNCH:
505                 CDEBUG(D_INODE, "punch\n");
506                 rc = ost_punch(ost, req);
507                 break;
508         default:
509                 req->rq_status = -ENOTSUPP;
510                 return ost_error(obddev, req);
511         }
512
513 out:
514         req->rq_status = rc;
515         if (rc) { 
516                 CERROR("ost: processing error %d\n", rc);
517                 ost_error(obddev, req);
518         } else { 
519                 CDEBUG(D_INODE, "sending reply\n"); 
520                 ost_reply(obddev, req); 
521         }
522
523         return 0;
524 }
525
526 /* FIXME: Serious refactoring needed */
527 int ost_main(void *arg)
528 {
529         int signal; 
530         struct obd_device *obddev = (struct obd_device *) arg;
531         struct ost_obd *ost = &obddev->u.ost;
532         DECLARE_WAITQUEUE(wait, current);
533
534         ENTRY;
535
536         lock_kernel();
537         daemonize();
538         spin_lock_irq(&current->sigmask_lock);
539         sigfillset(&current->blocked);
540         recalc_sigpending(current);
541         spin_unlock_irq(&current->sigmask_lock);
542
543         sprintf(current->comm, "lustre_ost");
544
545         /* Record that the  thread is running */
546         ost->ost_thread = current;
547         wake_up(&ost->ost_done_waitq); 
548
549         /* XXX maintain a list of all managed devices: insert here */
550
551         /* And now, wait forever for commit wakeup events. */
552         while (1) {
553                 int rc; 
554
555                 if (ost->ost_service != NULL) {
556                         ptl_event_t ev;
557                         struct ptlrpc_request request;
558                         struct ptlrpc_service *service;
559
560                         CDEBUG(D_IOCTL, "-- sleeping\n");
561                         signal = 0;
562                         add_wait_queue(&ost->ost_waitq, &wait);
563                         while (1) {
564                                 set_current_state(TASK_INTERRUPTIBLE);
565                                 rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
566                                 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
567                                         break;
568                                 if (ost->ost_flags & OST_EXIT)
569                                         break;
570
571
572                                 /* if this process really wants to die,
573                                  * let it go */
574                                 if (sigismember(&(current->pending.signal),
575                                                 SIGKILL) ||
576                                     sigismember(&(current->pending.signal),
577                                                 SIGINT)) {
578                                         signal = 1;
579                                         break;
580                                 }
581
582                                 schedule();
583                         }
584                         remove_wait_queue(&ost->ost_waitq, &wait);
585                         set_current_state(TASK_RUNNING);
586                         CDEBUG(D_IOCTL, "-- done\n");
587
588                         if (signal == 1) {
589                                 /* We broke out because of a signal */
590                                 EXIT;
591                                 break;
592                         }
593                         if (ost->ost_flags & OST_EXIT) {
594                                 EXIT;
595                                 break;
596                         }
597
598                         service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
599
600                         /* FIXME: If we move to an event-driven model,
601                          * we should put the request on the stack of
602                          * mds_handle instead. */
603                         memset(&request, 0, sizeof(request));
604                         request.rq_reqbuf = ev.mem_desc.start + ev.offset;
605                         request.rq_reqlen = ev.mem_desc.length;
606                         request.rq_ost = ost;
607                         request.rq_xid = ev.match_bits;
608
609                         request.rq_peer.peer_nid = ev.initiator.nid;
610                         /* FIXME: this NI should be the incoming NI.
611                          * We don't know how to find that from here. */
612                         request.rq_peer.peer_ni =
613                                 ost->ost_service->srv_self.peer_ni;
614                         rc = ost_handle(obddev, &request);
615
616                         /* Inform the rpc layer the event has been handled */
617                         ptl_received_rpc(service);
618                 } else {
619                         struct ptlrpc_request *request;
620
621                         CDEBUG(D_IOCTL, "-- sleeping\n");
622                         add_wait_queue(&ost->ost_waitq, &wait);
623                         while (1) {
624                                 spin_lock(&ost->ost_lock);
625                                 if (!list_empty(&ost->ost_reqs))
626                                         break;
627
628                                 set_current_state(TASK_INTERRUPTIBLE);
629
630                                 /* if this process really wants to die,
631                                  * let it go */
632                                 if (sigismember(&(current->pending.signal),
633                                                 SIGKILL) ||
634                                     sigismember(&(current->pending.signal),
635                                                 SIGINT))
636                                         break;
637
638                                 spin_unlock(&ost->ost_lock);
639
640                                 schedule();
641                         }
642                         remove_wait_queue(&ost->ost_waitq, &wait);
643                         set_current_state(TASK_RUNNING);
644                         CDEBUG(D_IOCTL, "-- done\n");
645
646                         if (list_empty(&ost->ost_reqs)) { 
647                                 CDEBUG(D_INODE, "woke because of signal\n");
648                                 spin_unlock(&ost->ost_lock);
649                         } else {
650                                 request = list_entry(ost->ost_reqs.next,
651                                                      struct ptlrpc_request,
652                                                      rq_list);
653                                 list_del(&request->rq_list);
654                                 spin_unlock(&ost->ost_lock);
655                                 rc = ost_handle(obddev, request); 
656                         }
657                 }
658         }
659
660         /* XXX maintain a list of all managed devices: cleanup here */
661
662         ost->ost_thread = NULL;
663         wake_up(&ost->ost_done_waitq);
664         CERROR("lustre_ost: exiting\n");
665         return 0;
666 }
667
668 static void ost_stop_srv_thread(struct ost_obd *ost)
669 {
670         ost->ost_flags |= OST_EXIT;
671
672         while (ost->ost_thread) {
673                 wake_up(&ost->ost_waitq);
674                 sleep_on(&ost->ost_done_waitq);
675         }
676 }
677
678 static void ost_start_srv_thread(struct obd_device *obd)
679 {
680         struct ost_obd *ost = &obd->u.ost;
681         ENTRY;
682
683         init_waitqueue_head(&ost->ost_waitq);
684         init_waitqueue_head(&ost->ost_done_waitq);
685         kernel_thread(ost_main, (void *)obd, 
686                       CLONE_VM | CLONE_FS | CLONE_FILES);
687         while (!ost->ost_thread) 
688                 sleep_on(&ost->ost_done_waitq);
689         EXIT;
690 }
691
692 /* mount the file system (secretly) */
693 static int ost_setup(struct obd_device *obddev, obd_count len,
694                         void *buf)
695                         
696 {
697         struct obd_ioctl_data* data = buf;
698         struct ost_obd *ost = &obddev->u.ost;
699         struct obd_device *tgt;
700         struct lustre_peer peer;
701         int err; 
702         ENTRY;
703
704         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
705                 EXIT;
706                 return -ENODEV;
707         }
708
709         tgt = &obd_dev[data->ioc_dev];
710         ost->ost_tgt = tgt;
711         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
712              ! (tgt->obd_flags & OBD_SET_UP) ){
713                 CERROR("device not attached or not set up (%d)\n", 
714                        data->ioc_dev);
715                 EXIT;
716                 return -EINVAL;
717         } 
718
719         ost->ost_conn.oc_dev = tgt;
720         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
721         if (err) { 
722                 CERROR("lustre ost: fail to connect to device %d\n", 
723                        data->ioc_dev); 
724                 return -EINVAL;
725         }
726
727         INIT_LIST_HEAD(&ost->ost_reqs);
728         ost->ost_thread = NULL;
729         ost->ost_flags = 0;
730
731         spin_lock_init(&obddev->u.ost.ost_lock);
732
733         err = kportal_uuid_to_peer("self", &peer);
734         if (err == 0) {
735                 OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
736                 if (ost->ost_service == NULL)
737                         return -ENOMEM;
738                 ost->ost_service->srv_buf_size = 64 * 1024;
739                 ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
740                 memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
741                 ost->ost_service->srv_wait_queue = &ost->ost_waitq;
742
743                 rpc_register_service(ost->ost_service, "self");
744         }
745
746         ost_start_srv_thread(obddev);
747
748         MOD_INC_USE_COUNT;
749         EXIT; 
750         return 0;
751
752
753 static int ost_cleanup(struct obd_device * obddev)
754 {
755         struct ost_obd *ost = &obddev->u.ost;
756         struct obd_device *tgt;
757         int err;
758
759         ENTRY;
760
761         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
762                 EXIT;
763                 return 0;
764         }
765
766         if ( !list_empty(&obddev->obd_gen_clients) ) {
767                 CERROR("still has clients!\n");
768                 EXIT;
769                 return -EBUSY;
770         }
771
772         ost_stop_srv_thread(ost);
773         rpc_unregister_service(ost->ost_service);
774         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
775
776         if (!list_empty(&ost->ost_reqs)) {
777                 // XXX reply with errors and clean up
778                 CDEBUG(D_INODE, "Request list not empty!\n");
779         }
780
781         tgt = ost->ost_tgt;
782         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
783         if (err) { 
784                 CERROR("lustre ost: fail to disconnect device\n");
785                 return -EINVAL;
786         }
787         
788
789         MOD_DEC_USE_COUNT;
790         EXIT;
791         return 0;
792 }
793
794 /* use obd ops to offer management infrastructure */
795 static struct obd_ops ost_obd_ops = {
796         o_setup:       ost_setup,
797         o_cleanup:     ost_cleanup,
798 };
799
800 static int __init ost_init(void)
801 {
802         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
803         return 0;
804 }
805
806 static void __exit ost_exit(void)
807 {
808         obd_unregister_type(LUSTRE_OST_NAME);
809 }
810
811 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
812 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
813 MODULE_LICENSE("GPL");
814
815 // for testing (maybe this stays)
816 EXPORT_SYMBOL(ost_queue_req);
817
818 module_init(ost_init);
819 module_exit(ost_exit);