Whamcloud - gitweb
LU-1030 osc: move io data from lov_oinfo into osc_object
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_async_args *aa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *aa->aa_oi->oi_oa = body->oa;
585 out:
586         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
587         RETURN(rc);
588 }
589
590 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
591                     struct obd_info *oinfo, obd_size start, obd_size end,
592                     struct ptlrpc_request_set *set)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_async_args *aa;
597         int                    rc;
598         ENTRY;
599
600         if (!oinfo->oi_oa) {
601                 CDEBUG(D_INFO, "oa NULL\n");
602                 RETURN(-EINVAL);
603         }
604
605         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
606         if (req == NULL)
607                 RETURN(-ENOMEM);
608
609         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
611         if (rc) {
612                 ptlrpc_request_free(req);
613                 RETURN(rc);
614         }
615
616         /* overload the size and blocks fields in the oa with start/end */
617         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
618         LASSERT(body);
619         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620         body->oa.o_size = start;
621         body->oa.o_blocks = end;
622         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623         osc_pack_capa(req, body, oinfo->oi_capa);
624
625         ptlrpc_request_set_replen(req);
626         req->rq_interpret_reply = osc_sync_interpret;
627
628         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629         aa = ptlrpc_req_async_args(req);
630         aa->aa_oi = oinfo;
631
632         ptlrpc_set_add_req(set, req);
633         RETURN (0);
634 }
635
636 /* Find and cancel locally locks matched by @mode in the resource found by
637  * @objid. Found locks are added into @cancel list. Returns the amount of
638  * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
640                                    cfs_list_t *cancels,
641                                    ldlm_mode_t mode, int lock_flags)
642 {
643         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644         struct ldlm_res_id res_id;
645         struct ldlm_resource *res;
646         int count;
647         ENTRY;
648
649         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
651         if (res == NULL)
652                 RETURN(0);
653
654         LDLM_RESOURCE_ADDREF(res);
655         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656                                            lock_flags, 0, NULL);
657         LDLM_RESOURCE_DELREF(res);
658         ldlm_resource_putref(res);
659         RETURN(count);
660 }
661
662 static int osc_destroy_interpret(const struct lu_env *env,
663                                  struct ptlrpc_request *req, void *data,
664                                  int rc)
665 {
666         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
667
668         cfs_atomic_dec(&cli->cl_destroy_in_flight);
669         cfs_waitq_signal(&cli->cl_destroy_waitq);
670         return 0;
671 }
672
673 static int osc_can_send_destroy(struct client_obd *cli)
674 {
675         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676             cli->cl_max_rpcs_in_flight) {
677                 /* The destroy request can be sent */
678                 return 1;
679         }
680         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681             cli->cl_max_rpcs_in_flight) {
682                 /*
683                  * The counter has been modified between the two atomic
684                  * operations.
685                  */
686                 cfs_waitq_signal(&cli->cl_destroy_waitq);
687         }
688         return 0;
689 }
690
691 /* Destroy requests can be async always on the client, and we don't even really
692  * care about the return code since the client cannot do anything at all about
693  * a destroy failure.
694  * When the MDS is unlinking a filename, it saves the file objects into a
695  * recovery llog, and these object records are cancelled when the OST reports
696  * they were destroyed and sync'd to disk (i.e. transaction committed).
697  * If the client dies, or the OST is down when the object should be destroyed,
698  * the records are not cancelled, and when the OST reconnects to the MDS next,
699  * it will retrieve the llog unlink logs and then sends the log cancellation
700  * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
702                        struct obdo *oa, struct lov_stripe_md *ea,
703                        struct obd_trans_info *oti, struct obd_export *md_export,
704                        void *capa)
705 {
706         struct client_obd     *cli = &exp->exp_obd->u.cli;
707         struct ptlrpc_request *req;
708         struct ost_body       *body;
709         CFS_LIST_HEAD(cancels);
710         int rc, count;
711         ENTRY;
712
713         if (!oa) {
714                 CDEBUG(D_INFO, "oa NULL\n");
715                 RETURN(-EINVAL);
716         }
717
718         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
719                                         LDLM_FL_DISCARD_DATA);
720
721         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
722         if (req == NULL) {
723                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
724                 RETURN(-ENOMEM);
725         }
726
727         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
728         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
729                                0, &cancels, count);
730         if (rc) {
731                 ptlrpc_request_free(req);
732                 RETURN(rc);
733         }
734
735         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
736         ptlrpc_at_set_req_timeout(req);
737
738         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
739                 oa->o_lcookie = *oti->oti_logcookies;
740         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
741         LASSERT(body);
742         lustre_set_wire_obdo(&body->oa, oa);
743
744         osc_pack_capa(req, body, (struct obd_capa *)capa);
745         ptlrpc_request_set_replen(req);
746
747         /* don't throttle destroy RPCs for the MDT */
748         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
749                 req->rq_interpret_reply = osc_destroy_interpret;
750                 if (!osc_can_send_destroy(cli)) {
751                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
752                                                           NULL);
753
754                         /*
755                          * Wait until the number of on-going destroy RPCs drops
756                          * under max_rpc_in_flight
757                          */
758                         l_wait_event_exclusive(cli->cl_destroy_waitq,
759                                                osc_can_send_destroy(cli), &lwi);
760                 }
761         }
762
763         /* Do not wait for response */
764         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
765         RETURN(0);
766 }
767
768 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
769                                 long writing_bytes)
770 {
771         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
772
773         LASSERT(!(oa->o_valid & bits));
774
775         oa->o_valid |= bits;
776         client_obd_list_lock(&cli->cl_loi_list_lock);
777         oa->o_dirty = cli->cl_dirty;
778         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
779                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
780                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
781                 oa->o_undirty = 0;
782         } else if (cfs_atomic_read(&obd_dirty_pages) -
783                    cfs_atomic_read(&obd_dirty_transit_pages) >
784                    obd_max_dirty_pages + 1){
785                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
786                  * not covered by a lock thus they may safely race and trip
787                  * this CERROR() unless we add in a small fudge factor (+1). */
788                 CERROR("dirty %d - %d > system dirty_max %d\n",
789                        cfs_atomic_read(&obd_dirty_pages),
790                        cfs_atomic_read(&obd_dirty_transit_pages),
791                        obd_max_dirty_pages);
792                 oa->o_undirty = 0;
793         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
794                 CERROR("dirty %lu - dirty_max %lu too big???\n",
795                        cli->cl_dirty, cli->cl_dirty_max);
796                 oa->o_undirty = 0;
797         } else {
798                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
799                                 (cli->cl_max_rpcs_in_flight + 1);
800                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
801         }
802         oa->o_grant = cli->cl_avail_grant;
803         oa->o_dropped = cli->cl_lost_grant;
804         cli->cl_lost_grant = 0;
805         client_obd_list_unlock(&cli->cl_loi_list_lock);
806         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
807                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
808
809 }
810
811 void osc_update_next_shrink(struct client_obd *cli)
812 {
813         cli->cl_next_shrink_grant =
814                 cfs_time_shift(cli->cl_grant_shrink_interval);
815         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
816                cli->cl_next_shrink_grant);
817 }
818
819 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
820 {
821         client_obd_list_lock(&cli->cl_loi_list_lock);
822         cli->cl_avail_grant += grant;
823         client_obd_list_unlock(&cli->cl_loi_list_lock);
824 }
825
826 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
827 {
828         if (body->oa.o_valid & OBD_MD_FLGRANT) {
829                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
830                 __osc_update_grant(cli, body->oa.o_grant);
831         }
832 }
833
834 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
835                               obd_count keylen, void *key, obd_count vallen,
836                               void *val, struct ptlrpc_request_set *set);
837
838 static int osc_shrink_grant_interpret(const struct lu_env *env,
839                                       struct ptlrpc_request *req,
840                                       void *aa, int rc)
841 {
842         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
843         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
844         struct ost_body *body;
845
846         if (rc != 0) {
847                 __osc_update_grant(cli, oa->o_grant);
848                 GOTO(out, rc);
849         }
850
851         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
852         LASSERT(body);
853         osc_update_grant(cli, body);
854 out:
855         OBDO_FREE(oa);
856         return rc;
857 }
858
859 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
860 {
861         client_obd_list_lock(&cli->cl_loi_list_lock);
862         oa->o_grant = cli->cl_avail_grant / 4;
863         cli->cl_avail_grant -= oa->o_grant;
864         client_obd_list_unlock(&cli->cl_loi_list_lock);
865         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
866                 oa->o_valid |= OBD_MD_FLFLAGS;
867                 oa->o_flags = 0;
868         }
869         oa->o_flags |= OBD_FL_SHRINK_GRANT;
870         osc_update_next_shrink(cli);
871 }
872
873 /* Shrink the current grant, either from some large amount to enough for a
874  * full set of in-flight RPCs, or if we have already shrunk to that limit
875  * then to enough for a single RPC.  This avoids keeping more grant than
876  * needed, and avoids shrinking the grant piecemeal. */
877 static int osc_shrink_grant(struct client_obd *cli)
878 {
879         long target = (cli->cl_max_rpcs_in_flight + 1) *
880                       cli->cl_max_pages_per_rpc;
881
882         client_obd_list_lock(&cli->cl_loi_list_lock);
883         if (cli->cl_avail_grant <= target)
884                 target = cli->cl_max_pages_per_rpc;
885         client_obd_list_unlock(&cli->cl_loi_list_lock);
886
887         return osc_shrink_grant_to_target(cli, target);
888 }
889
890 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
891 {
892         int    rc = 0;
893         struct ost_body     *body;
894         ENTRY;
895
896         client_obd_list_lock(&cli->cl_loi_list_lock);
897         /* Don't shrink if we are already above or below the desired limit
898          * We don't want to shrink below a single RPC, as that will negatively
899          * impact block allocation and long-term performance. */
900         if (target < cli->cl_max_pages_per_rpc)
901                 target = cli->cl_max_pages_per_rpc;
902
903         if (target >= cli->cl_avail_grant) {
904                 client_obd_list_unlock(&cli->cl_loi_list_lock);
905                 RETURN(0);
906         }
907         client_obd_list_unlock(&cli->cl_loi_list_lock);
908
909         OBD_ALLOC_PTR(body);
910         if (!body)
911                 RETURN(-ENOMEM);
912
913         osc_announce_cached(cli, &body->oa, 0);
914
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         body->oa.o_grant = cli->cl_avail_grant - target;
917         cli->cl_avail_grant = target;
918         client_obd_list_unlock(&cli->cl_loi_list_lock);
919         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
920                 body->oa.o_valid |= OBD_MD_FLFLAGS;
921                 body->oa.o_flags = 0;
922         }
923         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
924         osc_update_next_shrink(cli);
925
926         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
927                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
928                                 sizeof(*body), body, NULL);
929         if (rc != 0)
930                 __osc_update_grant(cli, body->oa.o_grant);
931         OBD_FREE_PTR(body);
932         RETURN(rc);
933 }
934
935 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
936 static int osc_should_shrink_grant(struct client_obd *client)
937 {
938         cfs_time_t time = cfs_time_current();
939         cfs_time_t next_shrink = client->cl_next_shrink_grant;
940
941         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
942              OBD_CONNECT_GRANT_SHRINK) == 0)
943                 return 0;
944
945         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
946                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
947                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
948                         return 1;
949                 else
950                         osc_update_next_shrink(client);
951         }
952         return 0;
953 }
954
955 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
956 {
957         struct client_obd *client;
958
959         cfs_list_for_each_entry(client, &item->ti_obd_list,
960                                 cl_grant_shrink_list) {
961                 if (osc_should_shrink_grant(client))
962                         osc_shrink_grant(client);
963         }
964         return 0;
965 }
966
967 static int osc_add_shrink_grant(struct client_obd *client)
968 {
969         int rc;
970
971         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
972                                        TIMEOUT_GRANT,
973                                        osc_grant_shrink_grant_cb, NULL,
974                                        &client->cl_grant_shrink_list);
975         if (rc) {
976                 CERROR("add grant client %s error %d\n",
977                         client->cl_import->imp_obd->obd_name, rc);
978                 return rc;
979         }
980         CDEBUG(D_CACHE, "add grant client %s \n",
981                client->cl_import->imp_obd->obd_name);
982         osc_update_next_shrink(client);
983         return 0;
984 }
985
986 static int osc_del_shrink_grant(struct client_obd *client)
987 {
988         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
989                                          TIMEOUT_GRANT);
990 }
991
992 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
993 {
994         /*
995          * ocd_grant is the total grant amount we're expect to hold: if we've
996          * been evicted, it's the new avail_grant amount, cl_dirty will drop
997          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
998          *
999          * race is tolerable here: if we're evicted, but imp_state already
1000          * left EVICTED state, then cl_dirty must be 0 already.
1001          */
1002         client_obd_list_lock(&cli->cl_loi_list_lock);
1003         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1004                 cli->cl_avail_grant = ocd->ocd_grant;
1005         else
1006                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1007
1008         if (cli->cl_avail_grant < 0) {
1009                 CWARN("%s: available grant < 0, the OSS is probably not running"
1010                       " with patch from bug20278 (%ld) \n",
1011                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1012                 /* workaround for 1.6 servers which do not have
1013                  * the patch from bug20278 */
1014                 cli->cl_avail_grant = ocd->ocd_grant;
1015         }
1016
1017         client_obd_list_unlock(&cli->cl_loi_list_lock);
1018
1019         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1020                cli->cl_import->imp_obd->obd_name,
1021                cli->cl_avail_grant, cli->cl_lost_grant);
1022
1023         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1024             cfs_list_empty(&cli->cl_grant_shrink_list))
1025                 osc_add_shrink_grant(cli);
1026 }
1027
1028 /* We assume that the reason this OSC got a short read is because it read
1029  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1030  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1031  * this stripe never got written at or beyond this stripe offset yet. */
1032 static void handle_short_read(int nob_read, obd_count page_count,
1033                               struct brw_page **pga)
1034 {
1035         char *ptr;
1036         int i = 0;
1037
1038         /* skip bytes read OK */
1039         while (nob_read > 0) {
1040                 LASSERT (page_count > 0);
1041
1042                 if (pga[i]->count > nob_read) {
1043                         /* EOF inside this page */
1044                         ptr = cfs_kmap(pga[i]->pg) +
1045                                 (pga[i]->off & ~CFS_PAGE_MASK);
1046                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1047                         cfs_kunmap(pga[i]->pg);
1048                         page_count--;
1049                         i++;
1050                         break;
1051                 }
1052
1053                 nob_read -= pga[i]->count;
1054                 page_count--;
1055                 i++;
1056         }
1057
1058         /* zero remaining pages */
1059         while (page_count-- > 0) {
1060                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1061                 memset(ptr, 0, pga[i]->count);
1062                 cfs_kunmap(pga[i]->pg);
1063                 i++;
1064         }
1065 }
1066
1067 static int check_write_rcs(struct ptlrpc_request *req,
1068                            int requested_nob, int niocount,
1069                            obd_count page_count, struct brw_page **pga)
1070 {
1071         int     i;
1072         __u32   *remote_rcs;
1073
1074         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1075                                                   sizeof(*remote_rcs) *
1076                                                   niocount);
1077         if (remote_rcs == NULL) {
1078                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1079                 return(-EPROTO);
1080         }
1081
1082         /* return error if any niobuf was in error */
1083         for (i = 0; i < niocount; i++) {
1084                 if ((int)remote_rcs[i] < 0)
1085                         return(remote_rcs[i]);
1086
1087                 if (remote_rcs[i] != 0) {
1088                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1089                                 i, remote_rcs[i], req);
1090                         return(-EPROTO);
1091                 }
1092         }
1093
1094         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1095                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1096                        req->rq_bulk->bd_nob_transferred, requested_nob);
1097                 return(-EPROTO);
1098         }
1099
1100         return (0);
1101 }
1102
1103 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1104 {
1105         if (p1->flag != p2->flag) {
1106                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1107                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1108
1109                 /* warn if we try to combine flags that we don't know to be
1110                  * safe to combine */
1111                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1112                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1113                               "report this at http://bugs.whamcloud.com/\n",
1114                               p1->flag, p2->flag);
1115                 }
1116                 return 0;
1117         }
1118
1119         return (p1->off + p1->count == p2->off);
1120 }
1121
1122 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1123                                    struct brw_page **pga, int opc,
1124                                    cksum_type_t cksum_type)
1125 {
1126         __u32 cksum;
1127         int i = 0;
1128
1129         LASSERT (pg_count > 0);
1130         cksum = init_checksum(cksum_type);
1131         while (nob > 0 && pg_count > 0) {
1132                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1133                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1134                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1135
1136                 /* corrupt the data before we compute the checksum, to
1137                  * simulate an OST->client data error */
1138                 if (i == 0 && opc == OST_READ &&
1139                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1140                         memcpy(ptr + off, "bad1", min(4, nob));
1141                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1142                 cfs_kunmap(pga[i]->pg);
1143                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1144                                off, cksum);
1145
1146                 nob -= pga[i]->count;
1147                 pg_count--;
1148                 i++;
1149         }
1150         /* For sending we only compute the wrong checksum instead
1151          * of corrupting the data so it is still correct on a redo */
1152         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1153                 cksum++;
1154
1155         return fini_checksum(cksum, cksum_type);
1156 }
1157
1158 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1159                                 struct lov_stripe_md *lsm, obd_count page_count,
1160                                 struct brw_page **pga,
1161                                 struct ptlrpc_request **reqp,
1162                                 struct obd_capa *ocapa, int reserve,
1163                                 int resend)
1164 {
1165         struct ptlrpc_request   *req;
1166         struct ptlrpc_bulk_desc *desc;
1167         struct ost_body         *body;
1168         struct obd_ioobj        *ioobj;
1169         struct niobuf_remote    *niobuf;
1170         int niocount, i, requested_nob, opc, rc;
1171         struct osc_brw_async_args *aa;
1172         struct req_capsule      *pill;
1173         struct brw_page *pg_prev;
1174
1175         ENTRY;
1176         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1177                 RETURN(-ENOMEM); /* Recoverable */
1178         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1179                 RETURN(-EINVAL); /* Fatal */
1180
1181         if ((cmd & OBD_BRW_WRITE) != 0) {
1182                 opc = OST_WRITE;
1183                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1184                                                 cli->cl_import->imp_rq_pool,
1185                                                 &RQF_OST_BRW_WRITE);
1186         } else {
1187                 opc = OST_READ;
1188                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1189         }
1190         if (req == NULL)
1191                 RETURN(-ENOMEM);
1192
1193         for (niocount = i = 1; i < page_count; i++) {
1194                 if (!can_merge_pages(pga[i - 1], pga[i]))
1195                         niocount++;
1196         }
1197
1198         pill = &req->rq_pill;
1199         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1200                              sizeof(*ioobj));
1201         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1202                              niocount * sizeof(*niobuf));
1203         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1204
1205         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1206         if (rc) {
1207                 ptlrpc_request_free(req);
1208                 RETURN(rc);
1209         }
1210         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1211         ptlrpc_at_set_req_timeout(req);
1212
1213         if (opc == OST_WRITE)
1214                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1215                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1216         else
1217                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1218                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1219
1220         if (desc == NULL)
1221                 GOTO(out, rc = -ENOMEM);
1222         /* NB request now owns desc and will free it when it gets freed */
1223
1224         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1225         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1226         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1227         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1228
1229         lustre_set_wire_obdo(&body->oa, oa);
1230
1231         obdo_to_ioobj(oa, ioobj);
1232         ioobj->ioo_bufcnt = niocount;
1233         osc_pack_capa(req, body, ocapa);
1234         LASSERT (page_count > 0);
1235         pg_prev = pga[0];
1236         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1237                 struct brw_page *pg = pga[i];
1238                 int poff = pg->off & ~CFS_PAGE_MASK;
1239
1240                 LASSERT(pg->count > 0);
1241                 /* make sure there is no gap in the middle of page array */
1242                 LASSERTF(page_count == 1 ||
1243                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1244                           ergo(i > 0 && i < page_count - 1,
1245                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1246                           ergo(i == page_count - 1, poff == 0)),
1247                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1248                          i, page_count, pg, pg->off, pg->count);
1249 #ifdef __linux__
1250                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1251                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1252                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1253                          i, page_count,
1254                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1255                          pg_prev->pg, page_private(pg_prev->pg),
1256                          pg_prev->pg->index, pg_prev->off);
1257 #else
1258                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1259                          "i %d p_c %u\n", i, page_count);
1260 #endif
1261                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1262                         (pg->flag & OBD_BRW_SRVLOCK));
1263
1264                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1265                 requested_nob += pg->count;
1266
1267                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1268                         niobuf--;
1269                         niobuf->len += pg->count;
1270                 } else {
1271                         niobuf->offset = pg->off;
1272                         niobuf->len    = pg->count;
1273                         niobuf->flags  = pg->flag;
1274                 }
1275                 pg_prev = pg;
1276         }
1277
1278         LASSERTF((void *)(niobuf - niocount) ==
1279                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1280                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1281                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1282
1283         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1284         if (resend) {
1285                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1286                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1287                         body->oa.o_flags = 0;
1288                 }
1289                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1290         }
1291
1292         if (osc_should_shrink_grant(cli))
1293                 osc_shrink_grant_local(cli, &body->oa);
1294
1295         /* size[REQ_REC_OFF] still sizeof (*body) */
1296         if (opc == OST_WRITE) {
1297                 if (cli->cl_checksum &&
1298                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1299                         /* store cl_cksum_type in a local variable since
1300                          * it can be changed via lprocfs */
1301                         cksum_type_t cksum_type = cli->cl_cksum_type;
1302
1303                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1304                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1305                                 body->oa.o_flags = 0;
1306                         }
1307                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1308                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1309                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1310                                                              page_count, pga,
1311                                                              OST_WRITE,
1312                                                              cksum_type);
1313                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1314                                body->oa.o_cksum);
1315                         /* save this in 'oa', too, for later checking */
1316                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1317                         oa->o_flags |= cksum_type_pack(cksum_type);
1318                 } else {
1319                         /* clear out the checksum flag, in case this is a
1320                          * resend but cl_checksum is no longer set. b=11238 */
1321                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1322                 }
1323                 oa->o_cksum = body->oa.o_cksum;
1324                 /* 1 RC per niobuf */
1325                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1326                                      sizeof(__u32) * niocount);
1327         } else {
1328                 if (cli->cl_checksum &&
1329                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1330                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1331                                 body->oa.o_flags = 0;
1332                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1333                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1334                 }
1335         }
1336         ptlrpc_request_set_replen(req);
1337
1338         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1339         aa = ptlrpc_req_async_args(req);
1340         aa->aa_oa = oa;
1341         aa->aa_requested_nob = requested_nob;
1342         aa->aa_nio_count = niocount;
1343         aa->aa_page_count = page_count;
1344         aa->aa_resends = 0;
1345         aa->aa_ppga = pga;
1346         aa->aa_cli = cli;
1347         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1348         if (ocapa && reserve)
1349                 aa->aa_ocapa = capa_get(ocapa);
1350
1351         *reqp = req;
1352         RETURN(0);
1353
1354  out:
1355         ptlrpc_req_finished(req);
1356         RETURN(rc);
1357 }
1358
1359 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1360                                 __u32 client_cksum, __u32 server_cksum, int nob,
1361                                 obd_count page_count, struct brw_page **pga,
1362                                 cksum_type_t client_cksum_type)
1363 {
1364         __u32 new_cksum;
1365         char *msg;
1366         cksum_type_t cksum_type;
1367
1368         if (server_cksum == client_cksum) {
1369                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1370                 return 0;
1371         }
1372
1373         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1374                                        oa->o_flags : 0);
1375         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1376                                       cksum_type);
1377
1378         if (cksum_type != client_cksum_type)
1379                 msg = "the server did not use the checksum type specified in "
1380                       "the original request - likely a protocol problem";
1381         else if (new_cksum == server_cksum)
1382                 msg = "changed on the client after we checksummed it - "
1383                       "likely false positive due to mmap IO (bug 11742)";
1384         else if (new_cksum == client_cksum)
1385                 msg = "changed in transit before arrival at OST";
1386         else
1387                 msg = "changed in transit AND doesn't match the original - "
1388                       "likely false positive due to mmap IO (bug 11742)";
1389
1390         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1391                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1392                            msg, libcfs_nid2str(peer->nid),
1393                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1394                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1395                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1396                            oa->o_id,
1397                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1398                            pga[0]->off,
1399                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1400         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1401                "client csum now %x\n", client_cksum, client_cksum_type,
1402                server_cksum, cksum_type, new_cksum);
1403         return 1;
1404 }
1405
1406 /* Note rc enters this function as number of bytes transferred */
1407 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1408 {
1409         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1410         const lnet_process_id_t *peer =
1411                         &req->rq_import->imp_connection->c_peer;
1412         struct client_obd *cli = aa->aa_cli;
1413         struct ost_body *body;
1414         __u32 client_cksum = 0;
1415         ENTRY;
1416
1417         if (rc < 0 && rc != -EDQUOT) {
1418                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1419                 RETURN(rc);
1420         }
1421
1422         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1423         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1424         if (body == NULL) {
1425                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1426                 RETURN(-EPROTO);
1427         }
1428
1429         /* set/clear over quota flag for a uid/gid */
1430         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1431             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1432                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1433
1434                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1435                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1436                        body->oa.o_flags);
1437                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1438         }
1439
1440         osc_update_grant(cli, body);
1441
1442         if (rc < 0)
1443                 RETURN(rc);
1444
1445         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1446                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1447
1448         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1449                 if (rc > 0) {
1450                         CERROR("Unexpected +ve rc %d\n", rc);
1451                         RETURN(-EPROTO);
1452                 }
1453                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1454
1455                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1456                         RETURN(-EAGAIN);
1457
1458                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1459                     check_write_checksum(&body->oa, peer, client_cksum,
1460                                          body->oa.o_cksum, aa->aa_requested_nob,
1461                                          aa->aa_page_count, aa->aa_ppga,
1462                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1463                         RETURN(-EAGAIN);
1464
1465                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1466                                      aa->aa_page_count, aa->aa_ppga);
1467                 GOTO(out, rc);
1468         }
1469
1470         /* The rest of this function executes only for OST_READs */
1471
1472         /* if unwrap_bulk failed, return -EAGAIN to retry */
1473         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1474         if (rc < 0)
1475                 GOTO(out, rc = -EAGAIN);
1476
1477         if (rc > aa->aa_requested_nob) {
1478                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1479                        aa->aa_requested_nob);
1480                 RETURN(-EPROTO);
1481         }
1482
1483         if (rc != req->rq_bulk->bd_nob_transferred) {
1484                 CERROR ("Unexpected rc %d (%d transferred)\n",
1485                         rc, req->rq_bulk->bd_nob_transferred);
1486                 return (-EPROTO);
1487         }
1488
1489         if (rc < aa->aa_requested_nob)
1490                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1491
1492         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1493                 static int cksum_counter;
1494                 __u32      server_cksum = body->oa.o_cksum;
1495                 char      *via;
1496                 char      *router;
1497                 cksum_type_t cksum_type;
1498
1499                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1500                                                body->oa.o_flags : 0);
1501                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1502                                                  aa->aa_ppga, OST_READ,
1503                                                  cksum_type);
1504
1505                 if (peer->nid == req->rq_bulk->bd_sender) {
1506                         via = router = "";
1507                 } else {
1508                         via = " via ";
1509                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1510                 }
1511
1512                 if (server_cksum == ~0 && rc > 0) {
1513                         CERROR("Protocol error: server %s set the 'checksum' "
1514                                "bit, but didn't send a checksum.  Not fatal, "
1515                                "but please notify on http://bugs.whamcloud.com/\n",
1516                                libcfs_nid2str(peer->nid));
1517                 } else if (server_cksum != client_cksum) {
1518                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1519                                            "%s%s%s inode "DFID" object "
1520                                            LPU64"/"LPU64" extent "
1521                                            "["LPU64"-"LPU64"]\n",
1522                                            req->rq_import->imp_obd->obd_name,
1523                                            libcfs_nid2str(peer->nid),
1524                                            via, router,
1525                                            body->oa.o_valid & OBD_MD_FLFID ?
1526                                                 body->oa.o_parent_seq : (__u64)0,
1527                                            body->oa.o_valid & OBD_MD_FLFID ?
1528                                                 body->oa.o_parent_oid : 0,
1529                                            body->oa.o_valid & OBD_MD_FLFID ?
1530                                                 body->oa.o_parent_ver : 0,
1531                                            body->oa.o_id,
1532                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1533                                                 body->oa.o_seq : (__u64)0,
1534                                            aa->aa_ppga[0]->off,
1535                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1536                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1537                                                                         1);
1538                         CERROR("client %x, server %x, cksum_type %x\n",
1539                                client_cksum, server_cksum, cksum_type);
1540                         cksum_counter = 0;
1541                         aa->aa_oa->o_cksum = client_cksum;
1542                         rc = -EAGAIN;
1543                 } else {
1544                         cksum_counter++;
1545                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1546                         rc = 0;
1547                 }
1548         } else if (unlikely(client_cksum)) {
1549                 static int cksum_missed;
1550
1551                 cksum_missed++;
1552                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1553                         CERROR("Checksum %u requested from %s but not sent\n",
1554                                cksum_missed, libcfs_nid2str(peer->nid));
1555         } else {
1556                 rc = 0;
1557         }
1558 out:
1559         if (rc >= 0)
1560                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1561
1562         RETURN(rc);
1563 }
1564
1565 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1566                             struct lov_stripe_md *lsm,
1567                             obd_count page_count, struct brw_page **pga,
1568                             struct obd_capa *ocapa)
1569 {
1570         struct ptlrpc_request *req;
1571         int                    rc;
1572         cfs_waitq_t            waitq;
1573         int                    generation, resends = 0;
1574         struct l_wait_info     lwi;
1575
1576         ENTRY;
1577
1578         cfs_waitq_init(&waitq);
1579         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1580
1581 restart_bulk:
1582         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1583                                   page_count, pga, &req, ocapa, 0, resends);
1584         if (rc != 0)
1585                 return (rc);
1586
1587         if (resends) {
1588                 req->rq_generation_set = 1;
1589                 req->rq_import_generation = generation;
1590                 req->rq_sent = cfs_time_current_sec() + resends;
1591         }
1592
1593         rc = ptlrpc_queue_wait(req);
1594
1595         if (rc == -ETIMEDOUT && req->rq_resend) {
1596                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1597                 ptlrpc_req_finished(req);
1598                 goto restart_bulk;
1599         }
1600
1601         rc = osc_brw_fini_request(req, rc);
1602
1603         ptlrpc_req_finished(req);
1604         /* When server return -EINPROGRESS, client should always retry
1605          * regardless of the number of times the bulk was resent already.*/
1606         if (osc_recoverable_error(rc)) {
1607                 resends++;
1608                 if (rc != -EINPROGRESS &&
1609                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1610                         CERROR("%s: too many resend retries for object: "
1611                                ""LPU64":"LPU64", rc = %d.\n",
1612                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1613                         goto out;
1614                 }
1615                 if (generation !=
1616                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1617                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1618                                ""LPU64":"LPU64", rc = %d.\n",
1619                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1620                         goto out;
1621                 }
1622
1623                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1624                                        NULL);
1625                 l_wait_event(waitq, 0, &lwi);
1626
1627                 goto restart_bulk;
1628         }
1629 out:
1630         if (rc == -EAGAIN || rc == -EINPROGRESS)
1631                 rc = -EIO;
1632         RETURN (rc);
1633 }
1634
1635 int osc_brw_redo_request(struct ptlrpc_request *request,
1636                          struct osc_brw_async_args *aa)
1637 {
1638         struct ptlrpc_request *new_req;
1639         struct ptlrpc_request_set *set = request->rq_set;
1640         struct osc_brw_async_args *new_aa;
1641         struct osc_async_page *oap;
1642         int rc = 0;
1643         ENTRY;
1644
1645         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1646
1647         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1648                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1649                                   aa->aa_cli, aa->aa_oa,
1650                                   NULL /* lsm unused by osc currently */,
1651                                   aa->aa_page_count, aa->aa_ppga,
1652                                   &new_req, aa->aa_ocapa, 0, 1);
1653         if (rc)
1654                 RETURN(rc);
1655
1656         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1657
1658         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1659                 if (oap->oap_request != NULL) {
1660                         LASSERTF(request == oap->oap_request,
1661                                  "request %p != oap_request %p\n",
1662                                  request, oap->oap_request);
1663                         if (oap->oap_interrupted) {
1664                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1665                                 ptlrpc_req_finished(new_req);
1666                                 RETURN(-EINTR);
1667                         }
1668                 }
1669         }
1670         /* New request takes over pga and oaps from old request.
1671          * Note that copying a list_head doesn't work, need to move it... */
1672         aa->aa_resends++;
1673         new_req->rq_interpret_reply = request->rq_interpret_reply;
1674         new_req->rq_async_args = request->rq_async_args;
1675         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1676         new_req->rq_generation_set = 1;
1677         new_req->rq_import_generation = request->rq_import_generation;
1678
1679         new_aa = ptlrpc_req_async_args(new_req);
1680
1681         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1682         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1683         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1684
1685         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1686                 if (oap->oap_request) {
1687                         ptlrpc_req_finished(oap->oap_request);
1688                         oap->oap_request = ptlrpc_request_addref(new_req);
1689                 }
1690         }
1691
1692         new_aa->aa_ocapa = aa->aa_ocapa;
1693         aa->aa_ocapa = NULL;
1694
1695         /* use ptlrpc_set_add_req is safe because interpret functions work
1696          * in check_set context. only one way exist with access to request
1697          * from different thread got -EINTR - this way protected with
1698          * cl_loi_list_lock */
1699         ptlrpc_set_add_req(set, new_req);
1700
1701         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1702
1703         DEBUG_REQ(D_INFO, new_req, "new request");
1704         RETURN(0);
1705 }
1706
1707 /*
1708  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1709  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1710  * fine for our small page arrays and doesn't require allocation.  its an
1711  * insertion sort that swaps elements that are strides apart, shrinking the
1712  * stride down until its '1' and the array is sorted.
1713  */
1714 static void sort_brw_pages(struct brw_page **array, int num)
1715 {
1716         int stride, i, j;
1717         struct brw_page *tmp;
1718
1719         if (num == 1)
1720                 return;
1721         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1722                 ;
1723
1724         do {
1725                 stride /= 3;
1726                 for (i = stride ; i < num ; i++) {
1727                         tmp = array[i];
1728                         j = i;
1729                         while (j >= stride && array[j - stride]->off > tmp->off) {
1730                                 array[j] = array[j - stride];
1731                                 j -= stride;
1732                         }
1733                         array[j] = tmp;
1734                 }
1735         } while (stride > 1);
1736 }
1737
1738 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1739 {
1740         int count = 1;
1741         int offset;
1742         int i = 0;
1743
1744         LASSERT (pages > 0);
1745         offset = pg[i]->off & ~CFS_PAGE_MASK;
1746
1747         for (;;) {
1748                 pages--;
1749                 if (pages == 0)         /* that's all */
1750                         return count;
1751
1752                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1753                         return count;   /* doesn't end on page boundary */
1754
1755                 i++;
1756                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1757                 if (offset != 0)        /* doesn't start on page boundary */
1758                         return count;
1759
1760                 count++;
1761         }
1762 }
1763
1764 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1765 {
1766         struct brw_page **ppga;
1767         int i;
1768
1769         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1770         if (ppga == NULL)
1771                 return NULL;
1772
1773         for (i = 0; i < count; i++)
1774                 ppga[i] = pga + i;
1775         return ppga;
1776 }
1777
1778 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1779 {
1780         LASSERT(ppga != NULL);
1781         OBD_FREE(ppga, sizeof(*ppga) * count);
1782 }
1783
1784 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1785                    obd_count page_count, struct brw_page *pga,
1786                    struct obd_trans_info *oti)
1787 {
1788         struct obdo *saved_oa = NULL;
1789         struct brw_page **ppga, **orig;
1790         struct obd_import *imp = class_exp2cliimp(exp);
1791         struct client_obd *cli;
1792         int rc, page_count_orig;
1793         ENTRY;
1794
1795         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1796         cli = &imp->imp_obd->u.cli;
1797
1798         if (cmd & OBD_BRW_CHECK) {
1799                 /* The caller just wants to know if there's a chance that this
1800                  * I/O can succeed */
1801
1802                 if (imp->imp_invalid)
1803                         RETURN(-EIO);
1804                 RETURN(0);
1805         }
1806
1807         /* test_brw with a failed create can trip this, maybe others. */
1808         LASSERT(cli->cl_max_pages_per_rpc);
1809
1810         rc = 0;
1811
1812         orig = ppga = osc_build_ppga(pga, page_count);
1813         if (ppga == NULL)
1814                 RETURN(-ENOMEM);
1815         page_count_orig = page_count;
1816
1817         sort_brw_pages(ppga, page_count);
1818         while (page_count) {
1819                 obd_count pages_per_brw;
1820
1821                 if (page_count > cli->cl_max_pages_per_rpc)
1822                         pages_per_brw = cli->cl_max_pages_per_rpc;
1823                 else
1824                         pages_per_brw = page_count;
1825
1826                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1827
1828                 if (saved_oa != NULL) {
1829                         /* restore previously saved oa */
1830                         *oinfo->oi_oa = *saved_oa;
1831                 } else if (page_count > pages_per_brw) {
1832                         /* save a copy of oa (brw will clobber it) */
1833                         OBDO_ALLOC(saved_oa);
1834                         if (saved_oa == NULL)
1835                                 GOTO(out, rc = -ENOMEM);
1836                         *saved_oa = *oinfo->oi_oa;
1837                 }
1838
1839                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1840                                       pages_per_brw, ppga, oinfo->oi_capa);
1841
1842                 if (rc != 0)
1843                         break;
1844
1845                 page_count -= pages_per_brw;
1846                 ppga += pages_per_brw;
1847         }
1848
1849 out:
1850         osc_release_ppga(orig, page_count_orig);
1851
1852         if (saved_oa != NULL)
1853                 OBDO_FREE(saved_oa);
1854
1855         RETURN(rc);
1856 }
1857
1858 static int brw_interpret(const struct lu_env *env,
1859                          struct ptlrpc_request *req, void *data, int rc)
1860 {
1861         struct osc_brw_async_args *aa = data;
1862         struct osc_async_page *oap, *tmp;
1863         struct client_obd *cli;
1864         ENTRY;
1865
1866         rc = osc_brw_fini_request(req, rc);
1867         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1868         /* When server return -EINPROGRESS, client should always retry
1869          * regardless of the number of times the bulk was resent already. */
1870         if (osc_recoverable_error(rc)) {
1871                 if (req->rq_import_generation !=
1872                     req->rq_import->imp_generation) {
1873                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1874                                ""LPU64":"LPU64", rc = %d.\n",
1875                                req->rq_import->imp_obd->obd_name,
1876                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1877                 } else if (rc == -EINPROGRESS ||
1878                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1879                         rc = osc_brw_redo_request(req, aa);
1880                 } else {
1881                         CERROR("%s: too many resent retries for object: "
1882                                ""LPU64":"LPU64", rc = %d.\n",
1883                                req->rq_import->imp_obd->obd_name,
1884                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1885                 }
1886
1887                 if (rc == 0)
1888                         RETURN(0);
1889                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1890                         rc = -EIO;
1891         }
1892
1893         if (aa->aa_ocapa) {
1894                 capa_put(aa->aa_ocapa);
1895                 aa->aa_ocapa = NULL;
1896         }
1897
1898         cli = aa->aa_cli;
1899         client_obd_list_lock(&cli->cl_loi_list_lock);
1900
1901         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1902          * is called so we know whether to go to sync BRWs or wait for more
1903          * RPCs to complete */
1904         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1905                 cli->cl_w_in_flight--;
1906         else
1907                 cli->cl_r_in_flight--;
1908
1909         /* the caller may re-use the oap after the completion call so
1910          * we need to clean it up a little */
1911         cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
1912                         oap_rpc_item) {
1913                 cfs_list_del_init(&oap->oap_rpc_item);
1914                 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1915         }
1916         OBDO_FREE(aa->aa_oa);
1917
1918         osc_wake_cache_waiters(cli);
1919         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1920         client_obd_list_unlock(&cli->cl_loi_list_lock);
1921
1922         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1923                           req->rq_bulk->bd_nob_transferred);
1924         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1925         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1926
1927         RETURN(rc);
1928 }
1929
1930 /* The most tricky part of this function is that it will return with
1931  * cli->cli_loi_list_lock held.
1932  */
1933 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1934                   cfs_list_t *rpc_list, int page_count, int cmd,
1935                   pdl_policy_t pol)
1936 {
1937         struct ptlrpc_request *req = NULL;
1938         struct brw_page **pga = NULL;
1939         struct osc_brw_async_args *aa = NULL;
1940         struct obdo *oa = NULL;
1941         struct osc_async_page *oap;
1942         struct osc_async_page *tmp;
1943         struct cl_req *clerq = NULL;
1944         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1945         struct ldlm_lock *lock = NULL;
1946         struct cl_req_attr crattr;
1947         int i, rc, mpflag = 0;
1948
1949         ENTRY;
1950         LASSERT(!cfs_list_empty(rpc_list));
1951
1952         if (cmd & OBD_BRW_MEMALLOC)
1953                 mpflag = cfs_memory_pressure_get_and_set();
1954
1955         memset(&crattr, 0, sizeof crattr);
1956         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1957         if (pga == NULL)
1958                 GOTO(out, rc = -ENOMEM);
1959
1960         OBDO_ALLOC(oa);
1961         if (oa == NULL)
1962                 GOTO(out, rc = -ENOMEM);
1963
1964         i = 0;
1965         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1966                 struct cl_page *page = osc_oap2cl_page(oap);
1967                 if (clerq == NULL) {
1968                         clerq = cl_req_alloc(env, page, crt,
1969                                              1 /* only 1-object rpcs for
1970                                                 * now */);
1971                         if (IS_ERR(clerq))
1972                                 GOTO(out, rc = PTR_ERR(clerq));
1973                         lock = oap->oap_ldlm_lock;
1974                 }
1975                 pga[i] = &oap->oap_brw_page;
1976                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1977                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1978                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1979                 i++;
1980                 cl_req_page_add(env, clerq, page);
1981         }
1982
1983         /* always get the data for the obdo for the rpc */
1984         LASSERT(clerq != NULL);
1985         crattr.cra_oa = oa;
1986         crattr.cra_capa = NULL;
1987         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
1988         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
1989         if (lock) {
1990                 oa->o_handle = lock->l_remote_handle;
1991                 oa->o_valid |= OBD_MD_FLHANDLE;
1992         }
1993
1994         rc = cl_req_prep(env, clerq);
1995         if (rc != 0) {
1996                 CERROR("cl_req_prep failed: %d\n", rc);
1997                 GOTO(out, rc);
1998         }
1999
2000         sort_brw_pages(pga, page_count);
2001         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2002                                   pga, &req, crattr.cra_capa, 1, 0);
2003         if (rc != 0) {
2004                 CERROR("prep_req failed: %d\n", rc);
2005                 GOTO(out, rc);
2006         }
2007
2008         req->rq_interpret_reply = brw_interpret;
2009         if (cmd & OBD_BRW_MEMALLOC)
2010                 req->rq_memalloc = 1;
2011
2012         /* Need to update the timestamps after the request is built in case
2013          * we race with setattr (locally or in queue at OST).  If OST gets
2014          * later setattr before earlier BRW (as determined by the request xid),
2015          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2016          * way to do this in a single call.  bug 10150 */
2017         cl_req_attr_set(env, clerq, &crattr,
2018                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2019
2020         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2021
2022         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2023         aa = ptlrpc_req_async_args(req);
2024         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2025         cfs_list_splice(rpc_list, &aa->aa_oaps);
2026         CFS_INIT_LIST_HEAD(rpc_list);
2027         aa->aa_clerq = clerq;
2028 out:
2029         if (cmd & OBD_BRW_MEMALLOC)
2030                 cfs_memory_pressure_restore(mpflag);
2031
2032         capa_put(crattr.cra_capa);
2033         if (rc != 0) {
2034                 LASSERT(req == NULL);
2035
2036                 if (oa)
2037                         OBDO_FREE(oa);
2038                 if (pga)
2039                         OBD_FREE(pga, sizeof(*pga) * page_count);
2040                 /* this should happen rarely and is pretty bad, it makes the
2041                  * pending list not follow the dirty order */
2042                 client_obd_list_lock(&cli->cl_loi_list_lock);
2043                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2044                         cfs_list_del_init(&oap->oap_rpc_item);
2045
2046                         /* queued sync pages can be torn down while the pages
2047                          * were between the pending list and the rpc */
2048                         if (oap->oap_interrupted) {
2049                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2050                                 osc_ap_completion(env, cli, NULL, oap, 0,
2051                                                   oap->oap_count);
2052                                 continue;
2053                         }
2054                         osc_ap_completion(env, cli, NULL, oap, 0, rc);
2055                 }
2056                 if (clerq && !IS_ERR(clerq))
2057                         cl_req_completion(env, clerq, rc);
2058         } else {
2059                 struct osc_async_page *tmp = NULL;
2060
2061                 /* queued sync pages can be torn down while the pages
2062                  * were between the pending list and the rpc */
2063                 LASSERT(aa != NULL);
2064                 client_obd_list_lock(&cli->cl_loi_list_lock);
2065                 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2066                         /* only one oap gets a request reference */
2067                         if (tmp == NULL)
2068                                 tmp = oap;
2069                         if (oap->oap_interrupted && !req->rq_intr) {
2070                                 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2071                                                 oap, req);
2072                                 ptlrpc_mark_interrupted(req);
2073                         }
2074                 }
2075                 if (tmp != NULL)
2076                         tmp->oap_request = ptlrpc_request_addref(req);
2077
2078                 DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
2079                           page_count, aa, cli->cl_r_in_flight,
2080                           cli->cl_w_in_flight);
2081
2082                 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2083                  * see which CPU/NUMA node the majority of pages were allocated
2084                  * on, and try to assign the async RPC to the CPU core
2085                  * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2086                  *
2087                  * But on the other hand, we expect that multiple ptlrpcd
2088                  * threads and the initial write sponsor can run in parallel,
2089                  * especially when data checksum is enabled, which is CPU-bound
2090                  * operation and single ptlrpcd thread cannot process in time.
2091                  * So more ptlrpcd threads sharing BRW load
2092                  * (with PDL_POLICY_ROUND) seems better.
2093                  */
2094                 ptlrpcd_add_req(req, pol, -1);
2095         }
2096         RETURN(rc);
2097 }
2098
2099 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2100                                         struct ldlm_enqueue_info *einfo)
2101 {
2102         void *data = einfo->ei_cbdata;
2103         int set = 0;
2104
2105         LASSERT(lock != NULL);
2106         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2107         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2108         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2109         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2110
2111         lock_res_and_lock(lock);
2112         cfs_spin_lock(&osc_ast_guard);
2113
2114         if (lock->l_ast_data == NULL)
2115                 lock->l_ast_data = data;
2116         if (lock->l_ast_data == data)
2117                 set = 1;
2118
2119         cfs_spin_unlock(&osc_ast_guard);
2120         unlock_res_and_lock(lock);
2121
2122         return set;
2123 }
2124
2125 static int osc_set_data_with_check(struct lustre_handle *lockh,
2126                                    struct ldlm_enqueue_info *einfo)
2127 {
2128         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2129         int set = 0;
2130
2131         if (lock != NULL) {
2132                 set = osc_set_lock_data_with_check(lock, einfo);
2133                 LDLM_LOCK_PUT(lock);
2134         } else
2135                 CERROR("lockh %p, data %p - client evicted?\n",
2136                        lockh, einfo->ei_cbdata);
2137         return set;
2138 }
2139
2140 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2141                              ldlm_iterator_t replace, void *data)
2142 {
2143         struct ldlm_res_id res_id;
2144         struct obd_device *obd = class_exp2obd(exp);
2145
2146         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2147         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2148         return 0;
2149 }
2150
2151 /* find any ldlm lock of the inode in osc
2152  * return 0    not find
2153  *        1    find one
2154  *      < 0    error */
2155 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2156                            ldlm_iterator_t replace, void *data)
2157 {
2158         struct ldlm_res_id res_id;
2159         struct obd_device *obd = class_exp2obd(exp);
2160         int rc = 0;
2161
2162         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2163         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2164         if (rc == LDLM_ITER_STOP)
2165                 return(1);
2166         if (rc == LDLM_ITER_CONTINUE)
2167                 return(0);
2168         return(rc);
2169 }
2170
2171 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2172                             obd_enqueue_update_f upcall, void *cookie,
2173                             int *flags, int agl, int rc)
2174 {
2175         int intent = *flags & LDLM_FL_HAS_INTENT;
2176         ENTRY;
2177
2178         if (intent) {
2179                 /* The request was created before ldlm_cli_enqueue call. */
2180                 if (rc == ELDLM_LOCK_ABORTED) {
2181                         struct ldlm_reply *rep;
2182                         rep = req_capsule_server_get(&req->rq_pill,
2183                                                      &RMF_DLM_REP);
2184
2185                         LASSERT(rep != NULL);
2186                         if (rep->lock_policy_res1)
2187                                 rc = rep->lock_policy_res1;
2188                 }
2189         }
2190
2191         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2192             (rc == 0)) {
2193                 *flags |= LDLM_FL_LVB_READY;
2194                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2195                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2196         }
2197
2198         /* Call the update callback. */
2199         rc = (*upcall)(cookie, rc);
2200         RETURN(rc);
2201 }
2202
2203 static int osc_enqueue_interpret(const struct lu_env *env,
2204                                  struct ptlrpc_request *req,
2205                                  struct osc_enqueue_args *aa, int rc)
2206 {
2207         struct ldlm_lock *lock;
2208         struct lustre_handle handle;
2209         __u32 mode;
2210         struct ost_lvb *lvb;
2211         __u32 lvb_len;
2212         int *flags = aa->oa_flags;
2213
2214         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2215          * might be freed anytime after lock upcall has been called. */
2216         lustre_handle_copy(&handle, aa->oa_lockh);
2217         mode = aa->oa_ei->ei_mode;
2218
2219         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2220          * be valid. */
2221         lock = ldlm_handle2lock(&handle);
2222
2223         /* Take an additional reference so that a blocking AST that
2224          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2225          * to arrive after an upcall has been executed by
2226          * osc_enqueue_fini(). */
2227         ldlm_lock_addref(&handle, mode);
2228
2229         /* Let CP AST to grant the lock first. */
2230         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2231
2232         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2233                 lvb = NULL;
2234                 lvb_len = 0;
2235         } else {
2236                 lvb = aa->oa_lvb;
2237                 lvb_len = sizeof(*aa->oa_lvb);
2238         }
2239
2240         /* Complete obtaining the lock procedure. */
2241         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2242                                    mode, flags, lvb, lvb_len, &handle, rc);
2243         /* Complete osc stuff. */
2244         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2245                               flags, aa->oa_agl, rc);
2246
2247         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2248
2249         /* Release the lock for async request. */
2250         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2251                 /*
2252                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2253                  * not already released by
2254                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2255                  */
2256                 ldlm_lock_decref(&handle, mode);
2257
2258         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2259                  aa->oa_lockh, req, aa);
2260         ldlm_lock_decref(&handle, mode);
2261         LDLM_LOCK_PUT(lock);
2262         return rc;
2263 }
2264
2265 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2266                         struct lov_oinfo *loi, int flags,
2267                         struct ost_lvb *lvb, __u32 mode, int rc)
2268 {
2269         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2270
2271         if (rc == ELDLM_OK) {
2272                 __u64 tmp;
2273
2274                 LASSERT(lock != NULL);
2275                 loi->loi_lvb = *lvb;
2276                 tmp = loi->loi_lvb.lvb_size;
2277                 /* Extend KMS up to the end of this lock and no further
2278                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2279                 if (tmp > lock->l_policy_data.l_extent.end)
2280                         tmp = lock->l_policy_data.l_extent.end + 1;
2281                 if (tmp >= loi->loi_kms) {
2282                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2283                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2284                         loi_kms_set(loi, tmp);
2285                 } else {
2286                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2287                                    LPU64"; leaving kms="LPU64", end="LPU64,
2288                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2289                                    lock->l_policy_data.l_extent.end);
2290                 }
2291                 ldlm_lock_allow_match(lock);
2292         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2293                 LASSERT(lock != NULL);
2294                 loi->loi_lvb = *lvb;
2295                 ldlm_lock_allow_match(lock);
2296                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2297                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2298                 rc = ELDLM_OK;
2299         }
2300
2301         if (lock != NULL) {
2302                 if (rc != ELDLM_OK)
2303                         ldlm_lock_fail_match(lock);
2304
2305                 LDLM_LOCK_PUT(lock);
2306         }
2307 }
2308 EXPORT_SYMBOL(osc_update_enqueue);
2309
2310 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2311
2312 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2313  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2314  * other synchronous requests, however keeping some locks and trying to obtain
2315  * others may take a considerable amount of time in a case of ost failure; and
2316  * when other sync requests do not get released lock from a client, the client
2317  * is excluded from the cluster -- such scenarious make the life difficult, so
2318  * release locks just after they are obtained. */
2319 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2320                      int *flags, ldlm_policy_data_t *policy,
2321                      struct ost_lvb *lvb, int kms_valid,
2322                      obd_enqueue_update_f upcall, void *cookie,
2323                      struct ldlm_enqueue_info *einfo,
2324                      struct lustre_handle *lockh,
2325                      struct ptlrpc_request_set *rqset, int async, int agl)
2326 {
2327         struct obd_device *obd = exp->exp_obd;
2328         struct ptlrpc_request *req = NULL;
2329         int intent = *flags & LDLM_FL_HAS_INTENT;
2330         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2331         ldlm_mode_t mode;
2332         int rc;
2333         ENTRY;
2334
2335         /* Filesystem lock extents are extended to page boundaries so that
2336          * dealing with the page cache is a little smoother.  */
2337         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2338         policy->l_extent.end |= ~CFS_PAGE_MASK;
2339
2340         /*
2341          * kms is not valid when either object is completely fresh (so that no
2342          * locks are cached), or object was evicted. In the latter case cached
2343          * lock cannot be used, because it would prime inode state with
2344          * potentially stale LVB.
2345          */
2346         if (!kms_valid)
2347                 goto no_match;
2348
2349         /* Next, search for already existing extent locks that will cover us */
2350         /* If we're trying to read, we also search for an existing PW lock.  The
2351          * VFS and page cache already protect us locally, so lots of readers/
2352          * writers can share a single PW lock.
2353          *
2354          * There are problems with conversion deadlocks, so instead of
2355          * converting a read lock to a write lock, we'll just enqueue a new
2356          * one.
2357          *
2358          * At some point we should cancel the read lock instead of making them
2359          * send us a blocking callback, but there are problems with canceling
2360          * locks out from other users right now, too. */
2361         mode = einfo->ei_mode;
2362         if (einfo->ei_mode == LCK_PR)
2363                 mode |= LCK_PW;
2364         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2365                                einfo->ei_type, policy, mode, lockh, 0);
2366         if (mode) {
2367                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2368
2369                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2370                         /* For AGL, if enqueue RPC is sent but the lock is not
2371                          * granted, then skip to process this strpe.
2372                          * Return -ECANCELED to tell the caller. */
2373                         ldlm_lock_decref(lockh, mode);
2374                         LDLM_LOCK_PUT(matched);
2375                         RETURN(-ECANCELED);
2376                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2377                         *flags |= LDLM_FL_LVB_READY;
2378                         /* addref the lock only if not async requests and PW
2379                          * lock is matched whereas we asked for PR. */
2380                         if (!rqset && einfo->ei_mode != mode)
2381                                 ldlm_lock_addref(lockh, LCK_PR);
2382                         if (intent) {
2383                                 /* I would like to be able to ASSERT here that
2384                                  * rss <= kms, but I can't, for reasons which
2385                                  * are explained in lov_enqueue() */
2386                         }
2387
2388                         /* We already have a lock, and it's referenced */
2389                         (*upcall)(cookie, ELDLM_OK);
2390
2391                         if (einfo->ei_mode != mode)
2392                                 ldlm_lock_decref(lockh, LCK_PW);
2393                         else if (rqset)
2394                                 /* For async requests, decref the lock. */
2395                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2396                         LDLM_LOCK_PUT(matched);
2397                         RETURN(ELDLM_OK);
2398                 } else {
2399                         ldlm_lock_decref(lockh, mode);
2400                         LDLM_LOCK_PUT(matched);
2401                 }
2402         }
2403
2404  no_match:
2405         if (intent) {
2406                 CFS_LIST_HEAD(cancels);
2407                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2408                                            &RQF_LDLM_ENQUEUE_LVB);
2409                 if (req == NULL)
2410                         RETURN(-ENOMEM);
2411
2412                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2413                 if (rc) {
2414                         ptlrpc_request_free(req);
2415                         RETURN(rc);
2416                 }
2417
2418                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2419                                      sizeof *lvb);
2420                 ptlrpc_request_set_replen(req);
2421         }
2422
2423         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2424         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2425
2426         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2427                               sizeof(*lvb), lockh, async);
2428         if (rqset) {
2429                 if (!rc) {
2430                         struct osc_enqueue_args *aa;
2431                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2432                         aa = ptlrpc_req_async_args(req);
2433                         aa->oa_ei = einfo;
2434                         aa->oa_exp = exp;
2435                         aa->oa_flags  = flags;
2436                         aa->oa_upcall = upcall;
2437                         aa->oa_cookie = cookie;
2438                         aa->oa_lvb    = lvb;
2439                         aa->oa_lockh  = lockh;
2440                         aa->oa_agl    = !!agl;
2441
2442                         req->rq_interpret_reply =
2443                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2444                         if (rqset == PTLRPCD_SET)
2445                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2446                         else
2447                                 ptlrpc_set_add_req(rqset, req);
2448                 } else if (intent) {
2449                         ptlrpc_req_finished(req);
2450                 }
2451                 RETURN(rc);
2452         }
2453
2454         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2455         if (intent)
2456                 ptlrpc_req_finished(req);
2457
2458         RETURN(rc);
2459 }
2460
2461 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2462                        struct ldlm_enqueue_info *einfo,
2463                        struct ptlrpc_request_set *rqset)
2464 {
2465         struct ldlm_res_id res_id;
2466         int rc;
2467         ENTRY;
2468
2469         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2470                            oinfo->oi_md->lsm_object_seq, &res_id);
2471
2472         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2473                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2474                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2475                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2476                               rqset, rqset != NULL, 0);
2477         RETURN(rc);
2478 }
2479
2480 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2481                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2482                    int *flags, void *data, struct lustre_handle *lockh,
2483                    int unref)
2484 {
2485         struct obd_device *obd = exp->exp_obd;
2486         int lflags = *flags;
2487         ldlm_mode_t rc;
2488         ENTRY;
2489
2490         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2491                 RETURN(-EIO);
2492
2493         /* Filesystem lock extents are extended to page boundaries so that
2494          * dealing with the page cache is a little smoother */
2495         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2496         policy->l_extent.end |= ~CFS_PAGE_MASK;
2497
2498         /* Next, search for already existing extent locks that will cover us */
2499         /* If we're trying to read, we also search for an existing PW lock.  The
2500          * VFS and page cache already protect us locally, so lots of readers/
2501          * writers can share a single PW lock. */
2502         rc = mode;
2503         if (mode == LCK_PR)
2504                 rc |= LCK_PW;
2505         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2506                              res_id, type, policy, rc, lockh, unref);
2507         if (rc) {
2508                 if (data != NULL) {
2509                         if (!osc_set_data_with_check(lockh, data)) {
2510                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2511                                         ldlm_lock_decref(lockh, rc);
2512                                 RETURN(0);
2513                         }
2514                 }
2515                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2516                         ldlm_lock_addref(lockh, LCK_PR);
2517                         ldlm_lock_decref(lockh, LCK_PW);
2518                 }
2519                 RETURN(rc);
2520         }
2521         RETURN(rc);
2522 }
2523
2524 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2525 {
2526         ENTRY;
2527
2528         if (unlikely(mode == LCK_GROUP))
2529                 ldlm_lock_decref_and_cancel(lockh, mode);
2530         else
2531                 ldlm_lock_decref(lockh, mode);
2532
2533         RETURN(0);
2534 }
2535
2536 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2537                       __u32 mode, struct lustre_handle *lockh)
2538 {
2539         ENTRY;
2540         RETURN(osc_cancel_base(lockh, mode));
2541 }
2542
2543 static int osc_cancel_unused(struct obd_export *exp,
2544                              struct lov_stripe_md *lsm,
2545                              ldlm_cancel_flags_t flags,
2546                              void *opaque)
2547 {
2548         struct obd_device *obd = class_exp2obd(exp);
2549         struct ldlm_res_id res_id, *resp = NULL;
2550
2551         if (lsm != NULL) {
2552                 resp = osc_build_res_name(lsm->lsm_object_id,
2553                                           lsm->lsm_object_seq, &res_id);
2554         }
2555
2556         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2557 }
2558
2559 static int osc_statfs_interpret(const struct lu_env *env,
2560                                 struct ptlrpc_request *req,
2561                                 struct osc_async_args *aa, int rc)
2562 {
2563         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2564         struct obd_statfs *msfs;
2565         __u64 used;
2566         ENTRY;
2567
2568         if (rc == -EBADR)
2569                 /* The request has in fact never been sent
2570                  * due to issues at a higher level (LOV).
2571                  * Exit immediately since the caller is
2572                  * aware of the problem and takes care
2573                  * of the clean up */
2574                  RETURN(rc);
2575
2576         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2577             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2578                 GOTO(out, rc = 0);
2579
2580         if (rc != 0)
2581                 GOTO(out, rc);
2582
2583         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2584         if (msfs == NULL) {
2585                 GOTO(out, rc = -EPROTO);
2586         }
2587
2588         /* Reinitialize the RDONLY and DEGRADED flags at the client
2589          * on each statfs, so they don't stay set permanently. */
2590         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2591
2592         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2593                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2594         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2595                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2596
2597         if (unlikely(msfs->os_state & OS_STATE_READONLY))
2598                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2599         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2600                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2601
2602         /* Add a bit of hysteresis so this flag isn't continually flapping,
2603          * and ensure that new files don't get extremely fragmented due to
2604          * only a small amount of available space in the filesystem.
2605          * We want to set the NOSPC flag when there is less than ~0.1% free
2606          * and clear it when there is at least ~0.2% free space, so:
2607          *                   avail < ~0.1% max          max = avail + used
2608          *            1025 * avail < avail + used       used = blocks - free
2609          *            1024 * avail < used
2610          *            1024 * avail < blocks - free
2611          *                   avail < ((blocks - free) >> 10)
2612          *
2613          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2614          * lose that amount of space so in those cases we report no space left
2615          * if their is less than 1 GB left.                             */
2616         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2617         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2618                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2619                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2620         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2621                           (msfs->os_ffree > 64) &&
2622                           (msfs->os_bavail > (used << 1)))) {
2623                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2624                                              OSCC_FLAG_NOSPC_BLK);
2625         }
2626
2627         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2628                      (msfs->os_bavail < used)))
2629                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2630
2631         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2632
2633         *aa->aa_oi->oi_osfs = *msfs;
2634 out:
2635         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2636         RETURN(rc);
2637 }
2638
2639 static int osc_statfs_async(struct obd_export *exp,
2640                             struct obd_info *oinfo, __u64 max_age,
2641                             struct ptlrpc_request_set *rqset)
2642 {
2643         struct obd_device     *obd = class_exp2obd(exp);
2644         struct ptlrpc_request *req;
2645         struct osc_async_args *aa;
2646         int                    rc;
2647         ENTRY;
2648
2649         /* We could possibly pass max_age in the request (as an absolute
2650          * timestamp or a "seconds.usec ago") so the target can avoid doing
2651          * extra calls into the filesystem if that isn't necessary (e.g.
2652          * during mount that would help a bit).  Having relative timestamps
2653          * is not so great if request processing is slow, while absolute
2654          * timestamps are not ideal because they need time synchronization. */
2655         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2656         if (req == NULL)
2657                 RETURN(-ENOMEM);
2658
2659         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2660         if (rc) {
2661                 ptlrpc_request_free(req);
2662                 RETURN(rc);
2663         }
2664         ptlrpc_request_set_replen(req);
2665         req->rq_request_portal = OST_CREATE_PORTAL;
2666         ptlrpc_at_set_req_timeout(req);
2667
2668         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2669                 /* procfs requests not want stat in wait for avoid deadlock */
2670                 req->rq_no_resend = 1;
2671                 req->rq_no_delay = 1;
2672         }
2673
2674         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2675         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2676         aa = ptlrpc_req_async_args(req);
2677         aa->aa_oi = oinfo;
2678
2679         ptlrpc_set_add_req(rqset, req);
2680         RETURN(0);
2681 }
2682
2683 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2684                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2685 {
2686         struct obd_device     *obd = class_exp2obd(exp);
2687         struct obd_statfs     *msfs;
2688         struct ptlrpc_request *req;
2689         struct obd_import     *imp = NULL;
2690         int rc;
2691         ENTRY;
2692
2693         /*Since the request might also come from lprocfs, so we need
2694          *sync this with client_disconnect_export Bug15684*/
2695         cfs_down_read(&obd->u.cli.cl_sem);
2696         if (obd->u.cli.cl_import)
2697                 imp = class_import_get(obd->u.cli.cl_import);
2698         cfs_up_read(&obd->u.cli.cl_sem);
2699         if (!imp)
2700                 RETURN(-ENODEV);
2701
2702         /* We could possibly pass max_age in the request (as an absolute
2703          * timestamp or a "seconds.usec ago") so the target can avoid doing
2704          * extra calls into the filesystem if that isn't necessary (e.g.
2705          * during mount that would help a bit).  Having relative timestamps
2706          * is not so great if request processing is slow, while absolute
2707          * timestamps are not ideal because they need time synchronization. */
2708         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2709
2710         class_import_put(imp);
2711
2712         if (req == NULL)
2713                 RETURN(-ENOMEM);
2714
2715         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2716         if (rc) {
2717                 ptlrpc_request_free(req);
2718                 RETURN(rc);
2719         }
2720         ptlrpc_request_set_replen(req);
2721         req->rq_request_portal = OST_CREATE_PORTAL;
2722         ptlrpc_at_set_req_timeout(req);
2723
2724         if (flags & OBD_STATFS_NODELAY) {
2725                 /* procfs requests not want stat in wait for avoid deadlock */
2726                 req->rq_no_resend = 1;
2727                 req->rq_no_delay = 1;
2728         }
2729
2730         rc = ptlrpc_queue_wait(req);
2731         if (rc)
2732                 GOTO(out, rc);
2733
2734         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2735         if (msfs == NULL) {
2736                 GOTO(out, rc = -EPROTO);
2737         }
2738
2739         *osfs = *msfs;
2740
2741         EXIT;
2742  out:
2743         ptlrpc_req_finished(req);
2744         return rc;
2745 }
2746
2747 /* Retrieve object striping information.
2748  *
2749  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2750  * the maximum number of OST indices which will fit in the user buffer.
2751  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2752  */
2753 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2754 {
2755         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2756         struct lov_user_md_v3 lum, *lumk;
2757         struct lov_user_ost_data_v1 *lmm_objects;
2758         int rc = 0, lum_size;
2759         ENTRY;
2760
2761         if (!lsm)
2762                 RETURN(-ENODATA);
2763
2764         /* we only need the header part from user space to get lmm_magic and
2765          * lmm_stripe_count, (the header part is common to v1 and v3) */
2766         lum_size = sizeof(struct lov_user_md_v1);
2767         if (cfs_copy_from_user(&lum, lump, lum_size))
2768                 RETURN(-EFAULT);
2769
2770         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2771             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2772                 RETURN(-EINVAL);
2773
2774         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2775         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2776         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2777         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2778
2779         /* we can use lov_mds_md_size() to compute lum_size
2780          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2781         if (lum.lmm_stripe_count > 0) {
2782                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2783                 OBD_ALLOC(lumk, lum_size);
2784                 if (!lumk)
2785                         RETURN(-ENOMEM);
2786
2787                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2788                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2789                 else
2790                         lmm_objects = &(lumk->lmm_objects[0]);
2791                 lmm_objects->l_object_id = lsm->lsm_object_id;
2792         } else {
2793                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2794                 lumk = &lum;
2795         }
2796
2797         lumk->lmm_object_id = lsm->lsm_object_id;
2798         lumk->lmm_object_seq = lsm->lsm_object_seq;
2799         lumk->lmm_stripe_count = 1;
2800
2801         if (cfs_copy_to_user(lump, lumk, lum_size))
2802                 rc = -EFAULT;
2803
2804         if (lumk != &lum)
2805                 OBD_FREE(lumk, lum_size);
2806
2807         RETURN(rc);
2808 }
2809
2810
2811 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2812                          void *karg, void *uarg)
2813 {
2814         struct obd_device *obd = exp->exp_obd;
2815         struct obd_ioctl_data *data = karg;
2816         int err = 0;
2817         ENTRY;
2818
2819         if (!cfs_try_module_get(THIS_MODULE)) {
2820                 CERROR("Can't get module. Is it alive?");
2821                 return -EINVAL;
2822         }
2823         switch (cmd) {
2824         case OBD_IOC_LOV_GET_CONFIG: {
2825                 char *buf;
2826                 struct lov_desc *desc;
2827                 struct obd_uuid uuid;
2828
2829                 buf = NULL;
2830                 len = 0;
2831                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2832                         GOTO(out, err = -EINVAL);
2833
2834                 data = (struct obd_ioctl_data *)buf;
2835
2836                 if (sizeof(*desc) > data->ioc_inllen1) {
2837                         obd_ioctl_freedata(buf, len);
2838                         GOTO(out, err = -EINVAL);
2839                 }
2840
2841                 if (data->ioc_inllen2 < sizeof(uuid)) {
2842                         obd_ioctl_freedata(buf, len);
2843                         GOTO(out, err = -EINVAL);
2844                 }
2845
2846                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2847                 desc->ld_tgt_count = 1;
2848                 desc->ld_active_tgt_count = 1;
2849                 desc->ld_default_stripe_count = 1;
2850                 desc->ld_default_stripe_size = 0;
2851                 desc->ld_default_stripe_offset = 0;
2852                 desc->ld_pattern = 0;
2853                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2854
2855                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2856
2857                 err = cfs_copy_to_user((void *)uarg, buf, len);
2858                 if (err)
2859                         err = -EFAULT;
2860                 obd_ioctl_freedata(buf, len);
2861                 GOTO(out, err);
2862         }
2863         case LL_IOC_LOV_SETSTRIPE:
2864                 err = obd_alloc_memmd(exp, karg);
2865                 if (err > 0)
2866                         err = 0;
2867                 GOTO(out, err);
2868         case LL_IOC_LOV_GETSTRIPE:
2869                 err = osc_getstripe(karg, uarg);
2870                 GOTO(out, err);
2871         case OBD_IOC_CLIENT_RECOVER:
2872                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2873                                             data->ioc_inlbuf1, 0);
2874                 if (err > 0)
2875                         err = 0;
2876                 GOTO(out, err);
2877         case IOC_OSC_SET_ACTIVE:
2878                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2879                                                data->ioc_offset);
2880                 GOTO(out, err);
2881         case OBD_IOC_POLL_QUOTACHECK:
2882                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2883                 GOTO(out, err);
2884         case OBD_IOC_PING_TARGET:
2885                 err = ptlrpc_obd_ping(obd);
2886                 GOTO(out, err);
2887         default:
2888                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2889                        cmd, cfs_curproc_comm());
2890                 GOTO(out, err = -ENOTTY);
2891         }
2892 out:
2893         cfs_module_put(THIS_MODULE);
2894         return err;
2895 }
2896
2897 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2898                         obd_count keylen, void *key, __u32 *vallen, void *val,
2899                         struct lov_stripe_md *lsm)
2900 {
2901         ENTRY;
2902         if (!vallen || !val)
2903                 RETURN(-EFAULT);
2904
2905         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2906                 __u32 *stripe = val;
2907                 *vallen = sizeof(*stripe);
2908                 *stripe = 0;
2909                 RETURN(0);
2910         } else if (KEY_IS(KEY_LAST_ID)) {
2911                 struct ptlrpc_request *req;
2912                 obd_id                *reply;
2913                 char                  *tmp;
2914                 int                    rc;
2915
2916                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2917                                            &RQF_OST_GET_INFO_LAST_ID);
2918                 if (req == NULL)
2919                         RETURN(-ENOMEM);
2920
2921                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2922                                      RCL_CLIENT, keylen);
2923                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2924                 if (rc) {
2925                         ptlrpc_request_free(req);
2926                         RETURN(rc);
2927                 }
2928
2929                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2930                 memcpy(tmp, key, keylen);
2931
2932                 req->rq_no_delay = req->rq_no_resend = 1;
2933                 ptlrpc_request_set_replen(req);
2934                 rc = ptlrpc_queue_wait(req);
2935                 if (rc)
2936                         GOTO(out, rc);
2937
2938                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2939                 if (reply == NULL)
2940                         GOTO(out, rc = -EPROTO);
2941
2942                 *((obd_id *)val) = *reply;
2943         out:
2944                 ptlrpc_req_finished(req);
2945                 RETURN(rc);
2946         } else if (KEY_IS(KEY_FIEMAP)) {
2947                 struct ptlrpc_request *req;
2948                 struct ll_user_fiemap *reply;
2949                 char *tmp;
2950                 int rc;
2951
2952                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2953                                            &RQF_OST_GET_INFO_FIEMAP);
2954                 if (req == NULL)
2955                         RETURN(-ENOMEM);
2956
2957                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2958                                      RCL_CLIENT, keylen);
2959                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2960                                      RCL_CLIENT, *vallen);
2961                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2962                                      RCL_SERVER, *vallen);
2963
2964                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2965                 if (rc) {
2966                         ptlrpc_request_free(req);
2967                         RETURN(rc);
2968                 }
2969
2970                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2971                 memcpy(tmp, key, keylen);
2972                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2973                 memcpy(tmp, val, *vallen);
2974
2975                 ptlrpc_request_set_replen(req);
2976                 rc = ptlrpc_queue_wait(req);
2977                 if (rc)
2978                         GOTO(out1, rc);
2979
2980                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2981                 if (reply == NULL)
2982                         GOTO(out1, rc = -EPROTO);
2983
2984                 memcpy(val, reply, *vallen);
2985         out1:
2986                 ptlrpc_req_finished(req);
2987
2988                 RETURN(rc);
2989         }
2990
2991         RETURN(-EINVAL);
2992 }
2993
2994 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
2995 {
2996         struct llog_ctxt *ctxt;
2997         int rc = 0;
2998         ENTRY;
2999
3000         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3001         if (ctxt) {
3002                 rc = llog_initiator_connect(ctxt);
3003                 llog_ctxt_put(ctxt);
3004         } else {
3005                 /* XXX return an error? skip setting below flags? */
3006         }
3007
3008         cfs_spin_lock(&imp->imp_lock);
3009         imp->imp_server_timeout = 1;
3010         imp->imp_pingable = 1;
3011         cfs_spin_unlock(&imp->imp_lock);
3012         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3013
3014         RETURN(rc);
3015 }
3016
3017 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3018                                           struct ptlrpc_request *req,
3019                                           void *aa, int rc)
3020 {
3021         ENTRY;
3022         if (rc != 0)
3023                 RETURN(rc);
3024
3025         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3026 }
3027
3028 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3029                               obd_count keylen, void *key, obd_count vallen,
3030                               void *val, struct ptlrpc_request_set *set)
3031 {
3032         struct ptlrpc_request *req;
3033         struct obd_device     *obd = exp->exp_obd;
3034         struct obd_import     *imp = class_exp2cliimp(exp);
3035         char                  *tmp;
3036         int                    rc;
3037         ENTRY;
3038
3039         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3040
3041         if (KEY_IS(KEY_NEXT_ID)) {
3042                 obd_id new_val;
3043                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3044
3045                 if (vallen != sizeof(obd_id))
3046                         RETURN(-ERANGE);
3047                 if (val == NULL)
3048                         RETURN(-EINVAL);
3049
3050                 if (vallen != sizeof(obd_id))
3051                         RETURN(-EINVAL);
3052
3053                 /* avoid race between allocate new object and set next id
3054                  * from ll_sync thread */
3055                 cfs_spin_lock(&oscc->oscc_lock);
3056                 new_val = *((obd_id*)val) + 1;
3057                 if (new_val > oscc->oscc_next_id)
3058                         oscc->oscc_next_id = new_val;
3059                 cfs_spin_unlock(&oscc->oscc_lock);
3060                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3061                        exp->exp_obd->obd_name,
3062                        obd->u.cli.cl_oscc.oscc_next_id);
3063
3064                 RETURN(0);
3065         }
3066
3067         if (KEY_IS(KEY_CHECKSUM)) {
3068                 if (vallen != sizeof(int))
3069                         RETURN(-EINVAL);
3070                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3071                 RETURN(0);
3072         }
3073
3074         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3075                 sptlrpc_conf_client_adapt(obd);
3076                 RETURN(0);
3077         }
3078
3079         if (KEY_IS(KEY_FLUSH_CTX)) {
3080                 sptlrpc_import_flush_my_ctx(imp);
3081                 RETURN(0);
3082         }
3083
3084         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3085                 RETURN(-EINVAL);
3086
3087         /* We pass all other commands directly to OST. Since nobody calls osc
3088            methods directly and everybody is supposed to go through LOV, we
3089            assume lov checked invalid values for us.
3090            The only recognised values so far are evict_by_nid and mds_conn.
3091            Even if something bad goes through, we'd get a -EINVAL from OST
3092            anyway. */
3093
3094         if (KEY_IS(KEY_GRANT_SHRINK))
3095                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3096         else
3097                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3098
3099         if (req == NULL)
3100                 RETURN(-ENOMEM);
3101
3102         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3103                              RCL_CLIENT, keylen);
3104         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3105                              RCL_CLIENT, vallen);
3106         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3107         if (rc) {
3108                 ptlrpc_request_free(req);
3109                 RETURN(rc);
3110         }
3111
3112         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3113         memcpy(tmp, key, keylen);
3114         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3115         memcpy(tmp, val, vallen);
3116
3117         if (KEY_IS(KEY_MDS_CONN)) {
3118                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3119
3120                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3121                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3122                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3123                 req->rq_no_delay = req->rq_no_resend = 1;
3124                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3125         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3126                 struct osc_grant_args *aa;
3127                 struct obdo *oa;
3128
3129                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3130                 aa = ptlrpc_req_async_args(req);
3131                 OBDO_ALLOC(oa);
3132                 if (!oa) {
3133                         ptlrpc_req_finished(req);
3134                         RETURN(-ENOMEM);
3135                 }
3136                 *oa = ((struct ost_body *)val)->oa;
3137                 aa->aa_oa = oa;
3138                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3139         }
3140
3141         ptlrpc_request_set_replen(req);
3142         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3143                 LASSERT(set != NULL);
3144                 ptlrpc_set_add_req(set, req);
3145                 ptlrpc_check_set(NULL, set);
3146         } else
3147                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3148
3149         RETURN(0);
3150 }
3151
3152
3153 static struct llog_operations osc_size_repl_logops = {
3154         lop_cancel: llog_obd_repl_cancel
3155 };
3156
3157 static struct llog_operations osc_mds_ost_orig_logops;
3158
3159 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3160                            struct obd_device *tgt, struct llog_catid *catid)
3161 {
3162         int rc;
3163         ENTRY;
3164
3165         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3166                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3167         if (rc) {
3168                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3169                 GOTO(out, rc);
3170         }
3171
3172         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3173                         NULL, &osc_size_repl_logops);
3174         if (rc) {
3175                 struct llog_ctxt *ctxt =
3176                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3177                 if (ctxt)
3178                         llog_cleanup(ctxt);
3179                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3180         }