Whamcloud - gitweb
LU-3274 osc: allow to call brw_commit() multiple times
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50
51 #ifdef  __CYGWIN__
52 # include <ctype.h>
53 #endif
54
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_fid.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (lmmp == NULL)
78                 RETURN(lmm_size);
79
80         if (*lmmp != NULL && lsm == NULL) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
85                 RETURN(-EBADF);
86         }
87
88         if (*lmmp == NULL) {
89                 OBD_ALLOC(*lmmp, lmm_size);
90                 if (*lmmp == NULL)
91                         RETURN(-ENOMEM);
92         }
93
94         if (lsm)
95                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         struct obd_import *imp = class_exp2cliimp(exp);
106         ENTRY;
107
108         if (lmm != NULL) {
109                 if (lmm_bytes < sizeof(*lmm)) {
110                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
111                                exp->exp_obd->obd_name, lmm_bytes,
112                                (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
118                         CERROR("%s: zero lmm_object_id: rc = %d\n",
119                                exp->exp_obd->obd_name, -EINVAL);
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (unlikely(*lsmp == NULL))
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
146                 RETURN(-EBADF);
147         }
148
149         if (lmm != NULL)
150                 /* XXX zero *lsmp? */
151                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
152
153         if (imp != NULL &&
154             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
155                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
156         else
157                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
158
159         RETURN(lsm_size);
160 }
161
162 static inline void osc_pack_capa(struct ptlrpc_request *req,
163                                  struct ost_body *body, void *capa)
164 {
165         struct obd_capa *oc = (struct obd_capa *)capa;
166         struct lustre_capa *c;
167
168         if (!capa)
169                 return;
170
171         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
172         LASSERT(c);
173         capa_cpy(c, oc);
174         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
175         DEBUG_CAPA(D_SEC, c, "pack");
176 }
177
178 static inline void osc_pack_req_body(struct ptlrpc_request *req,
179                                      struct obd_info *oinfo)
180 {
181         struct ost_body *body;
182
183         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
184         LASSERT(body);
185
186         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
187                              oinfo->oi_oa);
188         osc_pack_capa(req, body, oinfo->oi_capa);
189 }
190
191 static inline void osc_set_capa_size(struct ptlrpc_request *req,
192                                      const struct req_msg_field *field,
193                                      struct obd_capa *oc)
194 {
195         if (oc == NULL)
196                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
197         else
198                 /* it is already calculated as sizeof struct obd_capa */
199                 ;
200 }
201
202 static int osc_getattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body) {
214                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
215                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
216                                      aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
296                              &body->oa);
297
298         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
341                              &body->oa);
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_setattr_args *sa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
364                              &body->oa);
365 out:
366         rc = sa->sa_upcall(sa->sa_cookie, rc);
367         RETURN(rc);
368 }
369
370 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
371                            struct obd_trans_info *oti,
372                            obd_enqueue_update_f upcall, void *cookie,
373                            struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request   *req;
376         struct osc_setattr_args *sa;
377         int                      rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
392                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
393
394         osc_pack_req_body(req, oinfo);
395
396         ptlrpc_request_set_replen(req);
397
398         /* do mds to ost setattr asynchronously */
399         if (!rqset) {
400                 /* Do not wait for response. */
401                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
402         } else {
403                 req->rq_interpret_reply =
404                         (ptlrpc_interpterer_t)osc_setattr_interpret;
405
406                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
407                 sa = ptlrpc_req_async_args(req);
408                 sa->sa_oa = oinfo->oi_oa;
409                 sa->sa_upcall = upcall;
410                 sa->sa_cookie = cookie;
411
412                 if (rqset == PTLRPCD_SET)
413                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
414                 else
415                         ptlrpc_set_add_req(rqset, req);
416         }
417
418         RETURN(0);
419 }
420
421 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
422                              struct obd_trans_info *oti,
423                              struct ptlrpc_request_set *rqset)
424 {
425         return osc_setattr_async_base(exp, oinfo, oti,
426                                       oinfo->oi_cb_up, oinfo, rqset);
427 }
428
429 int osc_real_create(struct obd_export *exp, struct obdo *oa,
430                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
431 {
432         struct ptlrpc_request *req;
433         struct ost_body       *body;
434         struct lov_stripe_md  *lsm;
435         int                    rc;
436         ENTRY;
437
438         LASSERT(oa);
439         LASSERT(ea);
440
441         lsm = *ea;
442         if (!lsm) {
443                 rc = obd_alloc_memmd(exp, &lsm);
444                 if (rc < 0)
445                         RETURN(rc);
446         }
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
449         if (req == NULL)
450                 GOTO(out, rc = -ENOMEM);
451
452         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
453         if (rc) {
454                 ptlrpc_request_free(req);
455                 GOTO(out, rc);
456         }
457
458         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
459         LASSERT(body);
460
461         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
462
463         ptlrpc_request_set_replen(req);
464
465         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
466             oa->o_flags == OBD_FL_DELORPHAN) {
467                 DEBUG_REQ(D_HA, req,
468                           "delorphan from OST integration");
469                 /* Don't resend the delorphan req */
470                 req->rq_no_resend = req->rq_no_delay = 1;
471         }
472
473         rc = ptlrpc_queue_wait(req);
474         if (rc)
475                 GOTO(out_req, rc);
476
477         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
478         if (body == NULL)
479                 GOTO(out_req, rc = -EPROTO);
480
481         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
482         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
483
484         oa->o_blksize = cli_brw_size(exp->exp_obd);
485         oa->o_valid |= OBD_MD_FLBLKSZ;
486
487         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
488          * have valid lsm_oinfo data structs, so don't go touching that.
489          * This needs to be fixed in a big way.
490          */
491         lsm->lsm_oi = oa->o_oi;
492         *ea = lsm;
493
494         if (oti != NULL) {
495                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
496
497                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
498                         if (!oti->oti_logcookies)
499                                 oti_alloc_cookies(oti, 1);
500                         *oti->oti_logcookies = oa->o_lcookie;
501                 }
502         }
503
504         CDEBUG(D_HA, "transno: "LPD64"\n",
505                lustre_msg_get_transno(req->rq_repmsg));
506 out_req:
507         ptlrpc_req_finished(req);
508 out:
509         if (rc && !*ea)
510                 obd_free_memmd(exp, &lsm);
511         RETURN(rc);
512 }
513
514 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
515                    obd_enqueue_update_f upcall, void *cookie,
516                    struct ptlrpc_request_set *rqset)
517 {
518         struct ptlrpc_request   *req;
519         struct osc_setattr_args *sa;
520         struct ost_body         *body;
521         int                      rc;
522         ENTRY;
523
524         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
525         if (req == NULL)
526                 RETURN(-ENOMEM);
527
528         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
529         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
530         if (rc) {
531                 ptlrpc_request_free(req);
532                 RETURN(rc);
533         }
534         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
535         ptlrpc_at_set_req_timeout(req);
536
537         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
538         LASSERT(body);
539         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
540                              oinfo->oi_oa);
541         osc_pack_capa(req, body, oinfo->oi_capa);
542
543         ptlrpc_request_set_replen(req);
544
545         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
546         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
547         sa = ptlrpc_req_async_args(req);
548         sa->sa_oa     = oinfo->oi_oa;
549         sa->sa_upcall = upcall;
550         sa->sa_cookie = cookie;
551         if (rqset == PTLRPCD_SET)
552                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
553         else
554                 ptlrpc_set_add_req(rqset, req);
555
556         RETURN(0);
557 }
558
559 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
560                      struct obd_info *oinfo, struct obd_trans_info *oti,
561                      struct ptlrpc_request_set *rqset)
562 {
563         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
564         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
565         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
566         return osc_punch_base(exp, oinfo,
567                               oinfo->oi_cb_up, oinfo, rqset);
568 }
569
570 static int osc_sync_interpret(const struct lu_env *env,
571                               struct ptlrpc_request *req,
572                               void *arg, int rc)
573 {
574         struct osc_fsync_args *fa = arg;
575         struct ost_body *body;
576         ENTRY;
577
578         if (rc)
579                 GOTO(out, rc);
580
581         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
582         if (body == NULL) {
583                 CERROR ("can't unpack ost_body\n");
584                 GOTO(out, rc = -EPROTO);
585         }
586
587         *fa->fa_oi->oi_oa = body->oa;
588 out:
589         rc = fa->fa_upcall(fa->fa_cookie, rc);
590         RETURN(rc);
591 }
592
593 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
594                   obd_enqueue_update_f upcall, void *cookie,
595                   struct ptlrpc_request_set *rqset)
596 {
597         struct ptlrpc_request *req;
598         struct ost_body       *body;
599         struct osc_fsync_args *fa;
600         int                    rc;
601         ENTRY;
602
603         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
604         if (req == NULL)
605                 RETURN(-ENOMEM);
606
607         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
608         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
609         if (rc) {
610                 ptlrpc_request_free(req);
611                 RETURN(rc);
612         }
613
614         /* overload the size and blocks fields in the oa with start/end */
615         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
616         LASSERT(body);
617         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
618                              oinfo->oi_oa);
619         osc_pack_capa(req, body, oinfo->oi_capa);
620
621         ptlrpc_request_set_replen(req);
622         req->rq_interpret_reply = osc_sync_interpret;
623
624         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
625         fa = ptlrpc_req_async_args(req);
626         fa->fa_oi = oinfo;
627         fa->fa_upcall = upcall;
628         fa->fa_cookie = cookie;
629
630         if (rqset == PTLRPCD_SET)
631                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
632         else
633                 ptlrpc_set_add_req(rqset, req);
634
635         RETURN (0);
636 }
637
638 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
639                     struct obd_info *oinfo, obd_size start, obd_size end,
640                     struct ptlrpc_request_set *set)
641 {
642         ENTRY;
643
644         if (!oinfo->oi_oa) {
645                 CDEBUG(D_INFO, "oa NULL\n");
646                 RETURN(-EINVAL);
647         }
648
649         oinfo->oi_oa->o_size = start;
650         oinfo->oi_oa->o_blocks = end;
651         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
652
653         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
654 }
655
656 /* Find and cancel locally locks matched by @mode in the resource found by
657  * @objid. Found locks are added into @cancel list. Returns the amount of
658  * locks added to @cancels list. */
659 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
660                                    cfs_list_t *cancels,
661                                    ldlm_mode_t mode, __u64 lock_flags)
662 {
663         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
664         struct ldlm_res_id res_id;
665         struct ldlm_resource *res;
666         int count;
667         ENTRY;
668
669         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
670          * export) but disabled through procfs (flag in NS).
671          *
672          * This distinguishes from a case when ELC is not supported originally,
673          * when we still want to cancel locks in advance and just cancel them
674          * locally, without sending any RPC. */
675         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676                 RETURN(0);
677
678         ostid_build_res_name(&oa->o_oi, &res_id);
679         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680         if (res == NULL)
681                 RETURN(0);
682
683         LDLM_RESOURCE_ADDREF(res);
684         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685                                            lock_flags, 0, NULL);
686         LDLM_RESOURCE_DELREF(res);
687         ldlm_resource_putref(res);
688         RETURN(count);
689 }
690
691 static int osc_destroy_interpret(const struct lu_env *env,
692                                  struct ptlrpc_request *req, void *data,
693                                  int rc)
694 {
695         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696
697         atomic_dec(&cli->cl_destroy_in_flight);
698         wake_up(&cli->cl_destroy_waitq);
699         return 0;
700 }
701
702 static int osc_can_send_destroy(struct client_obd *cli)
703 {
704         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705             cli->cl_max_rpcs_in_flight) {
706                 /* The destroy request can be sent */
707                 return 1;
708         }
709         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710             cli->cl_max_rpcs_in_flight) {
711                 /*
712                  * The counter has been modified between the two atomic
713                  * operations.
714                  */
715                 wake_up(&cli->cl_destroy_waitq);
716         }
717         return 0;
718 }
719
720 int osc_create(const struct lu_env *env, struct obd_export *exp,
721                struct obdo *oa, struct lov_stripe_md **ea,
722                struct obd_trans_info *oti)
723 {
724         int rc = 0;
725         ENTRY;
726
727         LASSERT(oa);
728         LASSERT(ea);
729         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
730
731         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
732             oa->o_flags == OBD_FL_RECREATE_OBJS) {
733                 RETURN(osc_real_create(exp, oa, ea, oti));
734         }
735
736         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
737                 RETURN(osc_real_create(exp, oa, ea, oti));
738
739         /* we should not get here anymore */
740         LBUG();
741
742         RETURN(rc);
743 }
744
745 /* Destroy requests can be async always on the client, and we don't even really
746  * care about the return code since the client cannot do anything at all about
747  * a destroy failure.
748  * When the MDS is unlinking a filename, it saves the file objects into a
749  * recovery llog, and these object records are cancelled when the OST reports
750  * they were destroyed and sync'd to disk (i.e. transaction committed).
751  * If the client dies, or the OST is down when the object should be destroyed,
752  * the records are not cancelled, and when the OST reconnects to the MDS next,
753  * it will retrieve the llog unlink logs and then sends the log cancellation
754  * cookies to the MDS after committing destroy transactions. */
755 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
756                        struct obdo *oa, struct lov_stripe_md *ea,
757                        struct obd_trans_info *oti, struct obd_export *md_export,
758                        void *capa)
759 {
760         struct client_obd     *cli = &exp->exp_obd->u.cli;
761         struct ptlrpc_request *req;
762         struct ost_body       *body;
763         CFS_LIST_HEAD(cancels);
764         int rc, count;
765         ENTRY;
766
767         if (!oa) {
768                 CDEBUG(D_INFO, "oa NULL\n");
769                 RETURN(-EINVAL);
770         }
771
772         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
773                                         LDLM_FL_DISCARD_DATA);
774
775         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
776         if (req == NULL) {
777                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
778                 RETURN(-ENOMEM);
779         }
780
781         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
782         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
783                                0, &cancels, count);
784         if (rc) {
785                 ptlrpc_request_free(req);
786                 RETURN(rc);
787         }
788
789         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
790         ptlrpc_at_set_req_timeout(req);
791
792         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
793                 oa->o_lcookie = *oti->oti_logcookies;
794         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
795         LASSERT(body);
796         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
797
798         osc_pack_capa(req, body, (struct obd_capa *)capa);
799         ptlrpc_request_set_replen(req);
800
801         /* If osc_destory is for destroying the unlink orphan,
802          * sent from MDT to OST, which should not be blocked here,
803          * because the process might be triggered by ptlrpcd, and
804          * it is not good to block ptlrpcd thread (b=16006)*/
805         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
806                 req->rq_interpret_reply = osc_destroy_interpret;
807                 if (!osc_can_send_destroy(cli)) {
808                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
809                                                           NULL);
810
811                         /*
812                          * Wait until the number of on-going destroy RPCs drops
813                          * under max_rpc_in_flight
814                          */
815                         l_wait_event_exclusive(cli->cl_destroy_waitq,
816                                                osc_can_send_destroy(cli), &lwi);
817                 }
818         }
819
820         /* Do not wait for response */
821         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
822         RETURN(0);
823 }
824
825 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
826                                 long writing_bytes)
827 {
828         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
829
830         LASSERT(!(oa->o_valid & bits));
831
832         oa->o_valid |= bits;
833         client_obd_list_lock(&cli->cl_loi_list_lock);
834         oa->o_dirty = cli->cl_dirty;
835         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
836                      cli->cl_dirty_max)) {
837                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
838                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
839                 oa->o_undirty = 0;
840         } else if (unlikely(atomic_read(&obd_unstable_pages) +
841                             atomic_read(&obd_dirty_pages) -
842                             atomic_read(&obd_dirty_transit_pages) >
843                             (long)(obd_max_dirty_pages + 1))) {
844                 /* The atomic_read() allowing the atomic_inc() are
845                  * not covered by a lock thus they may safely race and trip
846                  * this CERROR() unless we add in a small fudge factor (+1). */
847                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
848                        cli->cl_import->imp_obd->obd_name,
849                        atomic_read(&obd_unstable_pages),
850                        atomic_read(&obd_dirty_pages),
851                        atomic_read(&obd_dirty_transit_pages),
852                        obd_max_dirty_pages);
853                 oa->o_undirty = 0;
854         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
855                 CERROR("dirty %lu - dirty_max %lu too big???\n",
856                        cli->cl_dirty, cli->cl_dirty_max);
857                 oa->o_undirty = 0;
858         } else {
859                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
860                                       PAGE_CACHE_SHIFT) *
861                                      (cli->cl_max_rpcs_in_flight + 1);
862                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
863         }
864         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
865         oa->o_dropped = cli->cl_lost_grant;
866         cli->cl_lost_grant = 0;
867         client_obd_list_unlock(&cli->cl_loi_list_lock);
868         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
869                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
870
871 }
872
873 void osc_update_next_shrink(struct client_obd *cli)
874 {
875         cli->cl_next_shrink_grant =
876                 cfs_time_shift(cli->cl_grant_shrink_interval);
877         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
878                cli->cl_next_shrink_grant);
879 }
880
881 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
882 {
883         client_obd_list_lock(&cli->cl_loi_list_lock);
884         cli->cl_avail_grant += grant;
885         client_obd_list_unlock(&cli->cl_loi_list_lock);
886 }
887
888 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
889 {
890         if (body->oa.o_valid & OBD_MD_FLGRANT) {
891                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
892                 __osc_update_grant(cli, body->oa.o_grant);
893         }
894 }
895
896 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
897                               obd_count keylen, void *key, obd_count vallen,
898                               void *val, struct ptlrpc_request_set *set);
899
900 static int osc_shrink_grant_interpret(const struct lu_env *env,
901                                       struct ptlrpc_request *req,
902                                       void *aa, int rc)
903 {
904         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
905         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
906         struct ost_body *body;
907
908         if (rc != 0) {
909                 __osc_update_grant(cli, oa->o_grant);
910                 GOTO(out, rc);
911         }
912
913         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
914         LASSERT(body);
915         osc_update_grant(cli, body);
916 out:
917         OBDO_FREE(oa);
918         return rc;
919 }
920
921 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
922 {
923         client_obd_list_lock(&cli->cl_loi_list_lock);
924         oa->o_grant = cli->cl_avail_grant / 4;
925         cli->cl_avail_grant -= oa->o_grant;
926         client_obd_list_unlock(&cli->cl_loi_list_lock);
927         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
928                 oa->o_valid |= OBD_MD_FLFLAGS;
929                 oa->o_flags = 0;
930         }
931         oa->o_flags |= OBD_FL_SHRINK_GRANT;
932         osc_update_next_shrink(cli);
933 }
934
935 /* Shrink the current grant, either from some large amount to enough for a
936  * full set of in-flight RPCs, or if we have already shrunk to that limit
937  * then to enough for a single RPC.  This avoids keeping more grant than
938  * needed, and avoids shrinking the grant piecemeal. */
939 static int osc_shrink_grant(struct client_obd *cli)
940 {
941         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
942                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
943
944         client_obd_list_lock(&cli->cl_loi_list_lock);
945         if (cli->cl_avail_grant <= target_bytes)
946                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
947         client_obd_list_unlock(&cli->cl_loi_list_lock);
948
949         return osc_shrink_grant_to_target(cli, target_bytes);
950 }
951
952 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
953 {
954         int                     rc = 0;
955         struct ost_body        *body;
956         ENTRY;
957
958         client_obd_list_lock(&cli->cl_loi_list_lock);
959         /* Don't shrink if we are already above or below the desired limit
960          * We don't want to shrink below a single RPC, as that will negatively
961          * impact block allocation and long-term performance. */
962         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
963                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
964
965         if (target_bytes >= cli->cl_avail_grant) {
966                 client_obd_list_unlock(&cli->cl_loi_list_lock);
967                 RETURN(0);
968         }
969         client_obd_list_unlock(&cli->cl_loi_list_lock);
970
971         OBD_ALLOC_PTR(body);
972         if (!body)
973                 RETURN(-ENOMEM);
974
975         osc_announce_cached(cli, &body->oa, 0);
976
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
979         cli->cl_avail_grant = target_bytes;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
982                 body->oa.o_valid |= OBD_MD_FLFLAGS;
983                 body->oa.o_flags = 0;
984         }
985         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
986         osc_update_next_shrink(cli);
987
988         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
989                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
990                                 sizeof(*body), body, NULL);
991         if (rc != 0)
992                 __osc_update_grant(cli, body->oa.o_grant);
993         OBD_FREE_PTR(body);
994         RETURN(rc);
995 }
996
997 static int osc_should_shrink_grant(struct client_obd *client)
998 {
999         cfs_time_t time = cfs_time_current();
1000         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1001
1002         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1003              OBD_CONNECT_GRANT_SHRINK) == 0)
1004                 return 0;
1005
1006         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1007                 /* Get the current RPC size directly, instead of going via:
1008                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1009                  * Keep comment here so that it can be found by searching. */
1010                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1011
1012                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1013                     client->cl_avail_grant > brw_size)
1014                         return 1;
1015                 else
1016                         osc_update_next_shrink(client);
1017         }
1018         return 0;
1019 }
1020
1021 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1022 {
1023         struct client_obd *client;
1024
1025         cfs_list_for_each_entry(client, &item->ti_obd_list,
1026                                 cl_grant_shrink_list) {
1027                 if (osc_should_shrink_grant(client))
1028                         osc_shrink_grant(client);
1029         }
1030         return 0;
1031 }
1032
1033 static int osc_add_shrink_grant(struct client_obd *client)
1034 {
1035         int rc;
1036
1037         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1038                                        TIMEOUT_GRANT,
1039                                        osc_grant_shrink_grant_cb, NULL,
1040                                        &client->cl_grant_shrink_list);
1041         if (rc) {
1042                 CERROR("add grant client %s error %d\n",
1043                         client->cl_import->imp_obd->obd_name, rc);
1044                 return rc;
1045         }
1046         CDEBUG(D_CACHE, "add grant client %s \n",
1047                client->cl_import->imp_obd->obd_name);
1048         osc_update_next_shrink(client);
1049         return 0;
1050 }
1051
1052 static int osc_del_shrink_grant(struct client_obd *client)
1053 {
1054         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1055                                          TIMEOUT_GRANT);
1056 }
1057
1058 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1059 {
1060         /*
1061          * ocd_grant is the total grant amount we're expect to hold: if we've
1062          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1063          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1064          *
1065          * race is tolerable here: if we're evicted, but imp_state already
1066          * left EVICTED state, then cl_dirty must be 0 already.
1067          */
1068         client_obd_list_lock(&cli->cl_loi_list_lock);
1069         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1070                 cli->cl_avail_grant = ocd->ocd_grant;
1071         else
1072                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1073
1074         if (cli->cl_avail_grant < 0) {
1075                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1076                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1077                       ocd->ocd_grant, cli->cl_dirty);
1078                 /* workaround for servers which do not have the patch from
1079                  * LU-2679 */
1080                 cli->cl_avail_grant = ocd->ocd_grant;
1081         }
1082
1083         /* determine the appropriate chunk size used by osc_extent. */
1084         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1085         client_obd_list_unlock(&cli->cl_loi_list_lock);
1086
1087         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1088                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1089                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1090
1091         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1092             cfs_list_empty(&cli->cl_grant_shrink_list))
1093                 osc_add_shrink_grant(cli);
1094 }
1095
1096 /* We assume that the reason this OSC got a short read is because it read
1097  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1098  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1099  * this stripe never got written at or beyond this stripe offset yet. */
1100 static void handle_short_read(int nob_read, obd_count page_count,
1101                               struct brw_page **pga)
1102 {
1103         char *ptr;
1104         int i = 0;
1105
1106         /* skip bytes read OK */
1107         while (nob_read > 0) {
1108                 LASSERT (page_count > 0);
1109
1110                 if (pga[i]->count > nob_read) {
1111                         /* EOF inside this page */
1112                         ptr = kmap(pga[i]->pg) +
1113                                 (pga[i]->off & ~CFS_PAGE_MASK);
1114                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1115                         kunmap(pga[i]->pg);
1116                         page_count--;
1117                         i++;
1118                         break;
1119                 }
1120
1121                 nob_read -= pga[i]->count;
1122                 page_count--;
1123                 i++;
1124         }
1125
1126         /* zero remaining pages */
1127         while (page_count-- > 0) {
1128                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1129                 memset(ptr, 0, pga[i]->count);
1130                 kunmap(pga[i]->pg);
1131                 i++;
1132         }
1133 }
1134
1135 static int check_write_rcs(struct ptlrpc_request *req,
1136                            int requested_nob, int niocount,
1137                            obd_count page_count, struct brw_page **pga)
1138 {
1139         int     i;
1140         __u32   *remote_rcs;
1141
1142         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1143                                                   sizeof(*remote_rcs) *
1144                                                   niocount);
1145         if (remote_rcs == NULL) {
1146                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1147                 return(-EPROTO);
1148         }
1149
1150         /* return error if any niobuf was in error */
1151         for (i = 0; i < niocount; i++) {
1152                 if ((int)remote_rcs[i] < 0)
1153                         return(remote_rcs[i]);
1154
1155                 if (remote_rcs[i] != 0) {
1156                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1157                                 i, remote_rcs[i], req);
1158                         return(-EPROTO);
1159                 }
1160         }
1161
1162         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1163                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1164                        req->rq_bulk->bd_nob_transferred, requested_nob);
1165                 return(-EPROTO);
1166         }
1167
1168         return (0);
1169 }
1170
1171 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1172 {
1173         if (p1->flag != p2->flag) {
1174                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1175                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1176                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1177
1178                 /* warn if we try to combine flags that we don't know to be
1179                  * safe to combine */
1180                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1181                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1182                               "report this at http://bugs.whamcloud.com/\n",
1183                               p1->flag, p2->flag);
1184                 }
1185                 return 0;
1186         }
1187
1188         return (p1->off + p1->count == p2->off);
1189 }
1190
1191 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1192                                    struct brw_page **pga, int opc,
1193                                    cksum_type_t cksum_type)
1194 {
1195         __u32                           cksum;
1196         int                             i = 0;
1197         struct cfs_crypto_hash_desc     *hdesc;
1198         unsigned int                    bufsize;
1199         int                             err;
1200         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1201
1202         LASSERT(pg_count > 0);
1203
1204         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1205         if (IS_ERR(hdesc)) {
1206                 CERROR("Unable to initialize checksum hash %s\n",
1207                        cfs_crypto_hash_name(cfs_alg));
1208                 return PTR_ERR(hdesc);
1209         }
1210
1211         while (nob > 0 && pg_count > 0) {
1212                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1213
1214                 /* corrupt the data before we compute the checksum, to
1215                  * simulate an OST->client data error */
1216                 if (i == 0 && opc == OST_READ &&
1217                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1218                         unsigned char *ptr = kmap(pga[i]->pg);
1219                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1220                         memcpy(ptr + off, "bad1", min(4, nob));
1221                         kunmap(pga[i]->pg);
1222                 }
1223                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1224                                   pga[i]->off & ~CFS_PAGE_MASK,
1225                                   count);
1226                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1227                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1228
1229                 nob -= pga[i]->count;
1230                 pg_count--;
1231                 i++;
1232         }
1233
1234         bufsize = 4;
1235         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1236
1237         if (err)
1238                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1239
1240         /* For sending we only compute the wrong checksum instead
1241          * of corrupting the data so it is still correct on a redo */
1242         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1243                 cksum++;
1244
1245         return cksum;
1246 }
1247
1248 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1249                                 struct lov_stripe_md *lsm, obd_count page_count,
1250                                 struct brw_page **pga,
1251                                 struct ptlrpc_request **reqp,
1252                                 struct obd_capa *ocapa, int reserve,
1253                                 int resend)
1254 {
1255         struct ptlrpc_request   *req;
1256         struct ptlrpc_bulk_desc *desc;
1257         struct ost_body         *body;
1258         struct obd_ioobj        *ioobj;
1259         struct niobuf_remote    *niobuf;
1260         int niocount, i, requested_nob, opc, rc;
1261         struct osc_brw_async_args *aa;
1262         struct req_capsule      *pill;
1263         struct brw_page *pg_prev;
1264
1265         ENTRY;
1266         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1267                 RETURN(-ENOMEM); /* Recoverable */
1268         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1269                 RETURN(-EINVAL); /* Fatal */
1270
1271         if ((cmd & OBD_BRW_WRITE) != 0) {
1272                 opc = OST_WRITE;
1273                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1274                                                 cli->cl_import->imp_rq_pool,
1275                                                 &RQF_OST_BRW_WRITE);
1276         } else {
1277                 opc = OST_READ;
1278                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1279         }
1280         if (req == NULL)
1281                 RETURN(-ENOMEM);
1282
1283         for (niocount = i = 1; i < page_count; i++) {
1284                 if (!can_merge_pages(pga[i - 1], pga[i]))
1285                         niocount++;
1286         }
1287
1288         pill = &req->rq_pill;
1289         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1290                              sizeof(*ioobj));
1291         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1292                              niocount * sizeof(*niobuf));
1293         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1294
1295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1296         if (rc) {
1297                 ptlrpc_request_free(req);
1298                 RETURN(rc);
1299         }
1300         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1301         ptlrpc_at_set_req_timeout(req);
1302         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1303          * retry logic */
1304         req->rq_no_retry_einprogress = 1;
1305
1306         desc = ptlrpc_prep_bulk_imp(req, page_count,
1307                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1308                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1309                 OST_BULK_PORTAL);
1310
1311         if (desc == NULL)
1312                 GOTO(out, rc = -ENOMEM);
1313         /* NB request now owns desc and will free it when it gets freed */
1314
1315         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1316         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1317         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1318         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1319
1320         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1321
1322         obdo_to_ioobj(oa, ioobj);
1323         ioobj->ioo_bufcnt = niocount;
1324         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1325          * that might be send for this request.  The actual number is decided
1326          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1327          * "max - 1" for old client compatibility sending "0", and also so the
1328          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1329         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1330         osc_pack_capa(req, body, ocapa);
1331         LASSERT(page_count > 0);
1332         pg_prev = pga[0];
1333         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1334                 struct brw_page *pg = pga[i];
1335                 int poff = pg->off & ~CFS_PAGE_MASK;
1336
1337                 LASSERT(pg->count > 0);
1338                 /* make sure there is no gap in the middle of page array */
1339                 LASSERTF(page_count == 1 ||
1340                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1341                           ergo(i > 0 && i < page_count - 1,
1342                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1343                           ergo(i == page_count - 1, poff == 0)),
1344                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1345                          i, page_count, pg, pg->off, pg->count);
1346 #ifdef __linux__
1347                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1348                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1349                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1350                          i, page_count,
1351                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1352                          pg_prev->pg, page_private(pg_prev->pg),
1353                          pg_prev->pg->index, pg_prev->off);
1354 #else
1355                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1356                          "i %d p_c %u\n", i, page_count);
1357 #endif
1358                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1359                         (pg->flag & OBD_BRW_SRVLOCK));
1360
1361                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1362                 requested_nob += pg->count;
1363
1364                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1365                         niobuf--;
1366                         niobuf->len += pg->count;
1367                 } else {
1368                         niobuf->offset = pg->off;
1369                         niobuf->len    = pg->count;
1370                         niobuf->flags  = pg->flag;
1371                 }
1372                 pg_prev = pg;
1373         }
1374
1375         LASSERTF((void *)(niobuf - niocount) ==
1376                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1377                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1378                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1379
1380         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1381         if (resend) {
1382                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1383                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1384                         body->oa.o_flags = 0;
1385                 }
1386                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1387         }
1388
1389         if (osc_should_shrink_grant(cli))
1390                 osc_shrink_grant_local(cli, &body->oa);
1391
1392         /* size[REQ_REC_OFF] still sizeof (*body) */
1393         if (opc == OST_WRITE) {
1394                 if (cli->cl_checksum &&
1395                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1396                         /* store cl_cksum_type in a local variable since
1397                          * it can be changed via lprocfs */
1398                         cksum_type_t cksum_type = cli->cl_cksum_type;
1399
1400                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1402                                 body->oa.o_flags = 0;
1403                         }
1404                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1405                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1406                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1407                                                              page_count, pga,
1408                                                              OST_WRITE,
1409                                                              cksum_type);
1410                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1411                                body->oa.o_cksum);
1412                         /* save this in 'oa', too, for later checking */
1413                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1414                         oa->o_flags |= cksum_type_pack(cksum_type);
1415                 } else {
1416                         /* clear out the checksum flag, in case this is a
1417                          * resend but cl_checksum is no longer set. b=11238 */
1418                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1419                 }
1420                 oa->o_cksum = body->oa.o_cksum;
1421                 /* 1 RC per niobuf */
1422                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1423                                      sizeof(__u32) * niocount);
1424         } else {
1425                 if (cli->cl_checksum &&
1426                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1427                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1428                                 body->oa.o_flags = 0;
1429                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1430                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431                 }
1432         }
1433         ptlrpc_request_set_replen(req);
1434
1435         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1436         aa = ptlrpc_req_async_args(req);
1437         aa->aa_oa = oa;
1438         aa->aa_requested_nob = requested_nob;
1439         aa->aa_nio_count = niocount;
1440         aa->aa_page_count = page_count;
1441         aa->aa_resends = 0;
1442         aa->aa_ppga = pga;
1443         aa->aa_cli = cli;
1444         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1445         if (ocapa && reserve)
1446                 aa->aa_ocapa = capa_get(ocapa);
1447
1448         *reqp = req;
1449         RETURN(0);
1450
1451  out:
1452         ptlrpc_req_finished(req);
1453         RETURN(rc);
1454 }
1455
1456 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1457                                 __u32 client_cksum, __u32 server_cksum, int nob,
1458                                 obd_count page_count, struct brw_page **pga,
1459                                 cksum_type_t client_cksum_type)
1460 {
1461         __u32 new_cksum;
1462         char *msg;
1463         cksum_type_t cksum_type;
1464
1465         if (server_cksum == client_cksum) {
1466                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1467                 return 0;
1468         }
1469
1470         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1471                                        oa->o_flags : 0);
1472         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1473                                       cksum_type);
1474
1475         if (cksum_type != client_cksum_type)
1476                 msg = "the server did not use the checksum type specified in "
1477                       "the original request - likely a protocol problem";
1478         else if (new_cksum == server_cksum)
1479                 msg = "changed on the client after we checksummed it - "
1480                       "likely false positive due to mmap IO (bug 11742)";
1481         else if (new_cksum == client_cksum)
1482                 msg = "changed in transit before arrival at OST";
1483         else
1484                 msg = "changed in transit AND doesn't match the original - "
1485                       "likely false positive due to mmap IO (bug 11742)";
1486
1487         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1488                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1489                            msg, libcfs_nid2str(peer->nid),
1490                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1491                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1492                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1493                            POSTID(&oa->o_oi), pga[0]->off,
1494                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1495         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1496                "client csum now %x\n", client_cksum, client_cksum_type,
1497                server_cksum, cksum_type, new_cksum);
1498         return 1;
1499 }
1500
1501 /* Note rc enters this function as number of bytes transferred */
1502 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1503 {
1504         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1505         const lnet_process_id_t *peer =
1506                         &req->rq_import->imp_connection->c_peer;
1507         struct client_obd *cli = aa->aa_cli;
1508         struct ost_body *body;
1509         __u32 client_cksum = 0;
1510         ENTRY;
1511
1512         if (rc < 0 && rc != -EDQUOT) {
1513                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1514                 RETURN(rc);
1515         }
1516
1517         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1518         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1519         if (body == NULL) {
1520                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1521                 RETURN(-EPROTO);
1522         }
1523
1524         /* set/clear over quota flag for a uid/gid */
1525         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1526             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1527                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1528
1529                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1530                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1531                        body->oa.o_flags);
1532                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1533         }
1534
1535         osc_update_grant(cli, body);
1536
1537         if (rc < 0)
1538                 RETURN(rc);
1539
1540         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1541                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1542
1543         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1544                 if (rc > 0) {
1545                         CERROR("Unexpected +ve rc %d\n", rc);
1546                         RETURN(-EPROTO);
1547                 }
1548                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1549
1550                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1551                         RETURN(-EAGAIN);
1552
1553                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1554                     check_write_checksum(&body->oa, peer, client_cksum,
1555                                          body->oa.o_cksum, aa->aa_requested_nob,
1556                                          aa->aa_page_count, aa->aa_ppga,
1557                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1558                         RETURN(-EAGAIN);
1559
1560                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1561                                      aa->aa_page_count, aa->aa_ppga);
1562                 GOTO(out, rc);
1563         }
1564
1565         /* The rest of this function executes only for OST_READs */
1566
1567         /* if unwrap_bulk failed, return -EAGAIN to retry */
1568         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1569         if (rc < 0)
1570                 GOTO(out, rc = -EAGAIN);
1571
1572         if (rc > aa->aa_requested_nob) {
1573                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1574                        aa->aa_requested_nob);
1575                 RETURN(-EPROTO);
1576         }
1577
1578         if (rc != req->rq_bulk->bd_nob_transferred) {
1579                 CERROR ("Unexpected rc %d (%d transferred)\n",
1580                         rc, req->rq_bulk->bd_nob_transferred);
1581                 return (-EPROTO);
1582         }
1583
1584         if (rc < aa->aa_requested_nob)
1585                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1586
1587         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1588                 static int cksum_counter;
1589                 __u32      server_cksum = body->oa.o_cksum;
1590                 char      *via;
1591                 char      *router;
1592                 cksum_type_t cksum_type;
1593
1594                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1595                                                body->oa.o_flags : 0);
1596                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1597                                                  aa->aa_ppga, OST_READ,
1598                                                  cksum_type);
1599
1600                 if (peer->nid == req->rq_bulk->bd_sender) {
1601                         via = router = "";
1602                 } else {
1603                         via = " via ";
1604                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1605                 }
1606
1607                 if (server_cksum == ~0 && rc > 0) {
1608                         CERROR("Protocol error: server %s set the 'checksum' "
1609                                "bit, but didn't send a checksum.  Not fatal, "
1610                                "but please notify on http://bugs.whamcloud.com/\n",
1611                                libcfs_nid2str(peer->nid));
1612                 } else if (server_cksum != client_cksum) {
1613                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1614                                            "%s%s%s inode "DFID" object "DOSTID
1615                                            " extent ["LPU64"-"LPU64"]\n",
1616                                            req->rq_import->imp_obd->obd_name,
1617                                            libcfs_nid2str(peer->nid),
1618                                            via, router,
1619                                            body->oa.o_valid & OBD_MD_FLFID ?
1620                                                 body->oa.o_parent_seq : (__u64)0,
1621                                            body->oa.o_valid & OBD_MD_FLFID ?
1622                                                 body->oa.o_parent_oid : 0,
1623                                            body->oa.o_valid & OBD_MD_FLFID ?
1624                                                 body->oa.o_parent_ver : 0,
1625                                            POSTID(&body->oa.o_oi),
1626                                            aa->aa_ppga[0]->off,
1627                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1628                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1629                                                                         1);
1630                         CERROR("client %x, server %x, cksum_type %x\n",
1631                                client_cksum, server_cksum, cksum_type);
1632                         cksum_counter = 0;
1633                         aa->aa_oa->o_cksum = client_cksum;
1634                         rc = -EAGAIN;
1635                 } else {
1636                         cksum_counter++;
1637                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1638                         rc = 0;
1639                 }
1640         } else if (unlikely(client_cksum)) {
1641                 static int cksum_missed;
1642
1643                 cksum_missed++;
1644                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1645                         CERROR("Checksum %u requested from %s but not sent\n",
1646                                cksum_missed, libcfs_nid2str(peer->nid));
1647         } else {
1648                 rc = 0;
1649         }
1650 out:
1651         if (rc >= 0)
1652                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1653                                      aa->aa_oa, &body->oa);
1654
1655         RETURN(rc);
1656 }
1657
1658 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1659                             struct lov_stripe_md *lsm,
1660                             obd_count page_count, struct brw_page **pga,
1661                             struct obd_capa *ocapa)
1662 {
1663         struct ptlrpc_request *req;
1664         int                    rc;
1665         wait_queue_head_t            waitq;
1666         int                    generation, resends = 0;
1667         struct l_wait_info     lwi;
1668
1669         ENTRY;
1670
1671         init_waitqueue_head(&waitq);
1672         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1673
1674 restart_bulk:
1675         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1676                                   page_count, pga, &req, ocapa, 0, resends);
1677         if (rc != 0)
1678                 return (rc);
1679
1680         if (resends) {
1681                 req->rq_generation_set = 1;
1682                 req->rq_import_generation = generation;
1683                 req->rq_sent = cfs_time_current_sec() + resends;
1684         }
1685
1686         rc = ptlrpc_queue_wait(req);
1687
1688         if (rc == -ETIMEDOUT && req->rq_resend) {
1689                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1690                 ptlrpc_req_finished(req);
1691                 goto restart_bulk;
1692         }
1693
1694         rc = osc_brw_fini_request(req, rc);
1695
1696         ptlrpc_req_finished(req);
1697         /* When server return -EINPROGRESS, client should always retry
1698          * regardless of the number of times the bulk was resent already.*/
1699         if (osc_recoverable_error(rc)) {
1700                 resends++;
1701                 if (rc != -EINPROGRESS &&
1702                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1703                         CERROR("%s: too many resend retries for object: "
1704                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1705                                POSTID(&oa->o_oi), rc);
1706                         goto out;
1707                 }
1708                 if (generation !=
1709                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1710                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1711                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1712                                POSTID(&oa->o_oi), rc);
1713                         goto out;
1714                 }
1715
1716                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1717                                        NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722 out:
1723         if (rc == -EAGAIN || rc == -EINPROGRESS)
1724                 rc = -EIO;
1725         RETURN (rc);
1726 }
1727
1728 static int osc_brw_redo_request(struct ptlrpc_request *request,
1729                                 struct osc_brw_async_args *aa, int rc)
1730 {
1731         struct ptlrpc_request *new_req;
1732         struct osc_brw_async_args *new_aa;
1733         struct osc_async_page *oap;
1734         ENTRY;
1735
1736         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1737                   "redo for recoverable error %d", rc);
1738
1739         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1740                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1741                                   aa->aa_cli, aa->aa_oa,
1742                                   NULL /* lsm unused by osc currently */,
1743                                   aa->aa_page_count, aa->aa_ppga,
1744                                   &new_req, aa->aa_ocapa, 0, 1);
1745         if (rc)
1746                 RETURN(rc);
1747
1748         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1749                 if (oap->oap_request != NULL) {
1750                         LASSERTF(request == oap->oap_request,
1751                                  "request %p != oap_request %p\n",
1752                                  request, oap->oap_request);
1753                         if (oap->oap_interrupted) {
1754                                 ptlrpc_req_finished(new_req);
1755                                 RETURN(-EINTR);
1756                         }
1757                 }
1758         }
1759         /* New request takes over pga and oaps from old request.
1760          * Note that copying a list_head doesn't work, need to move it... */
1761         aa->aa_resends++;
1762         new_req->rq_interpret_reply = request->rq_interpret_reply;
1763         new_req->rq_async_args = request->rq_async_args;
1764         new_req->rq_commit_cb = request->rq_commit_cb;
1765         /* cap resend delay to the current request timeout, this is similar to
1766          * what ptlrpc does (see after_reply()) */
1767         if (aa->aa_resends > new_req->rq_timeout)
1768                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1769         else
1770                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1771         new_req->rq_generation_set = 1;
1772         new_req->rq_import_generation = request->rq_import_generation;
1773
1774         new_aa = ptlrpc_req_async_args(new_req);
1775
1776         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1777         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1778         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1779         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1780         new_aa->aa_resends = aa->aa_resends;
1781
1782         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1783                 if (oap->oap_request) {
1784                         ptlrpc_req_finished(oap->oap_request);
1785                         oap->oap_request = ptlrpc_request_addref(new_req);
1786                 }
1787         }
1788
1789         new_aa->aa_ocapa = aa->aa_ocapa;
1790         aa->aa_ocapa = NULL;
1791
1792         /* XXX: This code will run into problem if we're going to support
1793          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1794          * and wait for all of them to be finished. We should inherit request
1795          * set from old request. */
1796         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1797
1798         DEBUG_REQ(D_INFO, new_req, "new request");
1799         RETURN(0);
1800 }
1801
1802 /*
1803  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1804  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1805  * fine for our small page arrays and doesn't require allocation.  its an
1806  * insertion sort that swaps elements that are strides apart, shrinking the
1807  * stride down until its '1' and the array is sorted.
1808  */
1809 static void sort_brw_pages(struct brw_page **array, int num)
1810 {
1811         int stride, i, j;
1812         struct brw_page *tmp;
1813
1814         if (num == 1)
1815                 return;
1816         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1817                 ;
1818
1819         do {
1820                 stride /= 3;
1821                 for (i = stride ; i < num ; i++) {
1822                         tmp = array[i];
1823                         j = i;
1824                         while (j >= stride && array[j - stride]->off > tmp->off) {
1825                                 array[j] = array[j - stride];
1826                                 j -= stride;
1827                         }
1828                         array[j] = tmp;
1829                 }
1830         } while (stride > 1);
1831 }
1832
1833 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1834 {
1835         int count = 1;
1836         int offset;
1837         int i = 0;
1838
1839         LASSERT (pages > 0);
1840         offset = pg[i]->off & ~CFS_PAGE_MASK;
1841
1842         for (;;) {
1843                 pages--;
1844                 if (pages == 0)         /* that's all */
1845                         return count;
1846
1847                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1848                         return count;   /* doesn't end on page boundary */
1849
1850                 i++;
1851                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1852                 if (offset != 0)        /* doesn't start on page boundary */
1853                         return count;
1854
1855                 count++;
1856         }
1857 }
1858
1859 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1860 {
1861         struct brw_page **ppga;
1862         int i;
1863
1864         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1865         if (ppga == NULL)
1866                 return NULL;
1867
1868         for (i = 0; i < count; i++)
1869                 ppga[i] = pga + i;
1870         return ppga;
1871 }
1872
1873 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1874 {
1875         LASSERT(ppga != NULL);
1876         OBD_FREE(ppga, sizeof(*ppga) * count);
1877 }
1878
1879 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1880                    obd_count page_count, struct brw_page *pga,
1881                    struct obd_trans_info *oti)
1882 {
1883         struct obdo *saved_oa = NULL;
1884         struct brw_page **ppga, **orig;
1885         struct obd_import *imp = class_exp2cliimp(exp);
1886         struct client_obd *cli;
1887         int rc, page_count_orig;
1888         ENTRY;
1889
1890         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1891         cli = &imp->imp_obd->u.cli;
1892
1893         if (cmd & OBD_BRW_CHECK) {
1894                 /* The caller just wants to know if there's a chance that this
1895                  * I/O can succeed */
1896
1897                 if (imp->imp_invalid)
1898                         RETURN(-EIO);
1899                 RETURN(0);
1900         }
1901
1902         /* test_brw with a failed create can trip this, maybe others. */
1903         LASSERT(cli->cl_max_pages_per_rpc);
1904
1905         rc = 0;
1906
1907         orig = ppga = osc_build_ppga(pga, page_count);
1908         if (ppga == NULL)
1909                 RETURN(-ENOMEM);
1910         page_count_orig = page_count;
1911
1912         sort_brw_pages(ppga, page_count);
1913         while (page_count) {
1914                 obd_count pages_per_brw;
1915
1916                 if (page_count > cli->cl_max_pages_per_rpc)
1917                         pages_per_brw = cli->cl_max_pages_per_rpc;
1918                 else
1919                         pages_per_brw = page_count;
1920
1921                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1922
1923                 if (saved_oa != NULL) {
1924                         /* restore previously saved oa */
1925                         *oinfo->oi_oa = *saved_oa;
1926                 } else if (page_count > pages_per_brw) {
1927                         /* save a copy of oa (brw will clobber it) */
1928                         OBDO_ALLOC(saved_oa);
1929                         if (saved_oa == NULL)
1930                                 GOTO(out, rc = -ENOMEM);
1931                         *saved_oa = *oinfo->oi_oa;
1932                 }
1933
1934                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1935                                       pages_per_brw, ppga, oinfo->oi_capa);
1936
1937                 if (rc != 0)
1938                         break;
1939
1940                 page_count -= pages_per_brw;
1941                 ppga += pages_per_brw;
1942         }
1943
1944 out:
1945         osc_release_ppga(orig, page_count_orig);
1946
1947         if (saved_oa != NULL)
1948                 OBDO_FREE(saved_oa);
1949
1950         RETURN(rc);
1951 }
1952
1953 static int brw_interpret(const struct lu_env *env,
1954                          struct ptlrpc_request *req, void *data, int rc)
1955 {
1956         struct osc_brw_async_args *aa = data;
1957         struct osc_extent *ext;
1958         struct osc_extent *tmp;
1959         struct client_obd *cli = aa->aa_cli;
1960         ENTRY;
1961
1962         rc = osc_brw_fini_request(req, rc);
1963         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1964         /* When server return -EINPROGRESS, client should always retry
1965          * regardless of the number of times the bulk was resent already. */
1966         if (osc_recoverable_error(rc)) {
1967                 if (req->rq_import_generation !=
1968                     req->rq_import->imp_generation) {
1969                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1970                                ""DOSTID", rc = %d.\n",
1971                                req->rq_import->imp_obd->obd_name,
1972                                POSTID(&aa->aa_oa->o_oi), rc);
1973                 } else if (rc == -EINPROGRESS ||
1974                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1975                         rc = osc_brw_redo_request(req, aa, rc);
1976                 } else {
1977                         CERROR("%s: too many resent retries for object: "
1978                                ""LPU64":"LPU64", rc = %d.\n",
1979                                req->rq_import->imp_obd->obd_name,
1980                                POSTID(&aa->aa_oa->o_oi), rc);
1981                 }
1982
1983                 if (rc == 0)
1984                         RETURN(0);
1985                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1986                         rc = -EIO;
1987         }
1988
1989         if (aa->aa_ocapa) {
1990                 capa_put(aa->aa_ocapa);
1991                 aa->aa_ocapa = NULL;
1992         }
1993
1994         if (rc == 0) {
1995                 struct obdo *oa = aa->aa_oa;
1996                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1997                 unsigned long valid = 0;
1998                 struct cl_object *obj;
1999                 struct osc_async_page *last;
2000
2001                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2002                 obj = osc2cl(last->oap_obj);
2003
2004                 cl_object_attr_lock(obj);
2005                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2006                         attr->cat_blocks = oa->o_blocks;
2007                         valid |= CAT_BLOCKS;
2008                 }
2009                 if (oa->o_valid & OBD_MD_FLMTIME) {
2010                         attr->cat_mtime = oa->o_mtime;
2011                         valid |= CAT_MTIME;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLATIME) {
2014                         attr->cat_atime = oa->o_atime;
2015                         valid |= CAT_ATIME;
2016                 }
2017                 if (oa->o_valid & OBD_MD_FLCTIME) {
2018                         attr->cat_ctime = oa->o_ctime;
2019                         valid |= CAT_CTIME;
2020                 }
2021
2022                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2023                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2024                         loff_t last_off = last->oap_count + last->oap_obj_off;
2025
2026                         /* Change file size if this is an out of quota or
2027                          * direct IO write and it extends the file size */
2028                         if (loi->loi_lvb.lvb_size < last_off) {
2029                                 attr->cat_size = last_off;
2030                                 valid |= CAT_SIZE;
2031                         }
2032                         /* Extend KMS if it's not a lockless write */
2033                         if (loi->loi_kms < last_off &&
2034                             oap2osc_page(last)->ops_srvlock == 0) {
2035                                 attr->cat_kms = last_off;
2036                                 valid |= CAT_KMS;
2037                         }
2038                 }
2039
2040                 if (valid != 0)
2041                         cl_object_attr_set(env, obj, attr, valid);
2042                 cl_object_attr_unlock(obj);
2043         }
2044         OBDO_FREE(aa->aa_oa);
2045
2046         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2047                 cfs_list_del_init(&ext->oe_link);
2048                 osc_extent_finish(env, ext, 1, rc);
2049         }
2050         LASSERT(cfs_list_empty(&aa->aa_exts));
2051         LASSERT(cfs_list_empty(&aa->aa_oaps));
2052
2053         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2054                           req->rq_bulk->bd_nob_transferred);
2055         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2056         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2057
2058         client_obd_list_lock(&cli->cl_loi_list_lock);
2059         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2060          * is called so we know whether to go to sync BRWs or wait for more
2061          * RPCs to complete */
2062         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2063                 cli->cl_w_in_flight--;
2064         else
2065                 cli->cl_r_in_flight--;
2066         osc_wake_cache_waiters(cli);
2067         client_obd_list_unlock(&cli->cl_loi_list_lock);
2068
2069         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2070         RETURN(rc);
2071 }
2072
2073 static void brw_commit(struct ptlrpc_request *req)
2074 {
2075         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2076          * this called via the rq_commit_cb, I need to ensure
2077          * osc_dec_unstable_pages is still called. Otherwise unstable
2078          * pages may be leaked. */
2079         spin_lock(&req->rq_lock);
2080         if (likely(req->rq_unstable)) {
2081                 req->rq_unstable = 0;
2082                 spin_unlock(&req->rq_lock);
2083
2084                 osc_dec_unstable_pages(req);
2085         } else {
2086                 req->rq_committed = 1;
2087                 spin_unlock(&req->rq_lock);
2088         }
2089 }
2090
2091 /**
2092  * Build an RPC by the list of extent @ext_list. The caller must ensure
2093  * that the total pages in this list are NOT over max pages per RPC.
2094  * Extents in the list must be in OES_RPC state.
2095  */
2096 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2097                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2098 {
2099         struct ptlrpc_request           *req = NULL;
2100         struct osc_extent               *ext;
2101         struct brw_page                 **pga = NULL;
2102         struct osc_brw_async_args       *aa = NULL;
2103         struct obdo                     *oa = NULL;
2104         struct osc_async_page           *oap;
2105         struct osc_async_page           *tmp;
2106         struct cl_req                   *clerq = NULL;
2107         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2108                                                                       CRT_READ;
2109         struct ldlm_lock                *lock = NULL;
2110         struct cl_req_attr              *crattr = NULL;
2111         obd_off                         starting_offset = OBD_OBJECT_EOF;
2112         obd_off                         ending_offset = 0;
2113         int                             mpflag = 0;
2114         int                             mem_tight = 0;
2115         int                             page_count = 0;
2116         int                             i;
2117         int                             rc;
2118         CFS_LIST_HEAD(rpc_list);
2119
2120         ENTRY;
2121         LASSERT(!cfs_list_empty(ext_list));
2122
2123         /* add pages into rpc_list to build BRW rpc */
2124         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2125                 LASSERT(ext->oe_state == OES_RPC);
2126                 mem_tight |= ext->oe_memalloc;
2127                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2128                         ++page_count;
2129                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2130                         if (starting_offset > oap->oap_obj_off)
2131                                 starting_offset = oap->oap_obj_off;
2132                         else
2133                                 LASSERT(oap->oap_page_off == 0);
2134                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2135                                 ending_offset = oap->oap_obj_off +
2136                                                 oap->oap_count;
2137                         else
2138                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2139                                         PAGE_CACHE_SIZE);
2140                 }
2141         }
2142
2143         if (mem_tight)
2144                 mpflag = cfs_memory_pressure_get_and_set();
2145
2146         OBD_ALLOC(crattr, sizeof(*crattr));
2147         if (crattr == NULL)
2148                 GOTO(out, rc = -ENOMEM);
2149
2150         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2151         if (pga == NULL)
2152                 GOTO(out, rc = -ENOMEM);
2153
2154         OBDO_ALLOC(oa);
2155         if (oa == NULL)
2156                 GOTO(out, rc = -ENOMEM);
2157
2158         i = 0;
2159         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2160                 struct cl_page *page = oap2cl_page(oap);
2161                 if (clerq == NULL) {
2162                         clerq = cl_req_alloc(env, page, crt,
2163                                              1 /* only 1-object rpcs for now */);
2164                         if (IS_ERR(clerq))
2165                                 GOTO(out, rc = PTR_ERR(clerq));
2166                         lock = oap->oap_ldlm_lock;
2167                 }
2168                 if (mem_tight)
2169                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2170                 pga[i] = &oap->oap_brw_page;
2171                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2172                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2173                        pga[i]->pg, page_index(oap->oap_page), oap,
2174                        pga[i]->flag);
2175                 i++;
2176                 cl_req_page_add(env, clerq, page);
2177         }
2178
2179         /* always get the data for the obdo for the rpc */
2180         LASSERT(clerq != NULL);
2181         crattr->cra_oa = oa;
2182         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2183         if (lock) {
2184                 oa->o_handle = lock->l_remote_handle;
2185                 oa->o_valid |= OBD_MD_FLHANDLE;
2186         }
2187
2188         rc = cl_req_prep(env, clerq);
2189         if (rc != 0) {
2190                 CERROR("cl_req_prep failed: %d\n", rc);
2191                 GOTO(out, rc);
2192         }
2193
2194         sort_brw_pages(pga, page_count);
2195         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2196                         pga, &req, crattr->cra_capa, 1, 0);
2197         if (rc != 0) {
2198                 CERROR("prep_req failed: %d\n", rc);
2199                 GOTO(out, rc);
2200         }
2201
2202         req->rq_commit_cb = brw_commit;
2203         req->rq_interpret_reply = brw_interpret;
2204
2205         if (mem_tight != 0)
2206                 req->rq_memalloc = 1;
2207
2208         /* Need to update the timestamps after the request is built in case
2209          * we race with setattr (locally or in queue at OST).  If OST gets
2210          * later setattr before earlier BRW (as determined by the request xid),
2211          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2212          * way to do this in a single call.  bug 10150 */
2213         cl_req_attr_set(env, clerq, crattr,
2214                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2215
2216         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2217
2218         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2219         aa = ptlrpc_req_async_args(req);
2220         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2221         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2222         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2223         cfs_list_splice_init(ext_list, &aa->aa_exts);
2224         aa->aa_clerq = clerq;
2225
2226         /* queued sync pages can be torn down while the pages
2227          * were between the pending list and the rpc */
2228         tmp = NULL;
2229         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2230                 /* only one oap gets a request reference */
2231                 if (tmp == NULL)
2232                         tmp = oap;
2233                 if (oap->oap_interrupted && !req->rq_intr) {
2234                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2235                                         oap, req);
2236                         ptlrpc_mark_interrupted(req);
2237                 }
2238         }
2239         if (tmp != NULL)
2240                 tmp->oap_request = ptlrpc_request_addref(req);
2241
2242         client_obd_list_lock(&cli->cl_loi_list_lock);
2243         starting_offset >>= PAGE_CACHE_SHIFT;
2244         if (cmd == OBD_BRW_READ) {
2245                 cli->cl_r_in_flight++;
2246                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2247                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2248                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2249                                       starting_offset + 1);
2250         } else {
2251                 cli->cl_w_in_flight++;
2252                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2253                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2254                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2255                                       starting_offset + 1);
2256         }
2257         client_obd_list_unlock(&cli->cl_loi_list_lock);
2258
2259         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2260                   page_count, aa, cli->cl_r_in_flight,
2261                   cli->cl_w_in_flight);
2262
2263         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2264          * see which CPU/NUMA node the majority of pages were allocated
2265          * on, and try to assign the async RPC to the CPU core
2266          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2267          *
2268          * But on the other hand, we expect that multiple ptlrpcd
2269          * threads and the initial write sponsor can run in parallel,
2270          * especially when data checksum is enabled, which is CPU-bound
2271          * operation and single ptlrpcd thread cannot process in time.
2272          * So more ptlrpcd threads sharing BRW load
2273          * (with PDL_POLICY_ROUND) seems better.
2274          */
2275         ptlrpcd_add_req(req, pol, -1);
2276         rc = 0;
2277         EXIT;
2278
2279 out:
2280         if (mem_tight != 0)
2281                 cfs_memory_pressure_restore(mpflag);
2282
2283         if (crattr != NULL) {
2284                 capa_put(crattr->cra_capa);
2285                 OBD_FREE(crattr, sizeof(*crattr));
2286         }
2287
2288         if (rc != 0) {
2289                 LASSERT(req == NULL);
2290
2291                 if (oa)
2292                         OBDO_FREE(oa);
2293                 if (pga)
2294                         OBD_FREE(pga, sizeof(*pga) * page_count);
2295                 /* this should happen rarely and is pretty bad, it makes the
2296                  * pending list not follow the dirty order */
2297                 while (!cfs_list_empty(ext_list)) {
2298                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2299                                              oe_link);
2300                         cfs_list_del_init(&ext->oe_link);
2301                         osc_extent_finish(env, ext, 0, rc);
2302                 }
2303                 if (clerq && !IS_ERR(clerq))
2304                         cl_req_completion(env, clerq, rc);
2305         }
2306         RETURN(rc);
2307 }
2308
2309 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2310                                         struct ldlm_enqueue_info *einfo)
2311 {
2312         void *data = einfo->ei_cbdata;
2313         int set = 0;
2314
2315         LASSERT(lock != NULL);
2316         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2317         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2318         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2319         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2320
2321         lock_res_and_lock(lock);
2322         spin_lock(&osc_ast_guard);
2323
2324         if (lock->l_ast_data == NULL)
2325                 lock->l_ast_data = data;
2326         if (lock->l_ast_data == data)
2327                 set = 1;
2328
2329         spin_unlock(&osc_ast_guard);
2330         unlock_res_and_lock(lock);
2331
2332         return set;
2333 }
2334
2335 static int osc_set_data_with_check(struct lustre_handle *lockh,
2336                                    struct ldlm_enqueue_info *einfo)
2337 {
2338         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2339         int set = 0;
2340
2341         if (lock != NULL) {
2342                 set = osc_set_lock_data_with_check(lock, einfo);
2343                 LDLM_LOCK_PUT(lock);
2344         } else
2345                 CERROR("lockh %p, data %p - client evicted?\n",
2346                        lockh, einfo->ei_cbdata);
2347         return set;
2348 }
2349
2350 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2351                              ldlm_iterator_t replace, void *data)
2352 {
2353         struct ldlm_res_id res_id;
2354         struct obd_device *obd = class_exp2obd(exp);
2355
2356         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2357         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2358         return 0;
2359 }
2360
2361 /* find any ldlm lock of the inode in osc
2362  * return 0    not find
2363  *        1    find one
2364  *      < 0    error */
2365 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2366                            ldlm_iterator_t replace, void *data)
2367 {
2368         struct ldlm_res_id res_id;
2369         struct obd_device *obd = class_exp2obd(exp);
2370         int rc = 0;
2371
2372         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2373         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2374         if (rc == LDLM_ITER_STOP)
2375                 return(1);
2376         if (rc == LDLM_ITER_CONTINUE)
2377                 return(0);
2378         return(rc);
2379 }
2380
2381 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2382                             obd_enqueue_update_f upcall, void *cookie,
2383                             __u64 *flags, int agl, int rc)
2384 {
2385         int intent = *flags & LDLM_FL_HAS_INTENT;
2386         ENTRY;
2387
2388         if (intent) {
2389                 /* The request was created before ldlm_cli_enqueue call. */
2390                 if (rc == ELDLM_LOCK_ABORTED) {
2391                         struct ldlm_reply *rep;
2392                         rep = req_capsule_server_get(&req->rq_pill,
2393                                                      &RMF_DLM_REP);
2394
2395                         LASSERT(rep != NULL);
2396                         rep->lock_policy_res1 =
2397                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2398                         if (rep->lock_policy_res1)
2399                                 rc = rep->lock_policy_res1;
2400                 }
2401         }
2402
2403         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2404             (rc == 0)) {
2405                 *flags |= LDLM_FL_LVB_READY;
2406                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2407                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2408         }
2409
2410         /* Call the update callback. */
2411         rc = (*upcall)(cookie, rc);
2412         RETURN(rc);
2413 }
2414
2415 static int osc_enqueue_interpret(const struct lu_env *env,
2416                                  struct ptlrpc_request *req,
2417                                  struct osc_enqueue_args *aa, int rc)
2418 {
2419         struct ldlm_lock *lock;
2420         struct lustre_handle handle;
2421         __u32 mode;
2422         struct ost_lvb *lvb;
2423         __u32 lvb_len;
2424         __u64 *flags = aa->oa_flags;
2425
2426         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2427          * might be freed anytime after lock upcall has been called. */
2428         lustre_handle_copy(&handle, aa->oa_lockh);
2429         mode = aa->oa_ei->ei_mode;
2430
2431         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2432          * be valid. */
2433         lock = ldlm_handle2lock(&handle);
2434
2435         /* Take an additional reference so that a blocking AST that
2436          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2437          * to arrive after an upcall has been executed by
2438          * osc_enqueue_fini(). */
2439         ldlm_lock_addref(&handle, mode);
2440
2441         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2442         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2443
2444         /* Let CP AST to grant the lock first. */
2445         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2446
2447         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2448                 lvb = NULL;
2449                 lvb_len = 0;
2450         } else {
2451                 lvb = aa->oa_lvb;
2452                 lvb_len = sizeof(*aa->oa_lvb);
2453         }
2454
2455         /* Complete obtaining the lock procedure. */
2456         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2457                                    mode, flags, lvb, lvb_len, &handle, rc);
2458         /* Complete osc stuff. */
2459         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2460                               flags, aa->oa_agl, rc);
2461
2462         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2463
2464         /* Release the lock for async request. */
2465         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2466                 /*
2467                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2468                  * not already released by
2469                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2470                  */
2471                 ldlm_lock_decref(&handle, mode);
2472
2473         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2474                  aa->oa_lockh, req, aa);
2475         ldlm_lock_decref(&handle, mode);
2476         LDLM_LOCK_PUT(lock);
2477         return rc;
2478 }
2479
2480 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2481                         struct lov_oinfo *loi, __u64 flags,
2482                         struct ost_lvb *lvb, __u32 mode, int rc)
2483 {
2484         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2485
2486         if (rc == ELDLM_OK) {
2487                 __u64 tmp;
2488
2489                 LASSERT(lock != NULL);
2490                 loi->loi_lvb = *lvb;
2491                 tmp = loi->loi_lvb.lvb_size;
2492                 /* Extend KMS up to the end of this lock and no further
2493                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2494                 if (tmp > lock->l_policy_data.l_extent.end)
2495                         tmp = lock->l_policy_data.l_extent.end + 1;
2496                 if (tmp >= loi->loi_kms) {
2497                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2498                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2499                         loi_kms_set(loi, tmp);
2500                 } else {
2501                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2502                                    LPU64"; leaving kms="LPU64", end="LPU64,
2503                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2504                                    lock->l_policy_data.l_extent.end);
2505                 }
2506                 ldlm_lock_allow_match(lock);
2507         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2508                 LASSERT(lock != NULL);
2509                 loi->loi_lvb = *lvb;
2510                 ldlm_lock_allow_match(lock);
2511                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2512                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2513                 rc = ELDLM_OK;
2514         }
2515
2516         if (lock != NULL) {
2517                 if (rc != ELDLM_OK)
2518                         ldlm_lock_fail_match(lock);
2519
2520                 LDLM_LOCK_PUT(lock);
2521         }
2522 }
2523 EXPORT_SYMBOL(osc_update_enqueue);
2524
2525 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2526
2527 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2528  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2529  * other synchronous requests, however keeping some locks and trying to obtain
2530  * others may take a considerable amount of time in a case of ost failure; and
2531  * when other sync requests do not get released lock from a client, the client
2532  * is excluded from the cluster -- such scenarious make the life difficult, so
2533  * release locks just after they are obtained. */
2534 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2535                      __u64 *flags, ldlm_policy_data_t *policy,
2536                      struct ost_lvb *lvb, int kms_valid,
2537                      obd_enqueue_update_f upcall, void *cookie,
2538                      struct ldlm_enqueue_info *einfo,
2539                      struct lustre_handle *lockh,
2540                      struct ptlrpc_request_set *rqset, int async, int agl)
2541 {
2542         struct obd_device *obd = exp->exp_obd;
2543         struct ptlrpc_request *req = NULL;
2544         int intent = *flags & LDLM_FL_HAS_INTENT;
2545         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2546         ldlm_mode_t mode;
2547         int rc;
2548         ENTRY;
2549
2550         /* Filesystem lock extents are extended to page boundaries so that
2551          * dealing with the page cache is a little smoother.  */
2552         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2553         policy->l_extent.end |= ~CFS_PAGE_MASK;
2554
2555         /*
2556          * kms is not valid when either object is completely fresh (so that no
2557          * locks are cached), or object was evicted. In the latter case cached
2558          * lock cannot be used, because it would prime inode state with
2559          * potentially stale LVB.
2560          */
2561         if (!kms_valid)
2562                 goto no_match;
2563
2564         /* Next, search for already existing extent locks that will cover us */
2565         /* If we're trying to read, we also search for an existing PW lock.  The
2566          * VFS and page cache already protect us locally, so lots of readers/
2567          * writers can share a single PW lock.
2568          *
2569          * There are problems with conversion deadlocks, so instead of
2570          * converting a read lock to a write lock, we'll just enqueue a new
2571          * one.
2572          *
2573          * At some point we should cancel the read lock instead of making them
2574          * send us a blocking callback, but there are problems with canceling
2575          * locks out from other users right now, too. */
2576         mode = einfo->ei_mode;
2577         if (einfo->ei_mode == LCK_PR)
2578                 mode |= LCK_PW;
2579         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2580                                einfo->ei_type, policy, mode, lockh, 0);
2581         if (mode) {
2582                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2583
2584                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2585                         /* For AGL, if enqueue RPC is sent but the lock is not
2586                          * granted, then skip to process this strpe.
2587                          * Return -ECANCELED to tell the caller. */
2588                         ldlm_lock_decref(lockh, mode);
2589                         LDLM_LOCK_PUT(matched);
2590                         RETURN(-ECANCELED);
2591                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2592                         *flags |= LDLM_FL_LVB_READY;
2593                         /* addref the lock only if not async requests and PW
2594                          * lock is matched whereas we asked for PR. */
2595                         if (!rqset && einfo->ei_mode != mode)
2596                                 ldlm_lock_addref(lockh, LCK_PR);
2597                         if (intent) {
2598                                 /* I would like to be able to ASSERT here that
2599                                  * rss <= kms, but I can't, for reasons which
2600                                  * are explained in lov_enqueue() */
2601                         }
2602
2603                         /* We already have a lock, and it's referenced.
2604                          *
2605                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2606                          * AGL upcall may change it to CLS_HELD directly. */
2607                         (*upcall)(cookie, ELDLM_OK);
2608
2609                         if (einfo->ei_mode != mode)
2610                                 ldlm_lock_decref(lockh, LCK_PW);
2611                         else if (rqset)
2612                                 /* For async requests, decref the lock. */
2613                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2614                         LDLM_LOCK_PUT(matched);
2615                         RETURN(ELDLM_OK);
2616                 } else {
2617                         ldlm_lock_decref(lockh, mode);
2618                         LDLM_LOCK_PUT(matched);
2619                 }
2620         }
2621
2622  no_match:
2623         if (intent) {
2624                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2625                                            &RQF_LDLM_ENQUEUE_LVB);
2626                 if (req == NULL)
2627                         RETURN(-ENOMEM);
2628
2629                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2630                 if (rc < 0) {
2631                         ptlrpc_request_free(req);
2632                         RETURN(rc);
2633                 }
2634
2635                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2636                                      sizeof *lvb);
2637                 ptlrpc_request_set_replen(req);
2638         }
2639
2640         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2641         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2642
2643         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2644                               sizeof(*lvb), LVB_T_OST, lockh, async);
2645         if (rqset) {
2646                 if (!rc) {
2647                         struct osc_enqueue_args *aa;
2648                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2649                         aa = ptlrpc_req_async_args(req);
2650                         aa->oa_ei = einfo;
2651                         aa->oa_exp = exp;
2652                         aa->oa_flags  = flags;
2653                         aa->oa_upcall = upcall;
2654                         aa->oa_cookie = cookie;
2655                         aa->oa_lvb    = lvb;
2656                         aa->oa_lockh  = lockh;
2657                         aa->oa_agl    = !!agl;
2658
2659                         req->rq_interpret_reply =
2660                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2661                         if (rqset == PTLRPCD_SET)
2662                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2663                         else
2664                                 ptlrpc_set_add_req(rqset, req);
2665                 } else if (intent) {
2666                         ptlrpc_req_finished(req);
2667                 }
2668                 RETURN(rc);
2669         }
2670
2671         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2672         if (intent)
2673                 ptlrpc_req_finished(req);
2674
2675         RETURN(rc);
2676 }
2677
2678 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2679                        struct ldlm_enqueue_info *einfo,
2680                        struct ptlrpc_request_set *rqset)
2681 {
2682         struct ldlm_res_id res_id;
2683         int rc;
2684         ENTRY;
2685
2686         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2687         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2688                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2689                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2690                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2691                               rqset, rqset != NULL, 0);
2692         RETURN(rc);
2693 }
2694
2695 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2696                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2697                    __u64 *flags, void *data, struct lustre_handle *lockh,
2698                    int unref)
2699 {
2700         struct obd_device *obd = exp->exp_obd;
2701         __u64 lflags = *flags;
2702         ldlm_mode_t rc;
2703         ENTRY;
2704
2705         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2706                 RETURN(-EIO);
2707
2708         /* Filesystem lock extents are extended to page boundaries so that
2709          * dealing with the page cache is a little smoother */
2710         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2711         policy->l_extent.end |= ~CFS_PAGE_MASK;
2712
2713         /* Next, search for already existing extent locks that will cover us */
2714         /* If we're trying to read, we also search for an existing PW lock.  The
2715          * VFS and page cache already protect us locally, so lots of readers/
2716          * writers can share a single PW lock. */
2717         rc = mode;
2718         if (mode == LCK_PR)
2719                 rc |= LCK_PW;
2720         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2721                              res_id, type, policy, rc, lockh, unref);
2722         if (rc) {
2723                 if (data != NULL) {
2724                         if (!osc_set_data_with_check(lockh, data)) {
2725                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2726                                         ldlm_lock_decref(lockh, rc);
2727                                 RETURN(0);
2728                         }
2729                 }
2730                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2731                         ldlm_lock_addref(lockh, LCK_PR);
2732                         ldlm_lock_decref(lockh, LCK_PW);
2733                 }
2734                 RETURN(rc);
2735         }
2736         RETURN(rc);
2737 }
2738
2739 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2740 {
2741         ENTRY;
2742
2743         if (unlikely(mode == LCK_GROUP))
2744                 ldlm_lock_decref_and_cancel(lockh, mode);
2745         else
2746                 ldlm_lock_decref(lockh, mode);
2747
2748         RETURN(0);
2749 }
2750
2751 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2752                       __u32 mode, struct lustre_handle *lockh)
2753 {
2754         ENTRY;
2755         RETURN(osc_cancel_base(lockh, mode));
2756 }
2757
2758 static int osc_cancel_unused(struct obd_export *exp,
2759                              struct lov_stripe_md *lsm,
2760                              ldlm_cancel_flags_t flags,
2761                              void *opaque)
2762 {
2763         struct obd_device *obd = class_exp2obd(exp);
2764         struct ldlm_res_id res_id, *resp = NULL;
2765
2766         if (lsm != NULL) {
2767                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2768                 resp = &res_id;
2769         }
2770
2771         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2772 }
2773
2774 static int osc_statfs_interpret(const struct lu_env *env,
2775                                 struct ptlrpc_request *req,
2776                                 struct osc_async_args *aa, int rc)
2777 {
2778         struct obd_statfs *msfs;
2779         ENTRY;
2780
2781         if (rc == -EBADR)
2782                 /* The request has in fact never been sent
2783                  * due to issues at a higher level (LOV).
2784                  * Exit immediately since the caller is
2785                  * aware of the problem and takes care
2786                  * of the clean up */
2787                  RETURN(rc);
2788
2789         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2790             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2791                 GOTO(out, rc = 0);
2792
2793         if (rc != 0)
2794                 GOTO(out, rc);
2795
2796         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2797         if (msfs == NULL) {
2798                 GOTO(out, rc = -EPROTO);
2799         }
2800
2801         *aa->aa_oi->oi_osfs = *msfs;
2802 out:
2803         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2804         RETURN(rc);
2805 }
2806
2807 static int osc_statfs_async(struct obd_export *exp,
2808                             struct obd_info *oinfo, __u64 max_age,
2809                             struct ptlrpc_request_set *rqset)
2810 {
2811         struct obd_device     *obd = class_exp2obd(exp);
2812         struct ptlrpc_request *req;
2813         struct osc_async_args *aa;
2814         int                    rc;
2815         ENTRY;
2816
2817         /* We could possibly pass max_age in the request (as an absolute
2818          * timestamp or a "seconds.usec ago") so the target can avoid doing
2819          * extra calls into the filesystem if that isn't necessary (e.g.
2820          * during mount that would help a bit).  Having relative timestamps
2821          * is not so great if request processing is slow, while absolute
2822          * timestamps are not ideal because they need time synchronization. */
2823         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2824         if (req == NULL)
2825                 RETURN(-ENOMEM);
2826
2827         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2828         if (rc) {
2829                 ptlrpc_request_free(req);
2830                 RETURN(rc);
2831         }
2832         ptlrpc_request_set_replen(req);
2833         req->rq_request_portal = OST_CREATE_PORTAL;
2834         ptlrpc_at_set_req_timeout(req);
2835
2836         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2837                 /* procfs requests not want stat in wait for avoid deadlock */
2838                 req->rq_no_resend = 1;
2839                 req->rq_no_delay = 1;
2840         }
2841
2842         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2843         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2844         aa = ptlrpc_req_async_args(req);
2845         aa->aa_oi = oinfo;
2846
2847         ptlrpc_set_add_req(rqset, req);
2848         RETURN(0);
2849 }
2850
2851 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2852                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2853 {
2854         struct obd_device     *obd = class_exp2obd(exp);
2855         struct obd_statfs     *msfs;
2856         struct ptlrpc_request *req;
2857         struct obd_import     *imp = NULL;
2858         int rc;
2859         ENTRY;
2860
2861         /*Since the request might also come from lprocfs, so we need
2862          *sync this with client_disconnect_export Bug15684*/
2863         down_read(&obd->u.cli.cl_sem);
2864         if (obd->u.cli.cl_import)
2865                 imp = class_import_get(obd->u.cli.cl_import);
2866         up_read(&obd->u.cli.cl_sem);
2867         if (!imp)
2868                 RETURN(-ENODEV);
2869
2870         /* We could possibly pass max_age in the request (as an absolute
2871          * timestamp or a "seconds.usec ago") so the target can avoid doing
2872          * extra calls into the filesystem if that isn't necessary (e.g.
2873          * during mount that would help a bit).  Having relative timestamps
2874          * is not so great if request processing is slow, while absolute
2875          * timestamps are not ideal because they need time synchronization. */
2876         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2877
2878         class_import_put(imp);
2879
2880         if (req == NULL)
2881                 RETURN(-ENOMEM);
2882
2883         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2884         if (rc) {
2885                 ptlrpc_request_free(req);
2886                 RETURN(rc);
2887         }
2888         ptlrpc_request_set_replen(req);
2889         req->rq_request_portal = OST_CREATE_PORTAL;
2890         ptlrpc_at_set_req_timeout(req);
2891
2892         if (flags & OBD_STATFS_NODELAY) {
2893                 /* procfs requests not want stat in wait for avoid deadlock */
2894                 req->rq_no_resend = 1;
2895                 req->rq_no_delay = 1;
2896         }
2897
2898         rc = ptlrpc_queue_wait(req);
2899         if (rc)
2900                 GOTO(out, rc);
2901
2902         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2903         if (msfs == NULL) {
2904                 GOTO(out, rc = -EPROTO);
2905         }
2906
2907         *osfs = *msfs;
2908
2909         EXIT;
2910  out:
2911         ptlrpc_req_finished(req);
2912         return rc;
2913 }
2914
2915 /* Retrieve object striping information.
2916  *
2917  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2918  * the maximum number of OST indices which will fit in the user buffer.
2919  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2920  */
2921 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2922 {
2923         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2924         struct lov_user_md_v3 lum, *lumk;
2925         struct lov_user_ost_data_v1 *lmm_objects;
2926         int rc = 0, lum_size;
2927         ENTRY;
2928
2929         if (!lsm)
2930                 RETURN(-ENODATA);
2931
2932         /* we only need the header part from user space to get lmm_magic and
2933          * lmm_stripe_count, (the header part is common to v1 and v3) */
2934         lum_size = sizeof(struct lov_user_md_v1);
2935         if (copy_from_user(&lum, lump, lum_size))
2936                 RETURN(-EFAULT);
2937
2938         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2939             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2940                 RETURN(-EINVAL);
2941
2942         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2943         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2944         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2945         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2946
2947         /* we can use lov_mds_md_size() to compute lum_size
2948          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2949         if (lum.lmm_stripe_count > 0) {
2950                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2951                 OBD_ALLOC(lumk, lum_size);
2952                 if (!lumk)
2953                         RETURN(-ENOMEM);
2954
2955                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2956                         lmm_objects =
2957                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2958                 else
2959                         lmm_objects = &(lumk->lmm_objects[0]);
2960                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2961         } else {
2962                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2963                 lumk = &lum;
2964         }
2965
2966         lumk->lmm_oi = lsm->lsm_oi;
2967         lumk->lmm_stripe_count = 1;
2968
2969         if (copy_to_user(lump, lumk, lum_size))
2970                 rc = -EFAULT;
2971
2972         if (lumk != &lum)
2973                 OBD_FREE(lumk, lum_size);
2974
2975         RETURN(rc);
2976 }
2977
2978
2979 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2980                          void *karg, void *uarg)
2981 {
2982         struct obd_device *obd = exp->exp_obd;
2983         struct obd_ioctl_data *data = karg;
2984         int err = 0;
2985         ENTRY;
2986
2987         if (!try_module_get(THIS_MODULE)) {
2988                 CERROR("Can't get module. Is it alive?");
2989                 return -EINVAL;
2990         }
2991         switch (cmd) {
2992         case OBD_IOC_LOV_GET_CONFIG: {
2993                 char *buf;
2994                 struct lov_desc *desc;
2995                 struct obd_uuid uuid;
2996
2997                 buf = NULL;
2998                 len = 0;
2999                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3000                         GOTO(out, err = -EINVAL);
3001
3002                 data = (struct obd_ioctl_data *)buf;
3003
3004                 if (sizeof(*desc) > data->ioc_inllen1) {
3005                         obd_ioctl_freedata(buf, len);
3006                         GOTO(out, err = -EINVAL);
3007                 }
3008
3009                 if (data->ioc_inllen2 < sizeof(uuid)) {
3010                         obd_ioctl_freedata(buf, len);
3011                         GOTO(out, err = -EINVAL);
3012                 }
3013
3014                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3015                 desc->ld_tgt_count = 1;
3016                 desc->ld_active_tgt_count = 1;
3017                 desc->ld_default_stripe_count = 1;
3018                 desc->ld_default_stripe_size = 0;
3019                 desc->ld_default_stripe_offset = 0;
3020                 desc->ld_pattern = 0;
3021                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3022
3023                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3024
3025                 err = copy_to_user((void *)uarg, buf, len);
3026                 if (err)
3027                         err = -EFAULT;
3028                 obd_ioctl_freedata(buf, len);
3029                 GOTO(out, err);
3030         }
3031         case LL_IOC_LOV_SETSTRIPE:
3032                 err = obd_alloc_memmd(exp, karg);
3033                 if (err > 0)
3034                         err = 0;
3035                 GOTO(out, err);
3036         case LL_IOC_LOV_GETSTRIPE:
3037                 err = osc_getstripe(karg, uarg);
3038                 GOTO(out, err);
3039         case OBD_IOC_CLIENT_RECOVER:
3040                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3041                                             data->ioc_inlbuf1, 0);
3042                 if (err > 0)
3043                         err = 0;
3044                 GOTO(out, err);
3045         case IOC_OSC_SET_ACTIVE:
3046                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3047                                                data->ioc_offset);
3048                 GOTO(out, err);
3049         case OBD_IOC_POLL_QUOTACHECK:
3050                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3051                 GOTO(out, err);
3052         case OBD_IOC_PING_TARGET:
3053                 err = ptlrpc_obd_ping(obd);
3054                 GOTO(out, err);
3055         default:
3056                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3057                        cmd, current_comm());
3058                 GOTO(out, err = -ENOTTY);
3059         }
3060 out:
3061         module_put(THIS_MODULE);
3062         return err;
3063 }
3064
3065 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3066                         obd_count keylen, void *key, __u32 *vallen, void *val,
3067                         struct lov_stripe_md *lsm)
3068 {
3069         ENTRY;
3070         if (!vallen || !val)
3071                 RETURN(-EFAULT);
3072
3073         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3074                 __u32 *stripe = val;
3075                 *vallen = sizeof(*stripe);
3076                 *stripe = 0;
3077                 RETURN(0);
3078         } else if (KEY_IS(KEY_LAST_ID)) {
3079                 struct ptlrpc_request *req;
3080                 obd_id                *reply;
3081                 char                  *tmp;
3082                 int                    rc;
3083
3084                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3085                                            &RQF_OST_GET_INFO_LAST_ID);
3086                 if (req == NULL)
3087                         RETURN(-ENOMEM);
3088
3089                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3090                                      RCL_CLIENT, keylen);
3091                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3092                 if (rc) {
3093                         ptlrpc_request_free(req);
3094                         RETURN(rc);
3095                 }
3096
3097                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3098                 memcpy(tmp, key, keylen);
3099
3100                 req->rq_no_delay = req->rq_no_resend = 1;
3101                 ptlrpc_request_set_replen(req);
3102                 rc = ptlrpc_queue_wait(req);
3103                 if (rc)
3104                         GOTO(out, rc);
3105
3106                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3107                 if (reply == NULL)
3108                         GOTO(out, rc = -EPROTO);
3109
3110                 *((obd_id *)val) = *reply;
3111         out:
3112                 ptlrpc_req_finished(req);
3113                 RETURN(rc);
3114         } else if (KEY_IS(KEY_FIEMAP)) {
3115                 struct ll_fiemap_info_key *fm_key =
3116                                 (struct ll_fiemap_info_key *)key;
3117                 struct ldlm_res_id       res_id;
3118                 ldlm_policy_data_t       policy;
3119                 struct lustre_handle     lockh;
3120                 ldlm_mode_t              mode = 0;
3121                 struct ptlrpc_request   *req;
3122                 struct ll_user_fiemap   *reply;
3123                 char                    *tmp;
3124                 int                      rc;
3125
3126                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3127                         goto skip_locking;
3128
3129                 policy.l_extent.start = fm_key->fiemap.fm_start &
3130                                                 CFS_PAGE_MASK;
3131
3132                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3133                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3134                         policy.l_extent.end = OBD_OBJECT_EOF;
3135                 else
3136                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3137                                 fm_key->fiemap.fm_length +
3138                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3139
3140                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3141                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3142                                        LDLM_FL_BLOCK_GRANTED |
3143                                        LDLM_FL_LVB_READY,
3144                                        &res_id, LDLM_EXTENT, &policy,
3145                                        LCK_PR | LCK_PW, &lockh, 0);
3146                 if (mode) { /* lock is cached on client */
3147                         if (mode != LCK_PR) {
3148                                 ldlm_lock_addref(&lockh, LCK_PR);
3149                                 ldlm_lock_decref(&lockh, LCK_PW);
3150                         }
3151                 } else { /* no cached lock, needs acquire lock on server side */
3152                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3153                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3154                 }
3155
3156 skip_locking:
3157                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3158                                            &RQF_OST_GET_INFO_FIEMAP);
3159                 if (req == NULL)
3160                         GOTO(drop_lock, rc = -ENOMEM);
3161
3162                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3163                                      RCL_CLIENT, keylen);
3164                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3165                                      RCL_CLIENT, *vallen);
3166                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3167                                      RCL_SERVER, *vallen);
3168
3169                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3170                 if (rc) {
3171                         ptlrpc_request_free(req);
3172                         GOTO(drop_lock, rc);
3173                 }
3174
3175                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3176                 memcpy(tmp, key, keylen);
3177                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3178                 memcpy(tmp, val, *vallen);
3179
3180                 ptlrpc_request_set_replen(req);
3181                 rc = ptlrpc_queue_wait(req);
3182                 if (rc)
3183                         GOTO(fini_req, rc);
3184
3185                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3186                 if (reply == NULL)
3187                         GOTO(fini_req, rc = -EPROTO);
3188
3189                 memcpy(val, reply, *vallen);
3190 fini_req:
3191                 ptlrpc_req_finished(req);
3192 drop_lock:
3193                 if (mode)
3194                         ldlm_lock_decref(&lockh, LCK_PR);
3195                 RETURN(rc);
3196         }
3197
3198         RETURN(-EINVAL);
3199 }
3200
3201 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3202                               obd_count keylen, void *key, obd_count vallen,
3203                               void *val, struct ptlrpc_request_set *set)
3204 {
3205         struct ptlrpc_request *req;
3206         struct obd_device     *obd = exp->exp_obd;
3207         struct obd_import     *imp = class_exp2cliimp(exp);
3208         char                  *tmp;
3209         int                    rc;
3210         ENTRY;
3211
3212         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3213
3214         if (KEY_IS(KEY_CHECKSUM)) {
3215                 if (vallen != sizeof(int))
3216                         RETURN(-EINVAL);
3217                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3218                 RETURN(0);
3219         }
3220
3221         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3222                 sptlrpc_conf_client_adapt(obd);
3223                 RETURN(0);
3224         }
3225
3226         if (KEY_IS(KEY_FLUSH_CTX)) {
3227                 sptlrpc_import_flush_my_ctx(imp);
3228                 RETURN(0);
3229         }
3230
3231         if (KEY_IS(KEY_CACHE_SET)) {
3232                 struct client_obd *cli = &obd->u.cli;
3233
3234                 LASSERT(cli->cl_cache == NULL); /* only once */
3235                 cli->cl_cache = (struct cl_client_cache *)val;
3236                 atomic_inc(&cli->cl_cache->ccc_users);
3237                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3238
3239                 /* add this osc into entity list */
3240                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3241                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3242                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3243                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3244
3245                 RETURN(0);
3246         }
3247
3248         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3249                 struct client_obd *cli = &obd->u.cli;
3250                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3251                 int target = *(int *)val;
3252
3253                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3254                 *(int *)val -= nr;
3255                 RETURN(0);
3256         }
3257
3258         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3259                 RETURN(-EINVAL);
3260
3261         /* We pass all other commands directly to OST. Since nobody calls osc
3262            methods directly and everybody is supposed to go through LOV, we
3263            assume lov checked invalid values for us.
3264            The only recognised values so far are evict_by_nid and mds_conn.
3265            Even if something bad goes through, we'd get a -EINVAL from OST
3266            anyway. */
3267
3268         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3269                                                 &RQF_OST_SET_GRANT_INFO :
3270                                                 &RQF_OBD_SET_INFO);
3271         if (req == NULL)
3272                 RETURN(-ENOMEM);
3273
3274         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3275                              RCL_CLIENT, keylen);
3276         if (!KEY_IS(KEY_GRANT_SHRINK))
3277                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3278                                      RCL_CLIENT, vallen);
3279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3280         if (rc) {
3281                 ptlrpc_request_free(req);
3282                 RETURN(rc);
3283         }
3284
3285         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3286         memcpy(tmp, key, keylen);
3287         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3288                                                         &RMF_OST_BODY :
3289                                                         &RMF_SETINFO_VAL);
3290         memcpy(tmp, val, vallen);
3291
3292         if (KEY_IS(KEY_GRANT_SHRINK)) {
3293                 struct osc_grant_args *aa;
3294                 struct obdo *oa;
3295
3296                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3297                 aa = ptlrpc_req_async_args(req);
3298                 OBDO_ALLOC(oa);
3299                 if (!oa) {
3300                         ptlrpc_req_finished(req);
3301                         RETURN(-ENOMEM);
3302                 }
3303                 *oa = ((struct ost_body *)val)->oa;
3304                 aa->aa_oa = oa;
3305                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3306         }
3307
3308         ptlrpc_request_set_replen(req);
3309         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3310                 LASSERT(set != NULL);
3311                 ptlrpc_set_add_req(set, req);
3312                 ptlrpc_check_set(NULL, set);
3313         } else
3314                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3315
3316         RETURN(0);
3317 }
3318
3319
3320 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3321                          struct obd_device *disk_obd, int *index)
3322 {
3323         /* this code is not supposed to be used with LOD/OSP
3324          * to be removed soon */
3325         LBUG();
3326         return 0;
3327 }
3328
3329 static int osc_llog_finish(struct obd_device *obd, int count)
3330 {
3331         struct llog_ctxt *ctxt;
3332
3333         ENTRY;
3334
3335         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3336         if (ctxt) {
3337                 llog_cat_close(NULL, ctxt->loc_handle);
3338                 llog_cleanup(NULL, ctxt);
3339         }
3340
3341         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3342         if (ctxt)
3343                 llog_cleanup(NULL, ctxt);
3344         RETURN(0);
3345 }
3346
3347 static int osc_reconnect(const struct lu_env *env,
3348                          struct obd_export *exp, struct obd_device *obd,
3349                          struct obd_uuid *cluuid,
3350                          struct obd_connect_data *data,
3351                          void *localdata)
3352 {
3353         struct client_obd *cli = &obd->u.cli;
3354
3355         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3356                 long lost_grant;
3357
3358                 client_obd_list_lock(&cli->cl_loi_list_lock);
3359                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3360                                 2 * cli_brw_size(obd);
3361                 lost_grant = cli->cl_lost_grant;
3362                 cli->cl_lost_grant = 0;
3363                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3364
3365                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3366                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3367                        data->ocd_version, data->ocd_grant, lost_grant);
3368         }
3369
3370         RETURN(0);
3371 }
3372
3373 static int osc_disconnect(struct obd_export *exp)
3374 {
3375         struct obd_device *obd = class_exp2obd(exp);
3376         struct llog_ctxt  *ctxt;
3377         int rc;
3378
3379         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3380         if (ctxt) {
3381                 if (obd->u.cli.cl_conn_count == 1) {
3382                         /* Flush any remaining cancel messages out to the
3383                          * target */
3384                         llog_sync(ctxt, exp, 0);
3385                 }
3386                 llog_ctxt_put(ctxt);
3387         } else {
3388                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3389                        obd);
3390         }
3391
3392         rc = client_disconnect_export(exp);
3393         /**
3394          * Initially we put del_shrink_grant before disconnect_export, but it
3395          * causes the following problem if setup (connect) and cleanup
3396          * (disconnect) are tangled together.
3397          *      connect p1                     disconnect p2
3398          *   ptlrpc_connect_import
3399          *     ...............               class_manual_cleanup
3400          *                                     osc_disconnect
3401          *                                     del_shrink_grant
3402          *   ptlrpc_connect_interrupt
3403          *     init_grant_shrink
3404          *   add this client to shrink list
3405          *                                      cleanup_osc
3406          * Bang! pinger trigger the shrink.
3407          * So the osc should be disconnected from the shrink list, after we
3408          * are sure the import has been destroyed. BUG18662
3409          */
3410         if (obd->u.cli.cl_import == NULL)
3411                 osc_del_shrink_grant(&obd->u.cli);
3412         return rc;
3413 }
3414
3415 static int osc_import_event(struct obd_device *obd,
3416                             struct obd_import *imp,
3417                             enum obd_import_event event)
3418 {
3419         struct client_obd *cli;
3420         int rc = 0;
3421
3422         ENTRY;
3423         LASSERT(imp->imp_obd == obd);
3424
3425         switch (event) {
3426         case IMP_EVENT_DISCON: {
3427                 cli = &obd->u.cli;
3428                 client_obd_list_lock(&cli->cl_loi_list_lock);
3429                 cli->cl_avail_grant = 0;
3430                 cli->cl_lost_grant = 0;
3431                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3432                 break;
3433         }
3434         case IMP_EVENT_INACTIVE: {
3435                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3436                 break;
3437         }
3438         case IMP_EVENT_INVALIDATE: {
3439                 struct ldlm_namespace *ns = obd->obd_namespace;
3440                 struct lu_env         *env;
3441                 int                    refcheck;
3442
3443                 env = cl_env_get(&refcheck);
3444                 if (!IS_ERR(env)) {
3445                         /* Reset grants */
3446                         cli = &obd->u.cli;
3447                         /* all pages go to failing rpcs due to the invalid
3448                          * import */
3449                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3450
3451                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3452                         cl_env_put(env, &refcheck);
3453                 } else
3454                         rc = PTR_ERR(env);
3455                 break;
3456         }
3457         case IMP_EVENT_ACTIVE: {
3458                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3459                 break;
3460         }
3461         case IMP_EVENT_OCD: {
3462                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3463
3464                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3465                         osc_init_grant(&obd->u.cli, ocd);
3466
3467                 /* See bug 7198 */
3468                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3469                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3470
3471                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3472                 break;
3473         }
3474         case IMP_EVENT_DEACTIVATE: {
3475                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3476                 break;
3477         }
3478         case IMP_EVENT_ACTIVATE: {
3479                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3480                 break;
3481         }
3482         default:
3483                 CERROR("Unknown import event %d\n", event);
3484                 LBUG();
3485         }
3486         RETURN(rc);
3487 }
3488
3489 /**
3490  * Determine whether the lock can be canceled before replaying the lock
3491  * during recovery, see bug16774 for detailed information.
3492  *
3493  * \retval zero the lock can't be canceled
3494  * \retval other ok to cancel
3495  */
3496 static int osc_cancel_weight(struct ldlm_lock *lock)
3497 {
3498         /*
3499          * Cancel all unused and granted extent lock.
3500          */
3501         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3502             lock->l_granted_mode == lock->l_req_mode &&
3503             osc_ldlm_weigh_ast(lock) == 0)
3504                 RETURN(1);
3505
3506         RETURN(0);
3507 }
3508
3509 static int brw_queue_work(const struct lu_env *env, void *data)
3510 {
3511         struct client_obd *cli = data;
3512
3513         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3514
3515         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3516         RETURN(0);
3517 }
3518
3519 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3520 {
3521         struct client_obd          *cli = &obd->u.cli;
3522         void                       *handler;
3523         int                        rc;
3524         ENTRY;
3525
3526         rc = ptlrpcd_addref();
3527         if (rc)
3528                 RETURN(rc);
3529
3530         rc = client_obd_setup(obd, lcfg);
3531         if (rc)
3532                 GOTO(out_ptlrpcd, rc);
3533
3534         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3535         if (IS_ERR(handler))
3536                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3537         cli->cl_writeback_work = handler;
3538
3539         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3540         if (IS_ERR(handler))
3541                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3542         cli->cl_lru_work = handler;
3543
3544         rc = osc_quota_setup(obd);
3545         if (rc)
3546                 GOTO(out_ptlrpcd_work, rc);
3547
3548         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3549 #ifdef LPROCFS
3550         obd->obd_vars = lprocfs_osc_obd_vars;
3551 #endif
3552         if (lprocfs_seq_obd_setup(obd) == 0) {
3553                 lproc_osc_attach_seqstat(obd);
3554                 sptlrpc_lprocfs_cliobd_attach(obd);
3555                 ptlrpc_lprocfs_register_obd(obd);
3556         }
3557
3558         /* We need to allocate a few requests more, because
3559          * brw_interpret tries to create new requests before freeing
3560          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3561          * reserved, but I'm afraid that might be too much wasted RAM
3562          * in fact, so 2 is just my guess and still should work. */
3563         cli->cl_import->imp_rq_pool =
3564                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3565                                     OST_MAXREQSIZE,
3566                                     ptlrpc_add_rqs_to_pool);
3567
3568         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3569         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3570         RETURN(rc);
3571
3572 out_ptlrpcd_work:
3573         if (cli->cl_writeback_work != NULL) {
3574                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3575                 cli->cl_writeback_work = NULL;
3576         }
3577         if (cli->cl_lru_work != NULL) {
3578                 ptlrpcd_destroy_work(cli->cl_lru_work);
3579                 cli->cl_lru_work = NULL;
3580         }
3581 out_client_setup:
3582         client_obd_cleanup(obd);
3583 out_ptlrpcd:
3584         ptlrpcd_decref();
3585         RETURN(rc);
3586 }
3587
3588 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3589 {
3590         int rc = 0;
3591         ENTRY;
3592
3593         switch (stage) {
3594         case OBD_CLEANUP_EARLY: {
3595                 struct obd_import *imp;
3596                 imp = obd->u.cli.cl_import;
3597                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3598                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3599                 ptlrpc_deactivate_import(imp);
3600                 spin_lock(&imp->imp_lock);
3601                 imp->imp_pingable = 0;
3602                 spin_unlock(&imp->imp_lock);
3603                 break;
3604         }
3605         case OBD_CLEANUP_EXPORTS: {
3606                 struct client_obd *cli = &obd->u.cli;
3607                 /* LU-464
3608                  * for echo client, export may be on zombie list, wait for
3609                  * zombie thread to cull it, because cli.cl_import will be
3610                  * cleared in client_disconnect_export():
3611                  *   class_export_destroy() -> obd_cleanup() ->
3612                  *   echo_device_free() -> echo_client_cleanup() ->
3613                  *   obd_disconnect() -> osc_disconnect() ->
3614                  *   client_disconnect_export()
3615                  */
3616                 obd_zombie_barrier();
3617                 if (cli->cl_writeback_work) {
3618                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3619                         cli->cl_writeback_work = NULL;
3620                 }
3621                 if (cli->cl_lru_work) {
3622                         ptlrpcd_destroy_work(cli->cl_lru_work);
3623                         cli->cl_lru_work = NULL;
3624                 }
3625                 obd_cleanup_client_import(obd);
3626                 ptlrpc_lprocfs_unregister_obd(obd);
3627                 lprocfs_obd_cleanup(obd);
3628                 rc = obd_llog_finish(obd, 0);
3629                 if (rc != 0)
3630                         CERROR("failed to cleanup llogging subsystems\n");
3631                 break;
3632                 }
3633         }
3634         RETURN(rc);
3635 }
3636
3637 int osc_cleanup(struct obd_device *obd)
3638 {
3639         struct client_obd *cli = &obd->u.cli;
3640         int rc;
3641
3642         ENTRY;
3643
3644         /* lru cleanup */
3645         if (cli->cl_cache != NULL) {
3646                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3647                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3648                 cfs_list_del_init(&cli->cl_lru_osc);
3649                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3650                 cli->cl_lru_left = NULL;
3651                 atomic_dec(&cli->cl_cache->ccc_users);
3652                 cli->cl_cache = NULL;
3653         }
3654
3655         /* free memory of osc quota cache */
3656         osc_quota_cleanup(obd);
3657
3658         rc = client_obd_cleanup(obd);
3659
3660         ptlrpcd_decref();
3661         RETURN(rc);
3662 }
3663
3664 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3665 {
3666         int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3667                                               lcfg, obd);
3668         return rc > 0 ? 0: rc;
3669 }
3670
3671 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3672 {
3673         return osc_process_config_base(obd, buf);
3674 }
3675
3676 struct obd_ops osc_obd_ops = {
3677         .o_owner                = THIS_MODULE,
3678         .o_setup                = osc_setup,
3679         .o_precleanup           = osc_precleanup,
3680         .o_cleanup              = osc_cleanup,
3681         .o_add_conn             = client_import_add_conn,
3682         .o_del_conn             = client_import_del_conn,
3683         .o_connect              = client_connect_import,
3684         .o_reconnect            = osc_reconnect,
3685         .o_disconnect           = osc_disconnect,
3686         .o_statfs               = osc_statfs,
3687         .o_statfs_async         = osc_statfs_async,
3688         .o_packmd               = osc_packmd,
3689         .o_unpackmd             = osc_unpackmd,
3690         .o_create               = osc_create,
3691         .o_destroy              = osc_destroy,
3692         .o_getattr              = osc_getattr,
3693         .o_getattr_async        = osc_getattr_async,
3694         .o_setattr              = osc_setattr,
3695         .o_setattr_async        = osc_setattr_async,
3696         .o_brw                  = osc_brw,
3697         .o_punch                = osc_punch,
3698         .o_sync                 = osc_sync,
3699         .o_enqueue              = osc_enqueue,
3700         .o_change_cbdata        = osc_change_cbdata,
3701         .o_find_cbdata          = osc_find_cbdata,
3702         .o_cancel               = osc_cancel,
3703         .o_cancel_unused        = osc_cancel_unused,
3704         .o_iocontrol            = osc_iocontrol,
3705         .o_get_info             = osc_get_info,
3706         .o_set_info_async       = osc_set_info_async,
3707         .o_import_event         = osc_import_event,
3708         .o_llog_init            = osc_llog_init,
3709         .o_llog_finish          = osc_llog_finish,
3710         .o_process_config       = osc_process_config,
3711         .o_quotactl             = osc_quotactl,
3712         .o_quotacheck           = osc_quotacheck,
3713 };
3714
3715 extern struct lu_kmem_descr osc_caches[];
3716 extern spinlock_t osc_ast_guard;
3717 extern struct lock_class_key osc_ast_guard_class;
3718
3719 int __init osc_init(void)
3720 {
3721         int rc;
3722         ENTRY;
3723
3724         /* print an address of _any_ initialized kernel symbol from this
3725          * module, to allow debugging with gdb that doesn't support data
3726          * symbols from modules.*/
3727         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3728
3729         rc = lu_kmem_init(osc_caches);
3730         if (rc)
3731                 RETURN(rc);
3732
3733         rc = class_register_type(&osc_obd_ops, NULL, NULL,
3734 #ifndef HAVE_ONLY_PROCFS_SEQ
3735                                 NULL,
3736 #endif
3737                                 LUSTRE_OSC_NAME, &osc_device_type);
3738         if (rc) {
3739                 lu_kmem_fini(osc_caches);
3740                 RETURN(rc);
3741         }
3742
3743         spin_lock_init(&osc_ast_guard);
3744         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3745
3746         RETURN(rc);
3747 }
3748
3749 #ifdef __KERNEL__
3750 static void /*__exit*/ osc_exit(void)
3751 {
3752         class_unregister_type(LUSTRE_OSC_NAME);
3753         lu_kmem_fini(osc_caches);
3754 }
3755
3756 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3757 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3758 MODULE_LICENSE("GPL");
3759
3760 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3761 #endif