Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #ifdef __KERNEL__
33 #include <linux/version.h>
34 #include <linux/module.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
41 #else
42 #include <linux/locks.h>
43 #endif
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/obd_ost.h>
51 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
52 #include <linux/ctype.h>
53 #include <linux/init.h>
54 #include <linux/lustre_ha.h>
55 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
56 #include <linux/lustre_lite.h> /* for ll_i2info */
57 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
58 #include <linux/lprocfs_status.h>
59
60 /* It is important that ood_fh remain the first item in this structure: that
61  * way, we don't have to re-pack the obdo's inline data before we send it to
62  * the server, we can just send the whole struct unaltered. */
63 #define OSC_OBDO_DATA_MAGIC 0xD15EA5ED
64 struct osc_obdo_data {
65         struct lustre_handle ood_fh;
66         struct ptlrpc_request *ood_request;
67         __u32 ood_magic;
68 };
69 #include <linux/obd_lov.h> /* just for the startup assertion; is that wrong? */
70
71 static int send_sync(struct obd_import *imp, struct ll_fid *rootfid,
72                           int level, int msg_flags)
73 {
74         struct ptlrpc_request *req;
75         struct mds_body *body;
76         int rc, size = sizeof(*body);
77         ENTRY;
78
79         req = ptlrpc_prep_req(imp, OST_SYNCFS, 1, &size, NULL);
80         if (!req)
81                 GOTO(out, rc = -ENOMEM);
82
83         body = lustre_msg_buf(req->rq_reqmsg, 0);
84         req->rq_level = level;
85         req->rq_replen = lustre_msg_size(1, &size);
86
87         req->rq_reqmsg->flags |= msg_flags;
88         rc = ptlrpc_queue_wait(req);
89
90         if (!rc) {
91                 CDEBUG(D_NET, "last_committed="LPU64
92                        ", last_xid="LPU64"\n",
93                        req->rq_repmsg->last_committed,
94                        req->rq_repmsg->last_xid);
95         }
96
97         EXIT;
98  out:
99         ptlrpc_req_finished(req);
100         return rc;
101 }
102
103 static int signal_completed_replay(struct obd_import *imp)
104 {
105         struct ll_fid fid;
106
107         return send_sync(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
108 }
109
110 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
111 {
112         struct lprocfs_static_vars lvars;
113
114         lprocfs_init_vars(&lvars);
115         return lprocfs_obd_attach(dev, lvars.obd_vars);
116 }
117
118 static int osc_detach(struct obd_device *dev)
119 {
120         return lprocfs_obd_detach(dev);
121 }
122
123 /* Pack OSC object metadata for shipment to the MDS. */
124 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
125                       struct lov_stripe_md *lsm)
126 {
127         int lmm_size;
128         ENTRY;
129
130         lmm_size = sizeof(**lmmp);
131         if (!lmmp)
132                 RETURN(lmm_size);
133
134         if (*lmmp && !lsm) {
135                 OBD_FREE(*lmmp, lmm_size);
136                 *lmmp = NULL;
137                 RETURN(0);
138         }
139
140         if (!*lmmp) {
141                 OBD_ALLOC(*lmmp, lmm_size);
142                 if (!*lmmp)
143                         RETURN(-ENOMEM);
144         }
145         if (lsm) {
146                 LASSERT(lsm->lsm_object_id);
147                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
148         }
149
150         RETURN(lmm_size);
151 }
152
153 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
154                         struct lov_mds_md *lmm)
155 {
156         int lsm_size;
157         ENTRY;
158
159         lsm_size = sizeof(**lsmp);
160         if (!lsmp)
161                 RETURN(lsm_size);
162
163         if (*lsmp && !lmm) {
164                 OBD_FREE(*lsmp, lsm_size);
165                 *lsmp = NULL;
166                 RETURN(0);
167         }
168
169         if (!*lsmp) {
170                 OBD_ALLOC(*lsmp, lsm_size);
171                 if (!*lsmp)
172                         RETURN(-ENOMEM);
173         }
174
175         /* XXX endianness */
176         if (lmm) {
177                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
178                 LASSERT((*lsmp)->lsm_object_id);
179         }
180
181         RETURN(lsm_size);
182 }
183
184 inline void oti_from_request(struct obd_trans_info *oti,
185                              struct ptlrpc_request *req)
186 {
187         if (oti && req->rq_repmsg)
188                 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
189         EXIT;
190 }
191
192 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
193                        struct lov_stripe_md *md)
194 {
195         struct ptlrpc_request *request;
196         struct ost_body *body;
197         int rc, size = sizeof(*body);
198         ENTRY;
199
200         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
201                                   &size, NULL);
202         if (!request)
203                 RETURN(-ENOMEM);
204
205         body = lustre_msg_buf(request->rq_reqmsg, 0);
206 #warning FIXME: pack only valid fields instead of memcpy, endianness
207         memcpy(&body->oa, oa, sizeof(*oa));
208
209         request->rq_replen = lustre_msg_size(1, &size);
210
211         rc = ptlrpc_queue_wait(request);
212         if (rc) {
213                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
214                 GOTO(out, rc);
215         }
216
217         body = lustre_msg_buf(request->rq_repmsg, 0);
218         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
219         memcpy(oa, &body->oa, sizeof(*oa));
220
221         EXIT;
222  out:
223         ptlrpc_req_finished(request);
224         return rc;
225 }
226
227 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
228                     struct lov_stripe_md *md, struct obd_trans_info *oti)
229 {
230         struct ptlrpc_request *request;
231         struct ost_body *body;
232         int rc, size = sizeof(*body);
233         ENTRY;
234
235         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
236                                   NULL);
237         if (!request)
238                 RETURN(-ENOMEM);
239
240         request->rq_flags |= PTL_RPC_FL_REPLAY;
241         body = lustre_msg_buf(request->rq_reqmsg, 0);
242 #warning FIXME: pack only valid fields instead of memcpy, endianness
243         memcpy(&body->oa, oa, sizeof(*oa));
244
245         request->rq_replen = lustre_msg_size(1, &size);
246
247         rc = ptlrpc_queue_wait(request);
248         if (rc)
249                 GOTO(out, rc);
250
251         if (oa) {
252                 struct osc_obdo_data ood;
253                 body = lustre_msg_buf(request->rq_repmsg, 0);
254                 memcpy(oa, &body->oa, sizeof(*oa));
255
256                 /* If the open succeeded, we better have a handle */
257                 /* BlueArc OSTs don't send back (o_valid | FLHANDLE).  sigh.
258                  * Temporary workaround until fixed. -phil 24 Feb 03 */
259                 //LASSERT(oa->o_valid & OBD_MD_FLHANDLE);
260                 oa->o_valid |= OBD_MD_FLHANDLE;
261
262                 memcpy(&ood.ood_fh, obdo_handle(oa), sizeof(ood.ood_fh));
263                 ood.ood_request = ptlrpc_request_addref(request);
264                 ood.ood_magic = OSC_OBDO_DATA_MAGIC;
265
266                 /* Save this data in the request; it will be passed back to us
267                  * in future obdos.  This memcpy is guaranteed to be safe,
268                  * because we check at compile-time that sizeof(ood) is smaller
269                  * than oa->o_inline. */
270                 memcpy(&oa->o_inline, &ood, sizeof(ood));
271         }
272
273         EXIT;
274  out:
275         ptlrpc_req_finished(request);
276         return rc;
277 }
278
279 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
280                      struct lov_stripe_md *md, struct obd_trans_info *oti)
281 {
282         struct obd_import *import = class_conn2cliimp(conn);
283         struct ptlrpc_request *request;
284         struct ost_body *body;
285         struct osc_obdo_data *ood;
286         unsigned long flags;
287         int rc, size = sizeof(*body);
288         ENTRY;
289
290         LASSERT(oa != NULL);
291         ood = (struct osc_obdo_data *)&oa->o_inline;
292         LASSERT(ood->ood_magic == OSC_OBDO_DATA_MAGIC);
293
294         request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
295         if (!request)
296                 RETURN(-ENOMEM);
297
298         body = lustre_msg_buf(request->rq_reqmsg, 0);
299 #warning FIXME: pack only valid fields instead of memcpy, endianness
300         memcpy(&body->oa, oa, sizeof(*oa));
301
302         request->rq_replen = lustre_msg_size(1, &size);
303
304         rc = ptlrpc_queue_wait(request);
305         if (rc) {
306                 /* FIXME: Does this mean that the file is still open locally?
307                  * If not, and I somehow suspect not, we need to cleanup
308                  * below */
309                 GOTO(out, rc);
310         }
311
312         spin_lock_irqsave(&import->imp_lock, flags);
313         ood->ood_request->rq_flags &= ~PTL_RPC_FL_REPLAY;
314         /* see comments in llite/file.c:ll_mdc_close() */
315         if (ood->ood_request->rq_transno) {
316                 LBUG(); /* this can't happen yet */
317                 if (!request->rq_transno) {
318                         request->rq_transno = ood->ood_request->rq_transno;
319                         ptlrpc_retain_replayable_request(request, import);
320                 }
321                 spin_unlock_irqrestore(&import->imp_lock, flags);
322         } else {
323                 spin_unlock_irqrestore(&import->imp_lock, flags);
324                 ptlrpc_req_finished(ood->ood_request);
325         }
326
327         body = lustre_msg_buf(request->rq_repmsg, 0);
328         memcpy(oa, &body->oa, sizeof(*oa));
329
330         EXIT;
331  out:
332         ptlrpc_req_finished(request);
333         return rc;
334 }
335
336 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
337                        struct lov_stripe_md *md, struct obd_trans_info *oti)
338 {
339         struct ptlrpc_request *request;
340         struct ost_body *body;
341         int rc, size = sizeof(*body);
342         ENTRY;
343
344         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
345                                   &size, NULL);
346         if (!request)
347                 RETURN(-ENOMEM);
348
349         body = lustre_msg_buf(request->rq_reqmsg, 0);
350         memcpy(&body->oa, oa, sizeof(*oa));
351
352         request->rq_replen = lustre_msg_size(1, &size);
353
354         rc = ptlrpc_queue_wait(request);
355
356         ptlrpc_req_finished(request);
357         return rc;
358 }
359
360 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
361                       struct lov_stripe_md **ea, struct obd_trans_info *oti_in)
362 {
363         struct ptlrpc_request *request;
364         struct ost_body *body;
365         struct lov_stripe_md *lsm;
366         struct obd_trans_info *oti, trans_info;
367         int rc, size = sizeof(*body);
368         ENTRY;
369
370         LASSERT(oa);
371         LASSERT(ea);
372
373         lsm = *ea;
374         if (!lsm) {
375                 rc = obd_alloc_memmd(conn, &lsm);
376                 if (rc < 0)
377                         RETURN(rc);
378         }
379
380         if (oti_in)
381                 oti = oti_in;
382         else
383                 oti = &trans_info;
384
385         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
386                                   NULL);
387         if (!request)
388                 GOTO(out, rc = -ENOMEM);
389
390         body = lustre_msg_buf(request->rq_reqmsg, 0);
391         memcpy(&body->oa, oa, sizeof(*oa));
392
393         request->rq_replen = lustre_msg_size(1, &size);
394
395         rc = ptlrpc_queue_wait(request);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = lustre_msg_buf(request->rq_repmsg, 0);
400         memcpy(oa, &body->oa, sizeof(*oa));
401
402         lsm->lsm_object_id = oa->o_id;
403         lsm->lsm_stripe_count = 0;
404         *ea = lsm;
405
406         oti_from_request(oti, request);
407         CDEBUG(D_HA, "transno: "LPD64"\n", oti->oti_transno);
408         EXIT;
409 out_req:
410         ptlrpc_req_finished(request);
411 out:
412         if (rc && !*ea)
413                 obd_free_memmd(conn, &lsm);
414         return rc;
415 }
416
417 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
418                      struct lov_stripe_md *md, obd_size start,
419                      obd_size end, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *request;
422         struct ost_body *body;
423         int rc, size = sizeof(*body);
424         ENTRY;
425
426         if (!oa) {
427                 CERROR("oa NULL\n");
428                 RETURN(-EINVAL);
429         }
430
431         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
432                                   NULL);
433         if (!request)
434                 RETURN(-ENOMEM);
435
436         body = lustre_msg_buf(request->rq_reqmsg, 0);
437 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
438         memcpy(&body->oa, oa, sizeof(*oa));
439
440         /* overload the size and blocks fields in the oa with start/end */
441         body->oa.o_size = HTON__u64(start);
442         body->oa.o_blocks = HTON__u64(end);
443         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
444
445         request->rq_replen = lustre_msg_size(1, &size);
446
447         rc = ptlrpc_queue_wait(request);
448         if (rc)
449                 GOTO(out, rc);
450
451         body = lustre_msg_buf(request->rq_repmsg, 0);
452         memcpy(oa, &body->oa, sizeof(*oa));
453
454         EXIT;
455  out:
456         ptlrpc_req_finished(request);
457         return rc;
458 }
459
460 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
461                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
462 {
463         struct ptlrpc_request *request;
464         struct ost_body *body;
465         int rc, size = sizeof(*body);
466         ENTRY;
467
468         if (!oa) {
469                 CERROR("oa NULL\n");
470                 RETURN(-EINVAL);
471         }
472         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
473                                   &size, NULL);
474         if (!request)
475                 RETURN(-ENOMEM);
476
477         body = lustre_msg_buf(request->rq_reqmsg, 0);
478 #warning FIXME: pack only valid fields instead of memcpy, endianness
479         memcpy(&body->oa, oa, sizeof(*oa));
480
481         request->rq_replen = lustre_msg_size(1, &size);
482
483         rc = ptlrpc_queue_wait(request);
484         if (rc)
485                 GOTO(out, rc);
486
487         body = lustre_msg_buf(request->rq_repmsg, 0);
488         memcpy(oa, &body->oa, sizeof(*oa));
489
490         EXIT;
491  out:
492         ptlrpc_req_finished(request);
493         return rc;
494 }
495
496 /* Our bulk-unmapping bottom half. */
497 static void unmap_and_decref_bulk_desc(void *data)
498 {
499         struct ptlrpc_bulk_desc *desc = data;
500         struct list_head *tmp;
501         ENTRY;
502
503         list_for_each(tmp, &desc->bd_page_list) {
504                 struct ptlrpc_bulk_page *bulk;
505                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
506
507                 kunmap(bulk->bp_page);
508                 obd_kmap_put(1);
509         }
510
511         ptlrpc_bulk_decref(desc);
512         EXIT;
513 }
514
515
516 /*  this is the callback function which is invoked by the Portals
517  *  event handler associated with the bulk_sink queue and bulk_source queue.
518  */
519 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
520 {
521         ENTRY;
522
523         LASSERT(desc->bd_brw_set != NULL);
524         LASSERT(desc->bd_brw_set->brw_callback != NULL);
525
526         /* It's important that you don't use desc->bd_brw_set after this
527          * callback runs.  If you do, take a reference on it. */
528         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
529
530         /* We can't kunmap the desc from interrupt context, so we do it from
531          * the bottom half above. */
532         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
533         schedule_work(&desc->bd_queue);
534
535         EXIT;
536 }
537
538 /*
539  * This is called when there was a bulk error return.  However, we don't know
540  * whether the bulk completed or not.  We cancel the portals bulk descriptors,
541  * so that if the OST decides to send them later we don't double free.  Then
542  * remove this descriptor from the set so that the set callback doesn't wait
543  * forever for the last CB_PHASE_FINISH to be called, and finally dump all of
544  * the bulk descriptor references.
545  */
546 static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
547 {
548         ENTRY;
549
550         LASSERT(desc->bd_brw_set != NULL);
551
552         /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or
553          *     just make osc_ptl_ev_hdlr() check desc->bd_flags for either
554          *     PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT
555          *     to brw_callback() and do the rest of the cleanup there.  I
556          *     also think ll_sync_brw_timeout() is missing an PtlMEUnlink,
557          *     but I could be wrong.
558          */
559         if (ptlrpc_abort_bulk(desc)) {
560                 EXIT;
561                 return;
562         }
563         obd_brw_set_del(desc);
564         unmap_and_decref_bulk_desc(desc);
565
566         EXIT;
567 }
568
569 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
570                         obd_count page_count, struct brw_page *pga,
571                         struct obd_brw_set *set)
572 {
573         struct obd_import *imp = class_conn2cliimp(conn);
574         struct ptlrpc_connection *connection = imp->imp_connection;
575         struct ptlrpc_request *request = NULL;
576         struct ptlrpc_bulk_desc *desc = NULL;
577         struct ost_body *body;
578         int rc, size[3] = {sizeof(*body)}, mapped = 0;
579         struct obd_ioobj *iooptr;
580         struct niobuf_remote *nioptr;
581         __u32 xid;
582         ENTRY;
583
584 restart_bulk:
585         size[1] = sizeof(struct obd_ioobj);
586         size[2] = page_count * sizeof(struct niobuf_remote);
587
588         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
589         if (!request)
590                 RETURN(-ENOMEM);
591
592         body = lustre_msg_buf(request->rq_reqmsg, 0);
593         body->oa.o_valid = HTON__u32(OBD_MD_FLCKSUM * CHECKSUM_BULK);
594
595         desc = ptlrpc_prep_bulk(connection);
596         if (!desc)
597                 GOTO(out_req, rc = -ENOMEM);
598         desc->bd_portal = OST_BULK_PORTAL;
599         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
600         CDEBUG(D_PAGE, "desc = %p\n", desc);
601
602         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
603         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
604         ost_pack_ioo(iooptr, lsm, page_count);
605         /* end almost identical to brw_write case */
606
607         xid = ptlrpc_next_xid();       /* single xid for all pages */
608
609         obd_kmap_get(page_count, 0);
610
611         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
612                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
613                 if (bulk == NULL) {
614                         unmap_and_decref_bulk_desc(desc);
615                         GOTO(out_req, rc = -ENOMEM);
616                 }
617
618                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
619
620                 bulk->bp_xid = xid;           /* single xid for all pages */
621                 bulk->bp_buf = kmap(pga[mapped].pg);
622                 bulk->bp_page = pga[mapped].pg;
623                 bulk->bp_buflen = PAGE_SIZE;
624                 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
625                                 pga[mapped].flag, bulk->bp_xid);
626         }
627
628         /*
629          * Register the bulk first, because the reply could arrive out of order,
630          * and we want to be ready for the bulk data.
631          *
632          * One reference is released when osc_ptl_ev_hdlr() is called by
633          * portals, the other when the caller removes us from the "set" list.
634          *
635          * On error, we never do the brw_finish, so we handle all decrefs.
636          */
637         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
638                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
639                        OBD_FAIL_OSC_BRW_READ_BULK);
640         } else {
641                 rc = ptlrpc_register_bulk_put(desc);
642                 if (rc) {
643                         unmap_and_decref_bulk_desc(desc);
644                         GOTO(out_req, rc);
645                 }
646                 obd_brw_set_add(set, desc);
647         }
648
649         request->rq_flags |= PTL_RPC_FL_NO_RESEND;
650         request->rq_replen = lustre_msg_size(1, size);
651         rc = ptlrpc_queue_wait(request);
652
653         /* XXX bug 937 here */
654         if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
655                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
656                 ptlrpc_req_finished(request);
657                 goto restart_bulk;
658         }
659
660         if (rc) {
661                 osc_ptl_ev_abort(desc);
662                 GOTO(out_req, rc);
663         }
664
665 #if CHECKSUM_BULK
666         body = lustre_msg_buf(request->rq_repmsg, 0);
667         if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) {
668                 static int cksum_counter;
669                 __u64 server_cksum = NTOH__u64(body->oa.o_rdev);
670                 __u64 cksum = 0;
671
672                 for (mapped = 0; mapped < page_count; mapped++) {
673                         char *ptr = kmap(pga[mapped].pg);
674                         int   off = pga[mapped].off & (PAGE_SIZE - 1);
675                         int   len = pga[mapped].count;
676
677                         LASSERT(off + len <= PAGE_SIZE);
678                         ost_checksum(&cksum, ptr + off, len);
679                         kunmap(pga[mapped].pg);
680                 }
681
682                 cksum_counter++;
683                 if (server_cksum != cksum) {
684                         CERROR("Bad checksum: server "LPX64", client "LPX64
685                                ", server NID "LPX64"\n", server_cksum, cksum,
686                                imp->imp_connection->c_peer.peer_nid);
687                         cksum_counter = 0;
688                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
689                         CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
690                                cksum_counter,
691                                imp->imp_connection->c_peer.peer_nid, cksum);
692         } else {
693                 static int cksum_missed;
694                 cksum_missed++;
695                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
696                         CERROR("Request checksum %u from "LPX64", no reply\n",
697                                cksum_missed,
698                                imp->imp_connection->c_peer.peer_nid);
699         }
700 #endif
701
702         EXIT;
703  out_req:
704         ptlrpc_req_finished(request);
705         return rc;
706 }
707
708 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
709                          obd_count page_count, struct brw_page *pga,
710                          struct obd_brw_set *set, struct obd_trans_info *oti)
711 {
712         struct obd_import *imp = class_conn2cliimp(conn);
713         struct ptlrpc_connection *connection = imp->imp_connection;
714         struct ptlrpc_request *request = NULL;
715         struct ptlrpc_bulk_desc *desc = NULL;
716         struct ost_body *body;
717         int rc, size[3] = {sizeof(*body)}, mapped = 0;
718         struct obd_ioobj *iooptr;
719         struct niobuf_remote *nioptr;
720         __u32 xid;
721 #if CHECKSUM_BULK
722         __u64 cksum = 0;
723 #endif
724         ENTRY;
725
726 restart_bulk:
727         size[1] = sizeof(struct obd_ioobj);
728         size[2] = page_count * sizeof(struct niobuf_remote);
729
730         request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
731         if (!request)
732                 RETURN(-ENOMEM);
733
734         body = lustre_msg_buf(request->rq_reqmsg, 0);
735
736         desc = ptlrpc_prep_bulk(connection);
737         if (!desc)
738                 GOTO(out_req, rc = -ENOMEM);
739         desc->bd_portal = OSC_BULK_PORTAL;
740         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
741         CDEBUG(D_PAGE, "desc = %p\n", desc);
742
743         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
744         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
745         ost_pack_ioo(iooptr, lsm, page_count);
746         /* end almost identical to brw_read case */
747
748         xid = ptlrpc_next_xid();       /* single xid for all pages */
749
750         obd_kmap_get(page_count, 0);
751
752         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
753                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
754                 if (bulk == NULL) {
755                         unmap_and_decref_bulk_desc(desc);
756                         GOTO(out_req, rc = -ENOMEM);
757                 }
758
759                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
760
761                 bulk->bp_xid = xid;           /* single xid for all pages */
762                 bulk->bp_buf = kmap(pga[mapped].pg);
763                 bulk->bp_page = pga[mapped].pg;
764                 /* matching ptlrpc_bulk_get assert */
765                 LASSERT(pga[mapped].count > 0);
766                 bulk->bp_buflen = pga[mapped].count;
767                 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
768                                 pga[mapped].flag, bulk->bp_xid);
769                 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
770         }
771
772 #if CHECKSUM_BULK
773         body->oa.o_rdev = HTON__u64(cksum);
774         body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
775 #endif
776         /*
777          * Register the bulk first, because the reply could arrive out of
778          * order, and we want to be ready for the bulk data.
779          *
780          * One reference is released when brw_finish is complete, the other
781          * when the caller removes us from the "set" list.
782          *
783          * On error, we never do the brw_finish, so we handle all decrefs.
784          */
785         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
786                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
787                        OBD_FAIL_OSC_BRW_WRITE_BULK);
788         } else {
789                 rc = ptlrpc_register_bulk_get(desc);
790                 if (rc) {
791                         unmap_and_decref_bulk_desc(desc);
792                         GOTO(out_req, rc);
793                 }
794                 obd_brw_set_add(set, desc);
795         }
796
797         request->rq_flags |= PTL_RPC_FL_NO_RESEND;
798         request->rq_replen = lustre_msg_size(1, size);
799         rc = ptlrpc_queue_wait(request);
800
801         /* XXX bug 937 here */
802         if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
803                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
804                 ptlrpc_req_finished(request);
805                 goto restart_bulk;
806         }
807
808         if (rc) {
809                 osc_ptl_ev_abort(desc);
810                 GOTO(out_req, rc);
811         }
812
813         EXIT;
814  out_req:
815         ptlrpc_req_finished(request);
816         return rc;
817 }
818
819 #ifndef min_t
820 #define min_t(a,b,c) ( b<c ) ? b : c
821 #endif
822
823 #warning "FIXME: make values dynamic based on get_info at setup (bug 665)"
824 #define OSC_BRW_MAX_SIZE 65536
825 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
826
827 #warning "FIXME: make these values dynamic based on a get_info call at setup"
828 #define OSC_BRW_MAX_SIZE 65536
829 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
830
831 static int osc_brw(int cmd, struct lustre_handle *conn,
832                    struct lov_stripe_md *md, obd_count page_count,
833                    struct brw_page *pga, struct obd_brw_set *set,
834                    struct obd_trans_info *oti)
835 {
836         ENTRY;
837
838         while (page_count) {
839                 obd_count pages_per_brw;
840                 int rc;
841
842                 if (page_count > OSC_BRW_MAX_IOV)
843                         pages_per_brw = OSC_BRW_MAX_IOV;
844                 else
845                         pages_per_brw = page_count;
846
847                 if (cmd & OBD_BRW_WRITE)
848                         rc = osc_brw_write(conn, md, pages_per_brw, pga,
849                                            set, oti);
850                 else
851                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
852
853                 if (rc != 0)
854                         RETURN(rc);
855
856                 page_count -= pages_per_brw;
857                 pga += pages_per_brw;
858         }
859         RETURN(0);
860 }
861
862 #ifdef __KERNEL__
863 /* Note: caller will lock/unlock, and set uptodate on the pages */
864 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
865 static int sanosc_brw_read(struct lustre_handle *conn,
866                            struct lov_stripe_md *lsm,
867                            obd_count page_count,
868                            struct brw_page *pga,
869                            struct obd_brw_set *set)
870 {
871         struct ptlrpc_request *request = NULL;
872         struct ost_body *body;
873         struct niobuf_remote *nioptr;
874         struct obd_ioobj *iooptr;
875         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
876         ENTRY;
877
878         size[1] = sizeof(struct obd_ioobj);
879         size[2] = page_count * sizeof(*nioptr);
880
881         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
882                                   size, NULL);
883         if (!request)
884                 RETURN(-ENOMEM);
885
886         body = lustre_msg_buf(request->rq_reqmsg, 0);
887         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
888         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
889         ost_pack_ioo(iooptr, lsm, page_count);
890
891         obd_kmap_get(page_count, 0);
892
893         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
894                 LASSERT(PageLocked(pga[mapped].pg));
895                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
896
897                 kmap(pga[mapped].pg);
898                 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
899                                 pga[mapped].flag, 0);
900         }
901
902         size[1] = page_count * sizeof(*nioptr);
903         request->rq_replen = lustre_msg_size(2, size);
904
905         rc = ptlrpc_queue_wait(request);
906         if (rc)
907                 GOTO(out_unmap, rc);
908
909         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
910         if (!nioptr)
911                 GOTO(out_unmap, rc = -EINVAL);
912
913         if (request->rq_repmsg->buflens[1] != size[1]) {
914                 CERROR("buffer length wrong (%d vs. %d)\n",
915                        request->rq_repmsg->buflens[1], size[1]);
916                 GOTO(out_unmap, rc = -EINVAL);
917         }
918
919         /* actual read */
920         for (j = 0; j < page_count; j++, nioptr++) {
921                 struct page *page = pga[j].pg;
922                 struct buffer_head *bh;
923                 kdev_t dev;
924
925                 ost_unpack_niobuf(nioptr, nioptr);
926                 /* got san device associated */
927                 LASSERT(class_conn2obd(conn));
928                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
929
930                 /* hole */
931                 if (!nioptr->offset) {
932                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
933                                         page->mapping->host->i_ino,
934                                         page->index);
935                         memset(page_address(page), 0, PAGE_SIZE);
936                         continue;
937                 }
938
939                 if (!page->buffers) {
940                         create_empty_buffers(page, dev, PAGE_SIZE);
941                         bh = page->buffers;
942
943                         clear_bit(BH_New, &bh->b_state);
944                         set_bit(BH_Mapped, &bh->b_state);
945                         bh->b_blocknr = (unsigned long)nioptr->offset;
946
947                         clear_bit(BH_Uptodate, &bh->b_state);
948
949                         ll_rw_block(READ, 1, &bh);
950                 } else {
951                         bh = page->buffers;
952
953                         /* if buffer already existed, it must be the
954                          * one we mapped before, check it */
955                         LASSERT(!test_bit(BH_New, &bh->b_state));
956                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
957                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
958
959                         /* wait it's io completion */
960                         if (test_bit(BH_Lock, &bh->b_state))
961                                 wait_on_buffer(bh);
962
963                         if (!test_bit(BH_Uptodate, &bh->b_state))
964                                 ll_rw_block(READ, 1, &bh);
965                 }
966
967
968                 /* must do syncronous write here */
969                 wait_on_buffer(bh);
970                 if (!buffer_uptodate(bh)) {
971                         /* I/O error */
972                         rc = -EIO;
973                         goto out_unmap;
974                 }
975         }
976
977 out_req:
978         ptlrpc_req_finished(request);
979         RETURN(rc);
980
981 out_unmap:
982         /* Clean up on error. */
983         while (mapped-- > 0)
984                 kunmap(pga[mapped].pg);
985
986         obd_kmap_put(page_count);
987
988         goto out_req;
989 }
990
991 static int sanosc_brw_write(struct lustre_handle *conn,
992                             struct lov_stripe_md *lsm,
993                             obd_count page_count,
994                             struct brw_page *pga,
995                             struct obd_brw_set *set)
996 {
997         struct ptlrpc_request *request = NULL;
998         struct ost_body *body;
999         struct niobuf_remote *nioptr;
1000         struct obd_ioobj *iooptr;
1001         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
1002         ENTRY;
1003
1004         size[1] = sizeof(struct obd_ioobj);
1005         size[2] = page_count * sizeof(*nioptr);
1006
1007         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1008                                   3, size, NULL);
1009         if (!request)
1010                 RETURN(-ENOMEM);
1011
1012         body = lustre_msg_buf(request->rq_reqmsg, 0);
1013         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
1014         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
1015         ost_pack_ioo(iooptr, lsm, page_count);
1016
1017         /* map pages, and pack request */
1018         obd_kmap_get(page_count, 0);
1019         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1020                 LASSERT(PageLocked(pga[mapped].pg));
1021                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1022
1023                 kmap(pga[mapped].pg);
1024                 ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
1025                                 pga[mapped].flag, 0);
1026         }
1027
1028         size[1] = page_count * sizeof(*nioptr);
1029         request->rq_replen = lustre_msg_size(2, size);
1030
1031         rc = ptlrpc_queue_wait(request);
1032         if (rc)
1033                 GOTO(out_unmap, rc);
1034
1035         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
1036         if (!nioptr)
1037                 GOTO(out_unmap, rc = -EINVAL);
1038
1039         if (request->rq_repmsg->buflens[1] != size[1]) {
1040                 CERROR("buffer length wrong (%d vs. %d)\n",
1041                        request->rq_repmsg->buflens[1], size[1]);
1042                 GOTO(out_unmap, rc = -EINVAL);
1043         }
1044
1045         /* actual write */
1046         for (j = 0; j < page_count; j++, nioptr++) {
1047                 struct page *page = pga[j].pg;
1048                 struct buffer_head *bh;
1049                 kdev_t dev;
1050
1051                 ost_unpack_niobuf(nioptr, nioptr);
1052                 /* got san device associated */
1053                 LASSERT(class_conn2obd(conn));
1054                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1055
1056                 if (!page->buffers) {
1057                         create_empty_buffers(page, dev, PAGE_SIZE);
1058                 } else {
1059                         /* checking */
1060                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1061                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1062                         LASSERT(page->buffers->b_blocknr ==
1063                                 (unsigned long)nioptr->offset);
1064                 }
1065                 bh = page->buffers;
1066
1067                 LASSERT(bh);
1068
1069                 /* if buffer locked, wait it's io completion */
1070                 if (test_bit(BH_Lock, &bh->b_state))
1071                         wait_on_buffer(bh);
1072
1073                 clear_bit(BH_New, &bh->b_state);
1074                 set_bit(BH_Mapped, &bh->b_state);
1075
1076                 /* override the block nr */
1077                 bh->b_blocknr = (unsigned long)nioptr->offset;
1078
1079                 /* we are about to write it, so set it
1080                  * uptodate/dirty
1081                  * page lock should garentee no race condition here */
1082                 set_bit(BH_Uptodate, &bh->b_state);
1083                 set_bit(BH_Dirty, &bh->b_state);
1084
1085                 ll_rw_block(WRITE, 1, &bh);
1086
1087                 /* must do syncronous write here */
1088                 wait_on_buffer(bh);
1089                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1090                         /* I/O error */
1091                         rc = -EIO;
1092                         goto out_unmap;
1093                 }
1094         }
1095
1096 out_req:
1097         ptlrpc_req_finished(request);
1098         RETURN(rc);
1099
1100 out_unmap:
1101         /* Clean up on error. */
1102         while (mapped-- > 0)
1103                 kunmap(pga[mapped].pg);
1104
1105         obd_kmap_put(page_count);
1106
1107         goto out_req;
1108 }
1109
1110 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1111                       struct lov_stripe_md *lsm, obd_count page_count,
1112                       struct brw_page *pga, struct obd_brw_set *set,
1113                       struct obd_trans_info *oti)
1114 {
1115         ENTRY;
1116
1117         while (page_count) {
1118                 obd_count pages_per_brw;
1119                 int rc;
1120
1121                 if (page_count > OSC_BRW_MAX_IOV)
1122                         pages_per_brw = OSC_BRW_MAX_IOV;
1123                 else
1124                         pages_per_brw = page_count;
1125
1126                 if (cmd & OBD_BRW_WRITE)
1127                         rc = sanosc_brw_write(conn, lsm, pages_per_brw,
1128                                               pga, set);
1129                 else
1130                         rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set);
1131
1132                 if (rc != 0)
1133                         RETURN(rc);
1134
1135                 page_count -= pages_per_brw;
1136                 pga += pages_per_brw;
1137         }
1138         RETURN(0);
1139 }
1140 #endif
1141 #endif
1142
1143 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1144                        struct lustre_handle *parent_lock,
1145                        __u32 type, void *extentp, int extent_len, __u32 mode,
1146                        int *flags, void *callback, void *data, int datalen,
1147                        struct lustre_handle *lockh)
1148 {
1149         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1150         struct obd_device *obddev = class_conn2obd(connh);
1151         struct ldlm_extent *extent = extentp;
1152         int rc;
1153         ENTRY;
1154
1155         /* Filesystem locks are given a bit of special treatment: if
1156          * this is not a file size lock (which has end == -1), we
1157          * fixup the lock to start and end on page boundaries. */
1158         if (extent->end != OBD_OBJECT_EOF) {
1159                 extent->start &= PAGE_MASK;
1160                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
1161         }
1162
1163         /* Next, search for already existing extent locks that will cover us */
1164         rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type, extent,
1165                              sizeof(extent), mode, lockh);
1166         if (rc == 1)
1167                 /* We already have a lock, and it's referenced */
1168                 RETURN(ELDLM_LOCK_MATCHED);
1169
1170         /* If we're trying to read, we also search for an existing PW lock.  The
1171          * VFS and page cache already protect us locally, so lots of readers/
1172          * writers can share a single PW lock.
1173          *
1174          * There are problems with conversion deadlocks, so instead of
1175          * converting a read lock to a write lock, we'll just enqueue a new
1176          * one.
1177          *
1178          * At some point we should cancel the read lock instead of making them
1179          * send us a blocking callback, but there are problems with canceling
1180          * locks out from other users right now, too. */
1181
1182         if (mode == LCK_PR) {
1183                 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type,
1184                                      extent, sizeof(extent), LCK_PW, lockh);
1185                 if (rc == 1) {
1186                         /* FIXME: This is not incredibly elegant, but it might
1187                          * be more elegant than adding another parameter to
1188                          * lock_match.  I want a second opinion. */
1189                         ldlm_lock_addref(lockh, LCK_PR);
1190                         ldlm_lock_decref(lockh, LCK_PW);
1191
1192                         RETURN(ELDLM_LOCK_MATCHED);
1193                 }
1194         }
1195
1196         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1197                               res_id, type, extent, sizeof(extent), mode, flags,
1198                               ldlm_completion_ast, callback, data, NULL,
1199                               lockh);
1200         RETURN(rc);
1201 }
1202
1203 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1204                       __u32 mode, struct lustre_handle *lockh)
1205 {
1206         ENTRY;
1207
1208         ldlm_lock_decref(lockh, mode);
1209
1210         RETURN(0);
1211 }
1212
1213 static int osc_cancel_unused(struct lustre_handle *connh,
1214                              struct lov_stripe_md *lsm, int flags)
1215 {
1216         struct obd_device *obddev = class_conn2obd(connh);
1217         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1218
1219         return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags);
1220 }
1221
1222 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1223 {
1224         struct ptlrpc_request *request;
1225         int rc, size = sizeof(*osfs);
1226         ENTRY;
1227
1228         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
1229                                   NULL);
1230         if (!request)
1231                 RETURN(-ENOMEM);
1232
1233         request->rq_replen = lustre_msg_size(1, &size);
1234
1235         rc = ptlrpc_queue_wait(request);
1236         if (rc) {
1237                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1238                 GOTO(out, rc);
1239         }
1240
1241         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
1242
1243         EXIT;
1244  out:
1245         ptlrpc_req_finished(request);
1246         return rc;
1247 }
1248
1249 /* Retrieve object striping information.
1250  *
1251  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1252  * the maximum number of OST indices which will fit in the user buffer.
1253  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1254  */
1255 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1256                          struct lov_mds_md *lmmu)
1257 {
1258         struct lov_mds_md lmm, *lmmk;
1259         int rc, lmm_size;
1260         ENTRY;
1261
1262         if (!lsm)
1263                 RETURN(-ENODATA);
1264
1265         rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1266         if (rc)
1267                 RETURN(-EFAULT);
1268
1269         if (lmm.lmm_magic != LOV_MAGIC)
1270                 RETURN(-EINVAL);
1271
1272         if (lmm.lmm_ost_count < 1)
1273                 RETURN(-EOVERFLOW);
1274
1275         lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1276         OBD_ALLOC(lmmk, lmm_size);
1277         if (rc < 0)
1278                 RETURN(rc);
1279
1280         lmmk->lmm_stripe_count = 1;
1281         lmmk->lmm_ost_count = 1;
1282         lmmk->lmm_object_id = lsm->lsm_object_id;
1283         lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1284
1285         if (copy_to_user(lmmu, lmmk, lmm_size))
1286                 rc = -EFAULT;
1287
1288         OBD_FREE(lmmk, lmm_size);
1289
1290         RETURN(rc);
1291 }
1292
1293 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1294                          void *karg, void *uarg)
1295 {
1296         struct obd_device *obddev = class_conn2obd(conn);
1297         struct obd_ioctl_data *data = karg;
1298         int err = 0;
1299         ENTRY;
1300
1301         switch (cmd) {
1302 #if 0
1303         case IOC_LDLM_TEST: {
1304                 err = ldlm_test(obddev, conn);
1305                 CERROR("-- done err %d\n", err);
1306                 GOTO(out, err);
1307         }
1308         case IOC_LDLM_REGRESS_START: {
1309                 unsigned int numthreads = 1;
1310                 unsigned int numheld = 10;
1311                 unsigned int numres = 10;
1312                 unsigned int numext = 10;
1313                 char *parse;
1314
1315                 if (data->ioc_inllen1) {
1316                         parse = data->ioc_inlbuf1;
1317                         if (*parse != '\0') {
1318                                 while(isspace(*parse)) parse++;
1319                                 numthreads = simple_strtoul(parse, &parse, 0);
1320                                 while(isspace(*parse)) parse++;
1321                         }
1322                         if (*parse != '\0') {
1323                                 while(isspace(*parse)) parse++;
1324                                 numheld = simple_strtoul(parse, &parse, 0);
1325                                 while(isspace(*parse)) parse++;
1326                         }
1327                         if (*parse != '\0') {
1328                                 while(isspace(*parse)) parse++;
1329                                 numres = simple_strtoul(parse, &parse, 0);
1330                                 while(isspace(*parse)) parse++;
1331                         }
1332                         if (*parse != '\0') {
1333                                 while(isspace(*parse)) parse++;
1334                                 numext = simple_strtoul(parse, &parse, 0);
1335                                 while(isspace(*parse)) parse++;
1336                         }
1337                 }
1338
1339                 err = ldlm_regression_start(obddev, conn, numthreads,
1340                                 numheld, numres, numext);
1341
1342                 CERROR("-- done err %d\n", err);
1343                 GOTO(out, err);
1344         }
1345         case IOC_LDLM_REGRESS_STOP: {
1346                 err = ldlm_regression_stop();
1347                 CERROR("-- done err %d\n", err);
1348                 GOTO(out, err);
1349         }
1350 #endif
1351         case IOC_OSC_REGISTER_LOV: {
1352                 if (obddev->u.cli.cl_containing_lov)
1353                         GOTO(out, err = -EALREADY);
1354                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1355                 GOTO(out, err);
1356         }
1357         case OBD_IOC_LOV_GET_CONFIG: {
1358                 char *buf;
1359                 struct lov_desc *desc;
1360                 struct obd_uuid uuid;
1361
1362                 buf = NULL;
1363                 len = 0;
1364                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1365                         GOTO(out, err = -EINVAL);
1366
1367                 data = (struct obd_ioctl_data *)buf;
1368
1369                 if (sizeof(*desc) > data->ioc_inllen1) {
1370                         OBD_FREE(buf, len);
1371                         GOTO(out, err = -EINVAL);
1372                 }
1373
1374                 if (data->ioc_inllen2 < sizeof(uuid)) {
1375                         OBD_FREE(buf, len);
1376                         GOTO(out, err = -EINVAL);
1377                 }
1378
1379                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1380                 desc->ld_tgt_count = 1;
1381                 desc->ld_active_tgt_count = 1;
1382                 desc->ld_default_stripe_count = 1;
1383                 desc->ld_default_stripe_size = 0;
1384                 desc->ld_default_stripe_offset = 0;
1385                 desc->ld_pattern = 0;
1386                 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1387
1388                 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1389
1390                 err = copy_to_user((void *)uarg, buf, len);
1391                 if (err)
1392                         err = -EFAULT;
1393                 OBD_FREE(buf, len);
1394                 GOTO(out, err);
1395         }
1396         case LL_IOC_LOV_SETSTRIPE:
1397                 err = obd_alloc_memmd(conn, karg);
1398                 if (err > 0)
1399                         err = 0;
1400                 GOTO(out, err);
1401         case LL_IOC_LOV_GETSTRIPE:
1402                 err = osc_getstripe(conn, karg, uarg);
1403                 GOTO(out, err);
1404         default:
1405                 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1406                 GOTO(out, err = -ENOTTY);
1407         }
1408 out:
1409         return err;
1410 }
1411
1412 static void set_osc_active(struct obd_import *imp, int active)
1413 {
1414         struct obd_device *notify_obd;
1415
1416         LASSERT(imp->imp_obd);
1417
1418         notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
1419
1420         if (notify_obd == NULL)
1421                 return;
1422
1423         /* How gross is _this_? */
1424         if (!list_empty(&notify_obd->obd_exports)) {
1425                 int rc;
1426                 struct lustre_handle fakeconn;
1427                 struct obd_ioctl_data ioc_data = { 0 };
1428                 struct obd_export *exp =
1429                         list_entry(notify_obd->obd_exports.next,
1430                                    struct obd_export, exp_obd_chain);
1431
1432                 fakeconn.addr = (__u64)(unsigned long)exp;
1433                 fakeconn.cookie = exp->exp_cookie;
1434                 ioc_data.ioc_inlbuf1 =
1435                         (char *)&imp->imp_obd->u.cli.cl_target_uuid;
1436                 ioc_data.ioc_offset = active;
1437                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
1438                                    sizeof ioc_data, &ioc_data, NULL);
1439                 if (rc)
1440                         CERROR("error disabling %s on LOV %p/%s: %d\n",
1441                                imp->imp_obd->u.cli.cl_target_uuid.uuid,
1442                                notify_obd, notify_obd->obd_uuid.uuid, rc);
1443         } else {
1444                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
1445                        "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
1446                        imp->imp_obd->obd_uuid.uuid);
1447         }
1448 }
1449
1450 static int osc_recover(struct obd_import *imp, int phase)
1451 {
1452         int rc;
1453         unsigned long flags;
1454         int msg_flags;
1455         struct ptlrpc_request *req;
1456         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1457         ENTRY;
1458
1459         CDEBUG(D_HA, "%s: entering phase: %d\n",
1460                imp->imp_obd->obd_name, phase);
1461         switch(phase) {
1462
1463             case PTLRPC_RECOVD_PHASE_PREPARE: {
1464                 if (imp->imp_flags & IMP_REPLAYABLE) {
1465                         CDEBUG(D_HA, "failover OST\n");
1466                         /* If we're a failover OSC/OST, just cancel unused
1467                          * locks to simplify lock replay.
1468                          */
1469                         ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
1470                 } else {
1471                         CDEBUG(D_HA, "non-failover OST\n");
1472                         /* Non-failover OSTs (LLNL scenario) disable the OSC
1473                          * and invalidate local state.
1474                          */
1475                         ldlm_namespace_cleanup(ns, 1 /* no network ops */);
1476                         ptlrpc_abort_inflight(imp, 0);
1477                         set_osc_active(imp, 0 /* inactive */);
1478                 }
1479                 RETURN(0);
1480             }
1481
1482         case PTLRPC_RECOVD_PHASE_RECOVER: {
1483         reconnect:
1484                 imp->imp_flags &= ~IMP_INVALID;
1485                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
1486
1487                 msg_flags = req->rq_repmsg
1488                         ? lustre_msg_get_op_flags(req->rq_repmsg)
1489                         : 0;
1490
1491                 if (rc == -EBUSY && (msg_flags & MSG_CONNECT_RECOVERING))
1492                         CERROR("reconnect denied by recovery; should retry\n");
1493
1494                 if (rc) {
1495                         if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
1496                                 CERROR("can't reconnect, invalidating\n");
1497                                 ldlm_namespace_cleanup(ns, 1);
1498                                 ptlrpc_abort_inflight(imp, 0);
1499                         }
1500                         imp->imp_flags |= IMP_INVALID;
1501                         ptlrpc_req_finished(req);
1502                         RETURN(rc);
1503                 }
1504
1505                 if (msg_flags & MSG_CONNECT_RECOVERING) {
1506                         /* Replay if they want it. */
1507                         DEBUG_REQ(D_HA, req, "OST wants replay");
1508                         rc = ptlrpc_replay(imp);
1509                         if (rc)
1510                                 GOTO(check_rc, rc);
1511
1512                         rc = ldlm_replay_locks(imp);
1513                         if (rc)
1514                                 GOTO(check_rc, rc);
1515
1516                         rc = signal_completed_replay(imp);
1517                         if (rc)
1518                                 GOTO(check_rc, rc);
1519                 } else if (msg_flags & MSG_CONNECT_RECONNECT) {
1520                         DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
1521                         /* Nothing else to do here. */
1522                 } else {
1523                         DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
1524                         /* Otherwise, clean everything up. */
1525                         ldlm_namespace_cleanup(ns, 1);
1526                         ptlrpc_abort_inflight(imp, 0);
1527                 }
1528
1529                 ptlrpc_req_finished(req);
1530
1531                 spin_lock_irqsave(&imp->imp_lock, flags);
1532                 imp->imp_level = LUSTRE_CONN_FULL;
1533                 imp->imp_flags &= ~IMP_INVALID;
1534                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1535
1536                 /* Is this the right place?  Should we do this in _PREPARE
1537                  * as well?  What about raising the level right away?
1538                  */
1539                 ptlrpc_wake_delayed(imp);
1540
1541                 rc = ptlrpc_resend(imp);
1542                 if (rc)
1543                         GOTO(check_rc, rc);
1544
1545                 set_osc_active(imp, 1 /* active */);
1546                 RETURN(0);
1547
1548         check_rc:
1549                 /* If we get disconnected in the middle, recovery has probably
1550                  * failed.  Reconnect and find out.
1551                  */
1552                 if (rc == -ENOTCONN)
1553                         goto reconnect;
1554                 RETURN(rc);
1555         }
1556             case PTLRPC_RECOVD_PHASE_NOTCONN:
1557                 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1558                 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1559
1560             default:
1561                 RETURN(-EINVAL);
1562         }
1563 }
1564
1565 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1566                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
1567                        ptlrpc_recovery_cb_t recover)
1568 {
1569         struct obd_import *imp = &obd->u.cli.cl_import;
1570         imp->imp_recover = osc_recover;
1571         return client_obd_connect(conn, obd, cluuid, recovd, recover);
1572 }
1573
1574 struct obd_ops osc_obd_ops = {
1575         o_owner:        THIS_MODULE,
1576         o_attach:       osc_attach,
1577         o_detach:       osc_detach,
1578         o_setup:        client_obd_setup,
1579         o_cleanup:      client_obd_cleanup,
1580         o_connect:      osc_connect,
1581         o_disconnect:   client_obd_disconnect,
1582         o_statfs:       osc_statfs,
1583         o_packmd:       osc_packmd,
1584         o_unpackmd:     osc_unpackmd,
1585         o_create:       osc_create,
1586         o_destroy:      osc_destroy,
1587         o_getattr:      osc_getattr,
1588         o_setattr:      osc_setattr,
1589         o_open:         osc_open,
1590         o_close:        osc_close,
1591         o_brw:          osc_brw,
1592         o_punch:        osc_punch,
1593         o_enqueue:      osc_enqueue,
1594         o_cancel:       osc_cancel,
1595         o_cancel_unused: osc_cancel_unused,
1596         o_iocontrol:    osc_iocontrol
1597 };
1598
1599 struct obd_ops sanosc_obd_ops = {
1600         o_owner:        THIS_MODULE,
1601         o_attach:       osc_attach,
1602         o_detach:       osc_detach,
1603         o_cleanup:      client_obd_cleanup,
1604         o_connect:      osc_connect,
1605         o_disconnect:   client_obd_disconnect,
1606         o_statfs:       osc_statfs,
1607         o_packmd:       osc_packmd,
1608         o_unpackmd:     osc_unpackmd,
1609         o_create:       osc_create,
1610         o_destroy:      osc_destroy,
1611         o_getattr:      osc_getattr,
1612         o_setattr:      osc_setattr,
1613         o_open:         osc_open,
1614         o_close:        osc_close,
1615 #ifdef __KERNEL__
1616         o_setup:        client_sanobd_setup,
1617         o_brw:          sanosc_brw,
1618 #endif
1619         o_punch:        osc_punch,
1620         o_enqueue:      osc_enqueue,
1621         o_cancel:       osc_cancel,
1622         o_cancel_unused: osc_cancel_unused,
1623         o_iocontrol:    osc_iocontrol,
1624 };
1625
1626 int __init osc_init(void)
1627 {
1628         struct lprocfs_static_vars lvars;
1629         int rc;
1630         ENTRY;
1631
1632         LASSERT(sizeof(struct osc_obdo_data) <= FD_OSTDATA_SIZE);
1633
1634         lprocfs_init_vars(&lvars);
1635
1636         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1637                                  LUSTRE_OSC_NAME);
1638         if (rc)
1639                 RETURN(rc);
1640
1641         rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1642                                  LUSTRE_SANOSC_NAME);
1643         if (rc)
1644                 class_unregister_type(LUSTRE_OSC_NAME);
1645
1646         RETURN(rc);
1647 }
1648
1649 static void __exit osc_exit(void)
1650 {
1651         class_unregister_type(LUSTRE_SANOSC_NAME);
1652         class_unregister_type(LUSTRE_OSC_NAME);
1653 }
1654
1655 #ifdef __KERNEL__
1656 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1657 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1658 MODULE_LICENSE("GPL");
1659
1660 module_init(osc_init);
1661 module_exit(osc_exit);
1662 #endif