Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43
44
45 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
46 {
47         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
48         struct ost_body *body;
49         int rc, size = sizeof(*body);
50         ENTRY;
51
52         body = lustre_msg_buf(req->rq_reqmsg, 0);
53
54         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
55         if (rc)
56                 RETURN(rc);
57
58         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
59         RETURN(0);
60 }
61
62 static int ost_getattr(struct ptlrpc_request *req)
63 {
64         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
65         struct ost_body *body, *repbody;
66         int rc, size = sizeof(*body);
67         ENTRY;
68
69         body = lustre_msg_buf(req->rq_reqmsg, 0);
70
71         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
72         if (rc)
73                 RETURN(rc);
74
75         repbody = lustre_msg_buf(req->rq_repmsg, 0);
76         /* FIXME: unpack only valid fields instead of memcpy, endianness */
77         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
78         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
79         RETURN(0);
80 }
81
82 static int ost_statfs(struct ptlrpc_request *req)
83 {
84         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
85         struct obd_statfs *osfs;
86         int rc, size = sizeof(*osfs);
87         ENTRY;
88
89         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
90         if (rc)
91                 RETURN(rc);
92
93         osfs = lustre_msg_buf(req->rq_repmsg, 0);
94         memset(osfs, 0, size);
95
96         rc = obd_statfs(conn, osfs);
97         if (rc) {
98                 CERROR("ost: statfs failed: rc %d\n", rc);
99                 req->rq_status = rc;
100                 RETURN(rc);
101         }
102         obd_statfs_pack(osfs, osfs);
103
104         RETURN(0);
105 }
106
107 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
108 {
109         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
110         struct ost_body *body, *repbody;
111         int rc, size = sizeof(*body);
112         ENTRY;
113
114         body = lustre_msg_buf(req->rq_reqmsg, 0);
115
116         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
117         if (rc)
118                 RETURN(rc);
119
120         repbody = lustre_msg_buf(req->rq_repmsg, 0);
121         /* FIXME: unpack only valid fields instead of memcpy, endianness */
122         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
123         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti);
124         RETURN(0);
125 }
126
127 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
128 {
129         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
130         struct ost_body *body, *repbody;
131         int rc, size = sizeof(*body);
132         ENTRY;
133
134         body = lustre_msg_buf(req->rq_reqmsg, 0);
135
136         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
137         if (rc)
138                 RETURN(rc);
139
140         repbody = lustre_msg_buf(req->rq_repmsg, 0);
141         /* FIXME: unpack only valid fields instead of memcpy, endianness */
142         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
143         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
144         RETURN(0);
145 }
146
147 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
148 {
149         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
150         struct ost_body *body, *repbody;
151         int rc, size = sizeof(*body);
152         ENTRY;
153
154         body = lustre_msg_buf(req->rq_reqmsg, 0);
155
156         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
157         if (rc)
158                 RETURN(rc);
159
160         repbody = lustre_msg_buf(req->rq_repmsg, 0);
161         /* FIXME: unpack only valid fields instead of memcpy, endianness */
162         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
163         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
164         RETURN(0);
165 }
166
167 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
168 {
169         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
170         struct ost_body *body, *repbody;
171         int rc, size = sizeof(*body);
172         ENTRY;
173
174         body = lustre_msg_buf(req->rq_reqmsg, 0);
175
176         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
177             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
178                 RETURN(-EINVAL);
179
180         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
181         if (rc)
182                 RETURN(rc);
183
184         repbody = lustre_msg_buf(req->rq_repmsg, 0);
185         /* FIXME: unpack only valid fields instead of memcpy, endianness */
186         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
187         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
188                                    repbody->oa.o_size, repbody->oa.o_blocks, oti);
189         RETURN(0);
190 }
191
192 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
193 {
194         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
195         struct ost_body *body, *repbody;
196         int rc, size = sizeof(*body);
197         ENTRY;
198
199         body = lustre_msg_buf(req->rq_reqmsg, 0);
200
201         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
202         if (rc)
203                 RETURN(rc);
204
205         repbody = lustre_msg_buf(req->rq_repmsg, 0);
206         /* FIXME: unpack only valid fields instead of memcpy, endianness */
207         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
208         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
209         RETURN(0);
210 }
211
212 static int ost_bulk_timeout(void *data)
213 {
214         ENTRY;
215         /* We don't fail the connection here, because having the export
216          * killed makes the (vital) call to commitrw very sad.
217          */
218         RETURN(1);
219 }
220
221 static int ost_brw_read(struct ptlrpc_request *req)
222 {
223         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
224         struct ptlrpc_bulk_desc *desc;
225         struct obd_ioobj *tmp1;
226         void *tmp2, *end2;
227         struct niobuf_remote *remote_nb;
228         struct niobuf_local *local_nb = NULL;
229         struct obd_ioobj *ioo;
230         struct ost_body *body;
231         struct l_wait_info lwi;
232         void *desc_priv = NULL;
233         int cmd, i, j, objcount, niocount, size = sizeof(*body);
234         int rc = 0;
235         ENTRY;
236
237         body = lustre_msg_buf(req->rq_reqmsg, 0);
238         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
239         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
240         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
241         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
242         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
243         cmd = OBD_BRW_READ;
244
245         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
246                 GOTO(out, req->rq_status = -EIO);
247
248         for (i = 0; i < objcount; i++) {
249                 ost_unpack_ioo(&tmp1, &ioo);
250                 if (tmp2 + ioo->ioo_bufcnt > end2) {
251                         LBUG();
252                         GOTO(out, rc = -EFAULT);
253                 }
254                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
255                         /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
256                         ost_unpack_niobuf(&tmp2, &remote_nb);
257                 }
258         }
259
260         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
261         if (local_nb == NULL)
262                 GOTO(out, rc = -ENOMEM);
263
264         /* The unpackers move tmp1 and tmp2, so reset them before using */
265         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
266         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
267         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
268                                     remote_nb, local_nb, &desc_priv, NULL);
269
270         if (req->rq_status)
271                 GOTO(out, req->rq_status);
272
273         desc = ptlrpc_prep_bulk(req->rq_connection);
274         if (desc == NULL)
275                 GOTO(out_local, rc = -ENOMEM);
276         desc->bd_ptl_ev_hdlr = NULL;
277         desc->bd_portal = OST_BULK_PORTAL;
278
279         for (i = 0; i < niocount; i++) {
280                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
281
282                 if (bulk == NULL)
283                         GOTO(out_bulk, rc = -ENOMEM);
284                 bulk->bp_xid = remote_nb[i].xid;
285                 bulk->bp_buf = local_nb[i].addr;
286                 bulk->bp_buflen = remote_nb[i].len;
287         }
288
289         rc = ptlrpc_bulk_put(desc);
290         if (rc)
291                 GOTO(out_bulk, rc);
292
293         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
294         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
295                           &lwi);
296         if (rc) {
297                 LASSERT(rc == -ETIMEDOUT);
298                 GOTO(out_bulk, rc);
299         }
300
301         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
302                                       local_nb, desc_priv, NULL);
303
304 out_bulk:
305         ptlrpc_bulk_decref(desc);
306 out_local:
307         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
308 out:
309         if (!rc)
310                 /* Hmm, we don't return anything in this reply buffer?
311                  * We should be returning per-page status codes and also
312                  * per-object size, blocks count, mtime, ctime.  (bug 593) */
313                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
314                                      &req->rq_repmsg);
315         if (rc)
316                 ptlrpc_error(req->rq_svc, req);
317         else
318                 ptlrpc_reply(req->rq_svc, req);
319         RETURN(rc);
320 }
321
322 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
323 {
324         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
325         struct ptlrpc_bulk_desc *desc;
326         struct obd_ioobj *tmp1;
327         void *tmp2, *end2;
328         struct niobuf_remote *remote_nb;
329         struct niobuf_local *local_nb = NULL;
330         struct obd_ioobj *ioo;
331         struct ost_body *body;
332         struct l_wait_info lwi;
333         void *desc_priv = NULL;
334         int cmd, i, j, objcount, niocount, size = sizeof(*body);
335         int rc = 0;
336         ENTRY;
337
338         body = lustre_msg_buf(req->rq_reqmsg, 0);
339         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
340         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
341         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
342         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
343         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
344         cmd = OBD_BRW_WRITE;
345
346         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
347                 GOTO(out, req->rq_status = -EIO);
348
349         for (i = 0; i < objcount; i++) {
350                 ost_unpack_ioo(&tmp1, &ioo);
351                 if (tmp2 + ioo->ioo_bufcnt > end2) {
352                         LBUG();
353                         GOTO(out, rc = -EFAULT);
354                 }
355                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
356                         /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
357                         ost_unpack_niobuf(&tmp2, &remote_nb);
358                 }
359         }
360
361         OBD_ALLOC(local_nb, sizeof(*local_nb)* niocount);
362         if (local_nb == NULL)
363                 GOTO(out, rc = -ENOMEM);
364
365         /* The unpackers move tmp1 and tmp2, so reset them before using */
366         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
367         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
368         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
369                                     remote_nb, local_nb, &desc_priv, oti);
370
371         if (req->rq_status)
372                 GOTO(out, rc = 0);
373
374         desc = ptlrpc_prep_bulk(req->rq_connection);
375         if (desc == NULL)
376                 GOTO(out_local, rc = -ENOMEM);
377         desc->bd_ptl_ev_hdlr = NULL;
378         desc->bd_portal = OSC_BULK_PORTAL;
379
380         for (i = 0; i < niocount; i++) {
381                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
382
383                 if (bulk == NULL)
384                         GOTO(out_bulk, rc = -ENOMEM);
385                 bulk->bp_xid = remote_nb[i].xid;
386                 bulk->bp_buf = local_nb[i].addr;
387                 bulk->bp_buflen = remote_nb[i].len;
388         }
389
390         rc = ptlrpc_bulk_get(desc);
391         if (rc)
392                 GOTO(out_bulk, rc);
393
394         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
395         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
396                           &lwi);
397         if (rc) {
398                 LASSERT(rc == -ETIMEDOUT);
399                 ptlrpc_abort_bulk(desc);
400                 recovd_conn_fail(desc->bd_connection);
401                 obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb,
402                              desc_priv, oti);
403                 GOTO(out_bulk, rc);
404         }
405
406         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
407                                       local_nb, desc_priv, oti);
408
409  out_bulk:
410         ptlrpc_bulk_decref(desc);
411  out_local:
412         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
413  out:
414         if (!rc)
415                 /* Hmm, we don't return anything in this reply buffer?
416                  * We should be returning per-page status codes and also
417                  * per-object size, blocks count, mtime, ctime.  (bug 593) */
418                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
419                                      &req->rq_repmsg);
420         if (rc)
421                 ptlrpc_error(req->rq_svc, req);
422         else
423                 rc = ptlrpc_reply(req->rq_svc, req);
424         RETURN(rc);
425 }
426
427 inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
428 {
429         if (oti && req->rq_repmsg)
430                 req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
431         EXIT;
432 }
433
434 static int ost_handle(struct ptlrpc_request *req)
435 {
436         struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
437         int rc;
438         ENTRY;
439
440         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
441         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
442                 CERROR("lustre_ost: Invalid request\n");
443                 GOTO(out, rc);
444         }
445
446         if (req->rq_reqmsg->opc != OST_CONNECT && req->rq_export == NULL) {
447                 CERROR("lustre_ost: operation %d on unconnected OST\n",
448                        req->rq_reqmsg->opc);
449                 req->rq_status = -ENOTCONN;
450                 GOTO(out, rc = -ENOTCONN);
451         }
452
453         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
454                 GOTO(out, rc = -EINVAL);
455
456         switch (req->rq_reqmsg->opc) {
457         case OST_CONNECT:
458                 CDEBUG(D_INODE, "connect\n");
459                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
460                 rc = target_handle_connect(req);
461                 break;
462         case OST_DISCONNECT:
463                 CDEBUG(D_INODE, "disconnect\n");
464                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
465                 rc = target_handle_disconnect(req);
466                 break;
467         case OST_CREATE:
468                 CDEBUG(D_INODE, "create\n");
469                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
470                 rc = ost_create(req, oti);
471                 break;
472         case OST_DESTROY:
473                 CDEBUG(D_INODE, "destroy\n");
474                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
475                 rc = ost_destroy(req, oti);
476                 break;
477         case OST_GETATTR:
478                 CDEBUG(D_INODE, "getattr\n");
479                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
480                 rc = ost_getattr(req);
481                 break;
482         case OST_SETATTR:
483                 CDEBUG(D_INODE, "setattr\n");
484                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
485                 rc = ost_setattr(req, oti);
486                 break;
487         case OST_OPEN:
488                 CDEBUG(D_INODE, "open\n");
489                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
490                 rc = ost_open(req, oti);
491                 break;
492         case OST_CLOSE:
493                 CDEBUG(D_INODE, "close\n");
494                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
495                 rc = ost_close(req, oti);
496                 break;
497         case OST_WRITE:
498                 CDEBUG(D_INODE, "write\n");
499                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
500                 rc = ost_brw_write(req, oti);
501                 /* ost_brw sends its own replies */
502                 RETURN(rc);
503         case OST_READ:
504                 CDEBUG(D_INODE, "read\n");
505                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
506                 rc = ost_brw_read(req);
507                 /* ost_brw sends its own replies */
508                 RETURN(rc);
509         case OST_PUNCH:
510                 CDEBUG(D_INODE, "punch\n");
511                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
512                 rc = ost_punch(req, oti);
513                 break;
514         case OST_STATFS:
515                 CDEBUG(D_INODE, "statfs\n");
516                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
517                 rc = ost_statfs(req);
518                 break;
519         case LDLM_ENQUEUE:
520                 CDEBUG(D_INODE, "enqueue\n");
521                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
522                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
523                                          ldlm_server_blocking_ast);
524                 break;
525         case LDLM_CONVERT:
526                 CDEBUG(D_INODE, "convert\n");
527                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
528                 rc = ldlm_handle_convert(req);
529                 break;
530         case LDLM_CANCEL:
531                 CDEBUG(D_INODE, "cancel\n");
532                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
533                 rc = ldlm_handle_cancel(req);
534                 break;
535         case LDLM_BL_CALLBACK:
536         case LDLM_CP_CALLBACK:
537                 CDEBUG(D_INODE, "callback\n");
538                 CERROR("callbacks should not happen on OST\n");
539                 LBUG();
540                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
541                 break;
542         default:
543                 req->rq_status = -ENOTSUPP;
544                 rc = ptlrpc_error(req->rq_svc, req);
545                 RETURN(rc);
546         }
547
548         EXIT;
549         /* If we're DISCONNECTing, the export_data is already freed */
550         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
551                 struct obd_device *obd  = req->rq_export->exp_obd;
552                 if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
553                         req->rq_repmsg->last_committed =
554                                 HTON__u64(obd->obd_last_committed);
555                 } else {
556                         DEBUG_REQ(D_IOCTL, req,
557                                   "not sending last_committed update");
558                 }
559                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
560                        obd->obd_last_committed, HTON__u64(req->rq_xid));
561         }
562
563 out:
564         //req->rq_status = rc;
565         if (rc) {
566                 CERROR("ost: processing error (opcode=%d): %d\n",
567                        req->rq_reqmsg->opc, rc);
568                 ptlrpc_error(req->rq_svc, req);
569         } else {
570                 CDEBUG(D_INODE, "sending reply\n");
571                 if (req->rq_repmsg == NULL)
572                         CERROR("handler for opcode %d returned rc=0 without "
573                                "creating rq_repmsg; needs to return rc != 0!\n",
574                                req->rq_reqmsg->opc);
575                 else
576                         oti_to_request(oti, req);
577                 ptlrpc_reply(req->rq_svc, req);
578         }
579
580         return 0;
581 }
582
583 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
584 {
585         struct ost_obd *ost = &obddev->u.ost;
586         struct obd_uuid self = { "self" };
587         int err;
588         int i;
589         ENTRY;
590
591         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
592                                            OST_BUFSIZE, OST_MAXREQSIZE,
593                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
594                                            &self, ost_handle, "ost");
595         if (!ost->ost_service) {
596                 CERROR("failed to start service\n");
597                 GOTO(error_disc, err = -ENOMEM);
598         }
599
600         for (i = 0; i < OST_NUM_THREADS; i++) {
601                 char name[32];
602                 sprintf(name, "ll_ost_%02d", i);
603                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
604                 if (err) {
605                         CERROR("error starting thread #%d: rc %d\n", i, err);
606                         GOTO(error_disc, err = -EINVAL);
607                 }
608         }
609
610         RETURN(0);
611
612 error_disc:
613         RETURN(err);
614 }
615
616 static int ost_cleanup(struct obd_device * obddev)
617 {
618         struct ost_obd *ost = &obddev->u.ost;
619         int err = 0;
620
621         ENTRY;
622
623         ptlrpc_stop_all_threads(ost->ost_service);
624         ptlrpc_unregister_service(ost->ost_service);
625
626         RETURN(err);
627 }
628
629 int ost_attach(struct obd_device *dev, obd_count len, void *data)
630 {
631         struct lprocfs_static_vars lvars;
632
633         lprocfs_init_vars(&lvars);
634         return lprocfs_obd_attach(dev, lvars.obd_vars);
635 }
636
637 int ost_detach(struct obd_device *dev)
638 {
639         return lprocfs_obd_detach(dev);
640 }
641
642 /* This is so similar to mds_connect that it makes my heart weep: we should
643  * shuffle the UUID into obd_export proper and make this all happen in
644  * target_handle_connect.
645  */
646 static int ost_connect(struct lustre_handle *conn,
647                        struct obd_device *obd, struct obd_uuid *cluuid,
648                        struct recovd_obd *recovd,
649                        ptlrpc_recovery_cb_t recover)
650 {
651         struct obd_export *exp;
652         struct ost_export_data *oed;
653         struct list_head *p;
654         int rc;
655         ENTRY;
656
657         if (!conn || !obd || !cluuid)
658                 RETURN(-EINVAL);
659
660         /* lctl gets a backstage, all-access pass. */
661         if (!strcmp(cluuid->uuid, "OBD_CLASS_UUID"))
662                 goto dont_check_exports;
663
664         spin_lock(&obd->obd_dev_lock);
665         list_for_each(p, &obd->obd_exports) {
666                 exp = list_entry(p, struct obd_export, exp_obd_chain);
667                 oed = &exp->exp_ost_data;
668                 if (!memcmp(cluuid->uuid, oed->oed_uuid.uuid, 
669                             sizeof(oed->oed_uuid.uuid))) {
670                         spin_unlock(&obd->obd_dev_lock);
671                         LASSERT(exp->exp_obd == obd);
672
673                         RETURN(target_handle_reconnect(conn, exp, cluuid));
674                 }
675         }
676
677  dont_check_exports:
678         rc = class_connect(conn, obd, cluuid);
679         if (rc)
680                 RETURN(rc);
681         exp = class_conn2export(conn);
682         LASSERT(exp);
683
684         oed = &exp->exp_ost_data;
685         memcpy(oed->oed_uuid.uuid, cluuid->uuid, sizeof(oed->oed_uuid.uuid));
686
687         RETURN(0);
688 }
689
690 /* use obd ops to offer management infrastructure */
691 static struct obd_ops ost_obd_ops = {
692         o_owner:        THIS_MODULE,
693         o_attach:       ost_attach,
694         o_detach:       ost_detach,
695         o_setup:        ost_setup,
696         o_cleanup:      ost_cleanup,
697         o_connect:      ost_connect,
698 };
699
700 static int __init ost_init(void)
701 {
702         struct lprocfs_static_vars lvars;
703         ENTRY;
704
705         lprocfs_init_vars(&lvars);
706         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
707                                    LUSTRE_OST_NAME));
708 }
709
710 static void __exit ost_exit(void)
711 {
712         class_unregister_type(LUSTRE_OST_NAME);
713 }
714
715 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
716 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
717 MODULE_LICENSE("GPL");
718
719 module_init(ost_init);
720 module_exit(ost_exit);