Whamcloud - gitweb
Merge of b_md to HEAD:
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/init.h>
41 #include <linux/lprocfs_status.h>
42
43 extern struct lprocfs_vars status_var_nm_1[];
44 extern struct lprocfs_vars status_class_var[];
45
46 static int ost_destroy(struct ptlrpc_request *req)
47 {
48         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
49         struct ost_body *body;
50         int rc, size = sizeof(*body);
51         ENTRY;
52
53         body = lustre_msg_buf(req->rq_reqmsg, 0);
54
55         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
56         if (rc)
57                 RETURN(rc);
58
59         req->rq_status = obd_destroy(conn, &body->oa, NULL);
60         RETURN(0);
61 }
62
63 static int ost_getattr(struct ptlrpc_request *req)
64 {
65         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
66         struct ost_body *body, *repbody;
67         int rc, size = sizeof(*body);
68         ENTRY;
69
70         body = lustre_msg_buf(req->rq_reqmsg, 0);
71
72         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
73         if (rc)
74                 RETURN(rc);
75
76         repbody = lustre_msg_buf(req->rq_repmsg, 0);
77         /* FIXME: unpack only valid fields instead of memcpy, endianness */
78         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
79         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
80         RETURN(0);
81 }
82
83 static int ost_statfs(struct ptlrpc_request *req)
84 {
85         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
86         struct obd_statfs *osfs;
87         int rc, size = sizeof(*osfs);
88         ENTRY;
89
90         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
91         if (rc)
92                 RETURN(rc);
93
94         osfs = lustre_msg_buf(req->rq_repmsg, 0);
95         memset(osfs, 0, size);
96
97         rc = obd_statfs(conn, osfs);
98         if (rc) {
99                 CERROR("ost: statfs failed: rc %d\n", rc);
100                 req->rq_status = rc;
101                 RETURN(rc);
102         }
103         obd_statfs_pack(osfs, osfs);
104
105         RETURN(0);
106 }
107
108 static int ost_open(struct ptlrpc_request *req)
109 {
110         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
111         struct ost_body *body, *repbody;
112         int rc, size = sizeof(*body);
113         ENTRY;
114
115         body = lustre_msg_buf(req->rq_reqmsg, 0);
116
117         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
118         if (rc)
119                 RETURN(rc);
120
121         repbody = lustre_msg_buf(req->rq_repmsg, 0);
122         /* FIXME: unpack only valid fields instead of memcpy, endianness */
123         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
124         req->rq_status = obd_open(conn, &repbody->oa, NULL);
125         RETURN(0);
126 }
127
128 static int ost_close(struct ptlrpc_request *req)
129 {
130         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
131         struct ost_body *body, *repbody;
132         int rc, size = sizeof(*body);
133         ENTRY;
134
135         body = lustre_msg_buf(req->rq_reqmsg, 0);
136
137         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
138         if (rc)
139                 RETURN(rc);
140
141         repbody = lustre_msg_buf(req->rq_repmsg, 0);
142         /* FIXME: unpack only valid fields instead of memcpy, endianness */
143         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
144         req->rq_status = obd_close(conn, &repbody->oa, NULL);
145         RETURN(0);
146 }
147
148 static int ost_create(struct ptlrpc_request *req)
149 {
150         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
151         struct ost_body *body, *repbody;
152         int rc, size = sizeof(*body);
153         ENTRY;
154
155         body = lustre_msg_buf(req->rq_reqmsg, 0);
156
157         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
158         if (rc)
159                 RETURN(rc);
160
161         repbody = lustre_msg_buf(req->rq_repmsg, 0);
162         /* FIXME: unpack only valid fields instead of memcpy, endianness */
163         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
164         req->rq_status = obd_create(conn, &repbody->oa, NULL);
165         RETURN(0);
166 }
167
168 static int ost_punch(struct ptlrpc_request *req)
169 {
170         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
171         struct ost_body *body, *repbody;
172         int rc, size = sizeof(*body);
173         ENTRY;
174
175         body = lustre_msg_buf(req->rq_reqmsg, 0);
176
177         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
178             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
179                 RETURN(-EINVAL);
180
181         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
182         if (rc)
183                 RETURN(rc);
184
185         repbody = lustre_msg_buf(req->rq_repmsg, 0);
186         /* FIXME: unpack only valid fields instead of memcpy, endianness */
187         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
188         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
189                                    repbody->oa.o_size, repbody->oa.o_blocks);
190         RETURN(0);
191 }
192
193 static int ost_setattr(struct ptlrpc_request *req)
194 {
195         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
196         struct ost_body *body, *repbody;
197         int rc, size = sizeof(*body);
198         ENTRY;
199
200         body = lustre_msg_buf(req->rq_reqmsg, 0);
201
202         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
203         if (rc)
204                 RETURN(rc);
205
206         repbody = lustre_msg_buf(req->rq_repmsg, 0);
207         /* FIXME: unpack only valid fields instead of memcpy, endianness */
208         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
209         req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
210         RETURN(0);
211 }
212
213 static int ost_bulk_timeout(void *data)
214 {
215         struct ptlrpc_bulk_desc *desc = data;
216
217         ENTRY;
218         recovd_conn_fail(desc->bd_connection);
219         RETURN(1);
220 }
221
222 static int ost_brw_read(struct ptlrpc_request *req)
223 {
224         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
225         struct ptlrpc_bulk_desc *desc;
226         void *tmp1, *tmp2, *end2;
227         struct niobuf_remote *remote_nb;
228         struct niobuf_local *local_nb = NULL;
229         struct obd_ioobj *ioo;
230         struct ost_body *body;
231         struct l_wait_info lwi;
232         void *desc_priv = NULL;
233         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
234         ENTRY;
235
236         body = lustre_msg_buf(req->rq_reqmsg, 0);
237         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
238         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
239         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
240         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
241         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
242         cmd = OBD_BRW_READ;
243
244         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
245                 GOTO(out, rc = 0);
246
247         for (i = 0; i < objcount; i++) {
248                 ost_unpack_ioo(&tmp1, &ioo);
249                 if (tmp2 + ioo->ioo_bufcnt > end2) {
250                         LBUG();
251                         GOTO(out, rc = -EFAULT);
252                 }
253                 for (j = 0; j < ioo->ioo_bufcnt; j++)
254                         ost_unpack_niobuf(&tmp2, &remote_nb);
255         }
256
257         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
258         if (local_nb == NULL)
259                 GOTO(out, rc = -ENOMEM);
260
261         /* The unpackers move tmp1 and tmp2, so reset them before using */
262         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
263         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
264         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
265                                     remote_nb, local_nb, &desc_priv);
266
267         if (req->rq_status)
268                 GOTO(out, rc = 0);
269
270         desc = ptlrpc_prep_bulk(req->rq_connection);
271         if (desc == NULL)
272                 GOTO(out_local, rc = -ENOMEM);
273         desc->bd_portal = OST_BULK_PORTAL;
274
275         for (i = 0; i < niocount; i++) {
276                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
277
278                 if (bulk == NULL)
279                         GOTO(out_bulk, rc = -ENOMEM);
280                 bulk->bp_xid = remote_nb[i].xid;
281                 bulk->bp_buf = local_nb[i].addr;
282                 bulk->bp_buflen = remote_nb[i].len;
283         }
284
285         rc = ptlrpc_send_bulk(desc);
286         if (rc)
287                 GOTO(out_bulk, rc);
288
289         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
290         rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi);
291         if (rc) {
292                 LASSERT(rc == -ETIMEDOUT);
293                 GOTO(out_bulk, rc);
294         }
295
296         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
297                                       local_nb, desc_priv);
298
299         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
300
301 out_bulk:
302         ptlrpc_free_bulk(desc);
303 out_local:
304         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
305 out:
306         if (rc)
307                 ptlrpc_error(req->rq_svc, req);
308         else
309                 ptlrpc_reply(req->rq_svc, req);
310         RETURN(rc);
311 }
312
313 static int ost_brw_write(struct ptlrpc_request *req)
314 {
315         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
316         struct ptlrpc_bulk_desc *desc;
317         struct niobuf_remote *remote_nb;
318         struct niobuf_local *local_nb, *lnb;
319         struct obd_ioobj *ioo;
320         struct ost_body *body;
321         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
322         void *tmp1, *tmp2, *end2;
323         void *desc_priv = NULL;
324         int reply_sent = 0;
325         struct ptlrpc_service *srv;
326         struct l_wait_info lwi;
327         __u32 xid;
328         ENTRY;
329
330         body = lustre_msg_buf(req->rq_reqmsg, 0);
331         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
332         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
333         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
334         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
335         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
336         cmd = OBD_BRW_WRITE;
337
338         for (i = 0; i < objcount; i++) {
339                 ost_unpack_ioo((void *)&tmp1, &ioo);
340                 if (tmp2 + ioo->ioo_bufcnt > end2) {
341                         rc = -EFAULT;
342                         break;
343                 }
344                 for (j = 0; j < ioo->ioo_bufcnt; j++)
345                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
346         }
347
348         size[1] = niocount * sizeof(*remote_nb);
349         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
350         if (rc)
351                 GOTO(out, rc);
352         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
353
354         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
355         if (local_nb == NULL)
356                 GOTO(out, rc = -ENOMEM);
357
358         /* The unpackers move tmp1 and tmp2, so reset them before using */
359         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
360         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
361         req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
362                                     local_nb, &desc_priv);
363         if (req->rq_status)
364                 GOTO(out_free, rc = 0); /* XXX is this correct? */
365
366         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
367                 GOTO(fail_preprw, rc = 0);
368
369         desc = ptlrpc_prep_bulk(req->rq_connection);
370         if (desc == NULL)
371                 GOTO(fail_preprw, rc = -ENOMEM);
372         desc->bd_ptl_ev_hdlr = NULL;
373         desc->bd_portal = OSC_BULK_PORTAL;
374         desc->bd_desc_private = desc_priv;
375         memcpy(&(desc->bd_conn), &conn, sizeof(conn));
376
377         srv = req->rq_obd->u.ost.ost_service;
378         spin_lock(&srv->srv_lock);
379         xid = srv->srv_xid++;                   /* single xid for all pages */
380         spin_unlock(&srv->srv_lock);
381
382         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
383                 struct ptlrpc_bulk_page *bulk;
384
385                 bulk = ptlrpc_prep_bulk_page(desc);
386                 if (bulk == NULL)
387                         GOTO(fail_bulk, rc = -ENOMEM);
388
389                 bulk->bp_xid = xid;              /* single xid for all pages */
390
391                 bulk->bp_buf = lnb->addr;
392                 bulk->bp_page = lnb->page;
393                 bulk->bp_flags = lnb->flags;
394                 bulk->bp_dentry = lnb->dentry;
395                 bulk->bp_buflen = lnb->len;
396                 bulk->bp_cb = NULL;
397
398                 /* this advances remote_nb */
399                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
400                                 bulk->bp_xid);
401         }
402
403         rc = ptlrpc_register_bulk(desc);
404         if (rc)
405                 GOTO(fail_bulk, rc);
406
407         reply_sent = 1;
408         ptlrpc_reply(req->rq_svc, req);
409
410         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
411         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
412                           &lwi);
413         if (rc) {
414                 if (rc != -ETIMEDOUT)
415                         LBUG();
416                 GOTO(fail_bulk, rc);
417         }
418
419         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
420                           desc->bd_desc_private);
421         ptlrpc_free_bulk(desc);
422         EXIT;
423 out_free:
424         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
425 out:
426         if (!reply_sent) {
427                 if (rc) {
428                         OBD_FREE(req->rq_repmsg, req->rq_replen);
429                         req->rq_repmsg = NULL;
430                         ptlrpc_error(req->rq_svc, req);
431                 } else
432                         ptlrpc_reply(req->rq_svc, req);
433         }
434         return rc;
435
436 fail_bulk:
437         ptlrpc_free_bulk(desc);
438 fail_preprw:
439         /* FIXME: how do we undo the preprw? */
440         goto out_free;
441 }
442
443 static int ost_handle(struct ptlrpc_request *req)
444 {
445         int rc;
446         ENTRY;
447
448         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
449         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
450                 CERROR("lustre_ost: Invalid request\n");
451                 GOTO(out, rc);
452         }
453
454         if (req->rq_reqmsg->opc != OST_CONNECT &&
455             req->rq_export == NULL) {
456                 CERROR("lustre_ost: operation %d on unconnected OST\n",
457                        req->rq_reqmsg->opc);
458                 GOTO(out, rc = -ENOTCONN);
459         }
460
461         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
462                 GOTO(out, rc = -EINVAL);
463
464         switch (req->rq_reqmsg->opc) {
465         case OST_CONNECT:
466                 CDEBUG(D_INODE, "connect\n");
467                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
468                 rc = target_handle_connect(req);
469                 break;
470         case OST_DISCONNECT:
471                 CDEBUG(D_INODE, "disconnect\n");
472                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
473                 rc = target_handle_disconnect(req);
474                 break;
475         case OST_CREATE:
476                 CDEBUG(D_INODE, "create\n");
477                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
478                 rc = ost_create(req);
479                 break;
480         case OST_DESTROY:
481                 CDEBUG(D_INODE, "destroy\n");
482                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
483                 rc = ost_destroy(req);
484                 break;
485         case OST_GETATTR:
486                 CDEBUG(D_INODE, "getattr\n");
487                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
488                 rc = ost_getattr(req);
489                 break;
490         case OST_SETATTR:
491                 CDEBUG(D_INODE, "setattr\n");
492                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
493                 rc = ost_setattr(req);
494                 break;
495         case OST_OPEN:
496                 CDEBUG(D_INODE, "open\n");
497                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
498                 rc = ost_open(req);
499                 break;
500         case OST_CLOSE:
501                 CDEBUG(D_INODE, "close\n");
502                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
503                 rc = ost_close(req);
504                 break;
505         case OST_WRITE:
506                 CDEBUG(D_INODE, "write\n");
507                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
508                 rc = ost_brw_write(req);
509                 /* ost_brw sends its own replies */
510                 RETURN(rc);
511         case OST_READ:
512                 CDEBUG(D_INODE, "read\n");
513                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
514                 rc = ost_brw_read(req);
515                 /* ost_brw sends its own replies */
516                 RETURN(rc);
517         case OST_PUNCH:
518                 CDEBUG(D_INODE, "punch\n");
519                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
520                 rc = ost_punch(req);
521                 break;
522         case OST_STATFS:
523                 CDEBUG(D_INODE, "statfs\n");
524                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
525                 rc = ost_statfs(req);
526                 break;
527         case LDLM_ENQUEUE:
528                 CDEBUG(D_INODE, "enqueue\n");
529                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
530                 rc = ldlm_handle_enqueue(req);
531                 break;
532         case LDLM_CONVERT:
533                 CDEBUG(D_INODE, "convert\n");
534                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
535                 rc = ldlm_handle_convert(req);
536                 break;
537         case LDLM_CANCEL:
538                 CDEBUG(D_INODE, "cancel\n");
539                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
540                 rc = ldlm_handle_cancel(req);
541                 break;
542         case LDLM_BL_CALLBACK:
543         case LDLM_CP_CALLBACK:
544                 CDEBUG(D_INODE, "callback\n");
545                 CERROR("callbacks should not happen on OST\n");
546                 LBUG();
547                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
548                 break;
549         default:
550                 req->rq_status = -ENOTSUPP;
551                 rc = ptlrpc_error(req->rq_svc, req);
552                 RETURN(rc);
553         }
554
555         EXIT;
556 out:
557         //req->rq_status = rc;
558         if (rc) {
559                 CERROR("ost: processing error (opcode=%d): %d\n",
560                        req->rq_reqmsg->opc, rc);
561                 ptlrpc_error(req->rq_svc, req);
562         } else {
563                 CDEBUG(D_INODE, "sending reply\n");
564                 if (req->rq_repmsg == NULL)
565                         CERROR("handler for opcode %d returned rc=0 without "
566                                "creating rq_repmsg; needs to return rc != "
567                                "0!\n", req->rq_reqmsg->opc);
568                 ptlrpc_reply(req->rq_svc, req);
569         }
570
571         return 0;
572 }
573
574 /* mount the file system (secretly) */
575 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
576 {
577         struct obd_ioctl_data* data = buf;
578         struct ost_obd *ost = &obddev->u.ost;
579         struct obd_device *tgt;
580         int err;
581         int i;
582         ENTRY;
583
584         if (data->ioc_inllen1 < 1) {
585                 CERROR("requires a TARGET OBD UUID\n");
586                 RETURN(-EINVAL);
587         }
588         if (data->ioc_inllen1 > 37) {
589                 CERROR("OBD UUID must be less than 38 characters\n");
590                 RETURN(-EINVAL);
591         }
592
593         MOD_INC_USE_COUNT;
594         tgt = class_uuid2obd(data->ioc_inlbuf1);
595         if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
596             !(tgt->obd_flags & OBD_SET_UP)) {
597                 CERROR("device not attached or not set up (%d)\n",
598                        data->ioc_dev);
599                 GOTO(error_dec, err = -EINVAL);
600         }
601
602         err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
603         if (err) {
604                 CERROR("fail to connect to device %d\n", data->ioc_dev);
605                 GOTO(error_dec, err = -EINVAL);
606         }
607
608         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
609                                            OST_BUFSIZE, OST_MAXREQSIZE,
610                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
611                                            "self", ost_handle, "ost");
612         if (!ost->ost_service) {
613                 CERROR("failed to start service\n");
614                 GOTO(error_disc, err = -EINVAL);
615         }
616
617         for (i = 0; i < OST_NUM_THREADS; i++) {
618                 char name[32];
619                 sprintf(name, "ll_ost_%02d", i);
620                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
621                 if (err) {
622                         CERROR("error starting thread #%d: rc %d\n", i, err);
623                         GOTO(error_disc, err = -EINVAL);
624                 }
625         }
626
627         RETURN(0);
628
629 error_disc:
630         obd_disconnect(&ost->ost_conn);
631 error_dec:
632         MOD_DEC_USE_COUNT;
633         RETURN(err);
634 }
635
636 static int ost_cleanup(struct obd_device * obddev)
637 {
638         struct ost_obd *ost = &obddev->u.ost;
639         int err;
640
641         ENTRY;
642
643         if ( !list_empty(&obddev->obd_exports) ) {
644                 CERROR("still has clients!\n");
645                 RETURN(-EBUSY);
646         }
647
648         ptlrpc_stop_all_threads(ost->ost_service);
649         ptlrpc_unregister_service(ost->ost_service);
650
651         err = obd_disconnect(&ost->ost_conn);
652         if (err) {
653                 CERROR("lustre ost: fail to disconnect device\n");
654                 RETURN(-EINVAL);
655         }
656
657         MOD_DEC_USE_COUNT;
658         RETURN(0);
659 }
660 int ost_attach(struct obd_device *dev, obd_count len, void *data)
661 {
662         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
663 }
664
665 int ost_detach(struct obd_device *dev)
666 {
667         return lprocfs_dereg_obd(dev);
668        
669 }
670
671
672
673 /* use obd ops to offer management infrastructure */
674 static struct obd_ops ost_obd_ops = {
675         o_attach:      ost_attach,
676         o_detach:      ost_detach,
677         o_setup:       ost_setup,
678         o_cleanup:     ost_cleanup,
679 };
680
681 static int __init ost_init(void)
682 {
683         int rc;
684
685         rc = class_register_type(&ost_obd_ops, status_class_var, 
686                                  LUSTRE_OST_NAME);
687         RETURN(rc);
688
689 }
690
691 static void __exit ost_exit(void)
692 {
693         
694         class_unregister_type(LUSTRE_OST_NAME);
695 }
696
697 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
698 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
699 MODULE_LICENSE("GPL");
700
701 module_init(ost_init);
702 module_exit(ost_exit);