Whamcloud - gitweb
- New style waitqueues in ost and mds main
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
53
54 // for testing
55 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
56 {
57         struct ptlrpc_request *srv_req; 
58         struct ost_obd *ost = &obddev->u.ost;
59         
60         if (!ost) { 
61                 EXIT;
62                 return -1;
63         }
64
65         OBD_ALLOC(srv_req, sizeof(*srv_req));
66         if (!srv_req) { 
67                 EXIT;
68                 return -ENOMEM;
69         }
70
71         CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
72                __LINE__, ost, req, srv_req);
73
74         memset(srv_req, 0, sizeof(*req)); 
75
76         /* move the request buffer */
77         srv_req->rq_reqbuf = req->rq_reqbuf;
78         srv_req->rq_reqlen    = req->rq_reqlen;
79         srv_req->rq_ost = ost;
80
81         /* remember where it came from */
82         srv_req->rq_reply_handle = req;
83
84         spin_lock(&ost->ost_lock);
85         list_add(&srv_req->rq_list, &ost->ost_reqs); 
86         spin_unlock(&ost->ost_lock);
87         wake_up(&ost->ost_waitq);
88         return 0;
89 }
90
91 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
92 {
93         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
94
95         ENTRY;
96
97         if (req->rq_ost->ost_service != NULL) {
98                 /* This is a request that came from the network via portals. */
99
100                 /* FIXME: we need to increment the count of handled events */
101                 ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
102         } else {
103                 /* This is a local request that came from another thread. */
104
105                 /* move the reply to the client */ 
106                 clnt_req->rq_replen = req->rq_replen;
107                 clnt_req->rq_repbuf = req->rq_repbuf;
108                 req->rq_repbuf = NULL;
109                 req->rq_replen = 0;
110
111                 /* free the request buffer */
112                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
113                 req->rq_reqbuf = NULL;
114
115                 /* wake up the client */ 
116                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
117         }
118
119         EXIT;
120         return 0;
121 }
122
123 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
124 {
125         struct ptlrep_hdr *hdr;
126
127         ENTRY;
128
129         OBD_ALLOC(hdr, sizeof(*hdr));
130         if (!hdr) { 
131                 EXIT;
132                 return -ENOMEM;
133         }
134
135         memset(hdr, 0, sizeof(*hdr));
136         
137         hdr->seqno = req->rq_reqhdr->seqno;
138         hdr->status = req->rq_status; 
139         hdr->type = OST_TYPE_ERR;
140
141         req->rq_repbuf = (char *)hdr;
142         req->rq_replen = sizeof(*hdr); 
143
144         EXIT;
145         return ost_reply(obddev, req);
146 }
147
148 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
149 {
150         struct obd_conn conn; 
151         int rc;
152
153         ENTRY;
154         
155         conn.oc_id = req->rq_req.ost->connid;
156         conn.oc_dev = ost->ost_tgt;
157
158         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
159                           &req->rq_replen, &req->rq_repbuf); 
160         if (rc) { 
161                 CERROR("cannot pack reply\n"); 
162                 return rc;
163         }
164
165         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
166                 (&conn, &req->rq_req.ost->oa); 
167
168         EXIT;
169         return 0;
170 }
171
172 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
173 {
174         struct obd_conn conn; 
175         int rc;
176
177         ENTRY;
178         
179         conn.oc_id = req->rq_req.ost->connid;
180         conn.oc_dev = ost->ost_tgt;
181
182         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
183                           &req->rq_replen, &req->rq_repbuf); 
184         if (rc) { 
185                 CERROR("cannot pack reply\n"); 
186                 return rc;
187         }
188         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
189         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
190
191         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
192                 (&conn, &req->rq_rep.ost->oa); 
193
194         EXIT;
195         return 0;
196 }
197
198 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
199 {
200         struct obd_conn conn; 
201         int rc;
202
203         ENTRY;
204         
205         conn.oc_id = req->rq_req.ost->connid;
206         conn.oc_dev = ost->ost_tgt;
207
208         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
209                           &req->rq_replen, &req->rq_repbuf); 
210         if (rc) { 
211                 CERROR("cannot pack reply\n"); 
212                 return rc;
213         }
214
215         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
216
217         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
218                 (&conn, &req->rq_rep.ost->oa); 
219
220         EXIT;
221         return 0;
222 }
223
224 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
225 {
226         struct obd_conn conn; 
227         int rc;
228
229         ENTRY;
230         
231         conn.oc_id = req->rq_req.ost->connid;
232         conn.oc_dev = ost->ost_tgt;
233
234         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
235                           &req->rq_replen, &req->rq_repbuf); 
236         if (rc) { 
237                 CERROR("cannot pack reply\n"); 
238                 return rc;
239         }
240
241         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
242
243         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
244                 (&conn, &req->rq_rep.ost->oa, 
245                  req->rq_rep.ost->oa.o_size,
246                  req->rq_rep.ost->oa.o_blocks); 
247
248         EXIT;
249         return 0;
250 }
251
252
253 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
254 {
255         struct obd_conn conn; 
256         int rc;
257
258         ENTRY;
259         
260         conn.oc_id = req->rq_req.ost->connid;
261         conn.oc_dev = ost->ost_tgt;
262
263         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
264                           &req->rq_replen, &req->rq_repbuf); 
265         if (rc) { 
266                 CERROR("cannot pack reply\n"); 
267                 return rc;
268         }
269
270         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
271                sizeof(req->rq_req.ost->oa));
272
273         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
274                 (&conn, &req->rq_rep.ost->oa); 
275
276         EXIT;
277         return 0;
278 }
279
280 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
281 {
282         struct obd_conn conn; 
283         int rc;
284
285         ENTRY;
286         
287         conn.oc_dev = ost->ost_tgt;
288
289         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
290                           &req->rq_replen, &req->rq_repbuf); 
291         if (rc) { 
292                 CERROR("cannot pack reply\n"); 
293                 return rc;
294         }
295
296         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
297
298         CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
299                conn.oc_id);
300         req->rq_rep.ost->connid = conn.oc_id;
301         EXIT;
302         return 0;
303 }
304
305 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
306 {
307         struct obd_conn conn; 
308         int rc;
309
310         ENTRY;
311         
312         conn.oc_dev = ost->ost_tgt;
313         conn.oc_id = req->rq_req.ost->connid;
314
315         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
316                           &req->rq_replen, &req->rq_repbuf); 
317         if (rc) { 
318                 CERROR("cannot pack reply\n"); 
319                 return rc;
320         }
321
322         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
323
324         EXIT;
325         return 0;
326 }
327
328 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
329 {
330         struct obd_conn conn; 
331         int rc;
332         int vallen;
333         void *val;
334         char *ptr; 
335
336         ENTRY;
337         
338         conn.oc_id = req->rq_req.ost->connid;
339         conn.oc_dev = ost->ost_tgt;
340
341         ptr = ost_req_buf1(req->rq_req.ost);
342         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
343                 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); 
344
345         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
346                           &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); 
347         if (rc) { 
348                 CERROR("cannot pack reply\n"); 
349                 return rc;
350         }
351
352         EXIT;
353         return 0;
354 }
355
356 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
357 {
358         struct obd_conn conn; 
359         int rc;
360         int i, j;
361         int objcount, niocount;
362         char *tmp1, *tmp2, *end2;
363         char *res;
364         int cmd;
365         struct niobuf *nb, *src, *dst;
366         struct obd_ioobj *ioo;
367         struct ost_req *r = req->rq_req.ost;
368
369         ENTRY;
370         
371         tmp1 = ost_req_buf1(r);
372         tmp2 = ost_req_buf2(r);
373         end2 = tmp2 + req->rq_req.ost->buflen2;
374         objcount = r->buflen1 / sizeof(*ioo); 
375         niocount = r->buflen2 / sizeof(*nb); 
376         cmd = r->cmd;
377
378         conn.oc_id = req->rq_req.ost->connid;
379         conn.oc_dev = req->rq_ost->ost_tgt;
380
381         rc = ost_pack_rep(NULL, niocount, NULL, 0, 
382                           &req->rq_rephdr, &req->rq_rep.ost,
383                           &req->rq_replen, &req->rq_repbuf); 
384         if (rc) { 
385                 CERROR("cannot pack reply\n"); 
386                 return rc;
387         }
388         res = ost_rep_buf1(req->rq_rep.ost); 
389
390         for (i=0; i < objcount; i++) { 
391                 ost_unpack_ioo((void *)&tmp1, &ioo);
392                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
393                         rc = -EFAULT;
394                         break; 
395                 }
396                 for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { 
397                         ost_unpack_niobuf((void *)&tmp2, &nb); 
398                 }
399         }
400
401         /* The unpackers move tmp1 and tmp2, so reset them before using */
402         tmp1 = ost_req_buf1(r);
403         tmp2 = ost_req_buf2(r);
404         req->rq_rep.ost->result = 
405                 req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
406                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
407                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
408
409         if (req->rq_rep.ost->result) {
410                 EXIT;
411                 goto out;
412         }
413
414         if (cmd == OBD_BRW_WRITE) {
415                 for (i = 0; i < niocount; i++) {
416                         src = &((struct niobuf *)tmp2)[i];
417                         dst = &((struct niobuf *)res)[i];
418                         memcpy((void *)(unsigned long)dst->addr, 
419                                (void *)(unsigned long)src->addr, 
420                                src->len);
421                 }
422                 barrier();
423         } else { 
424                 for (i = 0; i < niocount; i++) {
425                         dst = &((struct niobuf *)tmp2)[i];
426                         src = &((struct niobuf *)res)[i];
427                         memcpy((void *)(unsigned long)dst->addr, 
428                                (void *)(unsigned long)src->addr, 
429                                PAGE_SIZE); 
430                 }
431                 barrier();
432         }
433
434         req->rq_rep.ost->result = 
435                 req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
436                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
437                  niocount, (struct niobuf *)res); 
438
439  out:
440         EXIT;
441         return 0;
442 }
443
444 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
445 {
446         int rc;
447         struct ost_obd *ost = &obddev->u.ost;
448         struct ptlreq_hdr *hdr;
449
450         ENTRY;
451         CDEBUG(0, "req at %p\n", req);
452
453         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
454         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
455                 CERROR("lustre_ost: wrong packet type sent %d\n",
456                        NTOH__u32(hdr->type));
457                 rc = -EINVAL;
458                 goto out;
459         }
460
461         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
462                             &req->rq_reqhdr, &req->rq_req.ost);
463         if (rc) { 
464                 CERROR("lustre_ost: Invalid request\n");
465                 EXIT; 
466                 goto out;
467         }
468
469         switch (req->rq_reqhdr->opc) { 
470
471         case OST_CONNECT:
472                 CDEBUG(D_INODE, "connect\n");
473                 rc = ost_connect(ost, req);
474                 break;
475         case OST_DISCONNECT:
476                 CDEBUG(D_INODE, "disconnect\n");
477                 rc = ost_disconnect(ost, req);
478                 break;
479         case OST_GET_INFO:
480                 CDEBUG(D_INODE, "get_info\n");
481                 rc = ost_get_info(ost, req);
482                 break;
483         case OST_CREATE:
484                 CDEBUG(D_INODE, "create\n");
485                 rc = ost_create(ost, req);
486                 break;
487         case OST_DESTROY:
488                 CDEBUG(D_INODE, "destroy\n");
489                 rc = ost_destroy(ost, req);
490                 break;
491         case OST_GETATTR:
492                 CDEBUG(D_INODE, "getattr\n");
493                 rc = ost_getattr(ost, req);
494                 break;
495         case OST_SETATTR:
496                 CDEBUG(D_INODE, "setattr\n");
497                 rc = ost_setattr(ost, req);
498                 break;
499         case OST_BRW:
500                 CDEBUG(D_INODE, "brw\n");
501                 rc = ost_brw(ost, req);
502                 break;
503         case OST_PUNCH:
504                 CDEBUG(D_INODE, "punch\n");
505                 rc = ost_punch(ost, req);
506                 break;
507         default:
508                 req->rq_status = -ENOTSUPP;
509                 return ost_error(obddev, req);
510         }
511
512 out:
513         req->rq_status = rc;
514         if (rc) { 
515                 CERROR("ost: processing error %d\n", rc);
516                 ost_error(obddev, req);
517         } else { 
518                 CDEBUG(D_INODE, "sending reply\n"); 
519                 ost_reply(obddev, req); 
520         }
521
522         return 0;
523 }
524
525 /* FIXME: Serious refactoring needed */
526 int ost_main(void *arg)
527 {
528         struct obd_device *obddev = (struct obd_device *) arg;
529         struct ost_obd *ost = &obddev->u.ost;
530         DECLARE_WAITQUEUE(wait, current);
531
532         ENTRY;
533
534         lock_kernel();
535         daemonize();
536         spin_lock_irq(&current->sigmask_lock);
537         sigfillset(&current->blocked);
538         recalc_sigpending(current);
539         spin_unlock_irq(&current->sigmask_lock);
540
541         sprintf(current->comm, "lustre_ost");
542
543         /* Record that the  thread is running */
544         ost->ost_thread = current;
545         wake_up(&ost->ost_done_waitq); 
546
547         /* XXX maintain a list of all managed devices: insert here */
548
549         /* And now, wait forever for commit wakeup events. */
550         while (1) {
551                 int rc; 
552
553                 if (ost->ost_flags & OST_EXIT)
554                         break;
555
556                 if (ost->ost_service != NULL) {
557                         ptl_event_t ev;
558                         struct ptlrpc_request request;
559                         struct ptlrpc_service *service;
560
561                         CDEBUG(D_IOCTL, "-- sleeping\n");
562                         add_wait_queue(&ost->ost_waitq, &wait);
563                         while (1) {
564                                 rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
565                                 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
566                                         break;
567
568                                 set_current_state(TASK_INTERRUPTIBLE);
569
570                                 /* if this process really wants to die,
571                                  * let it go */
572                                 if (sigismember(&(current->pending.signal),
573                                                 SIGKILL) ||
574                                     sigismember(&(current->pending.signal),
575                                                 SIGINT))
576                                         break;
577
578                                 schedule();
579                         }
580                         remove_wait_queue(&ost->ost_waitq, &wait);
581                         set_current_state(TASK_RUNNING);
582                         CDEBUG(D_IOCTL, "-- done\n");
583
584                         if (rc == PTL_EQ_EMPTY) {
585                                 /* We broke out because of a signal */
586                                 EXIT;
587                                 return -EINTR;
588                         }
589
590                         service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
591
592                         /* FIXME: If we move to an event-driven model,
593                          * we should put the request on the stack of
594                          * mds_handle instead. */
595                         memset(&request, 0, sizeof(request));
596                         request.rq_reqbuf = ev.mem_desc.start + ev.offset;
597                         request.rq_reqlen = ev.mem_desc.length;
598                         request.rq_ost = ost;
599                         request.rq_xid = ev.match_bits;
600
601                         request.rq_peer.peer_nid = ev.initiator.nid;
602                         /* FIXME: this NI should be the incoming NI.
603                          * We don't know how to find that from here. */
604                         request.rq_peer.peer_ni =
605                                 ost->ost_service->srv_self.peer_ni;
606                         rc = ost_handle(obddev, &request);
607
608                         /* Inform the rpc layer the event has been handled */
609                         ptl_received_rpc(service);
610                 } else {
611                         struct ptlrpc_request *request;
612
613                         CDEBUG(D_IOCTL, "-- sleeping\n");
614                         add_wait_queue(&ost->ost_waitq, &wait);
615                         while (1) {
616                                 spin_lock(&ost->ost_lock);
617                                 if (!list_empty(&ost->ost_reqs))
618                                         break;
619
620                                 set_current_state(TASK_INTERRUPTIBLE);
621
622                                 /* if this process really wants to die,
623                                  * let it go */
624                                 if (sigismember(&(current->pending.signal),
625                                                 SIGKILL) ||
626                                     sigismember(&(current->pending.signal),
627                                                 SIGINT))
628                                         break;
629
630                                 spin_unlock(&ost->ost_lock);
631
632                                 schedule();
633                         }
634                         remove_wait_queue(&ost->ost_waitq, &wait);
635                         set_current_state(TASK_RUNNING);
636                         CDEBUG(D_IOCTL, "-- done\n");
637
638                         if (list_empty(&ost->ost_reqs)) { 
639                                 CDEBUG(D_INODE, "woke because of signal\n");
640                                 spin_unlock(&ost->ost_req_lock);
641                         } else {
642                                 request = list_entry(ost->ost_reqs.next,
643                                                      struct ptlrpc_request,
644                                                      rq_list);
645                                 list_del(&request->rq_list);
646                                 spin_unlock(&ost->ost_req_lock);
647                                 rc = ost_handle(obddev, request); 
648                         }
649                 }
650         }
651
652         /* XXX maintain a list of all managed devices: cleanup here */
653
654         ost->ost_thread = NULL;
655         wake_up(&ost->ost_done_waitq);
656         CERROR("lustre_ost: exiting\n");
657         return 0;
658 }
659
660 static void ost_stop_srv_thread(struct ost_obd *ost)
661 {
662         ost->ost_flags |= OST_EXIT;
663
664         while (ost->ost_thread) {
665                 wake_up(&ost->ost_waitq);
666                 sleep_on(&ost->ost_done_waitq);
667         }
668 }
669
670 static void ost_start_srv_thread(struct obd_device *obd)
671 {
672         struct ost_obd *ost = &obd->u.ost;
673         ENTRY;
674
675         init_waitqueue_head(&ost->ost_waitq);
676         init_waitqueue_head(&ost->ost_done_waitq);
677         kernel_thread(ost_main, (void *)obd, 
678                       CLONE_VM | CLONE_FS | CLONE_FILES);
679         while (!ost->ost_thread) 
680                 sleep_on(&ost->ost_done_waitq);
681         EXIT;
682 }
683
684 /* mount the file system (secretly) */
685 static int ost_setup(struct obd_device *obddev, obd_count len,
686                         void *buf)
687                         
688 {
689         struct obd_ioctl_data* data = buf;
690         struct ost_obd *ost = &obddev->u.ost;
691         struct obd_device *tgt;
692         struct lustre_peer peer;
693         int err; 
694         ENTRY;
695
696         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
697                 EXIT;
698                 return -ENODEV;
699         }
700
701         tgt = &obd_dev[data->ioc_dev];
702         ost->ost_tgt = tgt;
703         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
704              ! (tgt->obd_flags & OBD_SET_UP) ){
705                 CERROR("device not attached or not set up (%d)\n", 
706                        data->ioc_dev);
707                 EXIT;
708                 return -EINVAL;
709         } 
710
711         ost->ost_conn.oc_dev = tgt;
712         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
713         if (err) { 
714                 CERROR("lustre ost: fail to connect to device %d\n", 
715                        data->ioc_dev); 
716                 return -EINVAL;
717         }
718
719         INIT_LIST_HEAD(&ost->ost_reqs);
720         ost->ost_thread = NULL;
721         ost->ost_flags = 0;
722
723         spin_lock_init(&obddev->u.ost.ost_lock);
724
725         err = kportal_uuid_to_peer("self", &peer);
726         if (err == 0) {
727                 OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
728                 if (ost->ost_service == NULL)
729                         return -ENOMEM;
730                 ost->ost_service->srv_buf_size = 64 * 1024;
731                 ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
732                 memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
733                 ost->ost_service->srv_wait_queue = &ost->ost_waitq;
734
735                 rpc_register_service(ost->ost_service, "self");
736         }
737
738         ost_start_srv_thread(obddev);
739
740         MOD_INC_USE_COUNT;
741         EXIT; 
742         return 0;
743
744
745 static int ost_cleanup(struct obd_device * obddev)
746 {
747         struct ost_obd *ost = &obddev->u.ost;
748         struct obd_device *tgt;
749         int err;
750
751         ENTRY;
752
753         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
754                 EXIT;
755                 return 0;
756         }
757
758         if ( !list_empty(&obddev->obd_gen_clients) ) {
759                 CERROR("still has clients!\n");
760                 EXIT;
761                 return -EBUSY;
762         }
763
764         ost_stop_srv_thread(ost);
765         rpc_unregister_service(ost->ost_service);
766         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
767
768         if (!list_empty(&ost->ost_reqs)) {
769                 // XXX reply with errors and clean up
770                 CDEBUG(D_INODE, "Request list not empty!\n");
771         }
772
773         tgt = ost->ost_tgt;
774         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
775         if (err) { 
776                 CERROR("lustre ost: fail to disconnect device\n");
777                 return -EINVAL;
778         }
779         
780
781         MOD_DEC_USE_COUNT;
782         EXIT;
783         return 0;
784 }
785
786 /* use obd ops to offer management infrastructure */
787 static struct obd_ops ost_obd_ops = {
788         o_setup:       ost_setup,
789         o_cleanup:     ost_cleanup,
790 };
791
792 static int __init ost_init(void)
793 {
794         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
795         return 0;
796 }
797
798 static void __exit ost_exit(void)
799 {
800         obd_unregister_type(LUSTRE_OST_NAME);
801 }
802
803 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
804 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
805 MODULE_LICENSE("GPL");
806
807 // for testing (maybe this stays)
808 EXPORT_SYMBOL(ost_queue_req);
809
810 module_init(ost_init);
811 module_exit(ost_exit);