Whamcloud - gitweb
LU-1683 agl: increase lock cll_holds for AGL upcall
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
667         if (res == NULL)
668                 RETURN(0);
669
670         LDLM_RESOURCE_ADDREF(res);
671         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672                                            lock_flags, 0, NULL);
673         LDLM_RESOURCE_DELREF(res);
674         ldlm_resource_putref(res);
675         RETURN(count);
676 }
677
678 static int osc_destroy_interpret(const struct lu_env *env,
679                                  struct ptlrpc_request *req, void *data,
680                                  int rc)
681 {
682         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
683
684         cfs_atomic_dec(&cli->cl_destroy_in_flight);
685         cfs_waitq_signal(&cli->cl_destroy_waitq);
686         return 0;
687 }
688
689 static int osc_can_send_destroy(struct client_obd *cli)
690 {
691         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692             cli->cl_max_rpcs_in_flight) {
693                 /* The destroy request can be sent */
694                 return 1;
695         }
696         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697             cli->cl_max_rpcs_in_flight) {
698                 /*
699                  * The counter has been modified between the two atomic
700                  * operations.
701                  */
702                 cfs_waitq_signal(&cli->cl_destroy_waitq);
703         }
704         return 0;
705 }
706
707 /* Destroy requests can be async always on the client, and we don't even really
708  * care about the return code since the client cannot do anything at all about
709  * a destroy failure.
710  * When the MDS is unlinking a filename, it saves the file objects into a
711  * recovery llog, and these object records are cancelled when the OST reports
712  * they were destroyed and sync'd to disk (i.e. transaction committed).
713  * If the client dies, or the OST is down when the object should be destroyed,
714  * the records are not cancelled, and when the OST reconnects to the MDS next,
715  * it will retrieve the llog unlink logs and then sends the log cancellation
716  * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718                        struct obdo *oa, struct lov_stripe_md *ea,
719                        struct obd_trans_info *oti, struct obd_export *md_export,
720                        void *capa)
721 {
722         struct client_obd     *cli = &exp->exp_obd->u.cli;
723         struct ptlrpc_request *req;
724         struct ost_body       *body;
725         CFS_LIST_HEAD(cancels);
726         int rc, count;
727         ENTRY;
728
729         if (!oa) {
730                 CDEBUG(D_INFO, "oa NULL\n");
731                 RETURN(-EINVAL);
732         }
733
734         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735                                         LDLM_FL_DISCARD_DATA);
736
737         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
738         if (req == NULL) {
739                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
740                 RETURN(-ENOMEM);
741         }
742
743         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
745                                0, &cancels, count);
746         if (rc) {
747                 ptlrpc_request_free(req);
748                 RETURN(rc);
749         }
750
751         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752         ptlrpc_at_set_req_timeout(req);
753
754         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755                 oa->o_lcookie = *oti->oti_logcookies;
756         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
757         LASSERT(body);
758         lustre_set_wire_obdo(&body->oa, oa);
759
760         osc_pack_capa(req, body, (struct obd_capa *)capa);
761         ptlrpc_request_set_replen(req);
762
763         /* If osc_destory is for destroying the unlink orphan,
764          * sent from MDT to OST, which should not be blocked here,
765          * because the process might be triggered by ptlrpcd, and
766          * it is not good to block ptlrpcd thread (b=16006)*/
767         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
768                 req->rq_interpret_reply = osc_destroy_interpret;
769                 if (!osc_can_send_destroy(cli)) {
770                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
771                                                           NULL);
772
773                         /*
774                          * Wait until the number of on-going destroy RPCs drops
775                          * under max_rpc_in_flight
776                          */
777                         l_wait_event_exclusive(cli->cl_destroy_waitq,
778                                                osc_can_send_destroy(cli), &lwi);
779                 }
780         }
781
782         /* Do not wait for response */
783         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
784         RETURN(0);
785 }
786
787 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
788                                 long writing_bytes)
789 {
790         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
791
792         LASSERT(!(oa->o_valid & bits));
793
794         oa->o_valid |= bits;
795         client_obd_list_lock(&cli->cl_loi_list_lock);
796         oa->o_dirty = cli->cl_dirty;
797         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
798                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
799                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
800                 oa->o_undirty = 0;
801         } else if (cfs_atomic_read(&obd_dirty_pages) -
802                    cfs_atomic_read(&obd_dirty_transit_pages) >
803                    obd_max_dirty_pages + 1){
804                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
805                  * not covered by a lock thus they may safely race and trip
806                  * this CERROR() unless we add in a small fudge factor (+1). */
807                 CERROR("dirty %d - %d > system dirty_max %d\n",
808                        cfs_atomic_read(&obd_dirty_pages),
809                        cfs_atomic_read(&obd_dirty_transit_pages),
810                        obd_max_dirty_pages);
811                 oa->o_undirty = 0;
812         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
813                 CERROR("dirty %lu - dirty_max %lu too big???\n",
814                        cli->cl_dirty, cli->cl_dirty_max);
815                 oa->o_undirty = 0;
816         } else {
817                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
818                                 (cli->cl_max_rpcs_in_flight + 1);
819                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
820         }
821         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
822         oa->o_dropped = cli->cl_lost_grant;
823         cli->cl_lost_grant = 0;
824         client_obd_list_unlock(&cli->cl_loi_list_lock);
825         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
826                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
827
828 }
829
830 void osc_update_next_shrink(struct client_obd *cli)
831 {
832         cli->cl_next_shrink_grant =
833                 cfs_time_shift(cli->cl_grant_shrink_interval);
834         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
835                cli->cl_next_shrink_grant);
836 }
837
838 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
839 {
840         client_obd_list_lock(&cli->cl_loi_list_lock);
841         cli->cl_avail_grant += grant;
842         client_obd_list_unlock(&cli->cl_loi_list_lock);
843 }
844
845 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
846 {
847         if (body->oa.o_valid & OBD_MD_FLGRANT) {
848                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
849                 __osc_update_grant(cli, body->oa.o_grant);
850         }
851 }
852
853 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
854                               obd_count keylen, void *key, obd_count vallen,
855                               void *val, struct ptlrpc_request_set *set);
856
857 static int osc_shrink_grant_interpret(const struct lu_env *env,
858                                       struct ptlrpc_request *req,
859                                       void *aa, int rc)
860 {
861         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
862         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
863         struct ost_body *body;
864
865         if (rc != 0) {
866                 __osc_update_grant(cli, oa->o_grant);
867                 GOTO(out, rc);
868         }
869
870         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
871         LASSERT(body);
872         osc_update_grant(cli, body);
873 out:
874         OBDO_FREE(oa);
875         return rc;
876 }
877
878 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
879 {
880         client_obd_list_lock(&cli->cl_loi_list_lock);
881         oa->o_grant = cli->cl_avail_grant / 4;
882         cli->cl_avail_grant -= oa->o_grant;
883         client_obd_list_unlock(&cli->cl_loi_list_lock);
884         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
885                 oa->o_valid |= OBD_MD_FLFLAGS;
886                 oa->o_flags = 0;
887         }
888         oa->o_flags |= OBD_FL_SHRINK_GRANT;
889         osc_update_next_shrink(cli);
890 }
891
892 /* Shrink the current grant, either from some large amount to enough for a
893  * full set of in-flight RPCs, or if we have already shrunk to that limit
894  * then to enough for a single RPC.  This avoids keeping more grant than
895  * needed, and avoids shrinking the grant piecemeal. */
896 static int osc_shrink_grant(struct client_obd *cli)
897 {
898         long target = (cli->cl_max_rpcs_in_flight + 1) *
899                       cli->cl_max_pages_per_rpc;
900
901         client_obd_list_lock(&cli->cl_loi_list_lock);
902         if (cli->cl_avail_grant <= target)
903                 target = cli->cl_max_pages_per_rpc;
904         client_obd_list_unlock(&cli->cl_loi_list_lock);
905
906         return osc_shrink_grant_to_target(cli, target);
907 }
908
909 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
910 {
911         int    rc = 0;
912         struct ost_body     *body;
913         ENTRY;
914
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         /* Don't shrink if we are already above or below the desired limit
917          * We don't want to shrink below a single RPC, as that will negatively
918          * impact block allocation and long-term performance. */
919         if (target < cli->cl_max_pages_per_rpc)
920                 target = cli->cl_max_pages_per_rpc;
921
922         if (target >= cli->cl_avail_grant) {
923                 client_obd_list_unlock(&cli->cl_loi_list_lock);
924                 RETURN(0);
925         }
926         client_obd_list_unlock(&cli->cl_loi_list_lock);
927
928         OBD_ALLOC_PTR(body);
929         if (!body)
930                 RETURN(-ENOMEM);
931
932         osc_announce_cached(cli, &body->oa, 0);
933
934         client_obd_list_lock(&cli->cl_loi_list_lock);
935         body->oa.o_grant = cli->cl_avail_grant - target;
936         cli->cl_avail_grant = target;
937         client_obd_list_unlock(&cli->cl_loi_list_lock);
938         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
939                 body->oa.o_valid |= OBD_MD_FLFLAGS;
940                 body->oa.o_flags = 0;
941         }
942         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
943         osc_update_next_shrink(cli);
944
945         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
946                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
947                                 sizeof(*body), body, NULL);
948         if (rc != 0)
949                 __osc_update_grant(cli, body->oa.o_grant);
950         OBD_FREE_PTR(body);
951         RETURN(rc);
952 }
953
954 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
955 static int osc_should_shrink_grant(struct client_obd *client)
956 {
957         cfs_time_t time = cfs_time_current();
958         cfs_time_t next_shrink = client->cl_next_shrink_grant;
959
960         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
961              OBD_CONNECT_GRANT_SHRINK) == 0)
962                 return 0;
963
964         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
965                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
966                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
967                         return 1;
968                 else
969                         osc_update_next_shrink(client);
970         }
971         return 0;
972 }
973
974 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
975 {
976         struct client_obd *client;
977
978         cfs_list_for_each_entry(client, &item->ti_obd_list,
979                                 cl_grant_shrink_list) {
980                 if (osc_should_shrink_grant(client))
981                         osc_shrink_grant(client);
982         }
983         return 0;
984 }
985
986 static int osc_add_shrink_grant(struct client_obd *client)
987 {
988         int rc;
989
990         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
991                                        TIMEOUT_GRANT,
992                                        osc_grant_shrink_grant_cb, NULL,
993                                        &client->cl_grant_shrink_list);
994         if (rc) {
995                 CERROR("add grant client %s error %d\n",
996                         client->cl_import->imp_obd->obd_name, rc);
997                 return rc;
998         }
999         CDEBUG(D_CACHE, "add grant client %s \n",
1000                client->cl_import->imp_obd->obd_name);
1001         osc_update_next_shrink(client);
1002         return 0;
1003 }
1004
1005 static int osc_del_shrink_grant(struct client_obd *client)
1006 {
1007         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1008                                          TIMEOUT_GRANT);
1009 }
1010
1011 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1012 {
1013         /*
1014          * ocd_grant is the total grant amount we're expect to hold: if we've
1015          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1016          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1017          *
1018          * race is tolerable here: if we're evicted, but imp_state already
1019          * left EVICTED state, then cl_dirty must be 0 already.
1020          */
1021         client_obd_list_lock(&cli->cl_loi_list_lock);
1022         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1023                 cli->cl_avail_grant = ocd->ocd_grant;
1024         else
1025                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1026
1027         if (cli->cl_avail_grant < 0) {
1028                 CWARN("%s: available grant < 0, the OSS is probably not running"
1029                       " with patch from bug20278 (%ld) \n",
1030                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1031                 /* workaround for 1.6 servers which do not have
1032                  * the patch from bug20278 */
1033                 cli->cl_avail_grant = ocd->ocd_grant;
1034         }
1035
1036         /* determine the appropriate chunk size used by osc_extent. */
1037         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1038         client_obd_list_unlock(&cli->cl_loi_list_lock);
1039
1040         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1041                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1042                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1043
1044         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1045             cfs_list_empty(&cli->cl_grant_shrink_list))
1046                 osc_add_shrink_grant(cli);
1047 }
1048
1049 /* We assume that the reason this OSC got a short read is because it read
1050  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1051  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1052  * this stripe never got written at or beyond this stripe offset yet. */
1053 static void handle_short_read(int nob_read, obd_count page_count,
1054                               struct brw_page **pga)
1055 {
1056         char *ptr;
1057         int i = 0;
1058
1059         /* skip bytes read OK */
1060         while (nob_read > 0) {
1061                 LASSERT (page_count > 0);
1062
1063                 if (pga[i]->count > nob_read) {
1064                         /* EOF inside this page */
1065                         ptr = cfs_kmap(pga[i]->pg) +
1066                                 (pga[i]->off & ~CFS_PAGE_MASK);
1067                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1068                         cfs_kunmap(pga[i]->pg);
1069                         page_count--;
1070                         i++;
1071                         break;
1072                 }
1073
1074                 nob_read -= pga[i]->count;
1075                 page_count--;
1076                 i++;
1077         }
1078
1079         /* zero remaining pages */
1080         while (page_count-- > 0) {
1081                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1082                 memset(ptr, 0, pga[i]->count);
1083                 cfs_kunmap(pga[i]->pg);
1084                 i++;
1085         }
1086 }
1087
1088 static int check_write_rcs(struct ptlrpc_request *req,
1089                            int requested_nob, int niocount,
1090                            obd_count page_count, struct brw_page **pga)
1091 {
1092         int     i;
1093         __u32   *remote_rcs;
1094
1095         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1096                                                   sizeof(*remote_rcs) *
1097                                                   niocount);
1098         if (remote_rcs == NULL) {
1099                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1100                 return(-EPROTO);
1101         }
1102
1103         /* return error if any niobuf was in error */
1104         for (i = 0; i < niocount; i++) {
1105                 if ((int)remote_rcs[i] < 0)
1106                         return(remote_rcs[i]);
1107
1108                 if (remote_rcs[i] != 0) {
1109                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1110                                 i, remote_rcs[i], req);
1111                         return(-EPROTO);
1112                 }
1113         }
1114
1115         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1116                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1117                        req->rq_bulk->bd_nob_transferred, requested_nob);
1118                 return(-EPROTO);
1119         }
1120
1121         return (0);
1122 }
1123
1124 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1125 {
1126         if (p1->flag != p2->flag) {
1127                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1128                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1129
1130                 /* warn if we try to combine flags that we don't know to be
1131                  * safe to combine */
1132                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1133                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1134                               "report this at http://bugs.whamcloud.com/\n",
1135                               p1->flag, p2->flag);
1136                 }
1137                 return 0;
1138         }
1139
1140         return (p1->off + p1->count == p2->off);
1141 }
1142
1143 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1144                                    struct brw_page **pga, int opc,
1145                                    cksum_type_t cksum_type)
1146 {
1147         __u32                           cksum;
1148         int                             i = 0;
1149         struct cfs_crypto_hash_desc     *hdesc;
1150         unsigned int                    bufsize;
1151         int                             err;
1152         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1153
1154         LASSERT(pg_count > 0);
1155
1156         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1157         if (IS_ERR(hdesc)) {
1158                 CERROR("Unable to initialize checksum hash %s\n",
1159                        cfs_crypto_hash_name(cfs_alg));
1160                 return PTR_ERR(hdesc);
1161         }
1162
1163         while (nob > 0 && pg_count > 0) {
1164                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1165
1166                 /* corrupt the data before we compute the checksum, to
1167                  * simulate an OST->client data error */
1168                 if (i == 0 && opc == OST_READ &&
1169                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1170                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1171                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1172                         memcpy(ptr + off, "bad1", min(4, nob));
1173                         cfs_kunmap(pga[i]->pg);
1174                 }
1175                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1176                                   pga[i]->off & ~CFS_PAGE_MASK,
1177                                   count);
1178                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1179                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1180
1181                 nob -= pga[i]->count;
1182                 pg_count--;
1183                 i++;
1184         }
1185
1186         bufsize = 4;
1187         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1188
1189         if (err)
1190                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1191
1192         /* For sending we only compute the wrong checksum instead
1193          * of corrupting the data so it is still correct on a redo */
1194         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1195                 cksum++;
1196
1197         return cksum;
1198 }
1199
1200 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1201                                 struct lov_stripe_md *lsm, obd_count page_count,
1202                                 struct brw_page **pga,
1203                                 struct ptlrpc_request **reqp,
1204                                 struct obd_capa *ocapa, int reserve,
1205                                 int resend)
1206 {
1207         struct ptlrpc_request   *req;
1208         struct ptlrpc_bulk_desc *desc;
1209         struct ost_body         *body;
1210         struct obd_ioobj        *ioobj;
1211         struct niobuf_remote    *niobuf;
1212         int niocount, i, requested_nob, opc, rc;
1213         struct osc_brw_async_args *aa;
1214         struct req_capsule      *pill;
1215         struct brw_page *pg_prev;
1216
1217         ENTRY;
1218         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1219                 RETURN(-ENOMEM); /* Recoverable */
1220         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1221                 RETURN(-EINVAL); /* Fatal */
1222
1223         if ((cmd & OBD_BRW_WRITE) != 0) {
1224                 opc = OST_WRITE;
1225                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1226                                                 cli->cl_import->imp_rq_pool,
1227                                                 &RQF_OST_BRW_WRITE);
1228         } else {
1229                 opc = OST_READ;
1230                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1231         }
1232         if (req == NULL)
1233                 RETURN(-ENOMEM);
1234
1235         for (niocount = i = 1; i < page_count; i++) {
1236                 if (!can_merge_pages(pga[i - 1], pga[i]))
1237                         niocount++;
1238         }
1239
1240         pill = &req->rq_pill;
1241         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1242                              sizeof(*ioobj));
1243         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1244                              niocount * sizeof(*niobuf));
1245         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1246
1247         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1248         if (rc) {
1249                 ptlrpc_request_free(req);
1250                 RETURN(rc);
1251         }
1252         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1253         ptlrpc_at_set_req_timeout(req);
1254         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1255          * retry logic */
1256         req->rq_no_retry_einprogress = 1;
1257
1258         if (opc == OST_WRITE)
1259                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1260                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1261         else
1262                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1263                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1264
1265         if (desc == NULL)
1266                 GOTO(out, rc = -ENOMEM);
1267         /* NB request now owns desc and will free it when it gets freed */
1268
1269         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1270         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1271         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1272         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1273
1274         lustre_set_wire_obdo(&body->oa, oa);
1275
1276         obdo_to_ioobj(oa, ioobj);
1277         ioobj->ioo_bufcnt = niocount;
1278         osc_pack_capa(req, body, ocapa);
1279         LASSERT (page_count > 0);
1280         pg_prev = pga[0];
1281         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1282                 struct brw_page *pg = pga[i];
1283                 int poff = pg->off & ~CFS_PAGE_MASK;
1284
1285                 LASSERT(pg->count > 0);
1286                 /* make sure there is no gap in the middle of page array */
1287                 LASSERTF(page_count == 1 ||
1288                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1289                           ergo(i > 0 && i < page_count - 1,
1290                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1291                           ergo(i == page_count - 1, poff == 0)),
1292                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1293                          i, page_count, pg, pg->off, pg->count);
1294 #ifdef __linux__
1295                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1296                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1297                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1298                          i, page_count,
1299                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1300                          pg_prev->pg, page_private(pg_prev->pg),
1301                          pg_prev->pg->index, pg_prev->off);
1302 #else
1303                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1304                          "i %d p_c %u\n", i, page_count);
1305 #endif
1306                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1307                         (pg->flag & OBD_BRW_SRVLOCK));
1308
1309                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1310                 requested_nob += pg->count;
1311
1312                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1313                         niobuf--;
1314                         niobuf->len += pg->count;
1315                 } else {
1316                         niobuf->offset = pg->off;
1317                         niobuf->len    = pg->count;
1318                         niobuf->flags  = pg->flag;
1319                 }
1320                 pg_prev = pg;
1321         }
1322
1323         LASSERTF((void *)(niobuf - niocount) ==
1324                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1325                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1326                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1327
1328         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1329         if (resend) {
1330                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1331                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1332                         body->oa.o_flags = 0;
1333                 }
1334                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1335         }
1336
1337         if (osc_should_shrink_grant(cli))
1338                 osc_shrink_grant_local(cli, &body->oa);
1339
1340         /* size[REQ_REC_OFF] still sizeof (*body) */
1341         if (opc == OST_WRITE) {
1342                 if (cli->cl_checksum &&
1343                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1344                         /* store cl_cksum_type in a local variable since
1345                          * it can be changed via lprocfs */
1346                         cksum_type_t cksum_type = cli->cl_cksum_type;
1347
1348                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1349                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1350                                 body->oa.o_flags = 0;
1351                         }
1352                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1353                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1354                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1355                                                              page_count, pga,
1356                                                              OST_WRITE,
1357                                                              cksum_type);
1358                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1359                                body->oa.o_cksum);
1360                         /* save this in 'oa', too, for later checking */
1361                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1362                         oa->o_flags |= cksum_type_pack(cksum_type);
1363                 } else {
1364                         /* clear out the checksum flag, in case this is a
1365                          * resend but cl_checksum is no longer set. b=11238 */
1366                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1367                 }
1368                 oa->o_cksum = body->oa.o_cksum;
1369                 /* 1 RC per niobuf */
1370                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1371                                      sizeof(__u32) * niocount);
1372         } else {
1373                 if (cli->cl_checksum &&
1374                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1375                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1376                                 body->oa.o_flags = 0;
1377                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1378                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1379                 }
1380         }
1381         ptlrpc_request_set_replen(req);
1382
1383         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1384         aa = ptlrpc_req_async_args(req);
1385         aa->aa_oa = oa;
1386         aa->aa_requested_nob = requested_nob;
1387         aa->aa_nio_count = niocount;
1388         aa->aa_page_count = page_count;
1389         aa->aa_resends = 0;
1390         aa->aa_ppga = pga;
1391         aa->aa_cli = cli;
1392         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1393         if (ocapa && reserve)
1394                 aa->aa_ocapa = capa_get(ocapa);
1395
1396         *reqp = req;
1397         RETURN(0);
1398
1399  out:
1400         ptlrpc_req_finished(req);
1401         RETURN(rc);
1402 }
1403
1404 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1405                                 __u32 client_cksum, __u32 server_cksum, int nob,
1406                                 obd_count page_count, struct brw_page **pga,
1407                                 cksum_type_t client_cksum_type)
1408 {
1409         __u32 new_cksum;
1410         char *msg;
1411         cksum_type_t cksum_type;
1412
1413         if (server_cksum == client_cksum) {
1414                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1415                 return 0;
1416         }
1417
1418         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1419                                        oa->o_flags : 0);
1420         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1421                                       cksum_type);
1422
1423         if (cksum_type != client_cksum_type)
1424                 msg = "the server did not use the checksum type specified in "
1425                       "the original request - likely a protocol problem";
1426         else if (new_cksum == server_cksum)
1427                 msg = "changed on the client after we checksummed it - "
1428                       "likely false positive due to mmap IO (bug 11742)";
1429         else if (new_cksum == client_cksum)
1430                 msg = "changed in transit before arrival at OST";
1431         else
1432                 msg = "changed in transit AND doesn't match the original - "
1433                       "likely false positive due to mmap IO (bug 11742)";
1434
1435         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1436                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1437                            msg, libcfs_nid2str(peer->nid),
1438                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1439                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1440                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1441                            oa->o_id,
1442                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1443                            pga[0]->off,
1444                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1445         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1446                "client csum now %x\n", client_cksum, client_cksum_type,
1447                server_cksum, cksum_type, new_cksum);
1448         return 1;
1449 }
1450
1451 /* Note rc enters this function as number of bytes transferred */
1452 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1453 {
1454         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1455         const lnet_process_id_t *peer =
1456                         &req->rq_import->imp_connection->c_peer;
1457         struct client_obd *cli = aa->aa_cli;
1458         struct ost_body *body;
1459         __u32 client_cksum = 0;
1460         ENTRY;
1461
1462         if (rc < 0 && rc != -EDQUOT) {
1463                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1464                 RETURN(rc);
1465         }
1466
1467         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1468         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1469         if (body == NULL) {
1470                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1471                 RETURN(-EPROTO);
1472         }
1473
1474         /* set/clear over quota flag for a uid/gid */
1475         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1476             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1477                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1478
1479                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1480                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1481                        body->oa.o_flags);
1482                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1483         }
1484
1485         osc_update_grant(cli, body);
1486
1487         if (rc < 0)
1488                 RETURN(rc);
1489
1490         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1491                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1492
1493         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1494                 if (rc > 0) {
1495                         CERROR("Unexpected +ve rc %d\n", rc);
1496                         RETURN(-EPROTO);
1497                 }
1498                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1499
1500                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1501                         RETURN(-EAGAIN);
1502
1503                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1504                     check_write_checksum(&body->oa, peer, client_cksum,
1505                                          body->oa.o_cksum, aa->aa_requested_nob,
1506                                          aa->aa_page_count, aa->aa_ppga,
1507                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1508                         RETURN(-EAGAIN);
1509
1510                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1511                                      aa->aa_page_count, aa->aa_ppga);
1512                 GOTO(out, rc);
1513         }
1514
1515         /* The rest of this function executes only for OST_READs */
1516
1517         /* if unwrap_bulk failed, return -EAGAIN to retry */
1518         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1519         if (rc < 0)
1520                 GOTO(out, rc = -EAGAIN);
1521
1522         if (rc > aa->aa_requested_nob) {
1523                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1524                        aa->aa_requested_nob);
1525                 RETURN(-EPROTO);
1526         }
1527
1528         if (rc != req->rq_bulk->bd_nob_transferred) {
1529                 CERROR ("Unexpected rc %d (%d transferred)\n",
1530                         rc, req->rq_bulk->bd_nob_transferred);
1531                 return (-EPROTO);
1532         }
1533
1534         if (rc < aa->aa_requested_nob)
1535                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1536
1537         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1538                 static int cksum_counter;
1539                 __u32      server_cksum = body->oa.o_cksum;
1540                 char      *via;
1541                 char      *router;
1542                 cksum_type_t cksum_type;
1543
1544                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1545                                                body->oa.o_flags : 0);
1546                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1547                                                  aa->aa_ppga, OST_READ,
1548                                                  cksum_type);
1549
1550                 if (peer->nid == req->rq_bulk->bd_sender) {
1551                         via = router = "";
1552                 } else {
1553                         via = " via ";
1554                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1555                 }
1556
1557                 if (server_cksum == ~0 && rc > 0) {
1558                         CERROR("Protocol error: server %s set the 'checksum' "
1559                                "bit, but didn't send a checksum.  Not fatal, "
1560                                "but please notify on http://bugs.whamcloud.com/\n",
1561                                libcfs_nid2str(peer->nid));
1562                 } else if (server_cksum != client_cksum) {
1563                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1564                                            "%s%s%s inode "DFID" object "
1565                                            LPU64"/"LPU64" extent "
1566                                            "["LPU64"-"LPU64"]\n",
1567                                            req->rq_import->imp_obd->obd_name,
1568                                            libcfs_nid2str(peer->nid),
1569                                            via, router,
1570                                            body->oa.o_valid & OBD_MD_FLFID ?
1571                                                 body->oa.o_parent_seq : (__u64)0,
1572                                            body->oa.o_valid & OBD_MD_FLFID ?
1573                                                 body->oa.o_parent_oid : 0,
1574                                            body->oa.o_valid & OBD_MD_FLFID ?
1575                                                 body->oa.o_parent_ver : 0,
1576                                            body->oa.o_id,
1577                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1578                                                 body->oa.o_seq : (__u64)0,
1579                                            aa->aa_ppga[0]->off,
1580                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1581                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1582                                                                         1);
1583                         CERROR("client %x, server %x, cksum_type %x\n",
1584                                client_cksum, server_cksum, cksum_type);
1585                         cksum_counter = 0;
1586                         aa->aa_oa->o_cksum = client_cksum;
1587                         rc = -EAGAIN;
1588                 } else {
1589                         cksum_counter++;
1590                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1591                         rc = 0;
1592                 }
1593         } else if (unlikely(client_cksum)) {
1594                 static int cksum_missed;
1595
1596                 cksum_missed++;
1597                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1598                         CERROR("Checksum %u requested from %s but not sent\n",
1599                                cksum_missed, libcfs_nid2str(peer->nid));
1600         } else {
1601                 rc = 0;
1602         }
1603 out:
1604         if (rc >= 0)
1605                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1606
1607         RETURN(rc);
1608 }
1609
1610 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1611                             struct lov_stripe_md *lsm,
1612                             obd_count page_count, struct brw_page **pga,
1613                             struct obd_capa *ocapa)
1614 {
1615         struct ptlrpc_request *req;
1616         int                    rc;
1617         cfs_waitq_t            waitq;
1618         int                    generation, resends = 0;
1619         struct l_wait_info     lwi;
1620
1621         ENTRY;
1622
1623         cfs_waitq_init(&waitq);
1624         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1625
1626 restart_bulk:
1627         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1628                                   page_count, pga, &req, ocapa, 0, resends);
1629         if (rc != 0)
1630                 return (rc);
1631
1632         if (resends) {
1633                 req->rq_generation_set = 1;
1634                 req->rq_import_generation = generation;
1635                 req->rq_sent = cfs_time_current_sec() + resends;
1636         }
1637
1638         rc = ptlrpc_queue_wait(req);
1639
1640         if (rc == -ETIMEDOUT && req->rq_resend) {
1641                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1642                 ptlrpc_req_finished(req);
1643                 goto restart_bulk;
1644         }
1645
1646         rc = osc_brw_fini_request(req, rc);
1647
1648         ptlrpc_req_finished(req);
1649         /* When server return -EINPROGRESS, client should always retry
1650          * regardless of the number of times the bulk was resent already.*/
1651         if (osc_recoverable_error(rc)) {
1652                 resends++;
1653                 if (rc != -EINPROGRESS &&
1654                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1655                         CERROR("%s: too many resend retries for object: "
1656                                ""LPU64":"LPU64", rc = %d.\n",
1657                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1658                         goto out;
1659                 }
1660                 if (generation !=
1661                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1662                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1663                                ""LPU64":"LPU64", rc = %d.\n",
1664                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1665                         goto out;
1666                 }
1667
1668                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1669                                        NULL);
1670                 l_wait_event(waitq, 0, &lwi);
1671
1672                 goto restart_bulk;
1673         }
1674 out:
1675         if (rc == -EAGAIN || rc == -EINPROGRESS)
1676                 rc = -EIO;
1677         RETURN (rc);
1678 }
1679
1680 int osc_brw_redo_request(struct ptlrpc_request *request,
1681                          struct osc_brw_async_args *aa)
1682 {
1683         struct ptlrpc_request *new_req;
1684         struct osc_brw_async_args *new_aa;
1685         struct osc_async_page *oap;
1686         int rc = 0;
1687         ENTRY;
1688
1689         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1690
1691         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1692                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1693                                   aa->aa_cli, aa->aa_oa,
1694                                   NULL /* lsm unused by osc currently */,
1695                                   aa->aa_page_count, aa->aa_ppga,
1696                                   &new_req, aa->aa_ocapa, 0, 1);
1697         if (rc)
1698                 RETURN(rc);
1699
1700         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1701                 if (oap->oap_request != NULL) {
1702                         LASSERTF(request == oap->oap_request,
1703                                  "request %p != oap_request %p\n",
1704                                  request, oap->oap_request);
1705                         if (oap->oap_interrupted) {
1706                                 ptlrpc_req_finished(new_req);
1707                                 RETURN(-EINTR);
1708                         }
1709                 }
1710         }
1711         /* New request takes over pga and oaps from old request.
1712          * Note that copying a list_head doesn't work, need to move it... */
1713         aa->aa_resends++;
1714         new_req->rq_interpret_reply = request->rq_interpret_reply;
1715         new_req->rq_async_args = request->rq_async_args;
1716         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1717         new_req->rq_generation_set = 1;
1718         new_req->rq_import_generation = request->rq_import_generation;
1719
1720         new_aa = ptlrpc_req_async_args(new_req);
1721
1722         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1723         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1724         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1725         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1726
1727         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1728                 if (oap->oap_request) {
1729                         ptlrpc_req_finished(oap->oap_request);
1730                         oap->oap_request = ptlrpc_request_addref(new_req);
1731                 }
1732         }
1733
1734         new_aa->aa_ocapa = aa->aa_ocapa;
1735         aa->aa_ocapa = NULL;
1736
1737         /* XXX: This code will run into problem if we're going to support
1738          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1739          * and wait for all of them to be finished. We should inherit request
1740          * set from old request. */
1741         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1742
1743         DEBUG_REQ(D_INFO, new_req, "new request");
1744         RETURN(0);
1745 }
1746
1747 /*
1748  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1749  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1750  * fine for our small page arrays and doesn't require allocation.  its an
1751  * insertion sort that swaps elements that are strides apart, shrinking the
1752  * stride down until its '1' and the array is sorted.
1753  */
1754 static void sort_brw_pages(struct brw_page **array, int num)
1755 {
1756         int stride, i, j;
1757         struct brw_page *tmp;
1758
1759         if (num == 1)
1760                 return;
1761         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1762                 ;
1763
1764         do {
1765                 stride /= 3;
1766                 for (i = stride ; i < num ; i++) {
1767                         tmp = array[i];
1768                         j = i;
1769                         while (j >= stride && array[j - stride]->off > tmp->off) {
1770                                 array[j] = array[j - stride];
1771                                 j -= stride;
1772                         }
1773                         array[j] = tmp;
1774                 }
1775         } while (stride > 1);
1776 }
1777
1778 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1779 {
1780         int count = 1;
1781         int offset;
1782         int i = 0;
1783
1784         LASSERT (pages > 0);
1785         offset = pg[i]->off & ~CFS_PAGE_MASK;
1786
1787         for (;;) {
1788                 pages--;
1789                 if (pages == 0)         /* that's all */
1790                         return count;
1791
1792                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1793                         return count;   /* doesn't end on page boundary */
1794
1795                 i++;
1796                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1797                 if (offset != 0)        /* doesn't start on page boundary */
1798                         return count;
1799
1800                 count++;
1801         }
1802 }
1803
1804 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1805 {
1806         struct brw_page **ppga;
1807         int i;
1808
1809         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1810         if (ppga == NULL)
1811                 return NULL;
1812
1813         for (i = 0; i < count; i++)
1814                 ppga[i] = pga + i;
1815         return ppga;
1816 }
1817
1818 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1819 {
1820         LASSERT(ppga != NULL);
1821         OBD_FREE(ppga, sizeof(*ppga) * count);
1822 }
1823
1824 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1825                    obd_count page_count, struct brw_page *pga,
1826                    struct obd_trans_info *oti)
1827 {
1828         struct obdo *saved_oa = NULL;
1829         struct brw_page **ppga, **orig;
1830         struct obd_import *imp = class_exp2cliimp(exp);
1831         struct client_obd *cli;
1832         int rc, page_count_orig;
1833         ENTRY;
1834
1835         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1836         cli = &imp->imp_obd->u.cli;
1837
1838         if (cmd & OBD_BRW_CHECK) {
1839                 /* The caller just wants to know if there's a chance that this
1840                  * I/O can succeed */
1841
1842                 if (imp->imp_invalid)
1843                         RETURN(-EIO);
1844                 RETURN(0);
1845         }
1846
1847         /* test_brw with a failed create can trip this, maybe others. */
1848         LASSERT(cli->cl_max_pages_per_rpc);
1849
1850         rc = 0;
1851
1852         orig = ppga = osc_build_ppga(pga, page_count);
1853         if (ppga == NULL)
1854                 RETURN(-ENOMEM);
1855         page_count_orig = page_count;
1856
1857         sort_brw_pages(ppga, page_count);
1858         while (page_count) {
1859                 obd_count pages_per_brw;
1860
1861                 if (page_count > cli->cl_max_pages_per_rpc)
1862                         pages_per_brw = cli->cl_max_pages_per_rpc;
1863                 else
1864                         pages_per_brw = page_count;
1865
1866                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1867
1868                 if (saved_oa != NULL) {
1869                         /* restore previously saved oa */
1870                         *oinfo->oi_oa = *saved_oa;
1871                 } else if (page_count > pages_per_brw) {
1872                         /* save a copy of oa (brw will clobber it) */
1873                         OBDO_ALLOC(saved_oa);
1874                         if (saved_oa == NULL)
1875                                 GOTO(out, rc = -ENOMEM);
1876                         *saved_oa = *oinfo->oi_oa;
1877                 }
1878
1879                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1880                                       pages_per_brw, ppga, oinfo->oi_capa);
1881
1882                 if (rc != 0)
1883                         break;
1884
1885                 page_count -= pages_per_brw;
1886                 ppga += pages_per_brw;
1887         }
1888
1889 out:
1890         osc_release_ppga(orig, page_count_orig);
1891
1892         if (saved_oa != NULL)
1893                 OBDO_FREE(saved_oa);
1894
1895         RETURN(rc);
1896 }
1897
1898 static int brw_interpret(const struct lu_env *env,
1899                          struct ptlrpc_request *req, void *data, int rc)
1900 {
1901         struct osc_brw_async_args *aa = data;
1902         struct osc_extent *ext;
1903         struct osc_extent *tmp;
1904         struct cl_object  *obj = NULL;
1905         struct client_obd *cli = aa->aa_cli;
1906         ENTRY;
1907
1908         rc = osc_brw_fini_request(req, rc);
1909         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1910         /* When server return -EINPROGRESS, client should always retry
1911          * regardless of the number of times the bulk was resent already. */
1912         if (osc_recoverable_error(rc)) {
1913                 if (req->rq_import_generation !=
1914                     req->rq_import->imp_generation) {
1915                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1916                                ""LPU64":"LPU64", rc = %d.\n",
1917                                req->rq_import->imp_obd->obd_name,
1918                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1919                 } else if (rc == -EINPROGRESS ||
1920                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1921                         rc = osc_brw_redo_request(req, aa);
1922                 } else {
1923                         CERROR("%s: too many resent retries for object: "
1924                                ""LPU64":"LPU64", rc = %d.\n",
1925                                req->rq_import->imp_obd->obd_name,
1926                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1927                 }
1928
1929                 if (rc == 0)
1930                         RETURN(0);
1931                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1932                         rc = -EIO;
1933         }
1934
1935         if (aa->aa_ocapa) {
1936                 capa_put(aa->aa_ocapa);
1937                 aa->aa_ocapa = NULL;
1938         }
1939
1940         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1941                 if (obj == NULL && rc == 0) {
1942                         obj = osc2cl(ext->oe_obj);
1943                         cl_object_get(obj);
1944                 }
1945
1946                 cfs_list_del_init(&ext->oe_link);
1947                 osc_extent_finish(env, ext, 1, rc);
1948         }
1949         LASSERT(cfs_list_empty(&aa->aa_exts));
1950         LASSERT(cfs_list_empty(&aa->aa_oaps));
1951
1952         if (obj != NULL) {
1953                 struct obdo *oa = aa->aa_oa;
1954                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1955                 unsigned long valid = 0;
1956
1957                 LASSERT(rc == 0);
1958                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1959                         attr->cat_blocks = oa->o_blocks;
1960                         valid |= CAT_BLOCKS;
1961                 }
1962                 if (oa->o_valid & OBD_MD_FLMTIME) {
1963                         attr->cat_mtime = oa->o_mtime;
1964                         valid |= CAT_MTIME;
1965                 }
1966                 if (oa->o_valid & OBD_MD_FLATIME) {
1967                         attr->cat_atime = oa->o_atime;
1968                         valid |= CAT_ATIME;
1969                 }
1970                 if (oa->o_valid & OBD_MD_FLCTIME) {
1971                         attr->cat_ctime = oa->o_ctime;
1972                         valid |= CAT_CTIME;
1973                 }
1974                 if (valid != 0) {
1975                         cl_object_attr_lock(obj);
1976                         cl_object_attr_set(env, obj, attr, valid);
1977                         cl_object_attr_unlock(obj);
1978                 }
1979                 cl_object_put(env, obj);
1980         }
1981         OBDO_FREE(aa->aa_oa);
1982
1983         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1984                           req->rq_bulk->bd_nob_transferred);
1985         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1986         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1987
1988         client_obd_list_lock(&cli->cl_loi_list_lock);
1989         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1990          * is called so we know whether to go to sync BRWs or wait for more
1991          * RPCs to complete */
1992         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1993                 cli->cl_w_in_flight--;
1994         else
1995                 cli->cl_r_in_flight--;
1996         osc_wake_cache_waiters(cli);
1997         client_obd_list_unlock(&cli->cl_loi_list_lock);
1998
1999         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2000         RETURN(rc);
2001 }
2002
2003 /**
2004  * Build an RPC by the list of extent @ext_list. The caller must ensure
2005  * that the total pages in this list are NOT over max pages per RPC.
2006  * Extents in the list must be in OES_RPC state.
2007  */
2008 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2009                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2010 {
2011         struct ptlrpc_request *req = NULL;
2012         struct osc_extent *ext;
2013         CFS_LIST_HEAD(rpc_list);
2014         struct brw_page **pga = NULL;
2015         struct osc_brw_async_args *aa = NULL;
2016         struct obdo *oa = NULL;
2017         struct osc_async_page *oap;
2018         struct osc_async_page *tmp;
2019         struct cl_req *clerq = NULL;
2020         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2021         struct ldlm_lock *lock = NULL;
2022         struct cl_req_attr crattr;
2023         obd_off starting_offset = OBD_OBJECT_EOF;
2024         obd_off ending_offset = 0;
2025         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2026
2027         ENTRY;
2028         LASSERT(!cfs_list_empty(ext_list));
2029
2030         /* add pages into rpc_list to build BRW rpc */
2031         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2032                 LASSERT(ext->oe_state == OES_RPC);
2033                 mem_tight |= ext->oe_memalloc;
2034                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2035                         ++page_count;
2036                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2037                         if (starting_offset > oap->oap_obj_off)
2038                                 starting_offset = oap->oap_obj_off;
2039                         else
2040                                 LASSERT(oap->oap_page_off == 0);
2041                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2042                                 ending_offset = oap->oap_obj_off +
2043                                                 oap->oap_count;
2044                         else
2045                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2046                                         CFS_PAGE_SIZE);
2047                 }
2048         }
2049
2050         if (mem_tight)
2051                 mpflag = cfs_memory_pressure_get_and_set();
2052
2053         memset(&crattr, 0, sizeof crattr);
2054         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2055         if (pga == NULL)
2056                 GOTO(out, rc = -ENOMEM);
2057
2058         OBDO_ALLOC(oa);
2059         if (oa == NULL)
2060                 GOTO(out, rc = -ENOMEM);
2061
2062         i = 0;
2063         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2064                 struct cl_page *page = oap2cl_page(oap);
2065                 if (clerq == NULL) {
2066                         clerq = cl_req_alloc(env, page, crt,
2067                                              1 /* only 1-object rpcs for
2068                                                 * now */);
2069                         if (IS_ERR(clerq))
2070                                 GOTO(out, rc = PTR_ERR(clerq));
2071                         lock = oap->oap_ldlm_lock;
2072                 }
2073                 if (mem_tight)
2074                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2075                 pga[i] = &oap->oap_brw_page;
2076                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2077                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2078                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2079                 i++;
2080                 cl_req_page_add(env, clerq, page);
2081         }
2082
2083         /* always get the data for the obdo for the rpc */
2084         LASSERT(clerq != NULL);
2085         crattr.cra_oa = oa;
2086         crattr.cra_capa = NULL;
2087         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2088         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2089         if (lock) {
2090                 oa->o_handle = lock->l_remote_handle;
2091                 oa->o_valid |= OBD_MD_FLHANDLE;
2092         }
2093
2094         rc = cl_req_prep(env, clerq);
2095         if (rc != 0) {
2096                 CERROR("cl_req_prep failed: %d\n", rc);
2097                 GOTO(out, rc);
2098         }
2099
2100         sort_brw_pages(pga, page_count);
2101         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2102                         pga, &req, crattr.cra_capa, 1, 0);
2103         if (rc != 0) {
2104                 CERROR("prep_req failed: %d\n", rc);
2105                 GOTO(out, rc);
2106         }
2107
2108         req->rq_interpret_reply = brw_interpret;
2109         if (mem_tight != 0)
2110                 req->rq_memalloc = 1;
2111
2112         /* Need to update the timestamps after the request is built in case
2113          * we race with setattr (locally or in queue at OST).  If OST gets
2114          * later setattr before earlier BRW (as determined by the request xid),
2115          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2116          * way to do this in a single call.  bug 10150 */
2117         cl_req_attr_set(env, clerq, &crattr,
2118                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2119
2120         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2121
2122         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2123         aa = ptlrpc_req_async_args(req);
2124         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2125         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2126         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2127         cfs_list_splice_init(ext_list, &aa->aa_exts);
2128         aa->aa_clerq = clerq;
2129
2130         /* queued sync pages can be torn down while the pages
2131          * were between the pending list and the rpc */
2132         tmp = NULL;
2133         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2134                 /* only one oap gets a request reference */
2135                 if (tmp == NULL)
2136                         tmp = oap;
2137                 if (oap->oap_interrupted && !req->rq_intr) {
2138                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2139                                         oap, req);
2140                         ptlrpc_mark_interrupted(req);
2141                 }
2142         }
2143         if (tmp != NULL)
2144                 tmp->oap_request = ptlrpc_request_addref(req);
2145
2146         client_obd_list_lock(&cli->cl_loi_list_lock);
2147         starting_offset >>= CFS_PAGE_SHIFT;
2148         if (cmd == OBD_BRW_READ) {
2149                 cli->cl_r_in_flight++;
2150                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2151                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2152                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2153                                       starting_offset + 1);
2154         } else {
2155                 cli->cl_w_in_flight++;
2156                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2157                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2158                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2159                                       starting_offset + 1);
2160         }
2161         client_obd_list_unlock(&cli->cl_loi_list_lock);
2162
2163         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2164                   page_count, aa, cli->cl_r_in_flight,
2165                   cli->cl_w_in_flight);
2166
2167         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2168          * see which CPU/NUMA node the majority of pages were allocated
2169          * on, and try to assign the async RPC to the CPU core
2170          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2171          *
2172          * But on the other hand, we expect that multiple ptlrpcd
2173          * threads and the initial write sponsor can run in parallel,
2174          * especially when data checksum is enabled, which is CPU-bound
2175          * operation and single ptlrpcd thread cannot process in time.
2176          * So more ptlrpcd threads sharing BRW load
2177          * (with PDL_POLICY_ROUND) seems better.
2178          */
2179         ptlrpcd_add_req(req, pol, -1);
2180         rc = 0;
2181         EXIT;
2182
2183 out:
2184         if (mem_tight != 0)
2185                 cfs_memory_pressure_restore(mpflag);
2186
2187         capa_put(crattr.cra_capa);
2188         if (rc != 0) {
2189                 LASSERT(req == NULL);
2190
2191                 if (oa)
2192                         OBDO_FREE(oa);
2193                 if (pga)
2194                         OBD_FREE(pga, sizeof(*pga) * page_count);
2195                 /* this should happen rarely and is pretty bad, it makes the
2196                  * pending list not follow the dirty order */
2197                 while (!cfs_list_empty(ext_list)) {
2198                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2199                                              oe_link);
2200                         cfs_list_del_init(&ext->oe_link);
2201                         osc_extent_finish(env, ext, 0, rc);
2202                 }
2203                 if (clerq && !IS_ERR(clerq))
2204                         cl_req_completion(env, clerq, rc);
2205         }
2206         RETURN(rc);
2207 }
2208
2209 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2210                                         struct ldlm_enqueue_info *einfo)
2211 {
2212         void *data = einfo->ei_cbdata;
2213         int set = 0;
2214
2215         LASSERT(lock != NULL);
2216         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2217         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2218         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2219         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2220
2221         lock_res_and_lock(lock);
2222         cfs_spin_lock(&osc_ast_guard);
2223
2224         if (lock->l_ast_data == NULL)
2225                 lock->l_ast_data = data;
2226         if (lock->l_ast_data == data)
2227                 set = 1;
2228
2229         cfs_spin_unlock(&osc_ast_guard);
2230         unlock_res_and_lock(lock);
2231
2232         return set;
2233 }
2234
2235 static int osc_set_data_with_check(struct lustre_handle *lockh,
2236                                    struct ldlm_enqueue_info *einfo)
2237 {
2238         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2239         int set = 0;
2240
2241         if (lock != NULL) {
2242                 set = osc_set_lock_data_with_check(lock, einfo);
2243                 LDLM_LOCK_PUT(lock);
2244         } else
2245                 CERROR("lockh %p, data %p - client evicted?\n",
2246                        lockh, einfo->ei_cbdata);
2247         return set;
2248 }
2249
2250 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2251                              ldlm_iterator_t replace, void *data)
2252 {
2253         struct ldlm_res_id res_id;
2254         struct obd_device *obd = class_exp2obd(exp);
2255
2256         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2257         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2258         return 0;
2259 }
2260
2261 /* find any ldlm lock of the inode in osc
2262  * return 0    not find
2263  *        1    find one
2264  *      < 0    error */
2265 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2266                            ldlm_iterator_t replace, void *data)
2267 {
2268         struct ldlm_res_id res_id;
2269         struct obd_device *obd = class_exp2obd(exp);
2270         int rc = 0;
2271
2272         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2273         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2274         if (rc == LDLM_ITER_STOP)
2275                 return(1);
2276         if (rc == LDLM_ITER_CONTINUE)
2277                 return(0);
2278         return(rc);
2279 }
2280
2281 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2282                             obd_enqueue_update_f upcall, void *cookie,
2283                             int *flags, int agl, int rc)
2284 {
2285         int intent = *flags & LDLM_FL_HAS_INTENT;
2286         ENTRY;
2287
2288         if (intent) {
2289                 /* The request was created before ldlm_cli_enqueue call. */
2290                 if (rc == ELDLM_LOCK_ABORTED) {
2291                         struct ldlm_reply *rep;
2292                         rep = req_capsule_server_get(&req->rq_pill,
2293                                                      &RMF_DLM_REP);
2294
2295                         LASSERT(rep != NULL);
2296                         if (rep->lock_policy_res1)
2297                                 rc = rep->lock_policy_res1;
2298                 }
2299         }
2300
2301         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2302             (rc == 0)) {
2303                 *flags |= LDLM_FL_LVB_READY;
2304                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2305                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2306         }
2307
2308         /* Call the update callback. */
2309         rc = (*upcall)(cookie, rc);
2310         RETURN(rc);
2311 }
2312
2313 static int osc_enqueue_interpret(const struct lu_env *env,
2314                                  struct ptlrpc_request *req,
2315                                  struct osc_enqueue_args *aa, int rc)
2316 {
2317         struct ldlm_lock *lock;
2318         struct lustre_handle handle;
2319         __u32 mode;
2320         struct ost_lvb *lvb;
2321         __u32 lvb_len;
2322         int *flags = aa->oa_flags;
2323
2324         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2325          * might be freed anytime after lock upcall has been called. */
2326         lustre_handle_copy(&handle, aa->oa_lockh);
2327         mode = aa->oa_ei->ei_mode;
2328
2329         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2330          * be valid. */
2331         lock = ldlm_handle2lock(&handle);
2332
2333         /* Take an additional reference so that a blocking AST that
2334          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2335          * to arrive after an upcall has been executed by
2336          * osc_enqueue_fini(). */
2337         ldlm_lock_addref(&handle, mode);
2338
2339         /* Let CP AST to grant the lock first. */
2340         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2341
2342         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2343                 lvb = NULL;
2344                 lvb_len = 0;
2345         } else {
2346                 lvb = aa->oa_lvb;
2347                 lvb_len = sizeof(*aa->oa_lvb);
2348         }
2349
2350         /* Complete obtaining the lock procedure. */
2351         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2352                                    mode, flags, lvb, lvb_len, &handle, rc);
2353         /* Complete osc stuff. */
2354         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2355                               flags, aa->oa_agl, rc);
2356
2357         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2358
2359         /* Release the lock for async request. */
2360         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2361                 /*
2362                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2363                  * not already released by
2364                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2365                  */
2366                 ldlm_lock_decref(&handle, mode);
2367
2368         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2369                  aa->oa_lockh, req, aa);
2370         ldlm_lock_decref(&handle, mode);
2371         LDLM_LOCK_PUT(lock);
2372         return rc;
2373 }
2374
2375 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2376                         struct lov_oinfo *loi, int flags,
2377                         struct ost_lvb *lvb, __u32 mode, int rc)
2378 {
2379         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2380
2381         if (rc == ELDLM_OK) {
2382                 __u64 tmp;
2383
2384                 LASSERT(lock != NULL);
2385                 loi->loi_lvb = *lvb;
2386                 tmp = loi->loi_lvb.lvb_size;
2387                 /* Extend KMS up to the end of this lock and no further
2388                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2389                 if (tmp > lock->l_policy_data.l_extent.end)
2390                         tmp = lock->l_policy_data.l_extent.end + 1;
2391                 if (tmp >= loi->loi_kms) {
2392                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2393                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2394                         loi_kms_set(loi, tmp);
2395                 } else {
2396                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2397                                    LPU64"; leaving kms="LPU64", end="LPU64,
2398                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2399                                    lock->l_policy_data.l_extent.end);
2400                 }
2401                 ldlm_lock_allow_match(lock);
2402         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2403                 LASSERT(lock != NULL);
2404                 loi->loi_lvb = *lvb;
2405                 ldlm_lock_allow_match(lock);
2406                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2407                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2408                 rc = ELDLM_OK;
2409         }
2410
2411         if (lock != NULL) {
2412                 if (rc != ELDLM_OK)
2413                         ldlm_lock_fail_match(lock);
2414
2415                 LDLM_LOCK_PUT(lock);
2416         }
2417 }
2418 EXPORT_SYMBOL(osc_update_enqueue);
2419
2420 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2421
2422 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2423  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2424  * other synchronous requests, however keeping some locks and trying to obtain
2425  * others may take a considerable amount of time in a case of ost failure; and
2426  * when other sync requests do not get released lock from a client, the client
2427  * is excluded from the cluster -- such scenarious make the life difficult, so
2428  * release locks just after they are obtained. */
2429 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2430                      int *flags, ldlm_policy_data_t *policy,
2431                      struct ost_lvb *lvb, int kms_valid,
2432                      obd_enqueue_update_f upcall, void *cookie,
2433                      struct ldlm_enqueue_info *einfo,
2434                      struct lustre_handle *lockh,
2435                      struct ptlrpc_request_set *rqset, int async, int agl)
2436 {
2437         struct obd_device *obd = exp->exp_obd;
2438         struct ptlrpc_request *req = NULL;
2439         int intent = *flags & LDLM_FL_HAS_INTENT;
2440         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2441         ldlm_mode_t mode;
2442         int rc;
2443         ENTRY;
2444
2445         /* Filesystem lock extents are extended to page boundaries so that
2446          * dealing with the page cache is a little smoother.  */
2447         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2448         policy->l_extent.end |= ~CFS_PAGE_MASK;
2449
2450         /*
2451          * kms is not valid when either object is completely fresh (so that no
2452          * locks are cached), or object was evicted. In the latter case cached
2453          * lock cannot be used, because it would prime inode state with
2454          * potentially stale LVB.
2455          */
2456         if (!kms_valid)
2457                 goto no_match;
2458
2459         /* Next, search for already existing extent locks that will cover us */
2460         /* If we're trying to read, we also search for an existing PW lock.  The
2461          * VFS and page cache already protect us locally, so lots of readers/
2462          * writers can share a single PW lock.
2463          *
2464          * There are problems with conversion deadlocks, so instead of
2465          * converting a read lock to a write lock, we'll just enqueue a new
2466          * one.
2467          *
2468          * At some point we should cancel the read lock instead of making them
2469          * send us a blocking callback, but there are problems with canceling
2470          * locks out from other users right now, too. */
2471         mode = einfo->ei_mode;
2472         if (einfo->ei_mode == LCK_PR)
2473                 mode |= LCK_PW;
2474         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2475                                einfo->ei_type, policy, mode, lockh, 0);
2476         if (mode) {
2477                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2478
2479                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2480                         /* For AGL, if enqueue RPC is sent but the lock is not
2481                          * granted, then skip to process this strpe.
2482                          * Return -ECANCELED to tell the caller. */
2483                         ldlm_lock_decref(lockh, mode);
2484                         LDLM_LOCK_PUT(matched);
2485                         RETURN(-ECANCELED);
2486                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2487                         *flags |= LDLM_FL_LVB_READY;
2488                         /* addref the lock only if not async requests and PW
2489                          * lock is matched whereas we asked for PR. */
2490                         if (!rqset && einfo->ei_mode != mode)
2491                                 ldlm_lock_addref(lockh, LCK_PR);
2492                         if (intent) {
2493                                 /* I would like to be able to ASSERT here that
2494                                  * rss <= kms, but I can't, for reasons which
2495                                  * are explained in lov_enqueue() */
2496                         }
2497
2498                         /* We already have a lock, and it's referenced.
2499                          *
2500                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2501                          * AGL upcall may change it to CLS_HELD directly. */
2502                         (*upcall)(cookie, ELDLM_OK);
2503
2504                         if (einfo->ei_mode != mode)
2505                                 ldlm_lock_decref(lockh, LCK_PW);
2506                         else if (rqset)
2507                                 /* For async requests, decref the lock. */
2508                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2509                         LDLM_LOCK_PUT(matched);
2510                         RETURN(ELDLM_OK);
2511                 } else {
2512                         ldlm_lock_decref(lockh, mode);
2513                         LDLM_LOCK_PUT(matched);
2514                 }
2515         }
2516
2517  no_match:
2518         if (intent) {
2519                 CFS_LIST_HEAD(cancels);
2520                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2521                                            &RQF_LDLM_ENQUEUE_LVB);
2522                 if (req == NULL)
2523                         RETURN(-ENOMEM);
2524
2525                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2526                 if (rc) {
2527                         ptlrpc_request_free(req);
2528                         RETURN(rc);
2529                 }
2530
2531                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2532                                      sizeof *lvb);
2533                 ptlrpc_request_set_replen(req);
2534         }
2535
2536         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2537         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2538
2539         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2540                               sizeof(*lvb), lockh, async);
2541         if (rqset) {
2542                 if (!rc) {
2543                         struct osc_enqueue_args *aa;
2544                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2545                         aa = ptlrpc_req_async_args(req);
2546                         aa->oa_ei = einfo;
2547                         aa->oa_exp = exp;
2548                         aa->oa_flags  = flags;
2549                         aa->oa_upcall = upcall;
2550                         aa->oa_cookie = cookie;
2551                         aa->oa_lvb    = lvb;
2552                         aa->oa_lockh  = lockh;
2553                         aa->oa_agl    = !!agl;
2554
2555                         req->rq_interpret_reply =
2556                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2557                         if (rqset == PTLRPCD_SET)
2558                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2559                         else
2560                                 ptlrpc_set_add_req(rqset, req);
2561                 } else if (intent) {
2562                         ptlrpc_req_finished(req);
2563                 }
2564                 RETURN(rc);
2565         }
2566
2567         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2568         if (intent)
2569                 ptlrpc_req_finished(req);
2570
2571         RETURN(rc);
2572 }
2573
2574 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2575                        struct ldlm_enqueue_info *einfo,
2576                        struct ptlrpc_request_set *rqset)
2577 {
2578         struct ldlm_res_id res_id;
2579         int rc;
2580         ENTRY;
2581
2582         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2583                            oinfo->oi_md->lsm_object_seq, &res_id);
2584
2585         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2586                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2587                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2588                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2589                               rqset, rqset != NULL, 0);
2590         RETURN(rc);
2591 }
2592
2593 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2594                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2595                    int *flags, void *data, struct lustre_handle *lockh,
2596                    int unref)
2597 {
2598         struct obd_device *obd = exp->exp_obd;
2599         int lflags = *flags;
2600         ldlm_mode_t rc;
2601         ENTRY;
2602
2603         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2604                 RETURN(-EIO);
2605
2606         /* Filesystem lock extents are extended to page boundaries so that
2607          * dealing with the page cache is a little smoother */
2608         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2609         policy->l_extent.end |= ~CFS_PAGE_MASK;
2610
2611         /* Next, search for already existing extent locks that will cover us */
2612         /* If we're trying to read, we also search for an existing PW lock.  The
2613          * VFS and page cache already protect us locally, so lots of readers/
2614          * writers can share a single PW lock. */
2615         rc = mode;
2616         if (mode == LCK_PR)
2617                 rc |= LCK_PW;
2618         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2619                              res_id, type, policy, rc, lockh, unref);
2620         if (rc) {
2621                 if (data != NULL) {
2622                         if (!osc_set_data_with_check(lockh, data)) {
2623                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2624                                         ldlm_lock_decref(lockh, rc);
2625                                 RETURN(0);
2626                         }
2627                 }
2628                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2629                         ldlm_lock_addref(lockh, LCK_PR);
2630                         ldlm_lock_decref(lockh, LCK_PW);
2631                 }
2632                 RETURN(rc);
2633         }
2634         RETURN(rc);
2635 }
2636
2637 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2638 {
2639         ENTRY;
2640
2641         if (unlikely(mode == LCK_GROUP))
2642                 ldlm_lock_decref_and_cancel(lockh, mode);
2643         else
2644                 ldlm_lock_decref(lockh, mode);
2645
2646         RETURN(0);
2647 }
2648
2649 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2650                       __u32 mode, struct lustre_handle *lockh)
2651 {
2652         ENTRY;
2653         RETURN(osc_cancel_base(lockh, mode));
2654 }
2655
2656 static int osc_cancel_unused(struct obd_export *exp,
2657                              struct lov_stripe_md *lsm,
2658                              ldlm_cancel_flags_t flags,
2659                              void *opaque)
2660 {
2661         struct obd_device *obd = class_exp2obd(exp);
2662         struct ldlm_res_id res_id, *resp = NULL;
2663
2664         if (lsm != NULL) {
2665                 resp = osc_build_res_name(lsm->lsm_object_id,
2666                                           lsm->lsm_object_seq, &res_id);
2667         }
2668
2669         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2670 }
2671
2672 static int osc_statfs_interpret(const struct lu_env *env,
2673                                 struct ptlrpc_request *req,
2674                                 struct osc_async_args *aa, int rc)
2675 {
2676         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2677         struct obd_statfs *msfs;
2678         __u64 used;
2679         ENTRY;
2680
2681         if (rc == -EBADR)
2682                 /* The request has in fact never been sent
2683                  * due to issues at a higher level (LOV).
2684                  * Exit immediately since the caller is
2685                  * aware of the problem and takes care
2686                  * of the clean up */
2687                  RETURN(rc);
2688
2689         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2690             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2691                 GOTO(out, rc = 0);
2692
2693         if (rc != 0)
2694                 GOTO(out, rc);
2695
2696         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2697         if (msfs == NULL) {
2698                 GOTO(out, rc = -EPROTO);
2699         }
2700
2701         /* Reinitialize the RDONLY and DEGRADED flags at the client
2702          * on each statfs, so they don't stay set permanently. */
2703         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2704
2705         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2706                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2707         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2708                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2709
2710         if (unlikely(msfs->os_state & OS_STATE_READONLY))
2711                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2712         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2713                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2714
2715         /* Add a bit of hysteresis so this flag isn't continually flapping,
2716          * and ensure that new files don't get extremely fragmented due to
2717          * only a small amount of available space in the filesystem.
2718          * We want to set the NOSPC flag when there is less than ~0.1% free
2719          * and clear it when there is at least ~0.2% free space, so:
2720          *                   avail < ~0.1% max          max = avail + used
2721          *            1025 * avail < avail + used       used = blocks - free
2722          *            1024 * avail < used
2723          *            1024 * avail < blocks - free
2724          *                   avail < ((blocks - free) >> 10)
2725          *
2726          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2727          * lose that amount of space so in those cases we report no space left
2728          * if their is less than 1 GB left.                             */
2729         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2730         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2731                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2732                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2733         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2734                           (msfs->os_ffree > 64) &&
2735                           (msfs->os_bavail > (used << 1)))) {
2736                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2737                                              OSCC_FLAG_NOSPC_BLK);
2738         }
2739
2740         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2741                      (msfs->os_bavail < used)))
2742                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2743
2744         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2745
2746         *aa->aa_oi->oi_osfs = *msfs;
2747 out:
2748         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2749         RETURN(rc);
2750 }
2751
2752 static int osc_statfs_async(struct obd_export *exp,
2753                             struct obd_info *oinfo, __u64 max_age,
2754                             struct ptlrpc_request_set *rqset)
2755 {
2756         struct obd_device     *obd = class_exp2obd(exp);
2757         struct ptlrpc_request *req;
2758         struct osc_async_args *aa;
2759         int                    rc;
2760         ENTRY;
2761
2762         /* We could possibly pass max_age in the request (as an absolute
2763          * timestamp or a "seconds.usec ago") so the target can avoid doing
2764          * extra calls into the filesystem if that isn't necessary (e.g.
2765          * during mount that would help a bit).  Having relative timestamps
2766          * is not so great if request processing is slow, while absolute
2767          * timestamps are not ideal because they need time synchronization. */
2768         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2769         if (req == NULL)
2770                 RETURN(-ENOMEM);
2771
2772         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2773         if (rc) {
2774                 ptlrpc_request_free(req);
2775                 RETURN(rc);
2776         }
2777         ptlrpc_request_set_replen(req);
2778         req->rq_request_portal = OST_CREATE_PORTAL;
2779         ptlrpc_at_set_req_timeout(req);
2780
2781         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2782                 /* procfs requests not want stat in wait for avoid deadlock */
2783                 req->rq_no_resend = 1;
2784                 req->rq_no_delay = 1;
2785         }
2786
2787         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2788         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2789         aa = ptlrpc_req_async_args(req);
2790         aa->aa_oi = oinfo;
2791
2792         ptlrpc_set_add_req(rqset, req);
2793         RETURN(0);
2794 }
2795
2796 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2797                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2798 {
2799         struct obd_device     *obd = class_exp2obd(exp);
2800         struct obd_statfs     *msfs;
2801         struct ptlrpc_request *req;
2802         struct obd_import     *imp = NULL;
2803         int rc;
2804         ENTRY;
2805
2806         /*Since the request might also come from lprocfs, so we need
2807          *sync this with client_disconnect_export Bug15684*/
2808         cfs_down_read(&obd->u.cli.cl_sem);
2809         if (obd->u.cli.cl_import)
2810                 imp = class_import_get(obd->u.cli.cl_import);
2811         cfs_up_read(&obd->u.cli.cl_sem);
2812         if (!imp)
2813                 RETURN(-ENODEV);
2814
2815         /* We could possibly pass max_age in the request (as an absolute
2816          * timestamp or a "seconds.usec ago") so the target can avoid doing
2817          * extra calls into the filesystem if that isn't necessary (e.g.
2818          * during mount that would help a bit).  Having relative timestamps
2819          * is not so great if request processing is slow, while absolute
2820          * timestamps are not ideal because they need time synchronization. */
2821         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2822
2823         class_import_put(imp);
2824
2825         if (req == NULL)
2826                 RETURN(-ENOMEM);
2827
2828         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2829         if (rc) {
2830                 ptlrpc_request_free(req);
2831                 RETURN(rc);
2832         }
2833         ptlrpc_request_set_replen(req);
2834         req->rq_request_portal = OST_CREATE_PORTAL;
2835         ptlrpc_at_set_req_timeout(req);
2836
2837         if (flags & OBD_STATFS_NODELAY) {
2838                 /* procfs requests not want stat in wait for avoid deadlock */
2839                 req->rq_no_resend = 1;
2840                 req->rq_no_delay = 1;
2841         }
2842
2843         rc = ptlrpc_queue_wait(req);
2844         if (rc)
2845                 GOTO(out, rc);
2846
2847         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2848         if (msfs == NULL) {
2849                 GOTO(out, rc = -EPROTO);
2850         }
2851
2852         *osfs = *msfs;
2853
2854         EXIT;
2855  out:
2856         ptlrpc_req_finished(req);
2857         return rc;
2858 }
2859
2860 /* Retrieve object striping information.
2861  *
2862  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2863  * the maximum number of OST indices which will fit in the user buffer.
2864  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2865  */
2866 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2867 {
2868         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2869         struct lov_user_md_v3 lum, *lumk;
2870         struct lov_user_ost_data_v1 *lmm_objects;
2871         int rc = 0, lum_size;
2872         ENTRY;
2873
2874         if (!lsm)
2875                 RETURN(-ENODATA);
2876
2877         /* we only need the header part from user space to get lmm_magic and
2878          * lmm_stripe_count, (the header part is common to v1 and v3) */
2879         lum_size = sizeof(struct lov_user_md_v1);
2880         if (cfs_copy_from_user(&lum, lump, lum_size))
2881                 RETURN(-EFAULT);
2882
2883         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2884             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2885                 RETURN(-EINVAL);
2886
2887         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2888         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2889         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2890         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2891
2892         /* we can use lov_mds_md_size() to compute lum_size
2893          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2894         if (lum.lmm_stripe_count > 0) {
2895                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2896                 OBD_ALLOC(lumk, lum_size);
2897                 if (!lumk)
2898                         RETURN(-ENOMEM);
2899
2900                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2901                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2902                 else
2903                         lmm_objects = &(lumk->lmm_objects[0]);
2904                 lmm_objects->l_object_id = lsm->lsm_object_id;
2905         } else {
2906                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2907                 lumk = &lum;
2908         }
2909
2910         lumk->lmm_object_id = lsm->lsm_object_id;
2911         lumk->lmm_object_seq = lsm->lsm_object_seq;
2912         lumk->lmm_stripe_count = 1;
2913
2914         if (cfs_copy_to_user(lump, lumk, lum_size))
2915                 rc = -EFAULT;
2916
2917         if (lumk != &lum)
2918                 OBD_FREE(lumk, lum_size);
2919
2920         RETURN(rc);
2921 }
2922
2923
2924 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2925                          void *karg, void *uarg)
2926 {
2927         struct obd_device *obd = exp->exp_obd;
2928         struct obd_ioctl_data *data = karg;
2929         int err = 0;
2930         ENTRY;
2931
2932         if (!cfs_try_module_get(THIS_MODULE)) {
2933                 CERROR("Can't get module. Is it alive?");
2934                 return -EINVAL;
2935         }
2936         switch (cmd) {
2937         case OBD_IOC_LOV_GET_CONFIG: {
2938                 char *buf;
2939                 struct lov_desc *desc;
2940                 struct obd_uuid uuid;
2941
2942                 buf = NULL;
2943                 len = 0;
2944                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2945                         GOTO(out, err = -EINVAL);
2946
2947                 data = (struct obd_ioctl_data *)buf;
2948
2949                 if (sizeof(*desc) > data->ioc_inllen1) {
2950                         obd_ioctl_freedata(buf, len);
2951                         GOTO(out, err = -EINVAL);
2952                 }
2953
2954                 if (data->ioc_inllen2 < sizeof(uuid)) {
2955                         obd_ioctl_freedata(buf, len);
2956                         GOTO(out, err = -EINVAL);
2957                 }
2958
2959                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2960                 desc->ld_tgt_count = 1;
2961                 desc->ld_active_tgt_count = 1;
2962                 desc->ld_default_stripe_count = 1;
2963                 desc->ld_default_stripe_size = 0;
2964                 desc->ld_default_stripe_offset = 0;
2965                 desc->ld_pattern = 0;
2966                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2967
2968                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2969
2970                 err = cfs_copy_to_user((void *)uarg, buf, len);
2971                 if (err)
2972                         err = -EFAULT;
2973                 obd_ioctl_freedata(buf, len);
2974                 GOTO(out, err);
2975         }
2976         case LL_IOC_LOV_SETSTRIPE:
2977                 err = obd_alloc_memmd(exp, karg);
2978                 if (err > 0)
2979                         err = 0;
2980                 GOTO(out, err);
2981         case LL_IOC_LOV_GETSTRIPE:
2982                 err = osc_getstripe(karg, uarg);
2983                 GOTO(out, err);
2984         case OBD_IOC_CLIENT_RECOVER:
2985                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2986                                             data->ioc_inlbuf1, 0);
2987                 if (err > 0)
2988                         err = 0;
2989                 GOTO(out, err);
2990         case IOC_OSC_SET_ACTIVE:
2991                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2992                                                data->ioc_offset);
2993                 GOTO(out, err);
2994         case OBD_IOC_POLL_QUOTACHECK:
2995                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2996                 GOTO(out, err);
2997         case OBD_IOC_PING_TARGET:
2998                 err = ptlrpc_obd_ping(obd);
2999                 GOTO(out, err);
3000         default:
3001                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3002                        cmd, cfs_curproc_comm());
3003                 GOTO(out, err = -ENOTTY);
3004         }
3005 out:
3006         cfs_module_put(THIS_MODULE);
3007         return err;
3008 }
3009
3010 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3011                         obd_count keylen, void *key, __u32 *vallen, void *val,
3012                         struct lov_stripe_md *lsm)
3013 {
3014         ENTRY;
3015         if (!vallen || !val)
3016                 RETURN(-EFAULT);
3017
3018         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3019                 __u32 *stripe = val;
3020                 *vallen = sizeof(*stripe);
3021                 *stripe = 0;
3022                 RETURN(0);
3023         } else if (KEY_IS(KEY_LAST_ID)) {
3024                 struct ptlrpc_request *req;
3025                 obd_id                *reply;
3026                 char                  *tmp;
3027                 int                    rc;
3028
3029                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3030                                            &RQF_OST_GET_INFO_LAST_ID);
3031                 if (req == NULL)
3032                         RETURN(-ENOMEM);
3033
3034                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3035                                      RCL_CLIENT, keylen);
3036                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3037                 if (rc) {
3038                         ptlrpc_request_free(req);
3039                         RETURN(rc);
3040                 }
3041
3042                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3043                 memcpy(tmp, key, keylen);
3044
3045                 req->rq_no_delay = req->rq_no_resend = 1;
3046                 ptlrpc_request_set_replen(req);
3047                 rc = ptlrpc_queue_wait(req);
3048                 if (rc)
3049                         GOTO(out, rc);
3050
3051                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3052                 if (reply == NULL)
3053                         GOTO(out, rc = -EPROTO);
3054
3055                 *((obd_id *)val) = *reply;
3056         out:
3057                 ptlrpc_req_finished(req);
3058                 RETURN(rc);
3059         } else if (KEY_IS(KEY_FIEMAP)) {
3060                 struct ptlrpc_request *req;
3061                 struct ll_user_fiemap *reply;
3062                 char *tmp;
3063                 int rc;
3064
3065                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3066                                            &RQF_OST_GET_INFO_FIEMAP);
3067                 if (req == NULL)
3068                         RETURN(-ENOMEM);
3069
3070                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3071                                      RCL_CLIENT, keylen);
3072                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3073                                      RCL_CLIENT, *vallen);
3074                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3075                                      RCL_SERVER, *vallen);
3076
3077                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3078                 if (rc) {
3079                         ptlrpc_request_free(req);
3080                         RETURN(rc);
3081                 }
3082
3083                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3084                 memcpy(tmp, key, keylen);
3085                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3086                 memcpy(tmp, val, *vallen);
3087
3088                 ptlrpc_request_set_replen(req);
3089                 rc = ptlrpc_queue_wait(req);
3090                 if (rc)
3091                         GOTO(out1, rc);
3092
3093                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3094                 if (reply == NULL)
3095                         GOTO(out1, rc = -EPROTO);
3096
3097                 memcpy(val, reply, *vallen);
3098         out1:
3099                 ptlrpc_req_finished(req);
3100
3101                 RETURN(rc);
3102         }
3103
3104         RETURN(-EINVAL);
3105 }
3106
3107 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3108 {
3109         struct llog_ctxt *ctxt;
3110         int rc = 0;
3111         ENTRY;
3112
3113         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3114         if (ctxt) {
3115                 rc = llog_initiator_connect(ctxt);
3116                 llog_ctxt_put(ctxt);
3117         } else {
3118                 /* XXX return an error? skip setting below flags? */
3119         }
3120
3121         cfs_spin_lock(&imp->imp_lock);
3122         imp->imp_server_timeout = 1;
3123         imp->imp_pingable = 1;
3124         cfs_spin_unlock(&imp->imp_lock);
3125         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3126
3127         RETURN(rc);
3128 }
3129
3130 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3131                                           struct ptlrpc_request *req,
3132                                           void *aa, int rc)
3133 {
3134         ENTRY;
3135         if (rc != 0)
3136                 RETURN(rc);
3137
3138         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3139 }
3140
3141 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3142                               obd_count keylen, void *key, obd_count vallen,
3143                               void *val, struct ptlrpc_request_set *set)
3144 {
3145         struct ptlrpc_request *req;
3146         struct obd_device     *obd = exp->exp_obd;
3147         struct obd_import     *imp = class_exp2cliimp(exp);
3148         char                  *tmp;
3149         int                    rc;
3150         ENTRY;
3151
3152         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3153
3154         if (KEY_IS(KEY_NEXT_ID)) {
3155                 obd_id new_val;
3156                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3157
3158                 if (vallen != sizeof(obd_id))
3159                         RETURN(-ERANGE);
3160                 if (val == NULL)
3161                         RETURN(-EINVAL);
3162
3163                 if (vallen != sizeof(obd_id))
3164                         RETURN(-EINVAL);
3165
3166                 /* avoid race between allocate new object and set next id
3167                  * from ll_sync thread */
3168                 cfs_spin_lock(&oscc->oscc_lock);
3169                 new_val = *((obd_id*)val) + 1;
3170                 if (new_val > oscc->oscc_next_id)
3171                         oscc->oscc_next_id = new_val;
3172                 cfs_spin_unlock(&oscc->oscc_lock);
3173                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",