Whamcloud - gitweb
LU-1030 clio: reimplement ll_fsync in clio way
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
667         if (res == NULL)
668                 RETURN(0);
669
670         LDLM_RESOURCE_ADDREF(res);
671         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672                                            lock_flags, 0, NULL);
673         LDLM_RESOURCE_DELREF(res);
674         ldlm_resource_putref(res);
675         RETURN(count);
676 }
677
678 static int osc_destroy_interpret(const struct lu_env *env,
679                                  struct ptlrpc_request *req, void *data,
680                                  int rc)
681 {
682         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
683
684         cfs_atomic_dec(&cli->cl_destroy_in_flight);
685         cfs_waitq_signal(&cli->cl_destroy_waitq);
686         return 0;
687 }
688
689 static int osc_can_send_destroy(struct client_obd *cli)
690 {
691         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692             cli->cl_max_rpcs_in_flight) {
693                 /* The destroy request can be sent */
694                 return 1;
695         }
696         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697             cli->cl_max_rpcs_in_flight) {
698                 /*
699                  * The counter has been modified between the two atomic
700                  * operations.
701                  */
702                 cfs_waitq_signal(&cli->cl_destroy_waitq);
703         }
704         return 0;
705 }
706
707 /* Destroy requests can be async always on the client, and we don't even really
708  * care about the return code since the client cannot do anything at all about
709  * a destroy failure.
710  * When the MDS is unlinking a filename, it saves the file objects into a
711  * recovery llog, and these object records are cancelled when the OST reports
712  * they were destroyed and sync'd to disk (i.e. transaction committed).
713  * If the client dies, or the OST is down when the object should be destroyed,
714  * the records are not cancelled, and when the OST reconnects to the MDS next,
715  * it will retrieve the llog unlink logs and then sends the log cancellation
716  * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718                        struct obdo *oa, struct lov_stripe_md *ea,
719                        struct obd_trans_info *oti, struct obd_export *md_export,
720                        void *capa)
721 {
722         struct client_obd     *cli = &exp->exp_obd->u.cli;
723         struct ptlrpc_request *req;
724         struct ost_body       *body;
725         CFS_LIST_HEAD(cancels);
726         int rc, count;
727         ENTRY;
728
729         if (!oa) {
730                 CDEBUG(D_INFO, "oa NULL\n");
731                 RETURN(-EINVAL);
732         }
733
734         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735                                         LDLM_FL_DISCARD_DATA);
736
737         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
738         if (req == NULL) {
739                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
740                 RETURN(-ENOMEM);
741         }
742
743         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
745                                0, &cancels, count);
746         if (rc) {
747                 ptlrpc_request_free(req);
748                 RETURN(rc);
749         }
750
751         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752         ptlrpc_at_set_req_timeout(req);
753
754         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755                 oa->o_lcookie = *oti->oti_logcookies;
756         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
757         LASSERT(body);
758         lustre_set_wire_obdo(&body->oa, oa);
759
760         osc_pack_capa(req, body, (struct obd_capa *)capa);
761         ptlrpc_request_set_replen(req);
762
763         /* don't throttle destroy RPCs for the MDT */
764         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
765                 req->rq_interpret_reply = osc_destroy_interpret;
766                 if (!osc_can_send_destroy(cli)) {
767                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
768                                                           NULL);
769
770                         /*
771                          * Wait until the number of on-going destroy RPCs drops
772                          * under max_rpc_in_flight
773                          */
774                         l_wait_event_exclusive(cli->cl_destroy_waitq,
775                                                osc_can_send_destroy(cli), &lwi);
776                 }
777         }
778
779         /* Do not wait for response */
780         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
781         RETURN(0);
782 }
783
784 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
785                                 long writing_bytes)
786 {
787         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
788
789         LASSERT(!(oa->o_valid & bits));
790
791         oa->o_valid |= bits;
792         client_obd_list_lock(&cli->cl_loi_list_lock);
793         oa->o_dirty = cli->cl_dirty;
794         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
795                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
796                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
797                 oa->o_undirty = 0;
798         } else if (cfs_atomic_read(&obd_dirty_pages) -
799                    cfs_atomic_read(&obd_dirty_transit_pages) >
800                    obd_max_dirty_pages + 1){
801                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
802                  * not covered by a lock thus they may safely race and trip
803                  * this CERROR() unless we add in a small fudge factor (+1). */
804                 CERROR("dirty %d - %d > system dirty_max %d\n",
805                        cfs_atomic_read(&obd_dirty_pages),
806                        cfs_atomic_read(&obd_dirty_transit_pages),
807                        obd_max_dirty_pages);
808                 oa->o_undirty = 0;
809         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
810                 CERROR("dirty %lu - dirty_max %lu too big???\n",
811                        cli->cl_dirty, cli->cl_dirty_max);
812                 oa->o_undirty = 0;
813         } else {
814                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
815                                 (cli->cl_max_rpcs_in_flight + 1);
816                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
817         }
818         oa->o_grant = cli->cl_avail_grant;
819         oa->o_dropped = cli->cl_lost_grant;
820         cli->cl_lost_grant = 0;
821         client_obd_list_unlock(&cli->cl_loi_list_lock);
822         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
823                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
824
825 }
826
827 void osc_update_next_shrink(struct client_obd *cli)
828 {
829         cli->cl_next_shrink_grant =
830                 cfs_time_shift(cli->cl_grant_shrink_interval);
831         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
832                cli->cl_next_shrink_grant);
833 }
834
835 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
836 {
837         client_obd_list_lock(&cli->cl_loi_list_lock);
838         cli->cl_avail_grant += grant;
839         client_obd_list_unlock(&cli->cl_loi_list_lock);
840 }
841
842 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
843 {
844         if (body->oa.o_valid & OBD_MD_FLGRANT) {
845                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
846                 __osc_update_grant(cli, body->oa.o_grant);
847         }
848 }
849
850 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
851                               obd_count keylen, void *key, obd_count vallen,
852                               void *val, struct ptlrpc_request_set *set);
853
854 static int osc_shrink_grant_interpret(const struct lu_env *env,
855                                       struct ptlrpc_request *req,
856                                       void *aa, int rc)
857 {
858         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
859         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
860         struct ost_body *body;
861
862         if (rc != 0) {
863                 __osc_update_grant(cli, oa->o_grant);
864                 GOTO(out, rc);
865         }
866
867         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
868         LASSERT(body);
869         osc_update_grant(cli, body);
870 out:
871         OBDO_FREE(oa);
872         return rc;
873 }
874
875 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         oa->o_grant = cli->cl_avail_grant / 4;
879         cli->cl_avail_grant -= oa->o_grant;
880         client_obd_list_unlock(&cli->cl_loi_list_lock);
881         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
882                 oa->o_valid |= OBD_MD_FLFLAGS;
883                 oa->o_flags = 0;
884         }
885         oa->o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887 }
888
889 /* Shrink the current grant, either from some large amount to enough for a
890  * full set of in-flight RPCs, or if we have already shrunk to that limit
891  * then to enough for a single RPC.  This avoids keeping more grant than
892  * needed, and avoids shrinking the grant piecemeal. */
893 static int osc_shrink_grant(struct client_obd *cli)
894 {
895         long target = (cli->cl_max_rpcs_in_flight + 1) *
896                       cli->cl_max_pages_per_rpc;
897
898         client_obd_list_lock(&cli->cl_loi_list_lock);
899         if (cli->cl_avail_grant <= target)
900                 target = cli->cl_max_pages_per_rpc;
901         client_obd_list_unlock(&cli->cl_loi_list_lock);
902
903         return osc_shrink_grant_to_target(cli, target);
904 }
905
906 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
907 {
908         int    rc = 0;
909         struct ost_body     *body;
910         ENTRY;
911
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         /* Don't shrink if we are already above or below the desired limit
914          * We don't want to shrink below a single RPC, as that will negatively
915          * impact block allocation and long-term performance. */
916         if (target < cli->cl_max_pages_per_rpc)
917                 target = cli->cl_max_pages_per_rpc;
918
919         if (target >= cli->cl_avail_grant) {
920                 client_obd_list_unlock(&cli->cl_loi_list_lock);
921                 RETURN(0);
922         }
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925         OBD_ALLOC_PTR(body);
926         if (!body)
927                 RETURN(-ENOMEM);
928
929         osc_announce_cached(cli, &body->oa, 0);
930
931         client_obd_list_lock(&cli->cl_loi_list_lock);
932         body->oa.o_grant = cli->cl_avail_grant - target;
933         cli->cl_avail_grant = target;
934         client_obd_list_unlock(&cli->cl_loi_list_lock);
935         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
936                 body->oa.o_valid |= OBD_MD_FLFLAGS;
937                 body->oa.o_flags = 0;
938         }
939         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
940         osc_update_next_shrink(cli);
941
942         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
943                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
944                                 sizeof(*body), body, NULL);
945         if (rc != 0)
946                 __osc_update_grant(cli, body->oa.o_grant);
947         OBD_FREE_PTR(body);
948         RETURN(rc);
949 }
950
951 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
952 static int osc_should_shrink_grant(struct client_obd *client)
953 {
954         cfs_time_t time = cfs_time_current();
955         cfs_time_t next_shrink = client->cl_next_shrink_grant;
956
957         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
958              OBD_CONNECT_GRANT_SHRINK) == 0)
959                 return 0;
960
961         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
962                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
963                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
964                         return 1;
965                 else
966                         osc_update_next_shrink(client);
967         }
968         return 0;
969 }
970
971 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
972 {
973         struct client_obd *client;
974
975         cfs_list_for_each_entry(client, &item->ti_obd_list,
976                                 cl_grant_shrink_list) {
977                 if (osc_should_shrink_grant(client))
978                         osc_shrink_grant(client);
979         }
980         return 0;
981 }
982
983 static int osc_add_shrink_grant(struct client_obd *client)
984 {
985         int rc;
986
987         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
988                                        TIMEOUT_GRANT,
989                                        osc_grant_shrink_grant_cb, NULL,
990                                        &client->cl_grant_shrink_list);
991         if (rc) {
992                 CERROR("add grant client %s error %d\n",
993                         client->cl_import->imp_obd->obd_name, rc);
994                 return rc;
995         }
996         CDEBUG(D_CACHE, "add grant client %s \n",
997                client->cl_import->imp_obd->obd_name);
998         osc_update_next_shrink(client);
999         return 0;
1000 }
1001
1002 static int osc_del_shrink_grant(struct client_obd *client)
1003 {
1004         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1005                                          TIMEOUT_GRANT);
1006 }
1007
1008 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1009 {
1010         /*
1011          * ocd_grant is the total grant amount we're expect to hold: if we've
1012          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1013          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1014          *
1015          * race is tolerable here: if we're evicted, but imp_state already
1016          * left EVICTED state, then cl_dirty must be 0 already.
1017          */
1018         client_obd_list_lock(&cli->cl_loi_list_lock);
1019         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1020                 cli->cl_avail_grant = ocd->ocd_grant;
1021         else
1022                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1023
1024         if (cli->cl_avail_grant < 0) {
1025                 CWARN("%s: available grant < 0, the OSS is probably not running"
1026                       " with patch from bug20278 (%ld) \n",
1027                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1028                 /* workaround for 1.6 servers which do not have
1029                  * the patch from bug20278 */
1030                 cli->cl_avail_grant = ocd->ocd_grant;
1031         }
1032
1033         client_obd_list_unlock(&cli->cl_loi_list_lock);
1034
1035         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1036                cli->cl_import->imp_obd->obd_name,
1037                cli->cl_avail_grant, cli->cl_lost_grant);
1038
1039         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1040             cfs_list_empty(&cli->cl_grant_shrink_list))
1041                 osc_add_shrink_grant(cli);
1042 }
1043
1044 /* We assume that the reason this OSC got a short read is because it read
1045  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1046  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1047  * this stripe never got written at or beyond this stripe offset yet. */
1048 static void handle_short_read(int nob_read, obd_count page_count,
1049                               struct brw_page **pga)
1050 {
1051         char *ptr;
1052         int i = 0;
1053
1054         /* skip bytes read OK */
1055         while (nob_read > 0) {
1056                 LASSERT (page_count > 0);
1057
1058                 if (pga[i]->count > nob_read) {
1059                         /* EOF inside this page */
1060                         ptr = cfs_kmap(pga[i]->pg) +
1061                                 (pga[i]->off & ~CFS_PAGE_MASK);
1062                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1063                         cfs_kunmap(pga[i]->pg);
1064                         page_count--;
1065                         i++;
1066                         break;
1067                 }
1068
1069                 nob_read -= pga[i]->count;
1070                 page_count--;
1071                 i++;
1072         }
1073
1074         /* zero remaining pages */
1075         while (page_count-- > 0) {
1076                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1077                 memset(ptr, 0, pga[i]->count);
1078                 cfs_kunmap(pga[i]->pg);
1079                 i++;
1080         }
1081 }
1082
1083 static int check_write_rcs(struct ptlrpc_request *req,
1084                            int requested_nob, int niocount,
1085                            obd_count page_count, struct brw_page **pga)
1086 {
1087         int     i;
1088         __u32   *remote_rcs;
1089
1090         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1091                                                   sizeof(*remote_rcs) *
1092                                                   niocount);
1093         if (remote_rcs == NULL) {
1094                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1095                 return(-EPROTO);
1096         }
1097
1098         /* return error if any niobuf was in error */
1099         for (i = 0; i < niocount; i++) {
1100                 if ((int)remote_rcs[i] < 0)
1101                         return(remote_rcs[i]);
1102
1103                 if (remote_rcs[i] != 0) {
1104                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1105                                 i, remote_rcs[i], req);
1106                         return(-EPROTO);
1107                 }
1108         }
1109
1110         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1111                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1112                        req->rq_bulk->bd_nob_transferred, requested_nob);
1113                 return(-EPROTO);
1114         }
1115
1116         return (0);
1117 }
1118
1119 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1120 {
1121         if (p1->flag != p2->flag) {
1122                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1123                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1124
1125                 /* warn if we try to combine flags that we don't know to be
1126                  * safe to combine */
1127                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1128                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1129                               "report this at http://bugs.whamcloud.com/\n",
1130                               p1->flag, p2->flag);
1131                 }
1132                 return 0;
1133         }
1134
1135         return (p1->off + p1->count == p2->off);
1136 }
1137
1138 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1139                                    struct brw_page **pga, int opc,
1140                                    cksum_type_t cksum_type)
1141 {
1142         __u32 cksum;
1143         int i = 0;
1144
1145         LASSERT (pg_count > 0);
1146         cksum = init_checksum(cksum_type);
1147         while (nob > 0 && pg_count > 0) {
1148                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1149                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1150                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1151
1152                 /* corrupt the data before we compute the checksum, to
1153                  * simulate an OST->client data error */
1154                 if (i == 0 && opc == OST_READ &&
1155                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1156                         memcpy(ptr + off, "bad1", min(4, nob));
1157                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1158                 cfs_kunmap(pga[i]->pg);
1159                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1160                                off, cksum);
1161
1162                 nob -= pga[i]->count;
1163                 pg_count--;
1164                 i++;
1165         }
1166         /* For sending we only compute the wrong checksum instead
1167          * of corrupting the data so it is still correct on a redo */
1168         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1169                 cksum++;
1170
1171         return fini_checksum(cksum, cksum_type);
1172 }
1173
1174 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1175                                 struct lov_stripe_md *lsm, obd_count page_count,
1176                                 struct brw_page **pga,
1177                                 struct ptlrpc_request **reqp,
1178                                 struct obd_capa *ocapa, int reserve,
1179                                 int resend)
1180 {
1181         struct ptlrpc_request   *req;
1182         struct ptlrpc_bulk_desc *desc;
1183         struct ost_body         *body;
1184         struct obd_ioobj        *ioobj;
1185         struct niobuf_remote    *niobuf;
1186         int niocount, i, requested_nob, opc, rc;
1187         struct osc_brw_async_args *aa;
1188         struct req_capsule      *pill;
1189         struct brw_page *pg_prev;
1190
1191         ENTRY;
1192         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1193                 RETURN(-ENOMEM); /* Recoverable */
1194         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1195                 RETURN(-EINVAL); /* Fatal */
1196
1197         if ((cmd & OBD_BRW_WRITE) != 0) {
1198                 opc = OST_WRITE;
1199                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1200                                                 cli->cl_import->imp_rq_pool,
1201                                                 &RQF_OST_BRW_WRITE);
1202         } else {
1203                 opc = OST_READ;
1204                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1205         }
1206         if (req == NULL)
1207                 RETURN(-ENOMEM);
1208
1209         for (niocount = i = 1; i < page_count; i++) {
1210                 if (!can_merge_pages(pga[i - 1], pga[i]))
1211                         niocount++;
1212         }
1213
1214         pill = &req->rq_pill;
1215         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1216                              sizeof(*ioobj));
1217         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1218                              niocount * sizeof(*niobuf));
1219         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1220
1221         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1222         if (rc) {
1223                 ptlrpc_request_free(req);
1224                 RETURN(rc);
1225         }
1226         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1227         ptlrpc_at_set_req_timeout(req);
1228
1229         if (opc == OST_WRITE)
1230                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1231                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1232         else
1233                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1234                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1235
1236         if (desc == NULL)
1237                 GOTO(out, rc = -ENOMEM);
1238         /* NB request now owns desc and will free it when it gets freed */
1239
1240         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1241         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1242         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1243         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1244
1245         lustre_set_wire_obdo(&body->oa, oa);
1246
1247         obdo_to_ioobj(oa, ioobj);
1248         ioobj->ioo_bufcnt = niocount;
1249         osc_pack_capa(req, body, ocapa);
1250         LASSERT (page_count > 0);
1251         pg_prev = pga[0];
1252         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1253                 struct brw_page *pg = pga[i];
1254                 int poff = pg->off & ~CFS_PAGE_MASK;
1255
1256                 LASSERT(pg->count > 0);
1257                 /* make sure there is no gap in the middle of page array */
1258                 LASSERTF(page_count == 1 ||
1259                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1260                           ergo(i > 0 && i < page_count - 1,
1261                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1262                           ergo(i == page_count - 1, poff == 0)),
1263                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1264                          i, page_count, pg, pg->off, pg->count);
1265 #ifdef __linux__
1266                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1267                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1268                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1269                          i, page_count,
1270                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1271                          pg_prev->pg, page_private(pg_prev->pg),
1272                          pg_prev->pg->index, pg_prev->off);
1273 #else
1274                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1275                          "i %d p_c %u\n", i, page_count);
1276 #endif
1277                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1278                         (pg->flag & OBD_BRW_SRVLOCK));
1279
1280                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1281                 requested_nob += pg->count;
1282
1283                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1284                         niobuf--;
1285                         niobuf->len += pg->count;
1286                 } else {
1287                         niobuf->offset = pg->off;
1288                         niobuf->len    = pg->count;
1289                         niobuf->flags  = pg->flag;
1290                 }
1291                 pg_prev = pg;
1292         }
1293
1294         LASSERTF((void *)(niobuf - niocount) ==
1295                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1296                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1297                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1298
1299         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1300         if (resend) {
1301                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1302                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1303                         body->oa.o_flags = 0;
1304                 }
1305                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1306         }
1307
1308         if (osc_should_shrink_grant(cli))
1309                 osc_shrink_grant_local(cli, &body->oa);
1310
1311         /* size[REQ_REC_OFF] still sizeof (*body) */
1312         if (opc == OST_WRITE) {
1313                 if (cli->cl_checksum &&
1314                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1315                         /* store cl_cksum_type in a local variable since
1316                          * it can be changed via lprocfs */
1317                         cksum_type_t cksum_type = cli->cl_cksum_type;
1318
1319                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1320                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1321                                 body->oa.o_flags = 0;
1322                         }
1323                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1324                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1325                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1326                                                              page_count, pga,
1327                                                              OST_WRITE,
1328                                                              cksum_type);
1329                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1330                                body->oa.o_cksum);
1331                         /* save this in 'oa', too, for later checking */
1332                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1333                         oa->o_flags |= cksum_type_pack(cksum_type);
1334                 } else {
1335                         /* clear out the checksum flag, in case this is a
1336                          * resend but cl_checksum is no longer set. b=11238 */
1337                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1338                 }
1339                 oa->o_cksum = body->oa.o_cksum;
1340                 /* 1 RC per niobuf */
1341                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1342                                      sizeof(__u32) * niocount);
1343         } else {
1344                 if (cli->cl_checksum &&
1345                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1346                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1347                                 body->oa.o_flags = 0;
1348                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1349                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1350                 }
1351         }
1352         ptlrpc_request_set_replen(req);
1353
1354         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1355         aa = ptlrpc_req_async_args(req);
1356         aa->aa_oa = oa;
1357         aa->aa_requested_nob = requested_nob;
1358         aa->aa_nio_count = niocount;
1359         aa->aa_page_count = page_count;
1360         aa->aa_resends = 0;
1361         aa->aa_ppga = pga;
1362         aa->aa_cli = cli;
1363         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1364         if (ocapa && reserve)
1365                 aa->aa_ocapa = capa_get(ocapa);
1366
1367         *reqp = req;
1368         RETURN(0);
1369
1370  out:
1371         ptlrpc_req_finished(req);
1372         RETURN(rc);
1373 }
1374
1375 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1376                                 __u32 client_cksum, __u32 server_cksum, int nob,
1377                                 obd_count page_count, struct brw_page **pga,
1378                                 cksum_type_t client_cksum_type)
1379 {
1380         __u32 new_cksum;
1381         char *msg;
1382         cksum_type_t cksum_type;
1383
1384         if (server_cksum == client_cksum) {
1385                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1386                 return 0;
1387         }
1388
1389         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1390                                        oa->o_flags : 0);
1391         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1392                                       cksum_type);
1393
1394         if (cksum_type != client_cksum_type)
1395                 msg = "the server did not use the checksum type specified in "
1396                       "the original request - likely a protocol problem";
1397         else if (new_cksum == server_cksum)
1398                 msg = "changed on the client after we checksummed it - "
1399                       "likely false positive due to mmap IO (bug 11742)";
1400         else if (new_cksum == client_cksum)
1401                 msg = "changed in transit before arrival at OST";
1402         else
1403                 msg = "changed in transit AND doesn't match the original - "
1404                       "likely false positive due to mmap IO (bug 11742)";
1405
1406         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1407                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1408                            msg, libcfs_nid2str(peer->nid),
1409                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1410                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1411                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1412                            oa->o_id,
1413                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1414                            pga[0]->off,
1415                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1416         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1417                "client csum now %x\n", client_cksum, client_cksum_type,
1418                server_cksum, cksum_type, new_cksum);
1419         return 1;
1420 }
1421
1422 /* Note rc enters this function as number of bytes transferred */
1423 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1424 {
1425         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1426         const lnet_process_id_t *peer =
1427                         &req->rq_import->imp_connection->c_peer;
1428         struct client_obd *cli = aa->aa_cli;
1429         struct ost_body *body;
1430         __u32 client_cksum = 0;
1431         ENTRY;
1432
1433         if (rc < 0 && rc != -EDQUOT) {
1434                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1435                 RETURN(rc);
1436         }
1437
1438         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1439         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1440         if (body == NULL) {
1441                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1442                 RETURN(-EPROTO);
1443         }
1444
1445         /* set/clear over quota flag for a uid/gid */
1446         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1447             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1448                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1449
1450                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1451                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1452                        body->oa.o_flags);
1453                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1454         }
1455
1456         osc_update_grant(cli, body);
1457
1458         if (rc < 0)
1459                 RETURN(rc);
1460
1461         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1462                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1463
1464         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1465                 if (rc > 0) {
1466                         CERROR("Unexpected +ve rc %d\n", rc);
1467                         RETURN(-EPROTO);
1468                 }
1469                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1470
1471                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1472                         RETURN(-EAGAIN);
1473
1474                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1475                     check_write_checksum(&body->oa, peer, client_cksum,
1476                                          body->oa.o_cksum, aa->aa_requested_nob,
1477                                          aa->aa_page_count, aa->aa_ppga,
1478                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1479                         RETURN(-EAGAIN);
1480
1481                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1482                                      aa->aa_page_count, aa->aa_ppga);
1483                 GOTO(out, rc);
1484         }
1485
1486         /* The rest of this function executes only for OST_READs */
1487
1488         /* if unwrap_bulk failed, return -EAGAIN to retry */
1489         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1490         if (rc < 0)
1491                 GOTO(out, rc = -EAGAIN);
1492
1493         if (rc > aa->aa_requested_nob) {
1494                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1495                        aa->aa_requested_nob);
1496                 RETURN(-EPROTO);
1497         }
1498
1499         if (rc != req->rq_bulk->bd_nob_transferred) {
1500                 CERROR ("Unexpected rc %d (%d transferred)\n",
1501                         rc, req->rq_bulk->bd_nob_transferred);
1502                 return (-EPROTO);
1503         }
1504
1505         if (rc < aa->aa_requested_nob)
1506                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1507
1508         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1509                 static int cksum_counter;
1510                 __u32      server_cksum = body->oa.o_cksum;
1511                 char      *via;
1512                 char      *router;
1513                 cksum_type_t cksum_type;
1514
1515                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1516                                                body->oa.o_flags : 0);
1517                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1518                                                  aa->aa_ppga, OST_READ,
1519                                                  cksum_type);
1520
1521                 if (peer->nid == req->rq_bulk->bd_sender) {
1522                         via = router = "";
1523                 } else {
1524                         via = " via ";
1525                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1526                 }
1527
1528                 if (server_cksum == ~0 && rc > 0) {
1529                         CERROR("Protocol error: server %s set the 'checksum' "
1530                                "bit, but didn't send a checksum.  Not fatal, "
1531                                "but please notify on http://bugs.whamcloud.com/\n",
1532                                libcfs_nid2str(peer->nid));
1533                 } else if (server_cksum != client_cksum) {
1534                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1535                                            "%s%s%s inode "DFID" object "
1536                                            LPU64"/"LPU64" extent "
1537                                            "["LPU64"-"LPU64"]\n",
1538                                            req->rq_import->imp_obd->obd_name,
1539                                            libcfs_nid2str(peer->nid),
1540                                            via, router,
1541                                            body->oa.o_valid & OBD_MD_FLFID ?
1542                                                 body->oa.o_parent_seq : (__u64)0,
1543                                            body->oa.o_valid & OBD_MD_FLFID ?
1544                                                 body->oa.o_parent_oid : 0,
1545                                            body->oa.o_valid & OBD_MD_FLFID ?
1546                                                 body->oa.o_parent_ver : 0,
1547                                            body->oa.o_id,
1548                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1549                                                 body->oa.o_seq : (__u64)0,
1550                                            aa->aa_ppga[0]->off,
1551                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1552                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1553                                                                         1);
1554                         CERROR("client %x, server %x, cksum_type %x\n",
1555                                client_cksum, server_cksum, cksum_type);
1556                         cksum_counter = 0;
1557                         aa->aa_oa->o_cksum = client_cksum;
1558                         rc = -EAGAIN;
1559                 } else {
1560                         cksum_counter++;
1561                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1562                         rc = 0;
1563                 }
1564         } else if (unlikely(client_cksum)) {
1565                 static int cksum_missed;
1566
1567                 cksum_missed++;
1568                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1569                         CERROR("Checksum %u requested from %s but not sent\n",
1570                                cksum_missed, libcfs_nid2str(peer->nid));
1571         } else {
1572                 rc = 0;
1573         }
1574 out:
1575         if (rc >= 0)
1576                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1577
1578         RETURN(rc);
1579 }
1580
1581 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1582                             struct lov_stripe_md *lsm,
1583                             obd_count page_count, struct brw_page **pga,
1584                             struct obd_capa *ocapa)
1585 {
1586         struct ptlrpc_request *req;
1587         int                    rc;
1588         cfs_waitq_t            waitq;
1589         int                    generation, resends = 0;
1590         struct l_wait_info     lwi;
1591
1592         ENTRY;
1593
1594         cfs_waitq_init(&waitq);
1595         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1596
1597 restart_bulk:
1598         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1599                                   page_count, pga, &req, ocapa, 0, resends);
1600         if (rc != 0)
1601                 return (rc);
1602
1603         if (resends) {
1604                 req->rq_generation_set = 1;
1605                 req->rq_import_generation = generation;
1606                 req->rq_sent = cfs_time_current_sec() + resends;
1607         }
1608
1609         rc = ptlrpc_queue_wait(req);
1610
1611         if (rc == -ETIMEDOUT && req->rq_resend) {
1612                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1613                 ptlrpc_req_finished(req);
1614                 goto restart_bulk;
1615         }
1616
1617         rc = osc_brw_fini_request(req, rc);
1618
1619         ptlrpc_req_finished(req);
1620         /* When server return -EINPROGRESS, client should always retry
1621          * regardless of the number of times the bulk was resent already.*/
1622         if (osc_recoverable_error(rc)) {
1623                 resends++;
1624                 if (rc != -EINPROGRESS &&
1625                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1626                         CERROR("%s: too many resend retries for object: "
1627                                ""LPU64":"LPU64", rc = %d.\n",
1628                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1629                         goto out;
1630                 }
1631                 if (generation !=
1632                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1633                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1634                                ""LPU64":"LPU64", rc = %d.\n",
1635                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1636                         goto out;
1637                 }
1638
1639                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1640                                        NULL);
1641                 l_wait_event(waitq, 0, &lwi);
1642
1643                 goto restart_bulk;
1644         }
1645 out:
1646         if (rc == -EAGAIN || rc == -EINPROGRESS)
1647                 rc = -EIO;
1648         RETURN (rc);
1649 }
1650
1651 int osc_brw_redo_request(struct ptlrpc_request *request,
1652                          struct osc_brw_async_args *aa)
1653 {
1654         struct ptlrpc_request *new_req;
1655         struct ptlrpc_request_set *set = request->rq_set;
1656         struct osc_brw_async_args *new_aa;
1657         struct osc_async_page *oap;
1658         int rc = 0;
1659         ENTRY;
1660
1661         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1662
1663         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1664                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1665                                   aa->aa_cli, aa->aa_oa,
1666                                   NULL /* lsm unused by osc currently */,
1667                                   aa->aa_page_count, aa->aa_ppga,
1668                                   &new_req, aa->aa_ocapa, 0, 1);
1669         if (rc)
1670                 RETURN(rc);
1671
1672         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1673
1674         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1675                 if (oap->oap_request != NULL) {
1676                         LASSERTF(request == oap->oap_request,
1677                                  "request %p != oap_request %p\n",
1678                                  request, oap->oap_request);
1679                         if (oap->oap_interrupted) {
1680                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1681                                 ptlrpc_req_finished(new_req);
1682                                 RETURN(-EINTR);
1683                         }
1684                 }
1685         }
1686         /* New request takes over pga and oaps from old request.
1687          * Note that copying a list_head doesn't work, need to move it... */
1688         aa->aa_resends++;
1689         new_req->rq_interpret_reply = request->rq_interpret_reply;
1690         new_req->rq_async_args = request->rq_async_args;
1691         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1692         new_req->rq_generation_set = 1;
1693         new_req->rq_import_generation = request->rq_import_generation;
1694
1695         new_aa = ptlrpc_req_async_args(new_req);
1696
1697         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1698         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1699         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1700
1701         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1702                 if (oap->oap_request) {
1703                         ptlrpc_req_finished(oap->oap_request);
1704                         oap->oap_request = ptlrpc_request_addref(new_req);
1705                 }
1706         }
1707
1708         new_aa->aa_ocapa = aa->aa_ocapa;
1709         aa->aa_ocapa = NULL;
1710
1711         /* use ptlrpc_set_add_req is safe because interpret functions work
1712          * in check_set context. only one way exist with access to request
1713          * from different thread got -EINTR - this way protected with
1714          * cl_loi_list_lock */
1715         ptlrpc_set_add_req(set, new_req);
1716
1717         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1718
1719         DEBUG_REQ(D_INFO, new_req, "new request");
1720         RETURN(0);
1721 }
1722
1723 /*
1724  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1725  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1726  * fine for our small page arrays and doesn't require allocation.  its an
1727  * insertion sort that swaps elements that are strides apart, shrinking the
1728  * stride down until its '1' and the array is sorted.
1729  */
1730 static void sort_brw_pages(struct brw_page **array, int num)
1731 {
1732         int stride, i, j;
1733         struct brw_page *tmp;
1734
1735         if (num == 1)
1736                 return;
1737         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1738                 ;
1739
1740         do {
1741                 stride /= 3;
1742                 for (i = stride ; i < num ; i++) {
1743                         tmp = array[i];
1744                         j = i;
1745                         while (j >= stride && array[j - stride]->off > tmp->off) {
1746                                 array[j] = array[j - stride];
1747                                 j -= stride;
1748                         }
1749                         array[j] = tmp;
1750                 }
1751         } while (stride > 1);
1752 }
1753
1754 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1755 {
1756         int count = 1;
1757         int offset;
1758         int i = 0;
1759
1760         LASSERT (pages > 0);
1761         offset = pg[i]->off & ~CFS_PAGE_MASK;
1762
1763         for (;;) {
1764                 pages--;
1765                 if (pages == 0)         /* that's all */
1766                         return count;
1767
1768                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1769                         return count;   /* doesn't end on page boundary */
1770
1771                 i++;
1772                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1773                 if (offset != 0)        /* doesn't start on page boundary */
1774                         return count;
1775
1776                 count++;
1777         }
1778 }
1779
1780 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1781 {
1782         struct brw_page **ppga;
1783         int i;
1784
1785         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1786         if (ppga == NULL)
1787                 return NULL;
1788
1789         for (i = 0; i < count; i++)
1790                 ppga[i] = pga + i;
1791         return ppga;
1792 }
1793
1794 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1795 {
1796         LASSERT(ppga != NULL);
1797         OBD_FREE(ppga, sizeof(*ppga) * count);
1798 }
1799
1800 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1801                    obd_count page_count, struct brw_page *pga,
1802                    struct obd_trans_info *oti)
1803 {
1804         struct obdo *saved_oa = NULL;
1805         struct brw_page **ppga, **orig;
1806         struct obd_import *imp = class_exp2cliimp(exp);
1807         struct client_obd *cli;
1808         int rc, page_count_orig;
1809         ENTRY;
1810
1811         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1812         cli = &imp->imp_obd->u.cli;
1813
1814         if (cmd & OBD_BRW_CHECK) {
1815                 /* The caller just wants to know if there's a chance that this
1816                  * I/O can succeed */
1817
1818                 if (imp->imp_invalid)
1819                         RETURN(-EIO);
1820                 RETURN(0);
1821         }
1822
1823         /* test_brw with a failed create can trip this, maybe others. */
1824         LASSERT(cli->cl_max_pages_per_rpc);
1825
1826         rc = 0;
1827
1828         orig = ppga = osc_build_ppga(pga, page_count);
1829         if (ppga == NULL)
1830                 RETURN(-ENOMEM);
1831         page_count_orig = page_count;
1832
1833         sort_brw_pages(ppga, page_count);
1834         while (page_count) {
1835                 obd_count pages_per_brw;
1836
1837                 if (page_count > cli->cl_max_pages_per_rpc)
1838                         pages_per_brw = cli->cl_max_pages_per_rpc;
1839                 else
1840                         pages_per_brw = page_count;
1841
1842                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1843
1844                 if (saved_oa != NULL) {
1845                         /* restore previously saved oa */
1846                         *oinfo->oi_oa = *saved_oa;
1847                 } else if (page_count > pages_per_brw) {
1848                         /* save a copy of oa (brw will clobber it) */
1849                         OBDO_ALLOC(saved_oa);
1850                         if (saved_oa == NULL)
1851                                 GOTO(out, rc = -ENOMEM);
1852                         *saved_oa = *oinfo->oi_oa;
1853                 }
1854
1855                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1856                                       pages_per_brw, ppga, oinfo->oi_capa);
1857
1858                 if (rc != 0)
1859                         break;
1860
1861                 page_count -= pages_per_brw;
1862                 ppga += pages_per_brw;
1863         }
1864
1865 out:
1866         osc_release_ppga(orig, page_count_orig);
1867
1868         if (saved_oa != NULL)
1869                 OBDO_FREE(saved_oa);
1870
1871         RETURN(rc);
1872 }
1873
1874 static int brw_interpret(const struct lu_env *env,
1875                          struct ptlrpc_request *req, void *data, int rc)
1876 {
1877         struct osc_brw_async_args *aa = data;
1878         struct osc_async_page *oap, *tmp;
1879         struct client_obd *cli;
1880         ENTRY;
1881
1882         rc = osc_brw_fini_request(req, rc);
1883         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1884         /* When server return -EINPROGRESS, client should always retry
1885          * regardless of the number of times the bulk was resent already. */
1886         if (osc_recoverable_error(rc)) {
1887                 if (req->rq_import_generation !=
1888                     req->rq_import->imp_generation) {
1889                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1890                                ""LPU64":"LPU64", rc = %d.\n",
1891                                req->rq_import->imp_obd->obd_name,
1892                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1893                 } else if (rc == -EINPROGRESS ||
1894                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1895                         rc = osc_brw_redo_request(req, aa);
1896                 } else {
1897                         CERROR("%s: too many resent retries for object: "
1898                                ""LPU64":"LPU64", rc = %d.\n",
1899                                req->rq_import->imp_obd->obd_name,
1900                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1901                 }
1902
1903                 if (rc == 0)
1904                         RETURN(0);
1905                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1906                         rc = -EIO;
1907         }
1908
1909         if (aa->aa_ocapa) {
1910                 capa_put(aa->aa_ocapa);
1911                 aa->aa_ocapa = NULL;
1912         }
1913
1914         cli = aa->aa_cli;
1915         client_obd_list_lock(&cli->cl_loi_list_lock);
1916
1917         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1918          * is called so we know whether to go to sync BRWs or wait for more
1919          * RPCs to complete */
1920         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1921                 cli->cl_w_in_flight--;
1922         else
1923                 cli->cl_r_in_flight--;
1924
1925         /* the caller may re-use the oap after the completion call so
1926          * we need to clean it up a little */
1927         cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
1928                         oap_rpc_item) {
1929                 cfs_list_del_init(&oap->oap_rpc_item);
1930                 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1931         }
1932         OBDO_FREE(aa->aa_oa);
1933
1934         osc_wake_cache_waiters(cli);
1935         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1936         client_obd_list_unlock(&cli->cl_loi_list_lock);
1937
1938         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1939                           req->rq_bulk->bd_nob_transferred);
1940         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1941         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1942
1943         RETURN(rc);
1944 }
1945
1946 /* The most tricky part of this function is that it will return with
1947  * cli->cli_loi_list_lock held.
1948  */
1949 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1950                   cfs_list_t *rpc_list, int page_count, int cmd,
1951                   pdl_policy_t pol)
1952 {
1953         struct ptlrpc_request *req = NULL;
1954         struct brw_page **pga = NULL;
1955         struct osc_brw_async_args *aa = NULL;
1956         struct obdo *oa = NULL;
1957         struct osc_async_page *oap;
1958         struct osc_async_page *tmp;
1959         struct cl_req *clerq = NULL;
1960         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1961         struct ldlm_lock *lock = NULL;
1962         struct cl_req_attr crattr;
1963         int i, rc, mpflag = 0;
1964
1965         ENTRY;
1966         LASSERT(!cfs_list_empty(rpc_list));
1967
1968         if (cmd & OBD_BRW_MEMALLOC)
1969                 mpflag = cfs_memory_pressure_get_and_set();
1970
1971         memset(&crattr, 0, sizeof crattr);
1972         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1973         if (pga == NULL)
1974                 GOTO(out, rc = -ENOMEM);
1975
1976         OBDO_ALLOC(oa);
1977         if (oa == NULL)
1978                 GOTO(out, rc = -ENOMEM);
1979
1980         i = 0;
1981         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1982                 struct cl_page *page = osc_oap2cl_page(oap);
1983                 if (clerq == NULL) {
1984                         clerq = cl_req_alloc(env, page, crt,
1985                                              1 /* only 1-object rpcs for
1986                                                 * now */);
1987                         if (IS_ERR(clerq))
1988                                 GOTO(out, rc = PTR_ERR(clerq));
1989                         lock = oap->oap_ldlm_lock;
1990                 }
1991                 pga[i] = &oap->oap_brw_page;
1992                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1993                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1994                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1995                 i++;
1996                 cl_req_page_add(env, clerq, page);
1997         }
1998
1999         /* always get the data for the obdo for the rpc */
2000         LASSERT(clerq != NULL);
2001         crattr.cra_oa = oa;
2002         crattr.cra_capa = NULL;
2003         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2004         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2005         if (lock) {
2006                 oa->o_handle = lock->l_remote_handle;
2007                 oa->o_valid |= OBD_MD_FLHANDLE;
2008         }
2009
2010         rc = cl_req_prep(env, clerq);
2011         if (rc != 0) {
2012                 CERROR("cl_req_prep failed: %d\n", rc);
2013                 GOTO(out, rc);
2014         }
2015
2016         sort_brw_pages(pga, page_count);
2017         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2018                                   pga, &req, crattr.cra_capa, 1, 0);
2019         if (rc != 0) {
2020                 CERROR("prep_req failed: %d\n", rc);
2021                 GOTO(out, rc);
2022         }
2023
2024         req->rq_interpret_reply = brw_interpret;
2025         if (cmd & OBD_BRW_MEMALLOC)
2026                 req->rq_memalloc = 1;
2027
2028         /* Need to update the timestamps after the request is built in case
2029          * we race with setattr (locally or in queue at OST).  If OST gets
2030          * later setattr before earlier BRW (as determined by the request xid),
2031          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2032          * way to do this in a single call.  bug 10150 */
2033         cl_req_attr_set(env, clerq, &crattr,
2034                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2035
2036         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2037
2038         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2039         aa = ptlrpc_req_async_args(req);
2040         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2041         cfs_list_splice(rpc_list, &aa->aa_oaps);
2042         CFS_INIT_LIST_HEAD(rpc_list);
2043         aa->aa_clerq = clerq;
2044 out:
2045         if (cmd & OBD_BRW_MEMALLOC)
2046                 cfs_memory_pressure_restore(mpflag);
2047
2048         capa_put(crattr.cra_capa);
2049         if (rc != 0) {
2050                 LASSERT(req == NULL);
2051
2052                 if (oa)
2053                         OBDO_FREE(oa);
2054                 if (pga)
2055                         OBD_FREE(pga, sizeof(*pga) * page_count);
2056                 /* this should happen rarely and is pretty bad, it makes the
2057                  * pending list not follow the dirty order */
2058                 client_obd_list_lock(&cli->cl_loi_list_lock);
2059                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2060                         cfs_list_del_init(&oap->oap_rpc_item);
2061
2062                         /* queued sync pages can be torn down while the pages
2063                          * were between the pending list and the rpc */
2064                         if (oap->oap_interrupted) {
2065                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2066                                 osc_ap_completion(env, cli, NULL, oap, 0,
2067                                                   oap->oap_count);
2068                                 continue;
2069                         }
2070                         osc_ap_completion(env, cli, NULL, oap, 0, rc);
2071                 }
2072                 if (clerq && !IS_ERR(clerq))
2073                         cl_req_completion(env, clerq, rc);
2074         } else {
2075                 struct osc_async_page *tmp = NULL;
2076
2077                 /* queued sync pages can be torn down while the pages
2078                  * were between the pending list and the rpc */
2079                 LASSERT(aa != NULL);
2080                 client_obd_list_lock(&cli->cl_loi_list_lock);
2081                 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2082                         /* only one oap gets a request reference */
2083                         if (tmp == NULL)
2084                                 tmp = oap;
2085                         if (oap->oap_interrupted && !req->rq_intr) {
2086                                 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2087                                                 oap, req);
2088                                 ptlrpc_mark_interrupted(req);
2089                         }
2090                 }
2091                 if (tmp != NULL)
2092                         tmp->oap_request = ptlrpc_request_addref(req);
2093
2094                 DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
2095                           page_count, aa, cli->cl_r_in_flight,
2096                           cli->cl_w_in_flight);
2097
2098                 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2099                  * see which CPU/NUMA node the majority of pages were allocated
2100                  * on, and try to assign the async RPC to the CPU core
2101                  * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2102                  *
2103                  * But on the other hand, we expect that multiple ptlrpcd
2104                  * threads and the initial write sponsor can run in parallel,
2105                  * especially when data checksum is enabled, which is CPU-bound
2106                  * operation and single ptlrpcd thread cannot process in time.
2107                  * So more ptlrpcd threads sharing BRW load
2108                  * (with PDL_POLICY_ROUND) seems better.
2109                  */
2110                 ptlrpcd_add_req(req, pol, -1);
2111         }
2112         RETURN(rc);
2113 }
2114
2115 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2116                                         struct ldlm_enqueue_info *einfo)
2117 {
2118         void *data = einfo->ei_cbdata;
2119         int set = 0;
2120
2121         LASSERT(lock != NULL);
2122         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2123         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2124         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2125         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2126
2127         lock_res_and_lock(lock);
2128         cfs_spin_lock(&osc_ast_guard);
2129
2130         if (lock->l_ast_data == NULL)
2131                 lock->l_ast_data = data;
2132         if (lock->l_ast_data == data)
2133                 set = 1;
2134
2135         cfs_spin_unlock(&osc_ast_guard);
2136         unlock_res_and_lock(lock);
2137
2138         return set;
2139 }
2140
2141 static int osc_set_data_with_check(struct lustre_handle *lockh,
2142                                    struct ldlm_enqueue_info *einfo)
2143 {
2144         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2145         int set = 0;
2146
2147         if (lock != NULL) {
2148                 set = osc_set_lock_data_with_check(lock, einfo);
2149                 LDLM_LOCK_PUT(lock);
2150         } else
2151                 CERROR("lockh %p, data %p - client evicted?\n",
2152                        lockh, einfo->ei_cbdata);
2153         return set;
2154 }
2155
2156 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2157                              ldlm_iterator_t replace, void *data)
2158 {
2159         struct ldlm_res_id res_id;
2160         struct obd_device *obd = class_exp2obd(exp);
2161
2162         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2163         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2164         return 0;
2165 }
2166
2167 /* find any ldlm lock of the inode in osc
2168  * return 0    not find
2169  *        1    find one
2170  *      < 0    error */
2171 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2172                            ldlm_iterator_t replace, void *data)
2173 {
2174         struct ldlm_res_id res_id;
2175         struct obd_device *obd = class_exp2obd(exp);
2176         int rc = 0;
2177
2178         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2179         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2180         if (rc == LDLM_ITER_STOP)
2181                 return(1);
2182         if (rc == LDLM_ITER_CONTINUE)
2183                 return(0);
2184         return(rc);
2185 }
2186
2187 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2188                             obd_enqueue_update_f upcall, void *cookie,
2189                             int *flags, int agl, int rc)
2190 {
2191         int intent = *flags & LDLM_FL_HAS_INTENT;
2192         ENTRY;
2193
2194         if (intent) {
2195                 /* The request was created before ldlm_cli_enqueue call. */
2196                 if (rc == ELDLM_LOCK_ABORTED) {
2197                         struct ldlm_reply *rep;
2198                         rep = req_capsule_server_get(&req->rq_pill,
2199                                                      &RMF_DLM_REP);
2200
2201                         LASSERT(rep != NULL);
2202                         if (rep->lock_policy_res1)
2203                                 rc = rep->lock_policy_res1;
2204                 }
2205         }
2206
2207         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2208             (rc == 0)) {
2209                 *flags |= LDLM_FL_LVB_READY;
2210                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2211                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2212         }
2213
2214         /* Call the update callback. */
2215         rc = (*upcall)(cookie, rc);
2216         RETURN(rc);
2217 }
2218
2219 static int osc_enqueue_interpret(const struct lu_env *env,
2220                                  struct ptlrpc_request *req,
2221                                  struct osc_enqueue_args *aa, int rc)
2222 {
2223         struct ldlm_lock *lock;
2224         struct lustre_handle handle;
2225         __u32 mode;
2226         struct ost_lvb *lvb;
2227         __u32 lvb_len;
2228         int *flags = aa->oa_flags;
2229
2230         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2231          * might be freed anytime after lock upcall has been called. */
2232         lustre_handle_copy(&handle, aa->oa_lockh);
2233         mode = aa->oa_ei->ei_mode;
2234
2235         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2236          * be valid. */
2237         lock = ldlm_handle2lock(&handle);
2238
2239         /* Take an additional reference so that a blocking AST that
2240          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2241          * to arrive after an upcall has been executed by
2242          * osc_enqueue_fini(). */
2243         ldlm_lock_addref(&handle, mode);
2244
2245         /* Let CP AST to grant the lock first. */
2246         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2247
2248         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2249                 lvb = NULL;
2250                 lvb_len = 0;
2251         } else {
2252                 lvb = aa->oa_lvb;
2253                 lvb_len = sizeof(*aa->oa_lvb);
2254         }
2255
2256         /* Complete obtaining the lock procedure. */
2257         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2258                                    mode, flags, lvb, lvb_len, &handle, rc);
2259         /* Complete osc stuff. */
2260         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2261                               flags, aa->oa_agl, rc);
2262
2263         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2264
2265         /* Release the lock for async request. */
2266         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2267                 /*
2268                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2269                  * not already released by
2270                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2271                  */
2272                 ldlm_lock_decref(&handle, mode);
2273
2274         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2275                  aa->oa_lockh, req, aa);
2276         ldlm_lock_decref(&handle, mode);
2277         LDLM_LOCK_PUT(lock);
2278         return rc;
2279 }
2280
2281 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2282                         struct lov_oinfo *loi, int flags,
2283                         struct ost_lvb *lvb, __u32 mode, int rc)
2284 {
2285         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2286
2287         if (rc == ELDLM_OK) {
2288                 __u64 tmp;
2289
2290                 LASSERT(lock != NULL);
2291                 loi->loi_lvb = *lvb;
2292                 tmp = loi->loi_lvb.lvb_size;
2293                 /* Extend KMS up to the end of this lock and no further
2294                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2295                 if (tmp > lock->l_policy_data.l_extent.end)
2296                         tmp = lock->l_policy_data.l_extent.end + 1;
2297                 if (tmp >= loi->loi_kms) {
2298                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2299                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2300                         loi_kms_set(loi, tmp);
2301                 } else {
2302                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2303                                    LPU64"; leaving kms="LPU64", end="LPU64,
2304                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2305                                    lock->l_policy_data.l_extent.end);
2306                 }
2307                 ldlm_lock_allow_match(lock);
2308         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2309                 LASSERT(lock != NULL);
2310                 loi->loi_lvb = *lvb;
2311                 ldlm_lock_allow_match(lock);
2312                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2313                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2314                 rc = ELDLM_OK;
2315         }
2316
2317         if (lock != NULL) {
2318                 if (rc != ELDLM_OK)
2319                         ldlm_lock_fail_match(lock);
2320
2321                 LDLM_LOCK_PUT(lock);
2322         }
2323 }
2324 EXPORT_SYMBOL(osc_update_enqueue);
2325
2326 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2327
2328 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2329  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2330  * other synchronous requests, however keeping some locks and trying to obtain
2331  * others may take a considerable amount of time in a case of ost failure; and
2332  * when other sync requests do not get released lock from a client, the client
2333  * is excluded from the cluster -- such scenarious make the life difficult, so
2334  * release locks just after they are obtained. */
2335 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2336                      int *flags, ldlm_policy_data_t *policy,
2337                      struct ost_lvb *lvb, int kms_valid,
2338                      obd_enqueue_update_f upcall, void *cookie,
2339                      struct ldlm_enqueue_info *einfo,
2340                      struct lustre_handle *lockh,
2341                      struct ptlrpc_request_set *rqset, int async, int agl)
2342 {
2343         struct obd_device *obd = exp->exp_obd;
2344         struct ptlrpc_request *req = NULL;
2345         int intent = *flags & LDLM_FL_HAS_INTENT;
2346         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2347         ldlm_mode_t mode;
2348         int rc;
2349         ENTRY;
2350
2351         /* Filesystem lock extents are extended to page boundaries so that
2352          * dealing with the page cache is a little smoother.  */
2353         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2354         policy->l_extent.end |= ~CFS_PAGE_MASK;
2355
2356         /*
2357          * kms is not valid when either object is completely fresh (so that no
2358          * locks are cached), or object was evicted. In the latter case cached
2359          * lock cannot be used, because it would prime inode state with
2360          * potentially stale LVB.
2361          */
2362         if (!kms_valid)
2363                 goto no_match;
2364
2365         /* Next, search for already existing extent locks that will cover us */
2366         /* If we're trying to read, we also search for an existing PW lock.  The
2367          * VFS and page cache already protect us locally, so lots of readers/
2368          * writers can share a single PW lock.
2369          *
2370          * There are problems with conversion deadlocks, so instead of
2371          * converting a read lock to a write lock, we'll just enqueue a new
2372          * one.
2373          *
2374          * At some point we should cancel the read lock instead of making them
2375          * send us a blocking callback, but there are problems with canceling
2376          * locks out from other users right now, too. */
2377         mode = einfo->ei_mode;
2378         if (einfo->ei_mode == LCK_PR)
2379                 mode |= LCK_PW;
2380         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2381                                einfo->ei_type, policy, mode, lockh, 0);
2382         if (mode) {
2383                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2384
2385                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2386                         /* For AGL, if enqueue RPC is sent but the lock is not
2387                          * granted, then skip to process this strpe.
2388                          * Return -ECANCELED to tell the caller. */
2389                         ldlm_lock_decref(lockh, mode);
2390                         LDLM_LOCK_PUT(matched);
2391                         RETURN(-ECANCELED);
2392                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2393                         *flags |= LDLM_FL_LVB_READY;
2394                         /* addref the lock only if not async requests and PW
2395                          * lock is matched whereas we asked for PR. */
2396                         if (!rqset && einfo->ei_mode != mode)
2397                                 ldlm_lock_addref(lockh, LCK_PR);
2398                         if (intent) {
2399                                 /* I would like to be able to ASSERT here that
2400                                  * rss <= kms, but I can't, for reasons which
2401                                  * are explained in lov_enqueue() */
2402                         }
2403
2404                         /* We already have a lock, and it's referenced */
2405                         (*upcall)(cookie, ELDLM_OK);
2406
2407                         if (einfo->ei_mode != mode)
2408                                 ldlm_lock_decref(lockh, LCK_PW);
2409                         else if (rqset)
2410                                 /* For async requests, decref the lock. */
2411                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2412                         LDLM_LOCK_PUT(matched);
2413                         RETURN(ELDLM_OK);
2414                 } else {
2415                         ldlm_lock_decref(lockh, mode);
2416                         LDLM_LOCK_PUT(matched);
2417                 }
2418         }
2419
2420  no_match:
2421         if (intent) {
2422                 CFS_LIST_HEAD(cancels);
2423                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2424                                            &RQF_LDLM_ENQUEUE_LVB);
2425                 if (req == NULL)
2426                         RETURN(-ENOMEM);
2427
2428                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2429                 if (rc) {
2430                         ptlrpc_request_free(req);
2431                         RETURN(rc);
2432                 }
2433
2434                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2435                                      sizeof *lvb);
2436                 ptlrpc_request_set_replen(req);
2437         }
2438
2439         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2440         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2441
2442         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2443                               sizeof(*lvb), lockh, async);
2444         if (rqset) {
2445                 if (!rc) {
2446                         struct osc_enqueue_args *aa;
2447                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2448                         aa = ptlrpc_req_async_args(req);
2449                         aa->oa_ei = einfo;
2450                         aa->oa_exp = exp;
2451                         aa->oa_flags  = flags;
2452                         aa->oa_upcall = upcall;
2453                         aa->oa_cookie = cookie;
2454                         aa->oa_lvb    = lvb;
2455                         aa->oa_lockh  = lockh;
2456                         aa->oa_agl    = !!agl;
2457
2458                         req->rq_interpret_reply =
2459                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2460                         if (rqset == PTLRPCD_SET)
2461                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2462                         else
2463                                 ptlrpc_set_add_req(rqset, req);
2464                 } else if (intent) {
2465                         ptlrpc_req_finished(req);
2466                 }
2467                 RETURN(rc);
2468         }
2469
2470         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2471         if (intent)
2472                 ptlrpc_req_finished(req);
2473
2474         RETURN(rc);
2475 }
2476
2477 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2478                        struct ldlm_enqueue_info *einfo,
2479                        struct ptlrpc_request_set *rqset)
2480 {
2481         struct ldlm_res_id res_id;
2482         int rc;
2483         ENTRY;
2484
2485         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2486                            oinfo->oi_md->lsm_object_seq, &res_id);
2487
2488         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2489                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2490                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2491                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2492                               rqset, rqset != NULL, 0);
2493         RETURN(rc);
2494 }
2495
2496 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2497                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2498                    int *flags, void *data, struct lustre_handle *lockh,
2499                    int unref)
2500 {
2501         struct obd_device *obd = exp->exp_obd;
2502         int lflags = *flags;
2503         ldlm_mode_t rc;
2504         ENTRY;
2505
2506         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2507                 RETURN(-EIO);
2508
2509         /* Filesystem lock extents are extended to page boundaries so that
2510          * dealing with the page cache is a little smoother */
2511         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2512         policy->l_extent.end |= ~CFS_PAGE_MASK;
2513
2514         /* Next, search for already existing extent locks that will cover us */
2515         /* If we're trying to read, we also search for an existing PW lock.  The
2516          * VFS and page cache already protect us locally, so lots of readers/
2517          * writers can share a single PW lock. */
2518         rc = mode;
2519         if (mode == LCK_PR)
2520                 rc |= LCK_PW;
2521         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2522                              res_id, type, policy, rc, lockh, unref);
2523         if (rc) {
2524                 if (data != NULL) {
2525                         if (!osc_set_data_with_check(lockh, data)) {
2526                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2527                                         ldlm_lock_decref(lockh, rc);
2528                                 RETURN(0);
2529                         }
2530                 }
2531                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2532                         ldlm_lock_addref(lockh, LCK_PR);
2533                         ldlm_lock_decref(lockh, LCK_PW);
2534                 }
2535                 RETURN(rc);
2536         }
2537         RETURN(rc);
2538 }
2539
2540 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2541 {
2542         ENTRY;
2543
2544         if (unlikely(mode == LCK_GROUP))
2545                 ldlm_lock_decref_and_cancel(lockh, mode);
2546         else
2547                 ldlm_lock_decref(lockh, mode);
2548
2549         RETURN(0);
2550 }
2551
2552 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2553                       __u32 mode, struct lustre_handle *lockh)
2554 {
2555         ENTRY;
2556         RETURN(osc_cancel_base(lockh, mode));
2557 }
2558
2559 static int osc_cancel_unused(struct obd_export *exp,
2560                              struct lov_stripe_md *lsm,
2561                              ldlm_cancel_flags_t flags,
2562                              void *opaque)
2563 {
2564         struct obd_device *obd = class_exp2obd(exp);
2565         struct ldlm_res_id res_id, *resp = NULL;
2566
2567         if (lsm != NULL) {
2568                 resp = osc_build_res_name(lsm->lsm_object_id,
2569                                           lsm->lsm_object_seq, &res_id);
2570         }
2571
2572         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2573 }
2574
2575 static int osc_statfs_interpret(const struct lu_env *env,
2576                                 struct ptlrpc_request *req,
2577                                 struct osc_async_args *aa, int rc)
2578 {
2579         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2580         struct obd_statfs *msfs;
2581         __u64 used;
2582         ENTRY;
2583
2584         if (rc == -EBADR)
2585                 /* The request has in fact never been sent
2586                  * due to issues at a higher level (LOV).
2587                  * Exit immediately since the caller is
2588                  * aware of the problem and takes care
2589                  * of the clean up */
2590                  RETURN(rc);
2591
2592         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2593             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2594                 GOTO(out, rc = 0);
2595
2596         if (rc != 0)
2597                 GOTO(out, rc);
2598
2599         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2600         if (msfs == NULL) {
2601                 GOTO(out, rc = -EPROTO);
2602         }
2603
2604         /* Reinitialize the RDONLY and DEGRADED flags at the client
2605          * on each statfs, so they don't stay set permanently. */
2606         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2607
2608         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2609                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2610         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2611                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2612
2613         if (unlikely(msfs->os_state & OS_STATE_READONLY))
2614                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2615         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2616                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2617
2618         /* Add a bit of hysteresis so this flag isn't continually flapping,
2619          * and ensure that new files don't get extremely fragmented due to
2620          * only a small amount of available space in the filesystem.
2621          * We want to set the NOSPC flag when there is less than ~0.1% free
2622          * and clear it when there is at least ~0.2% free space, so:
2623          *                   avail < ~0.1% max          max = avail + used
2624          *            1025 * avail < avail + used       used = blocks - free
2625          *            1024 * avail < used
2626          *            1024 * avail < blocks - free
2627          *                   avail < ((blocks - free) >> 10)
2628          *
2629          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2630          * lose that amount of space so in those cases we report no space left
2631          * if their is less than 1 GB left.                             */
2632         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2633         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2634                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2635                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2636         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2637                           (msfs->os_ffree > 64) &&
2638                           (msfs->os_bavail > (used << 1)))) {
2639                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2640                                              OSCC_FLAG_NOSPC_BLK);
2641         }
2642
2643         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2644                      (msfs->os_bavail < used)))
2645                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2646
2647         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2648
2649         *aa->aa_oi->oi_osfs = *msfs;
2650 out:
2651         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2652         RETURN(rc);
2653 }
2654
2655 static int osc_statfs_async(struct obd_export *exp,
2656                             struct obd_info *oinfo, __u64 max_age,
2657                             struct ptlrpc_request_set *rqset)
2658 {
2659         struct obd_device     *obd = class_exp2obd(exp);
2660         struct ptlrpc_request *req;
2661         struct osc_async_args *aa;
2662         int                    rc;
2663         ENTRY;
2664
2665         /* We could possibly pass max_age in the request (as an absolute
2666          * timestamp or a "seconds.usec ago") so the target can avoid doing
2667          * extra calls into the filesystem if that isn't necessary (e.g.
2668          * during mount that would help a bit).  Having relative timestamps
2669          * is not so great if request processing is slow, while absolute
2670          * timestamps are not ideal because they need time synchronization. */
2671         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2672         if (req == NULL)
2673                 RETURN(-ENOMEM);
2674
2675         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2676         if (rc) {
2677                 ptlrpc_request_free(req);
2678                 RETURN(rc);
2679         }
2680         ptlrpc_request_set_replen(req);
2681         req->rq_request_portal = OST_CREATE_PORTAL;
2682         ptlrpc_at_set_req_timeout(req);
2683
2684         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2685                 /* procfs requests not want stat in wait for avoid deadlock */
2686                 req->rq_no_resend = 1;
2687                 req->rq_no_delay = 1;
2688         }
2689
2690         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2691         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2692         aa = ptlrpc_req_async_args(req);
2693         aa->aa_oi = oinfo;
2694
2695         ptlrpc_set_add_req(rqset, req);
2696         RETURN(0);
2697 }
2698
2699 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2700                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2701 {
2702         struct obd_device     *obd = class_exp2obd(exp);
2703         struct obd_statfs     *msfs;
2704         struct ptlrpc_request *req;
2705         struct obd_import     *imp = NULL;
2706         int rc;
2707         ENTRY;
2708
2709         /*Since the request might also come from lprocfs, so we need
2710          *sync this with client_disconnect_export Bug15684*/
2711         cfs_down_read(&obd->u.cli.cl_sem);
2712         if (obd->u.cli.cl_import)
2713                 imp = class_import_get(obd->u.cli.cl_import);
2714         cfs_up_read(&obd->u.cli.cl_sem);
2715         if (!imp)
2716                 RETURN(-ENODEV);
2717
2718         /* We could possibly pass max_age in the request (as an absolute
2719          * timestamp or a "seconds.usec ago") so the target can avoid doing
2720          * extra calls into the filesystem if that isn't necessary (e.g.
2721          * during mount that would help a bit).  Having relative timestamps
2722          * is not so great if request processing is slow, while absolute
2723          * timestamps are not ideal because they need time synchronization. */
2724         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2725
2726         class_import_put(imp);
2727
2728         if (req == NULL)
2729                 RETURN(-ENOMEM);
2730
2731         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2732         if (rc) {
2733                 ptlrpc_request_free(req);
2734                 RETURN(rc);
2735         }
2736         ptlrpc_request_set_replen(req);
2737         req->rq_request_portal = OST_CREATE_PORTAL;
2738         ptlrpc_at_set_req_timeout(req);
2739
2740         if (flags & OBD_STATFS_NODELAY) {
2741                 /* procfs requests not want stat in wait for avoid deadlock */
2742                 req->rq_no_resend = 1;
2743                 req->rq_no_delay = 1;
2744         }
2745
2746         rc = ptlrpc_queue_wait(req);
2747         if (rc)
2748                 GOTO(out, rc);
2749
2750         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2751         if (msfs == NULL) {
2752                 GOTO(out, rc = -EPROTO);
2753         }
2754
2755         *osfs = *msfs;
2756
2757         EXIT;
2758  out:
2759         ptlrpc_req_finished(req);
2760         return rc;
2761 }
2762
2763 /* Retrieve object striping information.
2764  *
2765  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2766  * the maximum number of OST indices which will fit in the user buffer.
2767  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2768  */
2769 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2770 {
2771         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2772         struct lov_user_md_v3 lum, *lumk;
2773         struct lov_user_ost_data_v1 *lmm_objects;
2774         int rc = 0, lum_size;
2775         ENTRY;
2776
2777         if (!lsm)
2778                 RETURN(-ENODATA);
2779
2780         /* we only need the header part from user space to get lmm_magic and
2781          * lmm_stripe_count, (the header part is common to v1 and v3) */
2782         lum_size = sizeof(struct lov_user_md_v1);
2783         if (cfs_copy_from_user(&lum, lump, lum_size))
2784                 RETURN(-EFAULT);
2785
2786         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2787             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2788                 RETURN(-EINVAL);
2789
2790         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2791         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2792         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2793         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2794
2795         /* we can use lov_mds_md_size() to compute lum_size
2796          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2797         if (lum.lmm_stripe_count > 0) {
2798                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2799                 OBD_ALLOC(lumk, lum_size);
2800                 if (!lumk)
2801                         RETURN(-ENOMEM);
2802
2803                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2804                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2805                 else
2806                         lmm_objects = &(lumk->lmm_objects[0]);
2807                 lmm_objects->l_object_id = lsm->lsm_object_id;
2808         } else {
2809                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2810                 lumk = &lum;
2811         }
2812
2813         lumk->lmm_object_id = lsm->lsm_object_id;
2814         lumk->lmm_object_seq = lsm->lsm_object_seq;
2815         lumk->lmm_stripe_count = 1;
2816
2817         if (cfs_copy_to_user(lump, lumk, lum_size))
2818                 rc = -EFAULT;
2819
2820         if (lumk != &lum)
2821                 OBD_FREE(lumk, lum_size);
2822
2823         RETURN(rc);
2824 }
2825
2826
2827 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2828                          void *karg, void *uarg)
2829 {
2830         struct obd_device *obd = exp->exp_obd;
2831         struct obd_ioctl_data *data = karg;
2832         int err = 0;
2833         ENTRY;
2834
2835         if (!cfs_try_module_get(THIS_MODULE)) {
2836                 CERROR("Can't get module. Is it alive?");
2837                 return -EINVAL;
2838         }
2839         switch (cmd) {
2840         case OBD_IOC_LOV_GET_CONFIG: {
2841                 char *buf;
2842                 struct lov_desc *desc;
2843                 struct obd_uuid uuid;
2844
2845                 buf = NULL;
2846                 len = 0;
2847                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2848                         GOTO(out, err = -EINVAL);
2849
2850                 data = (struct obd_ioctl_data *)buf;
2851
2852                 if (sizeof(*desc) > data->ioc_inllen1) {
2853                         obd_ioctl_freedata(buf, len);
2854                         GOTO(out, err = -EINVAL);
2855                 }
2856
2857                 if (data->ioc_inllen2 < sizeof(uuid)) {
2858                         obd_ioctl_freedata(buf, len);
2859                         GOTO(out, err = -EINVAL);
2860                 }
2861
2862                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2863                 desc->ld_tgt_count = 1;
2864                 desc->ld_active_tgt_count = 1;
2865                 desc->ld_default_stripe_count = 1;
2866                 desc->ld_default_stripe_size = 0;
2867                 desc->ld_default_stripe_offset = 0;
2868                 desc->ld_pattern = 0;
2869                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2870
2871                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2872
2873                 err = cfs_copy_to_user((void *)uarg, buf, len);
2874                 if (err)
2875                         err = -EFAULT;
2876                 obd_ioctl_freedata(buf, len);
2877                 GOTO(out, err);
2878         }
2879         case LL_IOC_LOV_SETSTRIPE:
2880                 err = obd_alloc_memmd(exp, karg);
2881                 if (err > 0)
2882                         err = 0;
2883                 GOTO(out, err);
2884         case LL_IOC_LOV_GETSTRIPE:
2885                 err = osc_getstripe(karg, uarg);
2886                 GOTO(out, err);
2887         case OBD_IOC_CLIENT_RECOVER:
2888                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2889                                             data->ioc_inlbuf1, 0);
2890                 if (err > 0)
2891                         err = 0;
2892                 GOTO(out, err);
2893         case IOC_OSC_SET_ACTIVE:
2894                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2895                                                data->ioc_offset);
2896                 GOTO(out, err);
2897         case OBD_IOC_POLL_QUOTACHECK:
2898                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2899                 GOTO(out, err);
2900         case OBD_IOC_PING_TARGET:
2901                 err = ptlrpc_obd_ping(obd);
2902                 GOTO(out, err);
2903         default:
2904                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2905                        cmd, cfs_curproc_comm());
2906                 GOTO(out, err = -ENOTTY);
2907         }
2908 out:
2909         cfs_module_put(THIS_MODULE);
2910         return err;
2911 }
2912
2913 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2914                         obd_count keylen, void *key, __u32 *vallen, void *val,
2915                         struct lov_stripe_md *lsm)
2916 {
2917         ENTRY;
2918         if (!vallen || !val)
2919                 RETURN(-EFAULT);
2920
2921         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2922                 __u32 *stripe = val;
2923                 *vallen = sizeof(*stripe);
2924                 *stripe = 0;
2925                 RETURN(0);
2926         } else if (KEY_IS(KEY_LAST_ID)) {
2927                 struct ptlrpc_request *req;
2928                 obd_id                *reply;
2929                 char                  *tmp;
2930                 int                    rc;
2931
2932                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2933                                            &RQF_OST_GET_INFO_LAST_ID);
2934                 if (req == NULL)
2935                         RETURN(-ENOMEM);
2936
2937                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2938                                      RCL_CLIENT, keylen);
2939                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2940                 if (rc) {
2941                         ptlrpc_request_free(req);
2942                         RETURN(rc);
2943                 }
2944
2945                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2946                 memcpy(tmp, key, keylen);
2947
2948                 req->rq_no_delay = req->rq_no_resend = 1;
2949                 ptlrpc_request_set_replen(req);
2950                 rc = ptlrpc_queue_wait(req);
2951                 if (rc)
2952                         GOTO(out, rc);
2953
2954                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2955                 if (reply == NULL)
2956                         GOTO(out, rc = -EPROTO);
2957
2958                 *((obd_id *)val) = *reply;
2959         out:
2960                 ptlrpc_req_finished(req);
2961                 RETURN(rc);
2962         } else if (KEY_IS(KEY_FIEMAP)) {
2963                 struct ptlrpc_request *req;
2964                 struct ll_user_fiemap *reply;
2965                 char *tmp;
2966                 int rc;
2967
2968                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2969                                            &RQF_OST_GET_INFO_FIEMAP);
2970                 if (req == NULL)
2971                         RETURN(-ENOMEM);
2972
2973                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2974                                      RCL_CLIENT, keylen);
2975                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2976                                      RCL_CLIENT, *vallen);
2977                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2978                                      RCL_SERVER, *vallen);
2979
2980                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2981                 if (rc) {
2982                         ptlrpc_request_free(req);
2983                         RETURN(rc);
2984                 }
2985
2986                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2987                 memcpy(tmp, key, keylen);
2988                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2989                 memcpy(tmp, val, *vallen);
2990
2991                 ptlrpc_request_set_replen(req);
2992                 rc = ptlrpc_queue_wait(req);
2993                 if (rc)
2994                         GOTO(out1, rc);
2995
2996                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2997                 if (reply == NULL)
2998                         GOTO(out1, rc = -EPROTO);
2999
3000                 memcpy(val, reply, *vallen);
3001         out1:
3002                 ptlrpc_req_finished(req);
3003
3004                 RETURN(rc);
3005         }
3006
3007         RETURN(-EINVAL);
3008 }
3009
3010 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3011 {
3012         struct llog_ctxt *ctxt;
3013         int rc = 0;
3014         ENTRY;
3015
3016         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3017         if (ctxt) {
3018                 rc = llog_initiator_connect(ctxt);
3019                 llog_ctxt_put(ctxt);
3020         } else {
3021                 /* XXX return an error? skip setting below flags? */
3022         }
3023
3024         cfs_spin_lock(&imp->imp_lock);
3025         imp->imp_server_timeout = 1;
3026         imp->imp_pingable = 1;
3027         cfs_spin_unlock(&imp->imp_lock);
3028         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3029
3030         RETURN(rc);
3031 }
3032
3033 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3034                                           struct ptlrpc_request *req,
3035                                           void *aa, int rc)
3036 {
3037         ENTRY;
3038         if (rc != 0)
3039                 RETURN(rc);
3040
3041         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3042 }
3043
3044 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3045                               obd_count keylen, void *key, obd_count vallen,
3046                               void *val, struct ptlrpc_request_set *set)
3047 {
3048         struct ptlrpc_request *req;
3049         struct obd_device     *obd = exp->exp_obd;
3050         struct obd_import     *imp = class_exp2cliimp(exp);
3051         char                  *tmp;
3052         int                    rc;
3053         ENTRY;
3054
3055         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3056
3057         if (KEY_IS(KEY_NEXT_ID)) {
3058                 obd_id new_val;
3059                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3060
3061                 if (vallen != sizeof(obd_id))
3062                         RETURN(-ERANGE);
3063                 if (val == NULL)
3064                         RETURN(-EINVAL);
3065
3066                 if (vallen != sizeof(obd_id))
3067                         RETURN(-EINVAL);
3068
3069                 /* avoid race between allocate new object and set next id
3070                  * from ll_sync thread */
3071                 cfs_spin_lock(&oscc->oscc_lock);
3072                 new_val = *((obd_id*)val) + 1;
3073                 if (new_val > oscc->oscc_next_id)
3074                         oscc->oscc_next_id = new_val;
3075                 cfs_spin_unlock(&oscc->oscc_lock);
3076                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3077                        exp->exp_obd->obd_name,
3078                        obd->u.cli.cl_oscc.oscc_next_id);
3079
3080                 RETURN(0);
3081         }
3082
3083         if (KEY_IS(KEY_CHECKSUM)) {
3084                 if (vallen != sizeof(int))
3085                         RETURN(-EINVAL);
3086                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3087                 RETURN(0);
3088         }
3089
3090         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3091                 sptlrpc_conf_client_adapt(obd);
3092                 RETURN(0);
3093         }
3094
3095         if (KEY_IS(KEY_FLUSH_CTX)) {
3096                 sptlrpc_import_flush_my_ctx(imp);
3097                 RETURN(0);
3098         }
3099
3100         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3101                 RETURN(-EINVAL);
3102
3103         /* We pass all other commands directly to OST. Since nobody calls osc
3104            methods directly and everybody is supposed to go through LOV, we
3105            assume lov checked invalid values for us.
3106            The only recognised values so far are evict_by_nid and mds_conn.
3107            Even if something bad goes through, we'd get a -EINVAL from OST
3108            anyway. */
3109
3110         if (KEY_IS(KEY_GRANT_SHRINK))
3111                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3112         else
3113                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3114
3115         if (req == NULL)
3116                 RETURN(-ENOMEM);
3117
3118         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3119                              RCL_CLIENT, keylen);
3120         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3121                              RCL_CLIENT, vallen);
3122         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3123         if (rc) {
3124                 ptlrpc_request_free(req);
3125                 RETURN(rc);
3126         }
3127
3128         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3129         memcpy(tmp, key, keylen);
3130         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3131         memcpy(tmp, val, vallen);
3132
3133         if (KEY_IS(KEY_MDS_CONN)) {
3134                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3135
3136                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3137                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3138                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3139                 req->rq_no_delay = req->rq_no_resend = 1;
3140                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3141         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3142                 struct osc_grant_args *aa;
3143                 struct obdo *oa;
3144
3145                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3146                 aa = ptlrpc_req_async_args(req);
3147                 OBDO_ALLOC(oa);
3148                 if (!oa) {
3149                         ptlrpc_req_finished(req);
3150                         RETURN(-ENOMEM);
3151                 }
3152                 *oa = ((struct ost_body *)val)->oa;
3153                 aa->aa_oa = oa;
3154                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3155         }
3156
3157         ptlrpc_request_set_replen(req);
3158         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3159                 LASSERT(set != NULL);
3160                 ptlrpc_set_add_req(set, req);
3161                 ptlrpc_check_set(NULL, set);
3162         } else
3163                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3164
3165         RETURN(0);
3166 }
3167
3168
3169 static struct llog_operations osc_size_repl_logops = {
3170         lop_cancel: llog_obd_repl_cancel
3171 };
3172
3173 static struct llog_operations osc_mds_ost_orig_logops;
3174
3175 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3176                            struct obd_device *tgt, struct llog_catid *catid)
3177 {
3178         int rc;
3179         ENTRY;
3180
3181         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3182                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3183  &nbs