Whamcloud - gitweb
- Small changes for packaging
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_ost.h>
47 #include <linux/lustre_net.h>
48
49 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
50 {
51         struct obd_conn conn; 
52         int rc;
53
54         ENTRY;
55         
56         conn.oc_id = req->rq_req.ost->connid;
57         conn.oc_dev = ost->ost_tgt;
58
59         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
60                           &req->rq_replen, &req->rq_repbuf); 
61         if (rc) { 
62                 CERROR("cannot pack reply\n"); 
63                 return rc;
64         }
65
66         req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); 
67
68         EXIT;
69         return 0;
70 }
71
72 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
73 {
74         struct obd_conn conn; 
75         int rc;
76
77         ENTRY;
78         
79         conn.oc_id = req->rq_req.ost->connid;
80         conn.oc_dev = ost->ost_tgt;
81
82         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
83                           &req->rq_replen, &req->rq_repbuf); 
84         if (rc) { 
85                 CERROR("cannot pack reply\n"); 
86                 return rc;
87         }
88         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
89         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
90
91         req->rq_rep.ost->result =  obd_getattr(&conn, &req->rq_rep.ost->oa); 
92
93         EXIT;
94         return 0;
95 }
96
97 static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
98 {
99         struct obd_conn conn; 
100         int rc;
101
102         ENTRY;
103         
104         conn.oc_id = req->rq_req.ost->connid;
105         conn.oc_dev = ost->ost_tgt;
106
107         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
108                           &req->rq_replen, &req->rq_repbuf); 
109         if (rc) { 
110                 CERROR("cannot pack reply\n"); 
111                 return rc;
112         }
113         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
114         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
115
116         req->rq_rep.ost->result =  obd_open(&conn, &req->rq_rep.ost->oa); 
117
118         EXIT;
119         return 0;
120 }
121
122 static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
123 {
124         struct obd_conn conn; 
125         int rc;
126
127         ENTRY;
128         
129         conn.oc_id = req->rq_req.ost->connid;
130         conn.oc_dev = ost->ost_tgt;
131
132         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
133                           &req->rq_replen, &req->rq_repbuf); 
134         if (rc) { 
135                 CERROR("cannot pack reply\n"); 
136                 return rc;
137         }
138         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
139         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
140
141         req->rq_rep.ost->result =  obd_close(&conn, &req->rq_rep.ost->oa); 
142
143         EXIT;
144         return 0;
145 }
146
147
148 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
149 {
150         struct obd_conn conn; 
151         int rc;
152
153         ENTRY;
154         
155         conn.oc_id = req->rq_req.ost->connid;
156         conn.oc_dev = ost->ost_tgt;
157
158         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
159                           &req->rq_replen, &req->rq_repbuf); 
160         if (rc) { 
161                 CERROR("cannot pack reply\n"); 
162                 return rc;
163         }
164
165         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
166                sizeof(req->rq_req.ost->oa));
167
168         req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); 
169
170         EXIT;
171         return 0;
172 }
173
174 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
175 {
176         struct obd_conn conn; 
177         int rc;
178
179         ENTRY;
180         
181         conn.oc_id = req->rq_req.ost->connid;
182         conn.oc_dev = ost->ost_tgt;
183
184         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
185                           &req->rq_replen, &req->rq_repbuf); 
186         if (rc) { 
187                 CERROR("cannot pack reply\n"); 
188                 return rc;
189         }
190
191         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
192                sizeof(req->rq_req.ost->oa));
193
194         req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, 
195                                             req->rq_rep.ost->oa.o_size,
196                                             req->rq_rep.ost->oa.o_blocks); 
197
198         EXIT;
199         return 0;
200 }
201
202
203 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
204 {
205         struct obd_conn conn; 
206         int rc;
207
208         ENTRY;
209         
210         conn.oc_id = req->rq_req.ost->connid;
211         conn.oc_dev = ost->ost_tgt;
212
213         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
214                           &req->rq_replen, &req->rq_repbuf); 
215         if (rc) { 
216                 CERROR("cannot pack reply\n"); 
217                 return rc;
218         }
219
220         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
221                sizeof(req->rq_req.ost->oa));
222
223         req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); 
224
225         EXIT;
226         return 0;
227 }
228
229 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
230 {
231         struct obd_conn conn; 
232         int rc;
233
234         ENTRY;
235         
236         conn.oc_dev = ost->ost_tgt;
237
238         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
239                           &req->rq_replen, &req->rq_repbuf); 
240         if (rc) { 
241                 CERROR("cannot pack reply\n"); 
242                 return rc;
243         }
244
245         req->rq_rep.ost->result = obd_connect(&conn);
246
247         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
248         req->rq_rep.ost->connid = conn.oc_id;
249         EXIT;
250         return 0;
251 }
252
253 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
254 {
255         struct obd_conn conn; 
256         int rc;
257
258         ENTRY;
259         
260         conn.oc_dev = ost->ost_tgt;
261         conn.oc_id = req->rq_req.ost->connid;
262
263         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
264                           &req->rq_replen, &req->rq_repbuf); 
265         if (rc) { 
266                 CERROR("cannot pack reply\n"); 
267                 return rc;
268         }
269         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
270         req->rq_rep.ost->result = obd_disconnect(&conn);
271
272         EXIT;
273         return 0;
274 }
275
276 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
277 {
278         struct obd_conn conn; 
279         int rc;
280         int vallen;
281         void *val;
282         char *ptr; 
283
284         ENTRY;
285         
286         conn.oc_id = req->rq_req.ost->connid;
287         conn.oc_dev = ost->ost_tgt;
288
289         ptr = ost_req_buf1(req->rq_req.ost);
290         req->rq_rep.ost->result = obd_get_info(&conn, 
291                                                req->rq_req.ost->buflen1, ptr, 
292                                                &vallen, &val); 
293
294         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
295                           &req->rq_rep, &req->rq_replen, &req->rq_repbuf); 
296         if (rc) { 
297                 CERROR("cannot pack reply\n"); 
298                 return rc;
299         }
300
301         EXIT;
302         return 0;
303 }
304
305 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
306 {
307         struct ptlrpc_bulk_desc **bulk_vec = NULL;
308         struct ptlrpc_bulk_desc *bulk = NULL;
309         struct obd_conn conn; 
310         int rc;
311         int i, j;
312         int objcount, niocount;
313         char *tmp1, *tmp2, *end2;
314         char *res = NULL;
315         int cmd;
316         struct niobuf *nb, *src, *dst;
317         struct obd_ioobj *ioo;
318         struct ost_req *r = req->rq_req.ost;
319
320         ENTRY;
321         
322         tmp1 = ost_req_buf1(r);
323         tmp2 = ost_req_buf2(r);
324         end2 = tmp2 + req->rq_req.ost->buflen2;
325         objcount = r->buflen1 / sizeof(*ioo); 
326         niocount = r->buflen2 / sizeof(*nb); 
327         cmd = r->cmd;
328
329         conn.oc_id = req->rq_req.ost->connid;
330         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
331
332         for (i = 0; i < objcount; i++) {
333                 ost_unpack_ioo((void *)&tmp1, &ioo);
334                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
335                         BUG();
336                         rc = -EFAULT;
337                         break; 
338                 }
339                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
340                         ost_unpack_niobuf((void *)&tmp2, &nb); 
341                 }
342         }
343
344         rc = ost_pack_rep(NULL, 0, NULL, 0,
345                           &req->rq_rephdr, &req->rq_rep,
346                           &req->rq_replen, &req->rq_repbuf);
347         if (rc) {
348                 CERROR("cannot pack reply\n"); 
349                 return rc;
350         }
351         OBD_ALLOC(res, sizeof(struct niobuf) * niocount);
352         if (res == NULL) {
353                 EXIT;
354                 return -ENOMEM;
355         }
356
357         /* The unpackers move tmp1 and tmp2, so reset them before using */
358         tmp1 = ost_req_buf1(r);
359         tmp2 = ost_req_buf2(r);
360         req->rq_rep.ost->result = obd_preprw
361                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
362                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
363
364         if (req->rq_rep.ost->result) {
365                 EXIT;
366                 goto out;
367         }
368
369         for (i = 0; i < niocount; i++) {
370                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
371                 if (bulk == NULL) {
372                         CERROR("cannot alloc bulk desc\n");
373                         rc = -ENOMEM;
374                         goto out;
375                 }
376
377                 src = &((struct niobuf *)res)[i];
378                 dst = &((struct niobuf *)tmp2)[i];
379                 bulk->b_xid = dst->xid;
380                 bulk->b_buf = (void *)(unsigned long)src->addr;
381                 bulk->b_buflen = PAGE_SIZE;
382                 rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
383                 if (rc) {
384                         EXIT;
385                         goto out;
386                 }
387                 wait_event_interruptible(bulk->b_waitq,
388                                          ptlrpc_check_bulk_sent(bulk));
389
390                 if (bulk->b_flags == PTL_RPC_INTR) {
391                         EXIT;
392                         goto out;
393                 }
394
395                 OBD_FREE(bulk, sizeof(*bulk));
396                 bulk = NULL;
397         }
398
399 #if 0
400         /* Local delivery */
401         dst = &((struct niobuf *)tmp2)[i];
402         memcpy((void *)(unsigned long)dst->addr,
403                (void *)(unsigned long)src->addr, PAGE_SIZE);
404 #endif
405         barrier();
406
407         /* The unpackers move tmp1 and tmp2, so reset them before using */
408         tmp1 = ost_req_buf1(r);
409         tmp2 = ost_req_buf2(r);
410         req->rq_rep.ost->result = obd_commitrw
411                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
412                  niocount, (struct niobuf *)res);
413
414  out:
415         if (res != NULL)
416                 OBD_FREE(res, sizeof(struct niobuf) * niocount);
417         if (bulk != NULL)
418                 OBD_FREE(bulk, sizeof(*bulk));
419         if (bulk_vec != NULL) {
420                 for (i = 0; i < niocount; i++) {
421                         if (bulk_vec[i] != NULL)
422                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
423                 }
424                 OBD_FREE(bulk_vec,
425                          niocount * sizeof(struct ptlrpc_bulk_desc *));
426         }
427
428         EXIT;
429         return 0;
430 }
431
432 static int ost_commit_page(struct obd_conn *conn, struct page *page)
433 {
434         struct obd_ioobj obj;
435         struct niobuf buf;
436         int rc;
437         ENTRY;
438
439         memset(&buf, 0, sizeof(buf));
440         memset(&obj, 0, sizeof(obj));
441
442         buf.page = page;
443         obj.ioo_bufcnt = 1;
444         
445         rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf); 
446         EXIT;
447         return rc;
448 }
449
450 static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data)
451 {
452         int rc;
453
454         ENTRY;
455
456         rc = ost_commit_page(&bulk->b_conn, bulk->b_page);
457         if (rc)
458                 CERROR("ost_commit_page failed: %d\n", rc);
459
460         EXIT;
461         return rc;
462 }
463
464 int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
465 {
466         struct obd_conn conn; 
467         int rc;
468         int i, j;
469         int objcount, niocount;
470         char *tmp1, *tmp2, *end2;
471         char *res;
472         int cmd;
473         struct niobuf *nb, *dst;
474         struct obd_ioobj *ioo;
475         struct ost_req *r = req->rq_req.ost;
476
477         ENTRY;
478         
479         tmp1 = ost_req_buf1(r);
480         tmp2 = ost_req_buf2(r);
481         end2 = tmp2 + req->rq_req.ost->buflen2;
482         objcount = r->buflen1 / sizeof(*ioo); 
483         niocount = r->buflen2 / sizeof(*nb); 
484         cmd = r->cmd;
485
486         conn.oc_id = req->rq_req.ost->connid;
487         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
488
489         for (i = 0; i < objcount; i++) {
490                 ost_unpack_ioo((void *)&tmp1, &ioo);
491                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
492                         rc = -EFAULT;
493                         break; 
494                 }
495                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
496                         ost_unpack_niobuf((void *)&tmp2, &nb); 
497                 }
498         }
499
500         rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
501                           &req->rq_rephdr, &req->rq_rep,
502                           &req->rq_replen, &req->rq_repbuf);
503         if (rc) { 
504                 CERROR("cannot pack reply\n"); 
505                 return rc;
506         }
507         res = ost_rep_buf2(req->rq_rep.ost);
508
509         /* The unpackers move tmp1 and tmp2, so reset them before using */
510         tmp1 = ost_req_buf1(r);
511         tmp2 = ost_req_buf2(r);
512         req->rq_rep.ost->result = obd_preprw
513                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
514                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
515
516         if (req->rq_rep.ost->result) {
517                 EXIT;
518                 goto out;
519         }
520
521         for (i = 0; i < niocount; i++) {
522                 struct ptlrpc_bulk_desc *bulk;
523                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
524
525                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
526                 if (bulk == NULL) {
527                         CERROR("cannot alloc bulk desc\n");
528                         rc = -ENOMEM;
529                         goto out;
530                 }
531
532                 spin_lock(&srv->srv_lock);
533                 bulk->b_xid = srv->srv_xid++;
534                 spin_unlock(&srv->srv_lock);
535
536                 dst = &((struct niobuf *)res)[i];
537                 dst->xid = HTON__u32(bulk->b_xid);
538
539                 bulk->b_buf = (void *)(unsigned long)dst->addr;
540                 bulk->b_cb = ost_brw_write_cb;
541                 bulk->b_page = dst->page;
542                 memcpy(&(bulk->b_conn), &conn, sizeof(conn));
543                 bulk->b_buflen = PAGE_SIZE;
544                 bulk->b_portal = OSC_BULK_PORTAL;
545                 rc = ptlrpc_register_bulk(bulk);
546                 if (rc)
547                         goto out;
548
549 #if 0
550                 /* Local delivery */
551                 src = &((struct niobuf *)tmp2)[i];
552                 memcpy((void *)(unsigned long)dst->addr,
553                        (void *)(unsigned long)src->addr, src->len);
554 #endif
555         }
556         barrier();
557
558  out:
559         EXIT;
560         return 0;
561 }
562
563 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
564 {
565         struct ost_req *r = req->rq_req.ost;
566         int cmd = r->cmd;
567
568         if (cmd == OBD_BRW_READ)
569                 return ost_brw_read(obddev, req);
570         else
571                 return ost_brw_write(obddev, req);
572 }
573
574 static int ost_handle(struct obd_device *obddev, 
575                struct ptlrpc_service *svc, 
576                struct ptlrpc_request *req)
577 {
578         int rc;
579         struct ost_obd *ost = &obddev->u.ost;
580         struct ptlreq_hdr *hdr;
581
582         ENTRY;
583
584         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
585         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
586                 CERROR("lustre_ost: wrong packet type sent %d\n",
587                        NTOH__u32(hdr->type));
588                 rc = -EINVAL;
589                 goto out;
590         }
591
592         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
593                             &req->rq_reqhdr, &req->rq_req);
594         if (rc) { 
595                 CERROR("lustre_ost: Invalid request\n");
596                 EXIT; 
597                 goto out;
598         }
599
600         switch (req->rq_reqhdr->opc) { 
601
602         case OST_CONNECT:
603                 CDEBUG(D_INODE, "connect\n");
604                 rc = ost_connect(ost, req);
605                 break;
606         case OST_DISCONNECT:
607                 CDEBUG(D_INODE, "disconnect\n");
608                 rc = ost_disconnect(ost, req);
609                 break;
610         case OST_GET_INFO:
611                 CDEBUG(D_INODE, "get_info\n");
612                 rc = ost_get_info(ost, req);
613                 break;
614         case OST_CREATE:
615                 CDEBUG(D_INODE, "create\n");
616                 rc = ost_create(ost, req);
617                 break;
618         case OST_DESTROY:
619                 CDEBUG(D_INODE, "destroy\n");
620                 rc = ost_destroy(ost, req);
621                 break;
622         case OST_GETATTR:
623                 CDEBUG(D_INODE, "getattr\n");
624                 rc = ost_getattr(ost, req);
625                 break;
626         case OST_SETATTR:
627                 CDEBUG(D_INODE, "setattr\n");
628                 rc = ost_setattr(ost, req);
629                 break;
630         case OST_OPEN:
631                 CDEBUG(D_INODE, "setattr\n");
632                 rc = ost_open(ost, req);
633                 break;
634         case OST_CLOSE:
635                 CDEBUG(D_INODE, "setattr\n");
636                 rc = ost_close(ost, req);
637                 break;
638         case OST_BRW:
639                 CDEBUG(D_INODE, "brw\n");
640                 rc = ost_brw(ost, req);
641                 break;
642         case OST_PUNCH:
643                 CDEBUG(D_INODE, "punch\n");
644                 rc = ost_punch(ost, req);
645                 break;
646         default:
647                 req->rq_status = -ENOTSUPP;
648                 return ptlrpc_error(obddev, svc, req);
649         }
650
651 out:
652         req->rq_status = rc;
653         if (rc) { 
654                 CERROR("ost: processing error %d\n", rc);
655                 ptlrpc_error(obddev, svc, req);
656         } else { 
657                 CDEBUG(D_INODE, "sending reply\n"); 
658                 ptlrpc_reply(obddev, svc, req); 
659         }
660
661         return 0;
662 }
663
664
665 /* mount the file system (secretly) */
666 static int ost_setup(struct obd_device *obddev, obd_count len,
667                         void *buf)
668                         
669 {
670         struct obd_ioctl_data* data = buf;
671         struct ost_obd *ost = &obddev->u.ost;
672         struct obd_device *tgt;
673         int err; 
674         ENTRY;
675
676         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
677                 EXIT;
678                 return -ENODEV;
679         }
680
681         tgt = &obd_dev[data->ioc_dev];
682         ost->ost_tgt = tgt;
683         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
684              ! (tgt->obd_flags & OBD_SET_UP) ){
685                 CERROR("device not attached or not set up (%d)\n", 
686                        data->ioc_dev);
687                 EXIT;
688                 return -EINVAL;
689         } 
690
691         ost->ost_conn.oc_dev = tgt;
692         err = obd_connect(&ost->ost_conn);
693         if (err) { 
694                 CERROR("fail to connect to device %d\n", data->ioc_dev); 
695                 return -EINVAL;
696         }
697
698         ost->ost_service = ptlrpc_init_svc( 64 * 1024, 
699                                             OST_REQUEST_PORTAL,
700                                             OSC_REPLY_PORTAL,
701                                             "self", 
702                                             ost_unpack_req,
703                                             ost_pack_rep,
704                                             ost_handle);
705         if (!ost->ost_service) { 
706                 obd_disconnect(&ost->ost_conn); 
707                 return -EINVAL;
708         }
709                                             
710         rpc_register_service(ost->ost_service, "self");
711
712         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); 
713         if (err) { 
714                 obd_disconnect(&ost->ost_conn); 
715                 return -EINVAL;
716         }
717                 
718         MOD_INC_USE_COUNT;
719         EXIT; 
720         return 0;
721
722
723 static int ost_cleanup(struct obd_device * obddev)
724 {
725         struct ost_obd *ost = &obddev->u.ost;
726         int err;
727
728         ENTRY;
729
730         if ( !list_empty(&obddev->obd_gen_clients) ) {
731                 CERROR("still has clients!\n");
732                 EXIT;
733                 return -EBUSY;
734         }
735
736         ptlrpc_stop_thread(ost->ost_service);
737         rpc_unregister_service(ost->ost_service);
738
739         if (!list_empty(&ost->ost_service->srv_reqs)) {
740                 // XXX reply with errors and clean up
741                 CERROR("Request list not empty!\n");
742         }
743         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
744
745         err = obd_disconnect(&ost->ost_conn);
746         if (err) { 
747                 CERROR("lustre ost: fail to disconnect device\n");
748                 return -EINVAL;
749         }
750
751         MOD_DEC_USE_COUNT;
752         EXIT;
753         return 0;
754 }
755
756 /* use obd ops to offer management infrastructure */
757 static struct obd_ops ost_obd_ops = {
758         o_setup:       ost_setup,
759         o_cleanup:     ost_cleanup,
760 };
761
762 static int __init ost_init(void)
763 {
764         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
765         return 0;
766 }
767
768 static void __exit ost_exit(void)
769 {
770         obd_unregister_type(LUSTRE_OST_NAME);
771 }
772
773 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
774 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
775 MODULE_LICENSE("GPL");
776
777 module_init(ost_init);
778 module_exit(ost_exit);