Whamcloud - gitweb
Fix many (mostly un-serious) build warnings
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40
41 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
42 {
43         struct obd_conn conn;
44         struct ost_body *body;
45         int rc, size = sizeof(*body);
46         ENTRY;
47
48         body = lustre_msg_buf(req->rq_reqmsg, 0);
49         conn.oc_id = body->connid;
50         conn.oc_dev = ost->ost_tgt;
51
52         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
53         if (rc)
54                 RETURN(rc);
55
56         req->rq_status = obd_destroy(&conn, &body->oa);
57         RETURN(0);
58 }
59
60 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
61 {
62         struct obd_conn conn;
63         struct ost_body *body, *repbody;
64         int rc, size = sizeof(*body);
65         ENTRY;
66
67         body = lustre_msg_buf(req->rq_reqmsg, 0);
68         conn.oc_id = body->connid;
69         conn.oc_dev = ost->ost_tgt;
70
71         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
72         if (rc)
73                 RETURN(rc);
74
75         repbody = lustre_msg_buf(req->rq_repmsg, 0);
76         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
77         req->rq_status = obd_getattr(&conn, &repbody->oa);
78         RETURN(0);
79 }
80
81 static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
82 {
83         struct obd_conn conn;
84         struct ost_body *body, *repbody;
85         int rc, size = sizeof(*body);
86         ENTRY;
87
88         body = lustre_msg_buf(req->rq_reqmsg, 0);
89         conn.oc_id = body->connid;
90         conn.oc_dev = ost->ost_tgt;
91
92         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
93         if (rc)
94                 RETURN(rc);
95
96         repbody = lustre_msg_buf(req->rq_repmsg, 0);
97         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
98         req->rq_status = obd_open(&conn, &repbody->oa);
99         RETURN(0);
100 }
101
102 static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
103 {
104         struct obd_conn conn;
105         struct ost_body *body, *repbody;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         body = lustre_msg_buf(req->rq_reqmsg, 0);
110         conn.oc_id = body->connid;
111         conn.oc_dev = ost->ost_tgt;
112
113         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf(req->rq_repmsg, 0);
118         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119         req->rq_status = obd_close(&conn, &repbody->oa);
120         RETURN(0);
121 }
122
123 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
124 {
125         struct obd_conn conn;
126         struct ost_body *body, *repbody;
127         int rc, size = sizeof(*body);
128         ENTRY;
129
130         body = lustre_msg_buf(req->rq_reqmsg, 0);
131         conn.oc_id = body->connid;
132         conn.oc_dev = ost->ost_tgt;
133
134         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
135         if (rc)
136                 RETURN(rc);
137
138         repbody = lustre_msg_buf(req->rq_repmsg, 0);
139         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
140         req->rq_status = obd_create(&conn, &repbody->oa);
141         RETURN(0);
142 }
143
144 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
145 {
146         struct obd_conn conn;
147         struct ost_body *body, *repbody;
148         int rc, size = sizeof(*body);
149         ENTRY;
150
151         body = lustre_msg_buf(req->rq_reqmsg, 0);
152         conn.oc_id = body->connid;
153         conn.oc_dev = ost->ost_tgt;
154
155         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
156         if (rc)
157                 RETURN(rc);
158
159         repbody = lustre_msg_buf(req->rq_repmsg, 0);
160         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
161         req->rq_status = obd_punch(&conn, &repbody->oa,
162                                    repbody->oa.o_size, repbody->oa.o_blocks);
163         RETURN(0);
164 }
165
166 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
167 {
168         struct obd_conn conn;
169         struct ost_body *body, *repbody;
170         int rc, size = sizeof(*body);
171         ENTRY;
172
173         body = lustre_msg_buf(req->rq_reqmsg, 0);
174         conn.oc_id = body->connid;
175         conn.oc_dev = ost->ost_tgt;
176
177         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
178         if (rc)
179                 RETURN(rc);
180
181         repbody = lustre_msg_buf(req->rq_repmsg, 0);
182         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
183         req->rq_status = obd_setattr(&conn, &repbody->oa);
184         RETURN(0);
185 }
186
187 static int ost_connect(struct ptlrpc_request *req)
188 {
189         struct obd_conn conn;
190         struct ost_body *body;
191         struct ost_obd *ost;
192         char *uuid;
193         int rc, size = sizeof(*body), i;
194         ENTRY;
195
196         uuid = lustre_msg_buf(req->rq_reqmsg, 0);
197         if (req->rq_reqmsg->buflens[0] > 37) {
198                 /* Invalid UUID */
199                 req->rq_status = -EINVAL;
200                 RETURN(0);
201         }
202
203         i = obd_class_name2dev(uuid);
204         if (i == -1) {
205                 req->rq_status = -ENODEV;
206                 RETURN(0);
207         }
208
209         ost = &(obd_dev[i].u.ost);
210         conn.oc_dev = ost->ost_tgt;
211
212         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
213         if (rc)
214                 RETURN(rc);
215
216         req->rq_repmsg->target_id = i;
217         req->rq_status = obd_connect(&conn);
218
219         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
220         body = lustre_msg_buf(req->rq_repmsg, 0);
221         body->connid = conn.oc_id;
222         RETURN(0);
223 }
224
225 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
226 {
227         struct obd_conn conn;
228         struct ost_body *body;
229         int rc;
230         ENTRY;
231
232         body = lustre_msg_buf(req->rq_reqmsg, 0);
233         conn.oc_id = body->connid;
234         conn.oc_dev = ost->ost_tgt;
235
236         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
237         if (rc)
238                 RETURN(rc);
239
240         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
241         req->rq_status = obd_disconnect(&conn);
242         RETURN(0);
243 }
244
245 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
246 {
247         struct obd_conn conn;
248         struct ost_body *body;
249         int rc, size[2] = {sizeof(*body)};
250         char *bufs[2] = {NULL, NULL}, *ptr;
251         ENTRY;
252
253         body = lustre_msg_buf(req->rq_reqmsg, 0);
254         conn.oc_id = body->connid;
255         conn.oc_dev = ost->ost_tgt;
256
257         ptr = lustre_msg_buf(req->rq_reqmsg, 1);
258         if (!ptr)
259                 RETURN(-EINVAL);
260
261         req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
262                                       &(size[1]), (void **)&(bufs[1]));
263
264         rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
265         if (rc)
266                 CERROR("cannot pack reply\n");
267
268         RETURN(rc);
269 }
270
271 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
272 {
273         struct ptlrpc_bulk_desc *desc;
274         struct obd_conn conn;
275         void *tmp1, *tmp2, *end2;
276         struct niobuf_remote *remote_nb;
277         struct niobuf_local *local_nb = NULL;
278         struct obd_ioobj *ioo;
279         struct ost_body *body;
280         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
281         ENTRY;
282
283         body = lustre_msg_buf(req->rq_reqmsg, 0);
284         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
285         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
286         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
287         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
288         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
289         cmd = body->data;
290
291         conn.oc_id = body->connid;
292         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
293
294         for (i = 0; i < objcount; i++) {
295                 ost_unpack_ioo(&tmp1, &ioo);
296                 if (tmp2 + ioo->ioo_bufcnt > end2) {
297                         LBUG();
298                         GOTO(out, rc = -EFAULT);
299                 }
300                 for (j = 0; j < ioo->ioo_bufcnt; j++)
301                         ost_unpack_niobuf(&tmp2, &remote_nb);
302         }
303
304         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
305         if (rc)
306                 RETURN(rc);
307         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
308         if (local_nb == NULL)
309                 RETURN(-ENOMEM);
310
311         /* The unpackers move tmp1 and tmp2, so reset them before using */
312         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
313         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
314         req->rq_status = obd_preprw(cmd, &conn, objcount,
315                                     tmp1, niocount, tmp2, local_nb, NULL);
316
317         if (req->rq_status)
318                 GOTO(out_local, 0);
319
320         desc = ptlrpc_prep_bulk(req->rq_connection);
321         if (desc == NULL)
322                 GOTO(out_local, rc = -ENOMEM);
323         desc->b_portal = OST_BULK_PORTAL;
324
325         for (i = 0; i < niocount; i++) {
326                 struct ptlrpc_bulk_page *bulk;
327                 bulk = ptlrpc_prep_bulk_page(desc);
328                 if (bulk == NULL)
329                         GOTO(out_bulk, rc = -ENOMEM);
330                 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
331                 bulk->b_xid = remote_nb->xid;
332                 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
333                 bulk->b_buflen = PAGE_SIZE;
334         }
335
336         rc = ptlrpc_send_bulk(desc);
337         if (rc)
338                 GOTO(out_bulk, rc);
339
340         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
341         if (desc->b_flags & PTL_RPC_FL_INTR)
342                 rc = -EINTR;
343
344         /* The unpackers move tmp1 and tmp2, so reset them before using */
345         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
346         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
347         req->rq_status = obd_commitrw(cmd, &conn, objcount,
348                                       tmp1, niocount, local_nb, NULL);
349
350 out_bulk:
351         ptlrpc_free_bulk(desc);
352 out_local:
353         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
354 out:
355         if (rc)
356                 ptlrpc_error(obddev->ost_service, req);
357         else
358                 ptlrpc_reply(obddev->ost_service, req);
359         RETURN(rc);
360 }
361
362 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
363 {
364         struct ptlrpc_bulk_desc *desc;
365         struct obd_conn conn;
366         struct niobuf_remote *remote_nb;
367         struct niobuf_local *local_nb, *lnb;
368         struct obd_ioobj *ioo;
369         struct ost_body *body;
370         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
371         void *tmp1, *tmp2, *end2;
372         void *desc_priv = NULL;
373         int reply_sent = 0;
374         ENTRY;
375
376         body = lustre_msg_buf(req->rq_reqmsg, 0);
377         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
378         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
379         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
380         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
381         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
382         cmd = body->data;
383
384         conn.oc_id = body->connid;
385         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
386
387         for (i = 0; i < objcount; i++) {
388                 ost_unpack_ioo((void *)&tmp1, &ioo);
389                 if (tmp2 + ioo->ioo_bufcnt > end2) {
390                         rc = -EFAULT;
391                         break;
392                 }
393                 for (j = 0; j < ioo->ioo_bufcnt; j++)
394                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
395         }
396
397         size[1] = niocount * sizeof(*remote_nb);
398         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
399         if (rc)
400                 GOTO(out, rc);
401         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
402
403         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
404         if (local_nb == NULL)
405                 GOTO(out, rc = -ENOMEM);
406
407         /* The unpackers move tmp1 and tmp2, so reset them before using */
408         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
409         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
410         req->rq_status = obd_preprw(cmd, &conn, objcount,
411                                     tmp1, niocount, tmp2, local_nb, &desc_priv);
412         if (req->rq_status)
413                 GOTO(out_free, rc = 0); /* XXX is this correct? */
414
415         desc = ptlrpc_prep_bulk(req->rq_connection);
416         if (desc == NULL)
417                 GOTO(fail_preprw, rc = -ENOMEM);
418         desc->b_cb = NULL;
419         desc->b_portal = OSC_BULK_PORTAL;
420         desc->b_desc_private = desc_priv;
421         memcpy(&(desc->b_conn), &conn, sizeof(conn));
422
423         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
424                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
425                 struct ptlrpc_bulk_page *bulk;
426
427                 bulk = ptlrpc_prep_bulk_page(desc);
428                 if (bulk == NULL)
429                         GOTO(fail_bulk, rc = -ENOMEM);
430
431                 spin_lock(&srv->srv_lock);
432                 bulk->b_xid = srv->srv_xid++;
433                 spin_unlock(&srv->srv_lock);
434
435                 bulk->b_buf = lnb->addr;
436                 bulk->b_page = lnb->page;
437                 bulk->b_flags = lnb->flags;
438                 bulk->b_dentry = lnb->dentry;
439                 bulk->b_buflen = PAGE_SIZE;
440                 bulk->b_cb = NULL;
441
442                 /* this advances remote_nb */
443                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
444                                 bulk->b_xid);
445         }
446
447         rc = ptlrpc_register_bulk(desc);
448         if (rc)
449                 GOTO(fail_bulk, rc);
450
451         reply_sent = 1;
452         ptlrpc_reply(obddev->ost_service, req);
453
454         wait_event_interruptible(desc->b_waitq,
455                                  desc->b_flags & PTL_BULK_FL_RCVD);
456
457         rc = obd_commitrw(cmd, &conn, objcount, tmp1, niocount, local_nb,
458                           desc->b_desc_private);
459         ptlrpc_free_bulk(desc);
460         EXIT;
461 out_free:
462         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
463 out:
464         if (!reply_sent) {
465                 if (rc)
466                         ptlrpc_error(obddev->ost_service, req);
467                 else
468                         ptlrpc_reply(obddev->ost_service, req);
469         }
470         return rc;
471
472 fail_bulk:
473         ptlrpc_free_bulk(desc);
474 fail_preprw:
475         /* FIXME: how do we undo the preprw? */
476         goto out_free;
477 }
478
479 static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
480 {
481         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
482
483         if (body->data == OBD_BRW_READ)
484                 return ost_brw_read(obddev, req);
485         else
486                 return ost_brw_write(obddev, req);
487 }
488
489 static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
490                       struct ptlrpc_request *req)
491 {
492         int rc;
493         struct ost_obd *ost = NULL;
494         ENTRY;
495
496         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
497         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
498                 CERROR("lustre_mds: Invalid request\n");
499                 GOTO(out, rc);
500         }
501
502         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
503                 CERROR("lustre_mds: wrong packet type sent %d\n",
504                        req->rq_reqmsg->type);
505                 GOTO(out, rc = -EINVAL);
506         }
507
508         if (req->rq_reqmsg->opc != OST_CONNECT) {
509                 int id = req->rq_reqmsg->target_id;
510                 struct obd_device *obddev;
511                 if (id < 0 || id > MAX_OBD_DEVICES)
512                         GOTO(out, rc = -ENODEV);
513                 obddev = &obd_dev[id];
514                 if (strcmp(obddev->obd_type->typ_name, "ost") != 0)
515                         GOTO(out, rc = -EINVAL);
516                 ost = &obddev->u.ost;
517                 req->rq_obd = obddev;
518         }
519
520         switch (req->rq_reqmsg->opc) {
521         case OST_CONNECT:
522                 CDEBUG(D_INODE, "connect\n");
523                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
524                 rc = ost_connect(req);
525                 break;
526         case OST_DISCONNECT:
527                 CDEBUG(D_INODE, "disconnect\n");
528                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
529                 rc = ost_disconnect(ost, req);
530                 break;
531         case OST_GET_INFO:
532                 CDEBUG(D_INODE, "get_info\n");
533                 OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
534                 rc = ost_get_info(ost, req);
535                 break;
536         case OST_CREATE:
537                 CDEBUG(D_INODE, "create\n");
538                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
539                 rc = ost_create(ost, req);
540                 break;
541         case OST_DESTROY:
542                 CDEBUG(D_INODE, "destroy\n");
543                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
544                 rc = ost_destroy(ost, req);
545                 break;
546         case OST_GETATTR:
547                 CDEBUG(D_INODE, "getattr\n");
548                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
549                 rc = ost_getattr(ost, req);
550                 break;
551         case OST_SETATTR:
552                 CDEBUG(D_INODE, "setattr\n");
553                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
554                 rc = ost_setattr(ost, req);
555                 break;
556         case OST_OPEN:
557                 CDEBUG(D_INODE, "setattr\n");
558                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
559                 rc = ost_open(ost, req);
560                 break;
561         case OST_CLOSE:
562                 CDEBUG(D_INODE, "setattr\n");
563                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
564                 rc = ost_close(ost, req);
565                 break;
566         case OST_BRW:
567                 CDEBUG(D_INODE, "brw\n");
568                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
569                 rc = ost_brw(ost, req);
570                 /* ost_brw sends its own replies */
571                 RETURN(rc);
572         case OST_PUNCH:
573                 CDEBUG(D_INODE, "punch\n");
574                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
575                 rc = ost_punch(ost, req);
576                 break;
577 #if 0
578         case OST_STATFS:
579                 CDEBUG(D_INODE, "statfs\n");
580                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
581                 rc = ost_statfs(ost, req);
582                 break;
583 #endif
584         default:
585                 req->rq_status = -ENOTSUPP;
586                 rc = ptlrpc_error(svc, req);
587                 RETURN(rc);
588         }
589
590         EXIT;
591 out:
592         //req->rq_status = rc;
593         if (rc) {
594                 CERROR("ost: processing error %d\n", rc);
595                 ptlrpc_error(svc, req);
596         } else {
597                 CDEBUG(D_INODE, "sending reply\n");
598                 ptlrpc_reply(svc, req);
599         }
600
601         return 0;
602 }
603
604 /* mount the file system (secretly) */
605 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
606 {
607         struct obd_ioctl_data* data = buf;
608         struct ost_obd *ost = &obddev->u.ost;
609         struct obd_device *tgt;
610         int err;
611         ENTRY;
612
613         if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
614                 RETURN(-ENODEV);
615
616         MOD_INC_USE_COUNT;
617         tgt = &obd_dev[data->ioc_dev];
618         ost->ost_tgt = tgt;
619         if (!(tgt->obd_flags & OBD_ATTACHED) ||
620             !(tgt->obd_flags & OBD_SET_UP)) {
621                 CERROR("device not attached or not set up (%d)\n",
622                        data->ioc_dev);
623                 GOTO(error_dec, err = -EINVAL);
624         }
625
626         ost->ost_conn.oc_dev = tgt;
627         err = obd_connect(&ost->ost_conn);
628         if (err) {
629                 CERROR("fail to connect to device %d\n", data->ioc_dev);
630                 GOTO(error_dec, err = -EINVAL);
631         }
632
633         obddev->obd_namespace =
634                 ldlm_namespace_new("ost", LDLM_NAMESPACE_SERVER);
635         if (obddev->obd_namespace == NULL)
636                 LBUG();
637
638         ost->ost_service = ptlrpc_init_svc(64 * 1024,
639                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
640                                            "self", ost_handle);
641         if (!ost->ost_service) {
642                 CERROR("failed to start service\n");
643                 GOTO(error_disc, err = -EINVAL);
644         }
645
646         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
647         if (err)
648                 GOTO(error_disc, err = -EINVAL);
649         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
650         if (err)
651                 GOTO(error_disc, err = -EINVAL);
652         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
653         if (err)
654                 GOTO(error_disc, err = -EINVAL);
655         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
656         if (err)
657                 GOTO(error_disc, err = -EINVAL);
658         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
659         if (err)
660                 GOTO(error_disc, err = -EINVAL);
661         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
662         if (err)
663                 GOTO(error_disc, err = -EINVAL);
664
665         RETURN(0);
666
667 error_disc:
668         obd_disconnect(&ost->ost_conn);
669 error_dec:
670         MOD_DEC_USE_COUNT;
671         RETURN(err);
672 }
673
674 static int ost_cleanup(struct obd_device * obddev)
675 {
676         struct ost_obd *ost = &obddev->u.ost;
677         int err;
678
679         ENTRY;
680
681         if ( !list_empty(&obddev->obd_gen_clients) ) {
682                 CERROR("still has clients!\n");
683                 RETURN(-EBUSY);
684         }
685
686         ptlrpc_stop_all_threads(ost->ost_service);
687         rpc_unregister_service(ost->ost_service);
688
689         if (!list_empty(&ost->ost_service->srv_reqs)) {
690                 // XXX reply with errors and clean up
691                 CERROR("Request list not empty!\n");
692         }
693         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
694
695         err = obd_disconnect(&ost->ost_conn);
696         if (err) {
697                 CERROR("lustre ost: fail to disconnect device\n");
698                 RETURN(-EINVAL);
699         }
700
701         ldlm_namespace_free(obddev->obd_namespace);
702
703         MOD_DEC_USE_COUNT;
704         RETURN(0);
705 }
706
707 /* use obd ops to offer management infrastructure */
708 static struct obd_ops ost_obd_ops = {
709         o_setup:       ost_setup,
710         o_cleanup:     ost_cleanup,
711 };
712
713 static int __init ost_init(void)
714 {
715         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
716         return 0;
717 }
718
719 static void __exit ost_exit(void)
720 {
721         obd_unregister_type(LUSTRE_OST_NAME);
722 }
723
724 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
725 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
726 MODULE_LICENSE("GPL");
727
728 module_init(ost_init);
729 module_exit(ost_exit);