Whamcloud - gitweb
We touched everything, but it's not as scary as it looks.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_ost.h>
47 #include <linux/lustre_net.h>
48
49 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
50 {
51         struct obd_conn conn; 
52         int rc;
53
54         ENTRY;
55         
56         conn.oc_id = req->rq_req.ost->connid;
57         conn.oc_dev = ost->ost_tgt;
58
59         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
60                           &req->rq_replen, &req->rq_repbuf); 
61         if (rc) { 
62                 CERROR("cannot pack reply\n"); 
63                 return rc;
64         }
65
66         req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); 
67
68         EXIT;
69         return 0;
70 }
71
72 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
73 {
74         struct obd_conn conn; 
75         int rc;
76
77         ENTRY;
78         
79         conn.oc_id = req->rq_req.ost->connid;
80         conn.oc_dev = ost->ost_tgt;
81
82         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
83                           &req->rq_replen, &req->rq_repbuf); 
84         if (rc) { 
85                 CERROR("cannot pack reply\n"); 
86                 return rc;
87         }
88         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
89         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
90
91         req->rq_rep.ost->result =  obd_getattr(&conn, &req->rq_rep.ost->oa); 
92
93         EXIT;
94         return 0;
95 }
96
97 static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
98 {
99         struct obd_conn conn; 
100         int rc;
101
102         ENTRY;
103         
104         conn.oc_id = req->rq_req.ost->connid;
105         conn.oc_dev = ost->ost_tgt;
106
107         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
108                           &req->rq_replen, &req->rq_repbuf); 
109         if (rc) { 
110                 CERROR("cannot pack reply\n"); 
111                 return rc;
112         }
113         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
114         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
115
116         req->rq_rep.ost->result =  obd_open(&conn, &req->rq_rep.ost->oa); 
117
118         EXIT;
119         return 0;
120 }
121
122 static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
123 {
124         struct obd_conn conn; 
125         int rc;
126
127         ENTRY;
128         
129         conn.oc_id = req->rq_req.ost->connid;
130         conn.oc_dev = ost->ost_tgt;
131
132         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
133                           &req->rq_replen, &req->rq_repbuf); 
134         if (rc) { 
135                 CERROR("cannot pack reply\n"); 
136                 return rc;
137         }
138         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
139         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
140
141         req->rq_rep.ost->result =  obd_close(&conn, &req->rq_rep.ost->oa); 
142
143         EXIT;
144         return 0;
145 }
146
147
148 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
149 {
150         struct obd_conn conn; 
151         int rc;
152
153         ENTRY;
154         
155         conn.oc_id = req->rq_req.ost->connid;
156         conn.oc_dev = ost->ost_tgt;
157
158         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
159                           &req->rq_replen, &req->rq_repbuf); 
160         if (rc) { 
161                 CERROR("cannot pack reply\n"); 
162                 return rc;
163         }
164
165         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
166                sizeof(req->rq_req.ost->oa));
167
168         req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); 
169
170         EXIT;
171         return 0;
172 }
173
174 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
175 {
176         struct obd_conn conn; 
177         int rc;
178
179         ENTRY;
180         
181         conn.oc_id = req->rq_req.ost->connid;
182         conn.oc_dev = ost->ost_tgt;
183
184         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
185                           &req->rq_replen, &req->rq_repbuf); 
186         if (rc) { 
187                 CERROR("cannot pack reply\n"); 
188                 return rc;
189         }
190
191         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
192                sizeof(req->rq_req.ost->oa));
193
194         req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, 
195                                             req->rq_rep.ost->oa.o_size,
196                                             req->rq_rep.ost->oa.o_blocks); 
197
198         EXIT;
199         return 0;
200 }
201
202
203 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
204 {
205         struct obd_conn conn; 
206         int rc;
207
208         ENTRY;
209         
210         conn.oc_id = req->rq_req.ost->connid;
211         conn.oc_dev = ost->ost_tgt;
212
213         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
214                           &req->rq_replen, &req->rq_repbuf); 
215         if (rc) { 
216                 CERROR("cannot pack reply\n"); 
217                 return rc;
218         }
219
220         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
221                sizeof(req->rq_req.ost->oa));
222
223         req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); 
224
225         EXIT;
226         return 0;
227 }
228
229 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
230 {
231         struct obd_conn conn; 
232         int rc;
233
234         ENTRY;
235         
236         conn.oc_dev = ost->ost_tgt;
237
238         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
239                           &req->rq_replen, &req->rq_repbuf); 
240         if (rc) { 
241                 CERROR("cannot pack reply\n"); 
242                 return rc;
243         }
244
245         req->rq_rep.ost->result = obd_connect(&conn);
246
247         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
248         req->rq_rep.ost->connid = conn.oc_id;
249         EXIT;
250         return 0;
251 }
252
253 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
254 {
255         struct obd_conn conn; 
256         int rc;
257
258         ENTRY;
259         
260         conn.oc_dev = ost->ost_tgt;
261         conn.oc_id = req->rq_req.ost->connid;
262
263         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
264                           &req->rq_replen, &req->rq_repbuf); 
265         if (rc) { 
266                 CERROR("cannot pack reply\n"); 
267                 return rc;
268         }
269         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
270         req->rq_rep.ost->result = obd_disconnect(&conn);
271
272         EXIT;
273         return 0;
274 }
275
276 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
277 {
278         struct obd_conn conn; 
279         int rc;
280         int vallen;
281         void *val;
282         char *ptr; 
283
284         ENTRY;
285         
286         conn.oc_id = req->rq_req.ost->connid;
287         conn.oc_dev = ost->ost_tgt;
288
289         ptr = ost_req_buf1(req->rq_req.ost);
290         req->rq_rep.ost->result = obd_get_info(&conn, 
291                                                req->rq_req.ost->buflen1, ptr, 
292                                                &vallen, &val); 
293
294         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
295                           &req->rq_rep, &req->rq_replen, &req->rq_repbuf); 
296         if (rc) { 
297                 CERROR("cannot pack reply\n"); 
298                 return rc;
299         }
300
301         EXIT;
302         return 0;
303 }
304
305 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
306 {
307         struct ptlrpc_bulk_desc **bulk_vec = NULL;
308         struct ptlrpc_bulk_desc *bulk = NULL;
309         struct obd_conn conn; 
310         int rc;
311         int i, j;
312         int objcount, niocount;
313         char *tmp1, *tmp2, *end2;
314         char *res = NULL;
315         int cmd;
316         struct niobuf *nb, *src, *dst;
317         struct obd_ioobj *ioo;
318         struct ost_req *r = req->rq_req.ost;
319
320         ENTRY;
321         
322         tmp1 = ost_req_buf1(r);
323         tmp2 = ost_req_buf2(r);
324         end2 = tmp2 + req->rq_req.ost->buflen2;
325         objcount = r->buflen1 / sizeof(*ioo); 
326         niocount = r->buflen2 / sizeof(*nb); 
327         cmd = r->cmd;
328
329         conn.oc_id = req->rq_req.ost->connid;
330         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
331
332         for (i = 0; i < objcount; i++) {
333                 ost_unpack_ioo((void *)&tmp1, &ioo);
334                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
335                         rc = -EFAULT;
336                         break; 
337                 }
338                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
339                         ost_unpack_niobuf((void *)&tmp2, &nb); 
340                 }
341         }
342
343         rc = ost_pack_rep(NULL, 0, NULL, 0,
344                           &req->rq_rephdr, &req->rq_rep,
345                           &req->rq_replen, &req->rq_repbuf);
346         if (rc) {
347                 CERROR("cannot pack reply\n"); 
348                 return rc;
349         }
350         OBD_ALLOC(res, sizeof(struct niobuf) * niocount);
351         if (res == NULL) {
352                 EXIT;
353                 return -ENOMEM;
354         }
355
356         /* The unpackers move tmp1 and tmp2, so reset them before using */
357         tmp1 = ost_req_buf1(r);
358         tmp2 = ost_req_buf2(r);
359         req->rq_rep.ost->result = obd_preprw
360                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
361                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
362
363         if (req->rq_rep.ost->result) {
364                 EXIT;
365                 goto out;
366         }
367
368         for (i = 0; i < niocount; i++) {
369                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
370                 if (bulk == NULL) {
371                         CERROR("cannot alloc bulk desc\n");
372                         rc = -ENOMEM;
373                         goto out;
374                 }
375
376                 src = &((struct niobuf *)res)[i];
377                 dst = &((struct niobuf *)tmp2)[i];
378                 bulk->b_xid = dst->xid;
379                 bulk->b_buf = (void *)(unsigned long)src->addr;
380                 bulk->b_buflen = PAGE_SIZE;
381                 rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
382                 if (rc) {
383                         EXIT;
384                         goto out;
385                 }
386                 wait_event_interruptible(bulk->b_waitq,
387                                          ptlrpc_check_bulk_sent(bulk));
388
389                 if (bulk->b_flags == PTL_RPC_INTR) {
390                         EXIT;
391                         goto out;
392                 }
393
394                 OBD_FREE(bulk, sizeof(*bulk));
395                 bulk = NULL;
396         }
397
398 #if 0
399         /* Local delivery */
400         dst = &((struct niobuf *)tmp2)[i];
401         memcpy((void *)(unsigned long)dst->addr,
402                (void *)(unsigned long)src->addr, PAGE_SIZE);
403 #endif
404         barrier();
405
406         /* The unpackers move tmp1 and tmp2, so reset them before using */
407         tmp1 = ost_req_buf1(r);
408         tmp2 = ost_req_buf2(r);
409         req->rq_rep.ost->result = obd_commitrw
410                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
411                  niocount, (struct niobuf *)res);
412
413  out:
414         if (res != NULL)
415                 OBD_FREE(res, sizeof(struct niobuf) * niocount);
416         if (bulk != NULL)
417                 OBD_FREE(bulk, sizeof(*bulk));
418         if (bulk_vec != NULL) {
419                 for (i = 0; i < niocount; i++) {
420                         if (bulk_vec[i] != NULL)
421                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
422                 }
423                 OBD_FREE(bulk_vec,
424                          niocount * sizeof(struct ptlrpc_bulk_desc *));
425         }
426
427         EXIT;
428         return 0;
429 }
430
431 static int ost_commit_page(struct obd_conn *conn, struct page *page)
432 {
433         struct obd_ioobj obj;
434         struct niobuf buf;
435         int rc;
436         ENTRY;
437
438         memset(&buf, 0, sizeof(buf));
439         memset(&obj, 0, sizeof(obj));
440
441         buf.page = page;
442         obj.ioo_bufcnt = 1;
443         
444         rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf); 
445         EXIT;
446         return rc;
447 }
448
449 static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data)
450 {
451         int rc;
452
453         ENTRY;
454
455         rc = ost_commit_page(&bulk->b_conn, bulk->b_page);
456         if (rc)
457                 CERROR("ost_commit_page failed: %d\n", rc);
458
459         EXIT;
460         return rc;
461 }
462
463 int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
464 {
465         struct obd_conn conn; 
466         int rc;
467         int i, j;
468         int objcount, niocount;
469         char *tmp1, *tmp2, *end2;
470         char *res;
471         int cmd;
472         struct niobuf *nb, *dst;
473         struct obd_ioobj *ioo;
474         struct ost_req *r = req->rq_req.ost;
475
476         ENTRY;
477         
478         tmp1 = ost_req_buf1(r);
479         tmp2 = ost_req_buf2(r);
480         end2 = tmp2 + req->rq_req.ost->buflen2;
481         objcount = r->buflen1 / sizeof(*ioo); 
482         niocount = r->buflen2 / sizeof(*nb); 
483         cmd = r->cmd;
484
485         conn.oc_id = req->rq_req.ost->connid;
486         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
487
488         for (i = 0; i < objcount; i++) {
489                 ost_unpack_ioo((void *)&tmp1, &ioo);
490                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
491                         rc = -EFAULT;
492                         break; 
493                 }
494                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
495                         ost_unpack_niobuf((void *)&tmp2, &nb); 
496                 }
497         }
498
499         rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
500                           &req->rq_rephdr, &req->rq_rep,
501                           &req->rq_replen, &req->rq_repbuf);
502         if (rc) { 
503                 CERROR("cannot pack reply\n"); 
504                 return rc;
505         }
506         res = ost_rep_buf2(req->rq_rep.ost);
507
508         /* The unpackers move tmp1 and tmp2, so reset them before using */
509         tmp1 = ost_req_buf1(r);
510         tmp2 = ost_req_buf2(r);
511         req->rq_rep.ost->result = obd_preprw
512                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
513                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
514
515         if (req->rq_rep.ost->result) {
516                 EXIT;
517                 goto out;
518         }
519
520         for (i = 0; i < niocount; i++) {
521                 struct ptlrpc_bulk_desc *bulk;
522                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
523
524                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
525                 if (bulk == NULL) {
526                         CERROR("cannot alloc bulk desc\n");
527                         rc = -ENOMEM;
528                         goto out;
529                 }
530
531                 spin_lock(&srv->srv_lock);
532                 bulk->b_xid = srv->srv_xid++;
533                 spin_unlock(&srv->srv_lock);
534
535                 dst = &((struct niobuf *)res)[i];
536                 dst->xid = HTON__u32(bulk->b_xid);
537
538                 bulk->b_buf = (void *)(unsigned long)dst->addr;
539                 bulk->b_cb = ost_brw_write_cb;
540                 bulk->b_page = dst->page;
541                 memcpy(&(bulk->b_conn), &conn, sizeof(conn));
542                 bulk->b_buflen = PAGE_SIZE;
543                 bulk->b_portal = OSC_BULK_PORTAL;
544                 rc = ptlrpc_register_bulk(bulk);
545                 if (rc)
546                         goto out;
547
548 #if 0
549                 /* Local delivery */
550                 src = &((struct niobuf *)tmp2)[i];
551                 memcpy((void *)(unsigned long)dst->addr,
552                        (void *)(unsigned long)src->addr, src->len);
553 #endif
554         }
555         barrier();
556
557  out:
558         EXIT;
559         return 0;
560 }
561
562 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
563 {
564         struct ost_req *r = req->rq_req.ost;
565         int cmd = r->cmd;
566
567         if (cmd == OBD_BRW_READ)
568                 return ost_brw_read(obddev, req);
569         else
570                 return ost_brw_write(obddev, req);
571 }
572
573 static int ost_handle(struct obd_device *obddev, 
574                struct ptlrpc_service *svc, 
575                struct ptlrpc_request *req)
576 {
577         int rc;
578         struct ost_obd *ost = &obddev->u.ost;
579         struct ptlreq_hdr *hdr;
580
581         ENTRY;
582
583         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
584         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
585                 CERROR("lustre_ost: wrong packet type sent %d\n",
586                        NTOH__u32(hdr->type));
587                 rc = -EINVAL;
588                 goto out;
589         }
590
591         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
592                             &req->rq_reqhdr, &req->rq_req);
593         if (rc) { 
594                 CERROR("lustre_ost: Invalid request\n");
595                 EXIT; 
596                 goto out;
597         }
598
599         switch (req->rq_reqhdr->opc) { 
600
601         case OST_CONNECT:
602                 CDEBUG(D_INODE, "connect\n");
603                 rc = ost_connect(ost, req);
604                 break;
605         case OST_DISCONNECT:
606                 CDEBUG(D_INODE, "disconnect\n");
607                 rc = ost_disconnect(ost, req);
608                 break;
609         case OST_GET_INFO:
610                 CDEBUG(D_INODE, "get_info\n");
611                 rc = ost_get_info(ost, req);
612                 break;
613         case OST_CREATE:
614                 CDEBUG(D_INODE, "create\n");
615                 rc = ost_create(ost, req);
616                 break;
617         case OST_DESTROY:
618                 CDEBUG(D_INODE, "destroy\n");
619                 rc = ost_destroy(ost, req);
620                 break;
621         case OST_GETATTR:
622                 CDEBUG(D_INODE, "getattr\n");
623                 rc = ost_getattr(ost, req);
624                 break;
625         case OST_SETATTR:
626                 CDEBUG(D_INODE, "setattr\n");
627                 rc = ost_setattr(ost, req);
628                 break;
629         case OST_OPEN:
630                 CDEBUG(D_INODE, "setattr\n");
631                 rc = ost_open(ost, req);
632                 break;
633         case OST_CLOSE:
634                 CDEBUG(D_INODE, "setattr\n");
635                 rc = ost_close(ost, req);
636                 break;
637         case OST_BRW:
638                 CDEBUG(D_INODE, "brw\n");
639                 rc = ost_brw(ost, req);
640                 break;
641         case OST_PUNCH:
642                 CDEBUG(D_INODE, "punch\n");
643                 rc = ost_punch(ost, req);
644                 break;
645         default:
646                 req->rq_status = -ENOTSUPP;
647                 return ptlrpc_error(obddev, svc, req);
648         }
649
650 out:
651         req->rq_status = rc;
652         if (rc) { 
653                 CERROR("ost: processing error %d\n", rc);
654                 ptlrpc_error(obddev, svc, req);
655         } else { 
656                 CDEBUG(D_INODE, "sending reply\n"); 
657                 ptlrpc_reply(obddev, svc, req); 
658         }
659
660         return 0;
661 }
662
663
664 /* mount the file system (secretly) */
665 static int ost_setup(struct obd_device *obddev, obd_count len,
666                         void *buf)
667                         
668 {
669         struct obd_ioctl_data* data = buf;
670         struct ost_obd *ost = &obddev->u.ost;
671         struct obd_device *tgt;
672         int err; 
673         ENTRY;
674
675         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
676                 EXIT;
677                 return -ENODEV;
678         }
679
680         tgt = &obd_dev[data->ioc_dev];
681         ost->ost_tgt = tgt;
682         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
683              ! (tgt->obd_flags & OBD_SET_UP) ){
684                 CERROR("device not attached or not set up (%d)\n", 
685                        data->ioc_dev);
686                 EXIT;
687                 return -EINVAL;
688         } 
689
690         ost->ost_conn.oc_dev = tgt;
691         err = obd_connect(&ost->ost_conn);
692         if (err) { 
693                 CERROR("fail to connect to device %d\n", data->ioc_dev); 
694                 return -EINVAL;
695         }
696
697         ost->ost_service = ptlrpc_init_svc( 64 * 1024, 
698                                             OST_REQUEST_PORTAL,
699                                             OSC_REPLY_PORTAL,
700                                             "self", 
701                                             ost_unpack_req,
702                                             ost_pack_rep,
703                                             ost_handle);
704         if (!ost->ost_service) { 
705                 obd_disconnect(&ost->ost_conn); 
706                 return -EINVAL;
707         }
708                                             
709         rpc_register_service(ost->ost_service, "self");
710
711         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); 
712         if (err) { 
713                 obd_disconnect(&ost->ost_conn); 
714                 return -EINVAL;
715         }
716                 
717         MOD_INC_USE_COUNT;
718         EXIT; 
719         return 0;
720
721
722 static int ost_cleanup(struct obd_device * obddev)
723 {
724         struct ost_obd *ost = &obddev->u.ost;
725         int err;
726
727         ENTRY;
728
729         if ( !list_empty(&obddev->obd_gen_clients) ) {
730                 CERROR("still has clients!\n");
731                 EXIT;
732                 return -EBUSY;
733         }
734
735         ptlrpc_stop_thread(ost->ost_service);
736         rpc_unregister_service(ost->ost_service);
737
738         if (!list_empty(&ost->ost_service->srv_reqs)) {
739                 // XXX reply with errors and clean up
740                 CERROR("Request list not empty!\n");
741         }
742         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
743
744         err = obd_disconnect(&ost->ost_conn);
745         if (err) { 
746                 CERROR("lustre ost: fail to disconnect device\n");
747                 return -EINVAL;
748         }
749
750         MOD_DEC_USE_COUNT;
751         EXIT;
752         return 0;
753 }
754
755 /* use obd ops to offer management infrastructure */
756 static struct obd_ops ost_obd_ops = {
757         o_setup:       ost_setup,
758         o_cleanup:     ost_cleanup,
759 };
760
761 static int __init ost_init(void)
762 {
763         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
764         return 0;
765 }
766
767 static void __exit ost_exit(void)
768 {
769         obd_unregister_type(LUSTRE_OST_NAME);
770 }
771
772 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
773 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
774 MODULE_LICENSE("GPL");
775
776 module_init(ost_init);
777 module_exit(ost_exit);