Whamcloud - gitweb
- Bug 296: remove LDLM_MDSINTENT to fix mismatches with LDLM_PLAIN locks.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/init.h>
41
42 static int ost_destroy(struct ptlrpc_request *req)
43 {
44         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
45         struct ost_body *body;
46         int rc, size = sizeof(*body);
47         ENTRY;
48
49         body = lustre_msg_buf(req->rq_reqmsg, 0);
50
51         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
52         if (rc)
53                 RETURN(rc);
54
55         req->rq_status = obd_destroy(conn, &body->oa, NULL);
56         RETURN(0);
57 }
58
59 static int ost_getattr(struct ptlrpc_request *req)
60 {
61         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
62         struct ost_body *body, *repbody;
63         int rc, size = sizeof(*body);
64         ENTRY;
65
66         body = lustre_msg_buf(req->rq_reqmsg, 0);
67
68         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
69         if (rc)
70                 RETURN(rc);
71
72         repbody = lustre_msg_buf(req->rq_repmsg, 0);
73         /* FIXME: unpack only valid fields instead of memcpy, endianness */
74         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
75         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
76         RETURN(0);
77 }
78
79 static int ost_statfs(struct ptlrpc_request *req)
80 {
81         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
82         struct obd_statfs *osfs;
83         int rc, size = sizeof(*osfs);
84         ENTRY;
85
86         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
87         if (rc)
88                 RETURN(rc);
89
90         osfs = lustre_msg_buf(req->rq_repmsg, 0);
91         memset(osfs, 0, size);
92
93         rc = obd_statfs(conn, osfs);
94         if (rc) {
95                 CERROR("ost: statfs failed: rc %d\n", rc);
96                 req->rq_status = rc;
97                 RETURN(rc);
98         }
99         obd_statfs_pack(osfs, osfs);
100
101         RETURN(0);
102 }
103
104 static int ost_open(struct ptlrpc_request *req)
105 {
106         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
107         struct ost_body *body, *repbody;
108         int rc, size = sizeof(*body);
109         ENTRY;
110
111         body = lustre_msg_buf(req->rq_reqmsg, 0);
112
113         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf(req->rq_repmsg, 0);
118         /* FIXME: unpack only valid fields instead of memcpy, endianness */
119         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
120         req->rq_status = obd_open(conn, &repbody->oa, NULL);
121         RETURN(0);
122 }
123
124 static int ost_close(struct ptlrpc_request *req)
125 {
126         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
127         struct ost_body *body, *repbody;
128         int rc, size = sizeof(*body);
129         ENTRY;
130
131         body = lustre_msg_buf(req->rq_reqmsg, 0);
132
133         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
134         if (rc)
135                 RETURN(rc);
136
137         repbody = lustre_msg_buf(req->rq_repmsg, 0);
138         /* FIXME: unpack only valid fields instead of memcpy, endianness */
139         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
140         req->rq_status = obd_close(conn, &repbody->oa, NULL);
141         RETURN(0);
142 }
143
144 static int ost_create(struct ptlrpc_request *req)
145 {
146         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
147         struct ost_body *body, *repbody;
148         int rc, size = sizeof(*body);
149         ENTRY;
150
151         body = lustre_msg_buf(req->rq_reqmsg, 0);
152
153         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
154         if (rc)
155                 RETURN(rc);
156
157         repbody = lustre_msg_buf(req->rq_repmsg, 0);
158         /* FIXME: unpack only valid fields instead of memcpy, endianness */
159         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
160         req->rq_status = obd_create(conn, &repbody->oa, NULL);
161         RETURN(0);
162 }
163
164 static int ost_punch(struct ptlrpc_request *req)
165 {
166         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
167         struct ost_body *body, *repbody;
168         int rc, size = sizeof(*body);
169         ENTRY;
170
171         body = lustre_msg_buf(req->rq_reqmsg, 0);
172
173         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
174             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
175                 RETURN(-EINVAL);
176
177         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
178         if (rc)
179                 RETURN(rc);
180
181         repbody = lustre_msg_buf(req->rq_repmsg, 0);
182         /* FIXME: unpack only valid fields instead of memcpy, endianness */
183         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
184         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
185                                    repbody->oa.o_size, repbody->oa.o_blocks);
186         RETURN(0);
187 }
188
189 static int ost_setattr(struct ptlrpc_request *req)
190 {
191         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
192         struct ost_body *body, *repbody;
193         int rc, size = sizeof(*body);
194         ENTRY;
195
196         body = lustre_msg_buf(req->rq_reqmsg, 0);
197
198         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
199         if (rc)
200                 RETURN(rc);
201
202         repbody = lustre_msg_buf(req->rq_repmsg, 0);
203         /* FIXME: unpack only valid fields instead of memcpy, endianness */
204         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
205         req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
206         RETURN(0);
207 }
208
209 static int ost_bulk_timeout(void *data)
210 {
211         struct ptlrpc_bulk_desc *desc = data;
212
213         ENTRY;
214         CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
215         RETURN(1);
216 }
217
218 static int ost_brw_read(struct ptlrpc_request *req)
219 {
220         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
221         struct ptlrpc_bulk_desc *desc;
222         void *tmp1, *tmp2, *end2;
223         struct niobuf_remote *remote_nb;
224         struct niobuf_local *local_nb = NULL;
225         struct obd_ioobj *ioo;
226         struct ost_body *body;
227         struct l_wait_info lwi;
228         void *desc_priv = NULL;
229         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
230         ENTRY;
231
232         body = lustre_msg_buf(req->rq_reqmsg, 0);
233         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
234         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
235         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
236         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
237         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
238         cmd = OBD_BRW_READ;
239
240         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
241                 GOTO(out, rc = 0);
242
243         for (i = 0; i < objcount; i++) {
244                 ost_unpack_ioo(&tmp1, &ioo);
245                 if (tmp2 + ioo->ioo_bufcnt > end2) {
246                         LBUG();
247                         GOTO(out, rc = -EFAULT);
248                 }
249                 for (j = 0; j < ioo->ioo_bufcnt; j++)
250                         ost_unpack_niobuf(&tmp2, &remote_nb);
251         }
252
253         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
254         if (local_nb == NULL)
255                 GOTO(out, rc = -ENOMEM);
256
257         /* The unpackers move tmp1 and tmp2, so reset them before using */
258         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
259         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
260         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
261                                     remote_nb, local_nb, &desc_priv);
262
263         if (req->rq_status)
264                 GOTO(out, rc = 0);
265
266         desc = ptlrpc_prep_bulk(req->rq_connection);
267         if (desc == NULL)
268                 GOTO(out_local, rc = -ENOMEM);
269         desc->bd_portal = OST_BULK_PORTAL;
270
271         for (i = 0; i < niocount; i++) {
272                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
273
274                 if (bulk == NULL)
275                         GOTO(out_bulk, rc = -ENOMEM);
276                 bulk->bp_xid = remote_nb[i].xid;
277                 bulk->bp_buf = local_nb[i].addr;
278                 bulk->bp_buflen = remote_nb[i].len;
279         }
280
281         rc = ptlrpc_send_bulk(desc);
282         if (rc)
283                 GOTO(out_bulk, rc);
284
285         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
286         rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi);
287         if (rc) {
288                 LASSERT(rc == -ETIMEDOUT);
289                 GOTO(out_bulk, rc);
290         }
291
292         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
293                                       local_nb, desc_priv);
294
295         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
296
297 out_bulk:
298         ptlrpc_free_bulk(desc);
299 out_local:
300         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
301 out:
302         if (rc) {
303                 /* It's a lot of work to delay allocating the reply, and a lot
304                  * less work to just free it here. */
305                 OBD_FREE(req->rq_repmsg, req->rq_replen);
306                 req->rq_repmsg = NULL;
307                 ptlrpc_error(req->rq_svc, req);
308         } else
309                 ptlrpc_reply(req->rq_svc, req);
310         RETURN(rc);
311 }
312
313 static int ost_brw_write(struct ptlrpc_request *req)
314 {
315         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
316         struct ptlrpc_bulk_desc *desc;
317         struct niobuf_remote *remote_nb;
318         struct niobuf_local *local_nb, *lnb;
319         struct obd_ioobj *ioo;
320         struct ost_body *body;
321         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
322         void *tmp1, *tmp2, *end2;
323         void *desc_priv = NULL;
324         int reply_sent = 0;
325         struct ptlrpc_service *srv;
326         struct l_wait_info lwi;
327         __u32 xid;
328         ENTRY;
329
330         body = lustre_msg_buf(req->rq_reqmsg, 0);
331         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
332         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
333         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
334         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
335         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
336         cmd = OBD_BRW_WRITE;
337
338         for (i = 0; i < objcount; i++) {
339                 ost_unpack_ioo((void *)&tmp1, &ioo);
340                 if (tmp2 + ioo->ioo_bufcnt > end2) {
341                         rc = -EFAULT;
342                         break;
343                 }
344                 for (j = 0; j < ioo->ioo_bufcnt; j++)
345                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
346         }
347
348         size[1] = niocount * sizeof(*remote_nb);
349         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
350         if (rc)
351                 GOTO(out, rc);
352         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
353
354         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
355         if (local_nb == NULL)
356                 GOTO(out, rc = -ENOMEM);
357
358         /* The unpackers move tmp1 and tmp2, so reset them before using */
359         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
360         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
361         req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
362                                     local_nb, &desc_priv);
363         if (req->rq_status)
364                 GOTO(out_free, rc = 0); /* XXX is this correct? */
365
366         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
367                 GOTO(fail_preprw, rc = 0);
368
369         desc = ptlrpc_prep_bulk(req->rq_connection);
370         if (desc == NULL)
371                 GOTO(fail_preprw, rc = -ENOMEM);
372         desc->bd_cb = NULL;
373         desc->bd_portal = OSC_BULK_PORTAL;
374         desc->bd_desc_private = desc_priv;
375         memcpy(&(desc->bd_conn), &conn, sizeof(conn));
376
377         srv = req->rq_obd->u.ost.ost_service;
378         spin_lock(&srv->srv_lock);
379         xid = srv->srv_xid++;                   /* single xid for all pages */
380         spin_unlock(&srv->srv_lock);
381
382         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
383                 struct ptlrpc_bulk_page *bulk;
384
385                 bulk = ptlrpc_prep_bulk_page(desc);
386                 if (bulk == NULL)
387                         GOTO(fail_bulk, rc = -ENOMEM);
388
389                 bulk->bp_xid = xid;              /* single xid for all pages */
390
391                 bulk->bp_buf = lnb->addr;
392                 bulk->bp_page = lnb->page;
393                 bulk->bp_flags = lnb->flags;
394                 bulk->bp_dentry = lnb->dentry;
395                 bulk->bp_buflen = lnb->len;
396                 bulk->bp_cb = NULL;
397
398                 /* this advances remote_nb */
399                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
400                                 bulk->bp_xid);
401         }
402
403         rc = ptlrpc_register_bulk(desc);
404         if (rc)
405                 GOTO(fail_bulk, rc);
406
407         reply_sent = 1;
408         ptlrpc_reply(req->rq_svc, req);
409
410         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
411         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
412                           &lwi);
413         if (rc) {
414                 if (rc != -ETIMEDOUT)
415                         LBUG();
416                 GOTO(fail_bulk, rc);
417         }
418
419         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
420                           desc->bd_desc_private);
421         ptlrpc_free_bulk(desc);
422         EXIT;
423 out_free:
424         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
425 out:
426         if (!reply_sent) {
427                 if (rc) {
428                         OBD_FREE(req->rq_repmsg, req->rq_replen);
429                         req->rq_repmsg = NULL;
430                         ptlrpc_error(req->rq_svc, req);
431                 } else
432                         ptlrpc_reply(req->rq_svc, req);
433         }
434         return rc;
435
436 fail_bulk:
437         ptlrpc_free_bulk(desc);
438 fail_preprw:
439         /* FIXME: how do we undo the preprw? */
440         goto out_free;
441 }
442
443 static int ost_handle(struct ptlrpc_request *req)
444 {
445         int rc;
446         ENTRY;
447
448         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
449         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
450                 CERROR("lustre_ost: Invalid request\n");
451                 GOTO(out, rc);
452         }
453
454         if (req->rq_reqmsg->opc != OST_CONNECT &&
455             req->rq_export == NULL) {
456                 CERROR("lustre_ost: operation %d on unconnected OST\n",
457                        req->rq_reqmsg->opc);
458                 GOTO(out, rc = -ENOTCONN);
459         }
460
461         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
462                 GOTO(out, rc = -EINVAL);
463
464         switch (req->rq_reqmsg->opc) {
465         case OST_CONNECT:
466                 CDEBUG(D_INODE, "connect\n");
467                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
468                 rc = target_handle_connect(req);
469                 break;
470         case OST_DISCONNECT:
471                 CDEBUG(D_INODE, "disconnect\n");
472                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
473                 rc = target_handle_disconnect(req);
474                 break;
475         case OST_CREATE:
476                 CDEBUG(D_INODE, "create\n");
477                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
478                 rc = ost_create(req);
479                 break;
480         case OST_DESTROY:
481                 CDEBUG(D_INODE, "destroy\n");
482                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
483                 rc = ost_destroy(req);
484                 break;
485         case OST_GETATTR:
486                 CDEBUG(D_INODE, "getattr\n");
487                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
488                 rc = ost_getattr(req);
489                 break;
490         case OST_SETATTR:
491                 CDEBUG(D_INODE, "setattr\n");
492                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
493                 rc = ost_setattr(req);
494                 break;
495         case OST_OPEN:
496                 CDEBUG(D_INODE, "open\n");
497                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
498                 rc = ost_open(req);
499                 break;
500         case OST_CLOSE:
501                 CDEBUG(D_INODE, "close\n");
502                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
503                 rc = ost_close(req);
504                 break;
505         case OST_WRITE:
506                 CDEBUG(D_INODE, "write\n");
507                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
508                 rc = ost_brw_write(req);
509                 /* ost_brw sends its own replies */
510                 RETURN(rc);
511         case OST_READ:
512                 CDEBUG(D_INODE, "read\n");
513                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
514                 rc = ost_brw_read(req);
515                 /* ost_brw sends its own replies */
516                 RETURN(rc);
517         case OST_PUNCH:
518                 CDEBUG(D_INODE, "punch\n");
519                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
520                 rc = ost_punch(req);
521                 break;
522         case OST_STATFS:
523                 CDEBUG(D_INODE, "statfs\n");
524                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
525                 rc = ost_statfs(req);
526                 break;
527         case LDLM_ENQUEUE:
528                 CDEBUG(D_INODE, "enqueue\n");
529                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
530                 rc = ldlm_handle_enqueue(req);
531                 if (rc)
532                         break;
533                 RETURN(0);
534         case LDLM_CONVERT:
535                 CDEBUG(D_INODE, "convert\n");
536                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
537                 rc = ldlm_handle_convert(req);
538                 if (rc)
539                         break;
540                 RETURN(0);
541         case LDLM_CANCEL:
542                 CDEBUG(D_INODE, "cancel\n");
543                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
544                 rc = ldlm_handle_cancel(req);
545                 if (rc)
546                         break;
547                 RETURN(0);
548         case LDLM_BL_CALLBACK:
549         case LDLM_CP_CALLBACK:
550                 CDEBUG(D_INODE, "callback\n");
551                 CERROR("callbacks should not happen on OST\n");
552                 LBUG();
553                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
554                 break;
555         default:
556                 req->rq_status = -ENOTSUPP;
557                 rc = ptlrpc_error(req->rq_svc, req);
558                 RETURN(rc);
559         }
560
561         EXIT;
562 out:
563         //req->rq_status = rc;
564         if (rc) {
565                 CERROR("ost: processing error (opcode=%d): %d\n",
566                        req->rq_reqmsg->opc, rc);
567                 ptlrpc_error(req->rq_svc, req);
568         } else {
569                 CDEBUG(D_INODE, "sending reply\n");
570                 if (req->rq_repmsg == NULL)
571                         CERROR("handler for opcode %d returned rc=0 without "
572                                "creating rq_repmsg; needs to return rc != "
573                                "0!\n", req->rq_reqmsg->opc);
574                 ptlrpc_reply(req->rq_svc, req);
575         }
576
577         return 0;
578 }
579
580 #define OST_NUM_THREADS 6
581
582 /* mount the file system (secretly) */
583 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
584 {
585         struct obd_ioctl_data* data = buf;
586         struct ost_obd *ost = &obddev->u.ost;
587         struct obd_device *tgt;
588         int err;
589         int i;
590         ENTRY;
591
592         if (data->ioc_inllen1 < 1) {
593                 CERROR("requires a TARGET OBD UUID\n");
594                 RETURN(-EINVAL);
595         }
596         if (data->ioc_inllen1 > 37) {
597                 CERROR("OBD UUID must be less than 38 characters\n");
598                 RETURN(-EINVAL);
599         }
600
601         MOD_INC_USE_COUNT;
602         tgt = class_uuid2obd(data->ioc_inlbuf1);
603         if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
604             !(tgt->obd_flags & OBD_SET_UP)) {
605                 CERROR("device not attached or not set up (%d)\n",
606                        data->ioc_dev);
607                 GOTO(error_dec, err = -EINVAL);
608         }
609
610         err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
611         if (err) {
612                 CERROR("fail to connect to device %d\n", data->ioc_dev);
613                 GOTO(error_dec, err = -EINVAL);
614         }
615
616         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
617                                            OST_BUFSIZE, OST_MAXREQSIZE,
618                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 
619                                            "self", ost_handle, "ost");
620         if (!ost->ost_service) {
621                 CERROR("failed to start service\n");
622                 GOTO(error_disc, err = -EINVAL);
623         }
624
625         for (i = 0; i < OST_NUM_THREADS; i++) {
626                 char name[32];
627                 sprintf(name, "lustre_ost_%02d", i);
628                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
629                 if (err) {
630                         CERROR("error starting thread #%d: rc %d\n", i, err);
631                         GOTO(error_disc, err = -EINVAL);
632                 }
633         }
634
635         RETURN(0);
636
637 error_disc:
638         obd_disconnect(&ost->ost_conn);
639 error_dec:
640         MOD_DEC_USE_COUNT;
641         RETURN(err);
642 }
643
644 static int ost_cleanup(struct obd_device * obddev)
645 {
646         struct ost_obd *ost = &obddev->u.ost;
647         int err;
648
649         ENTRY;
650
651         if ( !list_empty(&obddev->obd_exports) ) {
652                 CERROR("still has clients!\n");
653                 RETURN(-EBUSY);
654         }
655
656         ptlrpc_stop_all_threads(ost->ost_service);
657         ptlrpc_unregister_service(ost->ost_service);
658
659         err = obd_disconnect(&ost->ost_conn);
660         if (err) {
661                 CERROR("lustre ost: fail to disconnect device\n");
662                 RETURN(-EINVAL);
663         }
664
665         MOD_DEC_USE_COUNT;
666         RETURN(0);
667 }
668
669 /* use obd ops to offer management infrastructure */
670 static struct obd_ops ost_obd_ops = {
671         o_setup:       ost_setup,
672         o_cleanup:     ost_cleanup,
673 };
674
675 static int __init ost_init(void)
676 {
677         class_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
678         return 0;
679 }
680
681 static void __exit ost_exit(void)
682 {
683         class_unregister_type(LUSTRE_OST_NAME);
684 }
685
686 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
687 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
688 MODULE_LICENSE("GPL");
689
690 module_init(ost_init);
691 module_exit(ost_exit);