Whamcloud - gitweb
92827527ef77faa61749bb35e01e3e89aa6114e1
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/init.h>
41 #include <linux/lprocfs_status.h>
42
43 extern lprocfs_vars_t status_var_nm_1[];
44 extern lprocfs_vars_t status_class_var[];
45
46 static int ost_destroy(struct ptlrpc_request *req)
47 {
48         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
49         struct ost_body *body;
50         int rc, size = sizeof(*body);
51         ENTRY;
52
53         body = lustre_msg_buf(req->rq_reqmsg, 0);
54
55         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
56         if (rc)
57                 RETURN(rc);
58
59         req->rq_status = obd_destroy(conn, &body->oa, NULL);
60         RETURN(0);
61 }
62
63 static int ost_getattr(struct ptlrpc_request *req)
64 {
65         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
66         struct ost_body *body, *repbody;
67         int rc, size = sizeof(*body);
68         ENTRY;
69
70         body = lustre_msg_buf(req->rq_reqmsg, 0);
71
72         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
73         if (rc)
74                 RETURN(rc);
75
76         repbody = lustre_msg_buf(req->rq_repmsg, 0);
77         /* FIXME: unpack only valid fields instead of memcpy, endianness */
78         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
79         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
80         RETURN(0);
81 }
82
83 static int ost_statfs(struct ptlrpc_request *req)
84 {
85         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
86         struct obd_statfs *osfs;
87         int rc, size = sizeof(*osfs);
88         ENTRY;
89
90         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
91         if (rc)
92                 RETURN(rc);
93
94         osfs = lustre_msg_buf(req->rq_repmsg, 0);
95         memset(osfs, 0, size);
96
97         rc = obd_statfs(conn, osfs);
98         if (rc) {
99                 CERROR("ost: statfs failed: rc %d\n", rc);
100                 req->rq_status = rc;
101                 RETURN(rc);
102         }
103         obd_statfs_pack(osfs, osfs);
104
105         RETURN(0);
106 }
107
108 static int ost_open(struct ptlrpc_request *req)
109 {
110         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
111         struct ost_body *body, *repbody;
112         int rc, size = sizeof(*body);
113         ENTRY;
114
115         body = lustre_msg_buf(req->rq_reqmsg, 0);
116
117         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
118         if (rc)
119                 RETURN(rc);
120
121         repbody = lustre_msg_buf(req->rq_repmsg, 0);
122         /* FIXME: unpack only valid fields instead of memcpy, endianness */
123         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
124         req->rq_status = obd_open(conn, &repbody->oa, NULL);
125         RETURN(0);
126 }
127
128 static int ost_close(struct ptlrpc_request *req)
129 {
130         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
131         struct ost_body *body, *repbody;
132         int rc, size = sizeof(*body);
133         ENTRY;
134
135         body = lustre_msg_buf(req->rq_reqmsg, 0);
136
137         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
138         if (rc)
139                 RETURN(rc);
140
141         repbody = lustre_msg_buf(req->rq_repmsg, 0);
142         /* FIXME: unpack only valid fields instead of memcpy, endianness */
143         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
144         req->rq_status = obd_close(conn, &repbody->oa, NULL);
145         RETURN(0);
146 }
147
148 static int ost_create(struct ptlrpc_request *req)
149 {
150         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
151         struct ost_body *body, *repbody;
152         int rc, size = sizeof(*body);
153         ENTRY;
154
155         body = lustre_msg_buf(req->rq_reqmsg, 0);
156
157         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
158         if (rc)
159                 RETURN(rc);
160
161         repbody = lustre_msg_buf(req->rq_repmsg, 0);
162         /* FIXME: unpack only valid fields instead of memcpy, endianness */
163         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
164         req->rq_status = obd_create(conn, &repbody->oa, NULL);
165         RETURN(0);
166 }
167
168 static int ost_punch(struct ptlrpc_request *req)
169 {
170         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
171         struct ost_body *body, *repbody;
172         int rc, size = sizeof(*body);
173         ENTRY;
174
175         body = lustre_msg_buf(req->rq_reqmsg, 0);
176
177         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
178             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
179                 RETURN(-EINVAL);
180
181         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
182         if (rc)
183                 RETURN(rc);
184
185         repbody = lustre_msg_buf(req->rq_repmsg, 0);
186         /* FIXME: unpack only valid fields instead of memcpy, endianness */
187         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
188         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
189                                    repbody->oa.o_size, repbody->oa.o_blocks);
190         RETURN(0);
191 }
192
193 static int ost_setattr(struct ptlrpc_request *req)
194 {
195         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
196         struct ost_body *body, *repbody;
197         int rc, size = sizeof(*body);
198         ENTRY;
199
200         body = lustre_msg_buf(req->rq_reqmsg, 0);
201
202         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
203         if (rc)
204                 RETURN(rc);
205
206         repbody = lustre_msg_buf(req->rq_repmsg, 0);
207         /* FIXME: unpack only valid fields instead of memcpy, endianness */
208         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
209         req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
210         RETURN(0);
211 }
212
213 static int ost_bulk_timeout(void *data)
214 {
215         struct ptlrpc_bulk_desc *desc = data;
216
217         ENTRY;
218         CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
219         RETURN(1);
220 }
221
222 static int ost_brw_read(struct ptlrpc_request *req)
223 {
224         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
225         struct ptlrpc_bulk_desc *desc;
226         void *tmp1, *tmp2, *end2;
227         struct niobuf_remote *remote_nb;
228         struct niobuf_local *local_nb = NULL;
229         struct obd_ioobj *ioo;
230         struct ost_body *body;
231         struct l_wait_info lwi;
232         void *desc_priv = NULL;
233         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
234         ENTRY;
235
236         body = lustre_msg_buf(req->rq_reqmsg, 0);
237         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
238         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
239         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
240         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
241         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
242         cmd = OBD_BRW_READ;
243
244         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
245                 GOTO(out, rc = 0);
246
247         for (i = 0; i < objcount; i++) {
248                 ost_unpack_ioo(&tmp1, &ioo);
249                 if (tmp2 + ioo->ioo_bufcnt > end2) {
250                         LBUG();
251                         GOTO(out, rc = -EFAULT);
252                 }
253                 for (j = 0; j < ioo->ioo_bufcnt; j++)
254                         ost_unpack_niobuf(&tmp2, &remote_nb);
255         }
256
257         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
258         if (local_nb == NULL)
259                 GOTO(out, rc = -ENOMEM);
260
261         /* The unpackers move tmp1 and tmp2, so reset them before using */
262         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
263         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
264         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
265                                     remote_nb, local_nb, &desc_priv);
266
267         if (req->rq_status)
268                 GOTO(out, rc = 0);
269
270         desc = ptlrpc_prep_bulk(req->rq_connection);
271         if (desc == NULL)
272                 GOTO(out_local, rc = -ENOMEM);
273         desc->bd_portal = OST_BULK_PORTAL;
274
275         for (i = 0; i < niocount; i++) {
276                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
277
278                 if (bulk == NULL)
279                         GOTO(out_bulk, rc = -ENOMEM);
280                 bulk->bp_xid = remote_nb[i].xid;
281                 bulk->bp_buf = local_nb[i].addr;
282                 bulk->bp_buflen = remote_nb[i].len;
283         }
284
285         rc = ptlrpc_send_bulk(desc);
286         if (rc)
287                 GOTO(out_bulk, rc);
288
289         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
290         rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi);
291         if (rc) {
292                 LASSERT(rc == -ETIMEDOUT);
293                 GOTO(out_bulk, rc);
294         }
295
296         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
297                                       local_nb, desc_priv);
298
299         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
300
301 out_bulk:
302         ptlrpc_free_bulk(desc);
303 out_local:
304         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
305 out:
306         if (rc) {
307                 /* It's a lot of work to delay allocating the reply, and a lot
308                  * less work to just free it here. */
309                 OBD_FREE(req->rq_repmsg, req->rq_replen);
310                 req->rq_repmsg = NULL;
311                 ptlrpc_error(req->rq_svc, req);
312         } else
313                 ptlrpc_reply(req->rq_svc, req);
314         RETURN(rc);
315 }
316
317 static int ost_brw_write(struct ptlrpc_request *req)
318 {
319         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
320         struct ptlrpc_bulk_desc *desc;
321         struct niobuf_remote *remote_nb;
322         struct niobuf_local *local_nb, *lnb;
323         struct obd_ioobj *ioo;
324         struct ost_body *body;
325         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
326         void *tmp1, *tmp2, *end2;
327         void *desc_priv = NULL;
328         int reply_sent = 0;
329         struct ptlrpc_service *srv;
330         struct l_wait_info lwi;
331         __u32 xid;
332         ENTRY;
333
334         body = lustre_msg_buf(req->rq_reqmsg, 0);
335         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
336         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
337         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
338         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
339         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
340         cmd = OBD_BRW_WRITE;
341
342         for (i = 0; i < objcount; i++) {
343                 ost_unpack_ioo((void *)&tmp1, &ioo);
344                 if (tmp2 + ioo->ioo_bufcnt > end2) {
345                         rc = -EFAULT;
346                         break;
347                 }
348                 for (j = 0; j < ioo->ioo_bufcnt; j++)
349                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
350         }
351
352         size[1] = niocount * sizeof(*remote_nb);
353         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
354         if (rc)
355                 GOTO(out, rc);
356         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
357
358         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
359         if (local_nb == NULL)
360                 GOTO(out, rc = -ENOMEM);
361
362         /* The unpackers move tmp1 and tmp2, so reset them before using */
363         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
364         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
365         req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
366                                     local_nb, &desc_priv);
367         if (req->rq_status)
368                 GOTO(out_free, rc = 0); /* XXX is this correct? */
369
370         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
371                 GOTO(fail_preprw, rc = 0);
372
373         desc = ptlrpc_prep_bulk(req->rq_connection);
374         if (desc == NULL)
375                 GOTO(fail_preprw, rc = -ENOMEM);
376         desc->bd_cb = NULL;
377         desc->bd_portal = OSC_BULK_PORTAL;
378         desc->bd_desc_private = desc_priv;
379         memcpy(&(desc->bd_conn), &conn, sizeof(conn));
380
381         srv = req->rq_obd->u.ost.ost_service;
382         spin_lock(&srv->srv_lock);
383         xid = srv->srv_xid++;                   /* single xid for all pages */
384         spin_unlock(&srv->srv_lock);
385
386         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
387                 struct ptlrpc_bulk_page *bulk;
388
389                 bulk = ptlrpc_prep_bulk_page(desc);
390                 if (bulk == NULL)
391                         GOTO(fail_bulk, rc = -ENOMEM);
392
393                 bulk->bp_xid = xid;              /* single xid for all pages */
394
395                 bulk->bp_buf = lnb->addr;
396                 bulk->bp_page = lnb->page;
397                 bulk->bp_flags = lnb->flags;
398                 bulk->bp_dentry = lnb->dentry;
399                 bulk->bp_buflen = lnb->len;
400                 bulk->bp_cb = NULL;
401
402                 /* this advances remote_nb */
403                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
404                                 bulk->bp_xid);
405         }
406
407         rc = ptlrpc_register_bulk(desc);
408         if (rc)
409                 GOTO(fail_bulk, rc);
410
411         reply_sent = 1;
412         ptlrpc_reply(req->rq_svc, req);
413
414         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
415         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
416                           &lwi);
417         if (rc) {
418                 if (rc != -ETIMEDOUT)
419                         LBUG();
420                 GOTO(fail_bulk, rc);
421         }
422
423         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
424                           desc->bd_desc_private);
425         ptlrpc_free_bulk(desc);
426         EXIT;
427 out_free:
428         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
429 out:
430         if (!reply_sent) {
431                 if (rc) {
432                         OBD_FREE(req->rq_repmsg, req->rq_replen);
433                         req->rq_repmsg = NULL;
434                         ptlrpc_error(req->rq_svc, req);
435                 } else
436                         ptlrpc_reply(req->rq_svc, req);
437         }
438         return rc;
439
440 fail_bulk:
441         ptlrpc_free_bulk(desc);
442 fail_preprw:
443         /* FIXME: how do we undo the preprw? */
444         goto out_free;
445 }
446
447 static int ost_handle(struct ptlrpc_request *req)
448 {
449         int rc;
450         ENTRY;
451
452         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
453         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
454                 CERROR("lustre_ost: Invalid request\n");
455                 GOTO(out, rc);
456         }
457
458         if (req->rq_reqmsg->opc != OST_CONNECT &&
459             req->rq_export == NULL) {
460                 CERROR("lustre_ost: operation %d on unconnected OST\n",
461                        req->rq_reqmsg->opc);
462                 GOTO(out, rc = -ENOTCONN);
463         }
464
465         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
466                 GOTO(out, rc = -EINVAL);
467
468         switch (req->rq_reqmsg->opc) {
469         case OST_CONNECT:
470                 CDEBUG(D_INODE, "connect\n");
471                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
472                 rc = target_handle_connect(req);
473                 break;
474         case OST_DISCONNECT:
475                 CDEBUG(D_INODE, "disconnect\n");
476                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
477                 rc = target_handle_disconnect(req);
478                 break;
479         case OST_CREATE:
480                 CDEBUG(D_INODE, "create\n");
481                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
482                 rc = ost_create(req);
483                 break;
484         case OST_DESTROY:
485                 CDEBUG(D_INODE, "destroy\n");
486                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
487                 rc = ost_destroy(req);
488                 break;
489         case OST_GETATTR:
490                 CDEBUG(D_INODE, "getattr\n");
491                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
492                 rc = ost_getattr(req);
493                 break;
494         case OST_SETATTR:
495                 CDEBUG(D_INODE, "setattr\n");
496                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
497                 rc = ost_setattr(req);
498                 break;
499         case OST_OPEN:
500                 CDEBUG(D_INODE, "open\n");
501                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
502                 rc = ost_open(req);
503                 break;
504         case OST_CLOSE:
505                 CDEBUG(D_INODE, "close\n");
506                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
507                 rc = ost_close(req);
508                 break;
509         case OST_WRITE:
510                 CDEBUG(D_INODE, "write\n");
511                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
512                 rc = ost_brw_write(req);
513                 /* ost_brw sends its own replies */
514                 RETURN(rc);
515         case OST_READ:
516                 CDEBUG(D_INODE, "read\n");
517                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
518                 rc = ost_brw_read(req);
519                 /* ost_brw sends its own replies */
520                 RETURN(rc);
521         case OST_PUNCH:
522                 CDEBUG(D_INODE, "punch\n");
523                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
524                 rc = ost_punch(req);
525                 break;
526         case OST_STATFS:
527                 CDEBUG(D_INODE, "statfs\n");
528                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
529                 rc = ost_statfs(req);
530                 break;
531         case LDLM_ENQUEUE:
532                 CDEBUG(D_INODE, "enqueue\n");
533                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
534                 rc = ldlm_handle_enqueue(req);
535                 if (rc)
536                         break;
537                 RETURN(0);
538         case LDLM_CONVERT:
539                 CDEBUG(D_INODE, "convert\n");
540                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
541                 rc = ldlm_handle_convert(req);
542                 if (rc)
543                         break;
544                 RETURN(0);
545         case LDLM_CANCEL:
546                 CDEBUG(D_INODE, "cancel\n");
547                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
548                 rc = ldlm_handle_cancel(req);
549                 if (rc)
550                         break;
551                 RETURN(0);
552         case LDLM_BL_CALLBACK:
553         case LDLM_CP_CALLBACK:
554                 CDEBUG(D_INODE, "callback\n");
555                 CERROR("callbacks should not happen on OST\n");
556                 LBUG();
557                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
558                 break;
559         default:
560                 req->rq_status = -ENOTSUPP;
561                 rc = ptlrpc_error(req->rq_svc, req);
562                 RETURN(rc);
563         }
564
565         EXIT;
566 out:
567         //req->rq_status = rc;
568         if (rc) {
569                 CERROR("ost: processing error (opcode=%d): %d\n",
570                        req->rq_reqmsg->opc, rc);
571                 ptlrpc_error(req->rq_svc, req);
572         } else {
573                 CDEBUG(D_INODE, "sending reply\n");
574                 if (req->rq_repmsg == NULL)
575                         CERROR("handler for opcode %d returned rc=0 without "
576                                "creating rq_repmsg; needs to return rc != "
577                                "0!\n", req->rq_reqmsg->opc);
578                 ptlrpc_reply(req->rq_svc, req);
579         }
580
581         return 0;
582 }
583
584 #define OST_NUM_THREADS 6
585
586 /* mount the file system (secretly) */
587 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
588 {
589         struct obd_ioctl_data* data = buf;
590         struct ost_obd *ost = &obddev->u.ost;
591         struct obd_device *tgt;
592         int err;
593         int i;
594         ENTRY;
595
596         if (data->ioc_inllen1 < 1) {
597                 CERROR("requires a TARGET OBD UUID\n");
598                 RETURN(-EINVAL);
599         }
600         if (data->ioc_inllen1 > 37) {
601                 CERROR("OBD UUID must be less than 38 characters\n");
602                 RETURN(-EINVAL);
603         }
604
605         MOD_INC_USE_COUNT;
606         tgt = class_uuid2obd(data->ioc_inlbuf1);
607         if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
608             !(tgt->obd_flags & OBD_SET_UP)) {
609                 CERROR("device not attached or not set up (%d)\n",
610                        data->ioc_dev);
611                 GOTO(error_dec, err = -EINVAL);
612         }
613
614         err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
615         if (err) {
616                 CERROR("fail to connect to device %d\n", data->ioc_dev);
617                 GOTO(error_dec, err = -EINVAL);
618         }
619
620         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
621                                            OST_BUFSIZE, OST_MAXREQSIZE,
622                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 
623                                            "self", ost_handle, "ost");
624         if (!ost->ost_service) {
625                 CERROR("failed to start service\n");
626                 GOTO(error_disc, err = -EINVAL);
627         }
628
629         for (i = 0; i < OST_NUM_THREADS; i++) {
630                 char name[32];
631                 sprintf(name, "lustre_ost_%02d", i);
632                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
633                 if (err) {
634                         CERROR("error starting thread #%d: rc %d\n", i, err);
635                         GOTO(error_disc, err = -EINVAL);
636                 }
637         }
638
639         RETURN(0);
640
641 error_disc:
642         obd_disconnect(&ost->ost_conn);
643 error_dec:
644         MOD_DEC_USE_COUNT;
645         RETURN(err);
646 }
647
648 static int ost_cleanup(struct obd_device * obddev)
649 {
650         struct ost_obd *ost = &obddev->u.ost;
651         int err;
652
653         ENTRY;
654
655         if ( !list_empty(&obddev->obd_exports) ) {
656                 CERROR("still has clients!\n");
657                 RETURN(-EBUSY);
658         }
659
660         ptlrpc_stop_all_threads(ost->ost_service);
661         ptlrpc_unregister_service(ost->ost_service);
662
663         err = obd_disconnect(&ost->ost_conn);
664         if (err) {
665                 CERROR("lustre ost: fail to disconnect device\n");
666                 RETURN(-EINVAL);
667         }
668
669         MOD_DEC_USE_COUNT;
670         RETURN(0);
671 }
672 int ost_attach(struct obd_device *dev, 
673                    obd_count len, void *data)
674 {
675         /*  lprocfs_reg_dev(dev, (lprocfs_group_t*)lprocfs_ptlrpc_nm,
676                         sizeof(struct lprofiler_ptlrpc));
677         */
678         lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
679         return 0; 
680 }
681
682 int ost_detach(struct obd_device *dev)
683 {
684         /* lprocfs_dereg_dev(dev); */
685         lprocfs_dereg_obd(dev);
686         return 0;
687
688 }
689
690
691
692 /* use obd ops to offer management infrastructure */
693 static struct obd_ops ost_obd_ops = {
694         o_attach:      ost_attach,
695         o_detach:      ost_detach,
696         o_setup:       ost_setup,
697         o_cleanup:     ost_cleanup,
698 };
699
700 static int __init ost_init(void)
701 {
702         int rc;
703
704         rc = class_register_type(&ost_obd_ops,
705                                  (lprocfs_vars_t*)status_class_var, 
706                                  LUSTRE_OST_NAME);
707         if (rc) RETURN(rc);
708
709         return 0;
710
711 }
712
713 static void __exit ost_exit(void)
714 {
715         
716         class_unregister_type(LUSTRE_OST_NAME);
717 }
718
719 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
720 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
721 MODULE_LICENSE("GPL");
722
723 module_init(ost_init);
724 module_exit(ost_exit);