Whamcloud - gitweb
Landing the mds_lock_devel branch on the trunk. Notables:
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40
41 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
42 {
43         struct obd_conn conn;
44         struct ost_body *body;
45         int rc, size = sizeof(*body);
46         ENTRY;
47
48         body = lustre_msg_buf(req->rq_reqmsg, 0);
49         conn.oc_id = body->connid;
50         conn.oc_dev = ost->ost_tgt;
51
52         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
53         if (rc)
54                 RETURN(rc);
55
56         req->rq_status = obd_destroy(&conn, &body->oa);
57         RETURN(0);
58 }
59
60 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
61 {
62         struct obd_conn conn;
63         struct ost_body *body, *repbody;
64         int rc, size = sizeof(*body);
65         ENTRY;
66
67         body = lustre_msg_buf(req->rq_reqmsg, 0);
68         conn.oc_id = body->connid;
69         conn.oc_dev = ost->ost_tgt;
70
71         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
72         if (rc)
73                 RETURN(rc);
74
75         repbody = lustre_msg_buf(req->rq_repmsg, 0);
76         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
77         req->rq_status = obd_getattr(&conn, &repbody->oa);
78         RETURN(0);
79 }
80
81 static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
82 {
83         struct obd_conn conn;
84         struct ost_body *body, *repbody;
85         int rc, size = sizeof(*body);
86         ENTRY;
87
88         body = lustre_msg_buf(req->rq_reqmsg, 0);
89         conn.oc_id = body->connid;
90         conn.oc_dev = ost->ost_tgt;
91
92         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
93         if (rc)
94                 RETURN(rc);
95
96         repbody = lustre_msg_buf(req->rq_repmsg, 0);
97         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
98         req->rq_status = obd_open(&conn, &repbody->oa);
99         RETURN(0);
100 }
101
102 static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
103 {
104         struct obd_conn conn;
105         struct ost_body *body, *repbody;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         body = lustre_msg_buf(req->rq_reqmsg, 0);
110         conn.oc_id = body->connid;
111         conn.oc_dev = ost->ost_tgt;
112
113         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf(req->rq_repmsg, 0);
118         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119         req->rq_status = obd_close(&conn, &repbody->oa);
120         RETURN(0);
121 }
122
123 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
124 {
125         struct obd_conn conn;
126         struct ost_body *body, *repbody;
127         int rc, size = sizeof(*body);
128         ENTRY;
129
130         body = lustre_msg_buf(req->rq_reqmsg, 0);
131         conn.oc_id = body->connid;
132         conn.oc_dev = ost->ost_tgt;
133
134         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
135         if (rc)
136                 RETURN(rc);
137
138         repbody = lustre_msg_buf(req->rq_repmsg, 0);
139         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
140         req->rq_status = obd_create(&conn, &repbody->oa);
141         RETURN(0);
142 }
143
144 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
145 {
146         struct obd_conn conn;
147         struct ost_body *body, *repbody;
148         int rc, size = sizeof(*body);
149         ENTRY;
150
151         body = lustre_msg_buf(req->rq_reqmsg, 0);
152         conn.oc_id = body->connid;
153         conn.oc_dev = ost->ost_tgt;
154
155         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
156         if (rc)
157                 RETURN(rc);
158
159         repbody = lustre_msg_buf(req->rq_repmsg, 0);
160         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
161         req->rq_status = obd_punch(&conn, &repbody->oa,
162                                    repbody->oa.o_size, repbody->oa.o_blocks);
163         RETURN(0);
164 }
165
166 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
167 {
168         struct obd_conn conn;
169         struct ost_body *body, *repbody;
170         int rc, size = sizeof(*body);
171         ENTRY;
172
173         body = lustre_msg_buf(req->rq_reqmsg, 0);
174         conn.oc_id = body->connid;
175         conn.oc_dev = ost->ost_tgt;
176
177         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
178         if (rc)
179                 RETURN(rc);
180
181         repbody = lustre_msg_buf(req->rq_repmsg, 0);
182         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
183         req->rq_status = obd_setattr(&conn, &repbody->oa);
184         RETURN(0);
185 }
186
187 static int ost_connect(struct ptlrpc_request *req)
188 {
189         struct obd_conn conn;
190         struct ost_body *body;
191         struct ost_obd *ost;
192         char *uuid;
193         int rc, size = sizeof(*body), i;
194         ENTRY;
195
196         uuid = lustre_msg_buf(req->rq_reqmsg, 0);
197         if (req->rq_reqmsg->buflens[0] > 37) {
198                 /* Invalid UUID */
199                 req->rq_status = -EINVAL;
200                 RETURN(0);
201         }
202
203         i = obd_class_name2dev(uuid);
204         if (i == -1) {
205                 req->rq_status = -ENODEV;
206                 RETURN(0);
207         }
208
209         ost = &(obd_dev[i].u.ost);
210         conn.oc_dev = ost->ost_tgt;
211
212         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
213         if (rc)
214                 RETURN(rc);
215
216         req->rq_repmsg->target_id = i;
217         req->rq_status = obd_connect(&conn);
218
219         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
220         body = lustre_msg_buf(req->rq_repmsg, 0);
221         body->connid = conn.oc_id;
222         RETURN(0);
223 }
224
225 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
226 {
227         struct obd_conn conn;
228         struct ost_body *body;
229         int rc;
230         ENTRY;
231
232         body = lustre_msg_buf(req->rq_reqmsg, 0);
233         conn.oc_id = body->connid;
234         conn.oc_dev = ost->ost_tgt;
235
236         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
237         if (rc)
238                 RETURN(rc);
239
240         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
241         req->rq_status = obd_disconnect(&conn);
242         RETURN(0);
243 }
244
245 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
246 {
247         struct obd_conn conn;
248         struct ost_body *body;
249         int rc, size[2] = {sizeof(*body)};
250         char *bufs[2] = {NULL, NULL}, *ptr;
251         ENTRY;
252
253         body = lustre_msg_buf(req->rq_reqmsg, 0);
254         conn.oc_id = body->connid;
255         conn.oc_dev = ost->ost_tgt;
256
257         ptr = lustre_msg_buf(req->rq_reqmsg, 1);
258         if (!ptr)
259                 RETURN(-EINVAL);
260
261         req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
262                                       &(size[1]), (void **)&(bufs[1]));
263
264         rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
265         if (rc)
266                 CERROR("cannot pack reply\n");
267
268         RETURN(rc);
269 }
270
271 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
272 {
273         struct ptlrpc_bulk_desc *desc;
274         struct obd_conn conn;
275         void *tmp1, *tmp2, *end2;
276         struct niobuf_remote *remote_nb;
277         struct niobuf_local *local_nb = NULL;
278         struct obd_ioobj *ioo;
279         struct ost_body *body;
280         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
281         ENTRY;
282
283         body = lustre_msg_buf(req->rq_reqmsg, 0);
284         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
285         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
286         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
287         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
288         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
289         cmd = body->data;
290
291         conn.oc_id = body->connid;
292         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
293
294         for (i = 0; i < objcount; i++) {
295                 ost_unpack_ioo(&tmp1, &ioo);
296                 if (tmp2 + ioo->ioo_bufcnt > end2) {
297                         LBUG();
298                         GOTO(out, rc = -EFAULT);
299                 }
300                 for (j = 0; j < ioo->ioo_bufcnt; j++)
301                         ost_unpack_niobuf(&tmp2, &remote_nb);
302         }
303
304         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
305         if (rc)
306                 RETURN(rc);
307         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
308         if (local_nb == NULL)
309                 RETURN(-ENOMEM);
310
311         /* The unpackers move tmp1 and tmp2, so reset them before using */
312         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
313         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
314         req->rq_status = obd_preprw(cmd, &conn, objcount,
315                                     tmp1, niocount, tmp2, local_nb, NULL);
316
317         if (req->rq_status)
318                 GOTO(out_local, 0);
319
320         desc = ptlrpc_prep_bulk(req->rq_connection);
321         if (desc == NULL)
322                 GOTO(out_local, rc = -ENOMEM);
323         desc->b_portal = OST_BULK_PORTAL;
324
325         for (i = 0; i < niocount; i++) {
326                 struct ptlrpc_bulk_page *bulk;
327                 bulk = ptlrpc_prep_bulk_page(desc);
328                 if (bulk == NULL)
329                         GOTO(out_bulk, rc = -ENOMEM);
330                 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
331                 bulk->b_xid = remote_nb->xid;
332                 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
333                 bulk->b_buflen = PAGE_SIZE;
334         }
335
336         rc = ptlrpc_send_bulk(desc);
337         if (rc)
338                 GOTO(out_bulk, rc);
339
340         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
341         if (desc->b_flags & PTL_RPC_FL_INTR)
342                 rc = -EINTR;
343
344         /* The unpackers move tmp1 and tmp2, so reset them before using */
345         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
346         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
347         req->rq_status = obd_commitrw(cmd, &conn, objcount,
348                                       tmp1, niocount, local_nb, NULL);
349
350 out_bulk:
351         ptlrpc_free_bulk(desc);
352 out_local:
353         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
354 out:
355         RETURN(rc);
356 }
357
358 static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
359 {
360         struct obd_ioobj obj;
361         struct niobuf_local lnb;
362         int rc;
363         ENTRY;
364
365         memset(&lnb, 0, sizeof(lnb));
366         memset(&obj, 0, sizeof(obj));
367
368         lnb.page = bulk->b_page;
369         lnb.dentry = bulk->b_dentry;
370         lnb.flags = bulk->b_flags;
371         obj.ioo_bufcnt = 1;
372
373         rc = obd_commitrw(OBD_BRW_WRITE, &bulk->b_desc->b_conn, 1, &obj, 1,
374                           &lnb, bulk->b_desc->b_desc_private);
375         if (rc)
376                 CERROR("ost_commit_page failed: %d\n", rc);
377
378         RETURN(rc);
379 }
380
381 static void ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc, void *data)
382 {
383         ENTRY;
384         ptlrpc_free_bulk(desc);
385         EXIT;
386 }
387
388 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
389 {
390         struct ptlrpc_bulk_desc *desc;
391         struct obd_conn conn;
392         struct niobuf_remote *remote_nb;
393         struct niobuf_local *local_nb, *lnb;
394         struct obd_ioobj *ioo;
395         struct ost_body *body;
396         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
397         void *tmp1, *tmp2, *end2;
398         void *desc_priv = NULL;
399         ENTRY;
400
401         body = lustre_msg_buf(req->rq_reqmsg, 0);
402         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
403         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
404         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
405         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
406         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
407         cmd = body->data;
408
409         conn.oc_id = body->connid;
410         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
411
412         for (i = 0; i < objcount; i++) {
413                 ost_unpack_ioo((void *)&tmp1, &ioo);
414                 if (tmp2 + ioo->ioo_bufcnt > end2) {
415                         rc = -EFAULT;
416                         break;
417                 }
418                 for (j = 0; j < ioo->ioo_bufcnt; j++)
419                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
420         }
421
422         size[1] = niocount * sizeof(*remote_nb);
423         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
424         if (rc)
425                 GOTO(out, rc);
426         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
427
428         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
429         if (local_nb == NULL)
430                 GOTO(out, rc = -ENOMEM);
431
432         /* The unpackers move tmp1 and tmp2, so reset them before using */
433         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
434         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
435         req->rq_status = obd_preprw(cmd, &conn, objcount,
436                                     tmp1, niocount, tmp2, local_nb, &desc_priv);
437         if (req->rq_status)
438                 GOTO(out_free, rc = 0); /* XXX is this correct? */
439
440         desc = ptlrpc_prep_bulk(req->rq_connection);
441         if (desc == NULL)
442                 GOTO(fail_preprw, rc = -ENOMEM);
443         desc->b_cb = ost_brw_write_finished_cb;
444         desc->b_portal = OSC_BULK_PORTAL;
445         desc->b_desc_private = desc_priv;
446         memcpy(&(desc->b_conn), &conn, sizeof(conn));
447
448         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
449                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
450                 struct ptlrpc_bulk_page *bulk;
451
452                 bulk = ptlrpc_prep_bulk_page(desc);
453                 if (bulk == NULL)
454                         GOTO(fail_bulk, rc = -ENOMEM);
455
456                 spin_lock(&srv->srv_lock);
457                 bulk->b_xid = srv->srv_xid++;
458                 spin_unlock(&srv->srv_lock);
459
460                 bulk->b_buf = lnb->addr;
461                 bulk->b_page = lnb->page;
462                 bulk->b_flags = lnb->flags;
463                 bulk->b_dentry = lnb->dentry;
464                 bulk->b_buflen = PAGE_SIZE;
465                 bulk->b_cb = ost_brw_write_cb;
466
467                 /* this advances remote_nb */
468                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
469                                 bulk->b_xid);
470         }
471
472         rc = ptlrpc_register_bulk(desc);
473         if (rc)
474                 GOTO(fail_bulk, rc);
475
476         EXIT;
477 out_free:
478         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
479 out:
480         return rc;
481
482 fail_bulk:
483         ptlrpc_free_bulk(desc);
484 fail_preprw:
485         /* FIXME: how do we undo the preprw? */
486         goto out_free;
487 }
488
489 static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
490 {
491         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
492
493         if (body->data == OBD_BRW_READ)
494                 return ost_brw_read(obddev, req);
495         else
496                 return ost_brw_write(obddev, req);
497 }
498
499 static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
500                       struct ptlrpc_request *req)
501 {
502         int rc;
503         struct ost_obd *ost;
504         ENTRY;
505
506         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
507         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
508                 CERROR("lustre_mds: Invalid request\n");
509                 GOTO(out, rc);
510         }
511
512         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
513                 CERROR("lustre_mds: wrong packet type sent %d\n",
514                        req->rq_reqmsg->type);
515                 GOTO(out, rc = -EINVAL);
516         }
517
518         if (req->rq_reqmsg->opc != OST_CONNECT) {
519                 int id = req->rq_reqmsg->target_id;
520                 struct obd_device *obddev;
521                 if (id < 0 || id > MAX_OBD_DEVICES)
522                         GOTO(out, rc = -ENODEV);
523                 obddev = &obd_dev[id];
524                 if (strcmp(obddev->obd_type->typ_name, "ost") != 0)
525                         GOTO(out, rc = -EINVAL);
526                 ost = &obddev->u.ost;
527                 req->rq_obd = obddev;
528         }
529
530         switch (req->rq_reqmsg->opc) {
531         case OST_CONNECT:
532                 CDEBUG(D_INODE, "connect\n");
533                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
534                 rc = ost_connect(req);
535                 break;
536         case OST_DISCONNECT:
537                 CDEBUG(D_INODE, "disconnect\n");
538                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
539                 rc = ost_disconnect(ost, req);
540                 break;
541         case OST_GET_INFO:
542                 CDEBUG(D_INODE, "get_info\n");
543                 OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
544                 rc = ost_get_info(ost, req);
545                 break;
546         case OST_CREATE:
547                 CDEBUG(D_INODE, "create\n");
548                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
549                 rc = ost_create(ost, req);
550                 break;
551         case OST_DESTROY:
552                 CDEBUG(D_INODE, "destroy\n");
553                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
554                 rc = ost_destroy(ost, req);
555                 break;
556         case OST_GETATTR:
557                 CDEBUG(D_INODE, "getattr\n");
558                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
559                 rc = ost_getattr(ost, req);
560                 break;
561         case OST_SETATTR:
562                 CDEBUG(D_INODE, "setattr\n");
563                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
564                 rc = ost_setattr(ost, req);
565                 break;
566         case OST_OPEN:
567                 CDEBUG(D_INODE, "setattr\n");
568                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
569                 rc = ost_open(ost, req);
570                 break;
571         case OST_CLOSE:
572                 CDEBUG(D_INODE, "setattr\n");
573                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
574                 rc = ost_close(ost, req);
575                 break;
576         case OST_BRW:
577                 CDEBUG(D_INODE, "brw\n");
578                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
579                 rc = ost_brw(ost, req);
580                 break;
581         case OST_PUNCH:
582                 CDEBUG(D_INODE, "punch\n");
583                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
584                 rc = ost_punch(ost, req);
585                 break;
586 #if 0
587         case OST_STATFS:
588                 CDEBUG(D_INODE, "statfs\n");
589                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
590                 rc = ost_statfs(ost, req);
591                 break;
592 #endif
593         default:
594                 req->rq_status = -ENOTSUPP;
595                 rc = ptlrpc_error(svc, req);
596                 RETURN(rc);
597         }
598
599         EXIT;
600 out:
601         //req->rq_status = rc;
602         if (rc) {
603                 CERROR("ost: processing error %d\n", rc);
604                 ptlrpc_error(svc, req);
605         } else {
606                 CDEBUG(D_INODE, "sending reply\n");
607                 ptlrpc_reply(svc, req);
608         }
609
610         return 0;
611 }
612
613 /* mount the file system (secretly) */
614 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
615 {
616         struct obd_ioctl_data* data = buf;
617         struct ost_obd *ost = &obddev->u.ost;
618         struct obd_device *tgt;
619         int err;
620         ENTRY;
621
622         if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
623                 RETURN(-ENODEV);
624
625         MOD_INC_USE_COUNT;
626         tgt = &obd_dev[data->ioc_dev];
627         ost->ost_tgt = tgt;
628         if (!(tgt->obd_flags & OBD_ATTACHED) ||
629             !(tgt->obd_flags & OBD_SET_UP)) {
630                 CERROR("device not attached or not set up (%d)\n",
631                        data->ioc_dev);
632                 GOTO(error_dec, err = -EINVAL);
633         }
634
635         ost->ost_conn.oc_dev = tgt;
636         err = obd_connect(&ost->ost_conn);
637         if (err) {
638                 CERROR("fail to connect to device %d\n", data->ioc_dev);
639                 GOTO(error_dec, err = -EINVAL);
640         }
641
642         obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
643         if (obddev->obd_namespace == NULL)
644                 LBUG();
645
646         ost->ost_service = ptlrpc_init_svc(128 * 1024,
647                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
648                                            "self", ost_handle);
649         if (!ost->ost_service) {
650                 CERROR("failed to start service\n");
651                 GOTO(error_disc, err = -EINVAL);
652         }
653
654         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
655         if (err)
656                 GOTO(error_disc, err = -EINVAL);
657         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
658         if (err)
659                 GOTO(error_disc, err = -EINVAL);
660
661         RETURN(0);
662
663 error_disc:
664         obd_disconnect(&ost->ost_conn);
665 error_dec:
666         MOD_DEC_USE_COUNT;
667         RETURN(err);
668 }
669
670 static int ost_cleanup(struct obd_device * obddev)
671 {
672         struct ost_obd *ost = &obddev->u.ost;
673         int err;
674
675         ENTRY;
676
677         if ( !list_empty(&obddev->obd_gen_clients) ) {
678                 CERROR("still has clients!\n");
679                 RETURN(-EBUSY);
680         }
681
682         ptlrpc_stop_all_threads(ost->ost_service);
683         rpc_unregister_service(ost->ost_service);
684
685         if (!list_empty(&ost->ost_service->srv_reqs)) {
686                 // XXX reply with errors and clean up
687                 CERROR("Request list not empty!\n");
688         }
689         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
690
691         err = obd_disconnect(&ost->ost_conn);
692         if (err) {
693                 CERROR("lustre ost: fail to disconnect device\n");
694                 RETURN(-EINVAL);
695         }
696
697         MOD_DEC_USE_COUNT;
698         RETURN(0);
699 }
700
701 /* use obd ops to offer management infrastructure */
702 static struct obd_ops ost_obd_ops = {
703         o_setup:       ost_setup,
704         o_cleanup:     ost_cleanup,
705 };
706
707 static int __init ost_init(void)
708 {
709         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
710         return 0;
711 }
712
713 static void __exit ost_exit(void)
714 {
715         obd_unregister_type(LUSTRE_OST_NAME);
716 }
717
718 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
719 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
720 MODULE_LICENSE("GPL");
721
722 module_init(ost_init);
723 module_exit(ost_exit);