Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43
44 extern struct lprocfs_vars status_var_nm_1[];
45 extern struct lprocfs_vars status_class_var[];
46
47 static int ost_destroy(struct ptlrpc_request *req)
48 {
49         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
50         struct ost_body *body;
51         int rc, size = sizeof(*body);
52         ENTRY;
53
54         body = lustre_msg_buf(req->rq_reqmsg, 0);
55
56         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
57         if (rc)
58                 RETURN(rc);
59
60         req->rq_status = obd_destroy(conn, &body->oa, NULL);
61         RETURN(0);
62 }
63
64 static int ost_getattr(struct ptlrpc_request *req)
65 {
66         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
67         struct ost_body *body, *repbody;
68         int rc, size = sizeof(*body);
69         ENTRY;
70
71         body = lustre_msg_buf(req->rq_reqmsg, 0);
72
73         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
74         if (rc)
75                 RETURN(rc);
76
77         repbody = lustre_msg_buf(req->rq_repmsg, 0);
78         /* FIXME: unpack only valid fields instead of memcpy, endianness */
79         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
80         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
81         RETURN(0);
82 }
83
84 static int ost_statfs(struct ptlrpc_request *req)
85 {
86         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
87         struct obd_statfs *osfs;
88         int rc, size = sizeof(*osfs);
89         ENTRY;
90
91         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
92         if (rc)
93                 RETURN(rc);
94
95         osfs = lustre_msg_buf(req->rq_repmsg, 0);
96         memset(osfs, 0, size);
97
98         rc = obd_statfs(conn, osfs);
99         if (rc) {
100                 CERROR("ost: statfs failed: rc %d\n", rc);
101                 req->rq_status = rc;
102                 RETURN(rc);
103         }
104         obd_statfs_pack(osfs, osfs);
105
106         RETURN(0);
107 }
108
109 static int ost_open(struct ptlrpc_request *req)
110 {
111         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
112         struct ost_body *body, *repbody;
113         int rc, size = sizeof(*body);
114         ENTRY;
115
116         body = lustre_msg_buf(req->rq_reqmsg, 0);
117
118         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
119         if (rc)
120                 RETURN(rc);
121
122         repbody = lustre_msg_buf(req->rq_repmsg, 0);
123         /* FIXME: unpack only valid fields instead of memcpy, endianness */
124         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
125         req->rq_status = obd_open(conn, &repbody->oa, NULL);
126         RETURN(0);
127 }
128
129 static int ost_close(struct ptlrpc_request *req)
130 {
131         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
132         struct ost_body *body, *repbody;
133         int rc, size = sizeof(*body);
134         ENTRY;
135
136         body = lustre_msg_buf(req->rq_reqmsg, 0);
137
138         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
139         if (rc)
140                 RETURN(rc);
141
142         repbody = lustre_msg_buf(req->rq_repmsg, 0);
143         /* FIXME: unpack only valid fields instead of memcpy, endianness */
144         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
145         req->rq_status = obd_close(conn, &repbody->oa, NULL);
146         RETURN(0);
147 }
148
149 static int ost_create(struct ptlrpc_request *req)
150 {
151         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
152         struct ost_body *body, *repbody;
153         int rc, size = sizeof(*body);
154         ENTRY;
155
156         body = lustre_msg_buf(req->rq_reqmsg, 0);
157
158         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
159         if (rc)
160                 RETURN(rc);
161
162         repbody = lustre_msg_buf(req->rq_repmsg, 0);
163         /* FIXME: unpack only valid fields instead of memcpy, endianness */
164         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
165         req->rq_status = obd_create(conn, &repbody->oa, NULL);
166         RETURN(0);
167 }
168
169 static int ost_punch(struct ptlrpc_request *req)
170 {
171         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
172         struct ost_body *body, *repbody;
173         int rc, size = sizeof(*body);
174         ENTRY;
175
176         body = lustre_msg_buf(req->rq_reqmsg, 0);
177
178         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
179             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
180                 RETURN(-EINVAL);
181
182         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
183         if (rc)
184                 RETURN(rc);
185
186         repbody = lustre_msg_buf(req->rq_repmsg, 0);
187         /* FIXME: unpack only valid fields instead of memcpy, endianness */
188         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
189         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
190                                    repbody->oa.o_size, repbody->oa.o_blocks);
191         RETURN(0);
192 }
193
194 static int ost_setattr(struct ptlrpc_request *req)
195 {
196         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
197         struct ost_body *body, *repbody;
198         int rc, size = sizeof(*body);
199         ENTRY;
200
201         body = lustre_msg_buf(req->rq_reqmsg, 0);
202
203         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
204         if (rc)
205                 RETURN(rc);
206
207         repbody = lustre_msg_buf(req->rq_repmsg, 0);
208         /* FIXME: unpack only valid fields instead of memcpy, endianness */
209         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
210         req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
211         RETURN(0);
212 }
213
214 static int ost_bulk_timeout(void *data)
215 {
216         ENTRY;
217         /* We don't fail the connection here, because having the export
218          * killed makes the (vital) call to commitrw very sad.
219          */
220         RETURN(1);
221 }
222
223 static int ost_brw_read(struct ptlrpc_request *req)
224 {
225         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
226         struct ptlrpc_bulk_desc *desc;
227         struct obd_ioobj *tmp1;
228         void *tmp2, *end2;
229         struct niobuf_remote *remote_nb;
230         struct niobuf_local *local_nb = NULL;
231         struct obd_ioobj *ioo;
232         struct ost_body *body;
233         struct l_wait_info lwi;
234         void *desc_priv = NULL;
235         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
236         ENTRY;
237
238         body = lustre_msg_buf(req->rq_reqmsg, 0);
239         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
240         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
241         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
242         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
243         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
244         cmd = OBD_BRW_READ;
245
246         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
247                 GOTO(out, rc = 0);
248
249         for (i = 0; i < objcount; i++) {
250                 ost_unpack_ioo(&tmp1, &ioo);
251                 if (tmp2 + ioo->ioo_bufcnt > end2) {
252                         LBUG();
253                         GOTO(out, rc = -EFAULT);
254                 }
255                 for (j = 0; j < ioo->ioo_bufcnt; j++)
256                         ost_unpack_niobuf(&tmp2, &remote_nb);
257         }
258
259         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
260         if (local_nb == NULL)
261                 GOTO(out, rc = -ENOMEM);
262
263         /* The unpackers move tmp1 and tmp2, so reset them before using */
264         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
265         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
266         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
267                                     remote_nb, local_nb, &desc_priv);
268
269         if (req->rq_status)
270                 GOTO(out, rc = 0);
271
272         desc = ptlrpc_prep_bulk(req->rq_connection);
273         if (desc == NULL)
274                 GOTO(out_local, rc = -ENOMEM);
275         desc->bd_ptl_ev_hdlr = NULL;
276         desc->bd_portal = OST_BULK_PORTAL;
277
278         for (i = 0; i < niocount; i++) {
279                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
280
281                 if (bulk == NULL)
282                         GOTO(out_bulk, rc = -ENOMEM);
283                 bulk->bp_xid = remote_nb[i].xid;
284                 bulk->bp_buf = local_nb[i].addr;
285                 bulk->bp_buflen = remote_nb[i].len;
286         }
287
288         rc = ptlrpc_send_bulk(desc);
289         if (rc)
290                 GOTO(out_bulk, rc);
291
292         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
293         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
294                           &lwi);
295         if (rc) {
296                 LASSERT(rc == -ETIMEDOUT);
297                 GOTO(out_bulk, rc);
298         }
299
300         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
301                                       local_nb, desc_priv);
302
303         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
304
305 out_bulk:
306         ptlrpc_bulk_decref(desc);
307 out_local:
308         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
309 out:
310         if (rc)
311                 ptlrpc_error(req->rq_svc, req);
312         else
313                 ptlrpc_reply(req->rq_svc, req);
314         RETURN(rc);
315 }
316
317 static int ost_brw_write(struct ptlrpc_request *req)
318 {
319         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
320         struct ptlrpc_bulk_desc *desc;
321         struct obd_ioobj *tmp1;
322         void *tmp2, *end2;
323         struct niobuf_remote *remote_nb;
324         struct niobuf_local *local_nb = NULL;
325         struct niobuf_local *lnb;
326         struct obd_ioobj *ioo;
327         struct ost_body *body;
328         struct l_wait_info lwi;
329         int rc, cmd, i, j, objcount, niocount;
330         int size[2] = {sizeof(*body)};
331         void *desc_priv = NULL;
332         int reply_sent = 0;
333         struct ptlrpc_service *srv;
334         __u32 xid;
335         ENTRY;
336
337         body = lustre_msg_buf(req->rq_reqmsg, 0);
338         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
339         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
340         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
341         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
342         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
343         cmd = OBD_BRW_WRITE;
344
345         for (i = 0; i < objcount; i++) {
346                 ost_unpack_ioo((void *)&tmp1, &ioo);
347                 if (tmp2 + ioo->ioo_bufcnt > end2) {
348                         rc = -EFAULT;
349                         break;
350                 }
351                 for (j = 0; j < ioo->ioo_bufcnt; j++)
352                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
353         }
354
355         size[1] = niocount * sizeof(*remote_nb);
356         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
357         if (rc)
358                 GOTO(out, rc);
359         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
360
361         OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
362         if (local_nb == NULL)
363                 GOTO(out, rc = -ENOMEM);
364
365         /* The unpackers move tmp1 and tmp2, so reset them before using */
366         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
367         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
368         req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
369                                     local_nb, &desc_priv);
370         if (req->rq_status)
371                 GOTO(out_free, rc = 0); /* XXX is this correct? */
372
373         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
374                 GOTO(fail_preprw, rc = 0);
375
376         desc = ptlrpc_prep_bulk(req->rq_connection);
377         if (desc == NULL)
378                 GOTO(fail_preprw, rc = -ENOMEM);
379         desc->bd_ptl_ev_hdlr = NULL;
380         desc->bd_portal = OSC_BULK_PORTAL;
381         desc->bd_desc_private = desc_priv;
382         memcpy(&(desc->bd_conn), &conn, sizeof(conn));
383
384         srv = req->rq_obd->u.ost.ost_service;
385         spin_lock(&srv->srv_lock);
386         xid = srv->srv_xid++;                   /* single xid for all pages */
387         spin_unlock(&srv->srv_lock);
388
389         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
390                 struct ptlrpc_bulk_page *bulk;
391
392                 bulk = ptlrpc_prep_bulk_page(desc);
393                 if (bulk == NULL)
394                         GOTO(fail_bulk, rc = -ENOMEM);
395
396                 bulk->bp_xid = xid;              /* single xid for all pages */
397
398                 bulk->bp_buf = lnb->addr;
399                 bulk->bp_page = lnb->page;
400                 bulk->bp_flags = lnb->flags;
401                 bulk->bp_dentry = lnb->dentry;
402                 bulk->bp_buflen = lnb->len;
403                 bulk->bp_cb = NULL;
404
405                 /* this advances remote_nb */
406                 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
407                                 bulk->bp_xid);
408         }
409
410         rc = ptlrpc_register_bulk(desc);
411         if (rc)
412                 GOTO(fail_bulk, rc);
413
414         reply_sent = 1;
415         ptlrpc_reply(req->rq_svc, req);
416
417         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
418         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
419                           &lwi);
420         if (rc) {
421                 if (rc != -ETIMEDOUT)
422                         LBUG();
423                 ptlrpc_abort_bulk(desc);
424                 recovd_conn_fail(desc->bd_connection);
425                 obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
426                              desc->bd_desc_private);
427         } else {
428                 rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
429                                   desc->bd_desc_private);
430         }
431
432         ptlrpc_bulk_decref(desc);
433         EXIT;
434 out_free:
435         OBD_FREE(local_nb, niocount * sizeof(*local_nb));
436 out:
437         if (!reply_sent) {
438                 if (rc) {
439                         OBD_FREE(req->rq_repmsg, req->rq_replen);
440                         req->rq_repmsg = NULL;
441                         ptlrpc_error(req->rq_svc, req);
442                 } else
443                         ptlrpc_reply(req->rq_svc, req);
444         }
445         return rc;
446
447 fail_bulk:
448         ptlrpc_free_bulk(desc);
449 fail_preprw:
450         /* FIXME: how do we undo the preprw? - answer = call commitrw */
451         goto out_free;
452 }
453
454 static int ost_handle(struct ptlrpc_request *req)
455 {
456         int rc;
457         ENTRY;
458
459         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
460         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
461                 CERROR("lustre_ost: Invalid request\n");
462                 GOTO(out, rc);
463         }
464
465         if (req->rq_reqmsg->opc != OST_CONNECT &&
466             req->rq_export == NULL) {
467                 CERROR("lustre_ost: operation %d on unconnected OST\n",
468                        req->rq_reqmsg->opc);
469                 req->rq_status = -ENOTCONN;
470                 GOTO(out, rc = -ENOTCONN);
471         }
472
473         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
474                 GOTO(out, rc = -EINVAL);
475
476         switch (req->rq_reqmsg->opc) {
477         case OST_CONNECT:
478                 CDEBUG(D_INODE, "connect\n");
479                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
480                 rc = target_handle_connect(req);
481                 break;
482         case OST_DISCONNECT:
483                 CDEBUG(D_INODE, "disconnect\n");
484                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
485                 rc = target_handle_disconnect(req);
486                 break;
487         case OST_CREATE:
488                 CDEBUG(D_INODE, "create\n");
489                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
490                 rc = ost_create(req);
491                 break;
492         case OST_DESTROY:
493                 CDEBUG(D_INODE, "destroy\n");
494                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
495                 rc = ost_destroy(req);
496                 break;
497         case OST_GETATTR:
498                 CDEBUG(D_INODE, "getattr\n");
499                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
500                 rc = ost_getattr(req);
501                 break;
502         case OST_SETATTR:
503                 CDEBUG(D_INODE, "setattr\n");
504                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
505                 rc = ost_setattr(req);
506                 break;
507         case OST_OPEN:
508                 CDEBUG(D_INODE, "open\n");
509                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
510                 rc = ost_open(req);
511                 break;
512         case OST_CLOSE:
513                 CDEBUG(D_INODE, "close\n");
514                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
515                 rc = ost_close(req);
516                 break;
517         case OST_WRITE:
518                 CDEBUG(D_INODE, "write\n");
519                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
520                 rc = ost_brw_write(req);
521                 /* ost_brw sends its own replies */
522                 RETURN(rc);
523         case OST_READ:
524                 CDEBUG(D_INODE, "read\n");
525                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
526                 rc = ost_brw_read(req);
527                 /* ost_brw sends its own replies */
528                 RETURN(rc);
529         case OST_PUNCH:
530                 CDEBUG(D_INODE, "punch\n");
531                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
532                 rc = ost_punch(req);
533                 break;
534         case OST_STATFS:
535                 CDEBUG(D_INODE, "statfs\n");
536                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
537                 rc = ost_statfs(req);
538                 break;
539         case LDLM_ENQUEUE:
540                 CDEBUG(D_INODE, "enqueue\n");
541                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
542                 rc = ldlm_handle_enqueue(req);
543                 break;
544         case LDLM_CONVERT:
545                 CDEBUG(D_INODE, "convert\n");
546                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
547                 rc = ldlm_handle_convert(req);
548                 break;
549         case LDLM_CANCEL:
550                 CDEBUG(D_INODE, "cancel\n");
551                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
552                 rc = ldlm_handle_cancel(req);
553                 break;
554         case LDLM_BL_CALLBACK:
555         case LDLM_CP_CALLBACK:
556                 CDEBUG(D_INODE, "callback\n");
557                 CERROR("callbacks should not happen on OST\n");
558                 LBUG();
559                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
560                 break;
561         default:
562                 req->rq_status = -ENOTSUPP;
563                 rc = ptlrpc_error(req->rq_svc, req);
564                 RETURN(rc);
565         }
566
567         EXIT;
568 out:
569         //req->rq_status = rc;
570         if (rc) {
571                 CERROR("ost: processing error (opcode=%d): %d\n",
572                        req->rq_reqmsg->opc, rc);
573                 ptlrpc_error(req->rq_svc, req);
574         } else {
575                 CDEBUG(D_INODE, "sending reply\n");
576                 if (req->rq_repmsg == NULL)
577                         CERROR("handler for opcode %d returned rc=0 without "
578                                "creating rq_repmsg; needs to return rc != "
579                                "0!\n", req->rq_reqmsg->opc);
580                 ptlrpc_reply(req->rq_svc, req);
581         }
582
583         return 0;
584 }
585
586 /* mount the file system (secretly) */
587 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
588 {
589         struct obd_ioctl_data* data = buf;
590         struct ost_obd *ost = &obddev->u.ost;
591         struct obd_device *tgt;
592         int err;
593         int i;
594         ENTRY;
595
596         if (data->ioc_inllen1 < 1) {
597                 CERROR("requires a TARGET OBD UUID\n");
598                 RETURN(-EINVAL);
599         }
600         if (data->ioc_inllen1 > 37) {
601                 CERROR("OBD UUID must be less than 38 characters\n");
602                 RETURN(-EINVAL);
603         }
604
605         tgt = class_uuid2obd(data->ioc_inlbuf1);
606         if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
607             !(tgt->obd_flags & OBD_SET_UP)) {
608                 CERROR("device not attached or not set up (%d)\n",
609                        data->ioc_dev);
610                 RETURN(err = -EINVAL);
611         }
612
613         err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
614         if (err) {
615                 CERROR("fail to connect to device %d\n", data->ioc_dev);
616                 RETURN(err);
617         }
618
619         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
620                                            OST_BUFSIZE, OST_MAXREQSIZE,
621                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
622                                            "self", ost_handle, "ost");
623         if (!ost->ost_service) {
624                 CERROR("failed to start service\n");
625                 GOTO(error_disc, err = -ENOMEM);
626         }
627
628         for (i = 0; i < OST_NUM_THREADS; i++) {
629                 char name[32];
630                 sprintf(name, "ll_ost_%02d", i);
631                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
632                 if (err) {
633                         CERROR("error starting thread #%d: rc %d\n", i, err);
634                         GOTO(error_disc, err = -EINVAL);
635                 }
636         }
637
638         RETURN(0);
639
640 error_disc:
641         obd_disconnect(&ost->ost_conn);
642         RETURN(err);
643 }
644
645 static int ost_cleanup(struct obd_device * obddev)
646 {
647         struct ost_obd *ost = &obddev->u.ost;
648         int err;
649
650         ENTRY;
651
652         if ( !list_empty(&obddev->obd_exports) ) {
653                 CERROR("still has clients!\n");
654                 RETURN(-EBUSY);
655         }
656
657         ptlrpc_stop_all_threads(ost->ost_service);
658         ptlrpc_unregister_service(ost->ost_service);
659
660         err = obd_disconnect(&ost->ost_conn);
661         if (err)
662                 CERROR("lustre ost: fail to disconnect device\n");
663
664         RETURN(err);
665 }
666
667 int ost_attach(struct obd_device *dev, obd_count len, void *data)
668 {
669         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
670 }
671
672 int ost_detach(struct obd_device *dev)
673 {
674         return lprocfs_dereg_obd(dev);
675 }
676
677 /* This is so similar to mds_connect that it makes my heart weep: we should
678  * shuffle the UUID into obd_export proper and make this all happen in
679  * target_handle_connect.
680  */
681 static int ost_connect(struct lustre_handle *conn,
682                        struct obd_device *obd, obd_uuid_t cluuid,
683                        struct recovd_obd *recovd,
684                        ptlrpc_recovery_cb_t recover)
685 {
686         struct obd_export *exp;
687         struct ost_export_data *oed;
688         struct list_head *p;
689         int rc;
690         ENTRY;
691
692         if (!conn || !obd || !cluuid)
693                 RETURN(-EINVAL);
694
695         /* lctl gets a backstage, all-access pass. */
696         if (!strcmp(cluuid, "OBD_CLASS_UUID"))
697                 goto dont_check_exports;
698
699         spin_lock(&obd->obd_dev_lock);
700         list_for_each(p, &obd->obd_exports) {
701                 exp = list_entry(p, struct obd_export, exp_obd_chain);
702                 oed = &exp->exp_ost_data;
703                 if (!memcmp(cluuid, oed->oed_uuid, sizeof oed->oed_uuid)) {
704                         spin_unlock(&obd->obd_dev_lock);
705                         LASSERT(exp->exp_obd == obd);
706
707                         RETURN(target_handle_reconnect(conn, exp, cluuid));
708                 }
709         }
710
711  dont_check_exports:
712         rc = class_connect(conn, obd, cluuid);
713         if (rc)
714                 RETURN(rc);
715         exp = class_conn2export(conn);
716         LASSERT(exp);
717
718         oed = &exp->exp_ost_data;
719         memcpy(oed->oed_uuid, cluuid, sizeof oed->oed_uuid);
720
721         RETURN(0);
722 }
723
724
725 /* use obd ops to offer management infrastructure */
726 static struct obd_ops ost_obd_ops = {
727         o_owner:        THIS_MODULE,
728         o_attach:       ost_attach,
729         o_detach:       ost_detach,
730         o_setup:        ost_setup,
731         o_cleanup:      ost_cleanup,
732         o_connect:      ost_connect,
733 };
734
735 static int __init ost_init(void)
736 {
737         int rc;
738
739         rc = class_register_type(&ost_obd_ops, status_class_var,
740                                  LUSTRE_OST_NAME);
741         RETURN(rc);
742
743 }
744
745 static void __exit ost_exit(void)
746 {
747         class_unregister_type(LUSTRE_OST_NAME);
748 }
749
750 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
751 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
752 MODULE_LICENSE("GPL");
753
754 module_init(ost_init);
755 module_exit(ost_exit);