4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
259 int repsize, repsize_estimate;
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 op_data->op_file_secctx_name_size : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
324 ptlrpc_request_free(req);
328 spin_lock(&req->rq_lock);
329 req->rq_replay = req->rq_import->imp_replayable;
330 spin_unlock(&req->rq_lock);
332 /* pack the intent */
333 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334 lit->opc = (__u64)it->it_op;
336 /* pack the intended request */
337 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
340 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341 obddev->u.cli.cl_max_mds_easize);
342 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
344 if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
345 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
347 op_data->op_file_secctx_name_size > 0 &&
348 op_data->op_file_secctx_name != NULL) {
351 secctx_name = req_capsule_client_get(&req->rq_pill,
352 &RMF_FILE_SECCTX_NAME);
353 memcpy(secctx_name, op_data->op_file_secctx_name,
354 op_data->op_file_secctx_name_size);
355 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
357 obddev->u.cli.cl_max_mds_easize);
359 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
360 op_data->op_file_secctx_name_size,
361 op_data->op_file_secctx_name);
364 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
369 * Inline buffer for possible data from Data-on-MDT files.
371 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
372 sizeof(struct niobuf_remote));
373 ptlrpc_request_set_replen(req);
375 /* Get real repbuf allocated size as rounded up power of 2 */
376 repsize = size_roundup_power2(req->rq_replen +
377 lustre_msg_early_size());
378 /* Estimate free space for DoM files in repbuf */
379 repsize_estimate = repsize - (req->rq_replen -
380 obddev->u.cli.cl_max_mds_easize +
381 sizeof(struct lov_comp_md_v1) +
382 sizeof(struct lov_comp_md_entry_v1) +
383 lov_mds_md_size(0, LOV_MAGIC_V3));
385 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
386 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
387 repsize_estimate + sizeof(struct niobuf_remote);
388 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
390 sizeof(struct niobuf_remote) + repsize);
391 ptlrpc_request_set_replen(req);
392 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
393 repsize, req->rq_replen);
394 repsize = size_roundup_power2(req->rq_replen +
395 lustre_msg_early_size());
397 /* The only way to report real allocated repbuf size to the server
398 * is the lm_repsize but it must be set prior buffer allocation itself
399 * due to security reasons - it is part of buffer used in signature
400 * calculation (see LU-11414). Therefore the saved size is predicted
401 * value as rq_replen rounded to the next higher power of 2.
402 * Such estimation is safe. Though the final allocated buffer might
403 * be even larger, it is not possible to know that at this point.
405 req->rq_reqmsg->lm_repsize = repsize;
409 #define GA_DEFAULT_EA_NAME_LEN 20
410 #define GA_DEFAULT_EA_VAL_LEN 250
411 #define GA_DEFAULT_EA_NUM 10
413 static struct ptlrpc_request *
414 mdc_intent_getxattr_pack(struct obd_export *exp,
415 struct lookup_intent *it,
416 struct md_op_data *op_data)
418 struct ptlrpc_request *req;
419 struct ldlm_intent *lit;
421 struct list_head cancels = LIST_HEAD_INIT(cancels);
422 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
426 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
427 &RQF_LDLM_INTENT_GETXATTR);
429 RETURN(ERR_PTR(-ENOMEM));
431 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
433 ptlrpc_request_free(req);
437 /* pack the intent */
438 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439 lit->opc = IT_GETXATTR;
440 CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
441 exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
443 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
444 /* If the supplied buffer is too small then the server will
445 * return -ERANGE and llite will fallback to using non cached
446 * xattr operations. On servers before 2.10.1 a (non-cached)
447 * listxattr RPC for an orphan or dead file causes an oops. So
448 * let's try to avoid sending too small a buffer to too old a
449 * server. This is effectively undoing the memory conservation
450 * of LU-9417 when it would be *more* likely to crash the
451 * server. See LU-9856. */
452 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
453 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
454 exp->exp_connect_data.ocd_max_easize);
457 /* pack the intended request */
458 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
459 ea_vals_buf_size, -1, 0);
461 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
462 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
464 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
467 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
468 sizeof(u32) * GA_DEFAULT_EA_NUM);
470 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
472 ptlrpc_request_set_replen(req);
477 static struct ptlrpc_request *
478 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
479 struct md_op_data *op_data, __u32 acl_bufsize)
481 struct ptlrpc_request *req;
482 struct obd_device *obddev = class_exp2obd(exp);
483 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
484 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
485 OBD_MD_MEA | OBD_MD_FLACL;
486 struct ldlm_intent *lit;
489 bool have_secctx = false;
492 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
493 &RQF_LDLM_INTENT_GETATTR);
495 RETURN(ERR_PTR(-ENOMEM));
497 /* send name of security xattr to get upon intent */
498 if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
499 req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
501 op_data->op_file_secctx_name_size > 0 &&
502 op_data->op_file_secctx_name != NULL) {
504 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
506 op_data->op_file_secctx_name_size);
509 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
510 op_data->op_namelen + 1);
512 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
514 ptlrpc_request_free(req);
518 /* pack the intent */
519 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
520 lit->opc = (__u64)it->it_op;
522 if (obddev->u.cli.cl_default_mds_easize > 0)
523 easize = obddev->u.cli.cl_default_mds_easize;
525 easize = obddev->u.cli.cl_max_mds_easize;
527 /* pack the intended request */
528 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
530 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
531 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
536 secctx_name = req_capsule_client_get(&req->rq_pill,
537 &RMF_FILE_SECCTX_NAME);
538 memcpy(secctx_name, op_data->op_file_secctx_name,
539 op_data->op_file_secctx_name_size);
541 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
544 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
545 op_data->op_file_secctx_name_size,
546 op_data->op_file_secctx_name);
548 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
552 ptlrpc_request_set_replen(req);
556 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
557 struct lookup_intent *it,
558 struct md_op_data *op_data)
560 struct obd_device *obd = class_exp2obd(exp);
561 struct ptlrpc_request *req;
562 struct ldlm_intent *lit;
563 struct layout_intent *layout;
567 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
568 &RQF_LDLM_INTENT_LAYOUT);
570 RETURN(ERR_PTR(-ENOMEM));
572 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
573 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
575 ptlrpc_request_free(req);
579 /* pack the intent */
580 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
581 lit->opc = (__u64)it->it_op;
583 /* pack the layout intent request */
584 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
585 LASSERT(op_data->op_data != NULL);
586 LASSERT(op_data->op_data_size == sizeof(*layout));
587 memcpy(layout, op_data->op_data, sizeof(*layout));
589 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
590 obd->u.cli.cl_default_mds_easize);
591 ptlrpc_request_set_replen(req);
595 static struct ptlrpc_request *
596 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
598 struct ptlrpc_request *req;
602 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
604 RETURN(ERR_PTR(-ENOMEM));
606 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
608 ptlrpc_request_free(req);
612 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
613 ptlrpc_request_set_replen(req);
617 static int mdc_finish_enqueue(struct obd_export *exp,
618 struct ptlrpc_request *req,
619 struct ldlm_enqueue_info *einfo,
620 struct lookup_intent *it,
621 struct lustre_handle *lockh,
624 struct req_capsule *pill = &req->rq_pill;
625 struct ldlm_request *lockreq;
626 struct ldlm_reply *lockrep;
627 struct ldlm_lock *lock;
628 struct mdt_body *body = NULL;
629 void *lvb_data = NULL;
635 /* Similarly, if we're going to replay this request, we don't want to
636 * actually get a lock, just perform the intent. */
637 if (req->rq_transno || req->rq_replay) {
638 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
639 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
642 if (rc == ELDLM_LOCK_ABORTED) {
644 memset(lockh, 0, sizeof(*lockh));
646 } else { /* rc = 0 */
647 lock = ldlm_handle2lock(lockh);
648 LASSERT(lock != NULL);
650 /* If the server gave us back a different lock mode, we should
651 * fix up our variables. */
652 if (lock->l_req_mode != einfo->ei_mode) {
653 ldlm_lock_addref(lockh, lock->l_req_mode);
654 ldlm_lock_decref(lockh, einfo->ei_mode);
655 einfo->ei_mode = lock->l_req_mode;
660 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
661 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
663 it->it_disposition = (int)lockrep->lock_policy_res1;
664 it->it_status = (int)lockrep->lock_policy_res2;
665 it->it_lock_mode = einfo->ei_mode;
666 it->it_lock_handle = lockh->cookie;
667 it->it_request = req;
669 /* Technically speaking rq_transno must already be zero if
670 * it_status is in error, so the check is a bit redundant */
671 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
672 mdc_clear_replay_flag(req, it->it_status);
674 /* If we're doing an IT_OPEN which did not result in an actual
675 * successful open, then we need to remove the bit which saves
676 * this request for unconditional replay.
678 * It's important that we do this first! Otherwise we might exit the
679 * function without doing so, and try to replay a failed create
681 if (it->it_op & IT_OPEN && req->rq_replay &&
682 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
683 mdc_clear_replay_flag(req, it->it_status);
685 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
686 it->it_op, it->it_disposition, it->it_status);
688 /* We know what to expect, so we do any byte flipping required here */
689 if (it_has_reply_body(it)) {
690 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
692 CERROR ("Can't swab mdt_body\n");
696 if (it_disposition(it, DISP_OPEN_OPEN) &&
697 !it_open_error(DISP_OPEN_OPEN, it)) {
699 * If this is a successful OPEN request, we need to set
700 * replay handler and data early, so that if replay
701 * happens immediately after swabbing below, new reply
702 * is swabbed by that handler correctly.
704 mdc_set_open_replay_data(NULL, NULL, it);
707 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
710 mdc_update_max_ea_from_body(exp, body);
713 * The eadata is opaque; just check that it is there.
714 * Eventually, obd_unpackmd() will check the contents.
716 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
717 body->mbo_eadatasize);
721 /* save lvb data and length in case this is for layout
724 lvb_len = body->mbo_eadatasize;
727 * We save the reply LOV EA in case we have to replay a
728 * create for recovery. If we didn't allocate a large
729 * enough request buffer above we need to reallocate it
730 * here to hold the actual LOV EA.
732 * To not save LOV EA if request is not going to replay
733 * (for example error one).
735 if ((it->it_op & IT_OPEN) && req->rq_replay) {
736 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
737 body->mbo_eadatasize);
739 body->mbo_valid &= ~OBD_MD_FLEASIZE;
740 body->mbo_eadatasize = 0;
745 } else if (it->it_op & IT_LAYOUT) {
746 /* maybe the lock was granted right away and layout
747 * is packed into RMF_DLM_LVB of req */
748 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
749 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
750 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
752 lvb_data = req_capsule_server_sized_get(pill,
753 &RMF_DLM_LVB, lvb_len);
754 if (lvb_data == NULL)
758 * save replied layout data to the request buffer for
759 * recovery consideration (lest MDS reinitialize
760 * another set of OST objects).
763 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
768 /* fill in stripe data for layout lock.
769 * LU-6581: trust layout data only if layout lock is granted. The MDT
770 * has stopped sending layout unless the layout lock is granted. The
771 * client still does this checking in case it's talking with an old
772 * server. - Jinshan */
773 lock = ldlm_handle2lock(lockh);
777 if (ldlm_has_layout(lock) && lvb_data != NULL &&
778 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
781 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
782 ldlm_it2str(it->it_op), lvb_len);
784 OBD_ALLOC_LARGE(lmm, lvb_len);
786 GOTO(out_lock, rc = -ENOMEM);
788 memcpy(lmm, lvb_data, lvb_len);
790 /* install lvb_data */
791 lock_res_and_lock(lock);
792 if (lock->l_lvb_data == NULL) {
793 lock->l_lvb_type = LVB_T_LAYOUT;
794 lock->l_lvb_data = lmm;
795 lock->l_lvb_len = lvb_len;
798 unlock_res_and_lock(lock);
800 OBD_FREE_LARGE(lmm, lvb_len);
803 if (ldlm_has_dom(lock)) {
804 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
806 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
807 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
808 LDLM_ERROR(lock, "%s: DoM lock without size.",
809 exp->exp_obd->obd_name);
810 GOTO(out_lock, rc = -EPROTO);
813 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
814 ldlm_it2str(it->it_op), body->mbo_dom_size);
816 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
824 /* We always reserve enough space in the reply packet for a stripe MD, because
825 * we don't know in advance the file type. */
826 static int mdc_enqueue_base(struct obd_export *exp,
827 struct ldlm_enqueue_info *einfo,
828 const union ldlm_policy_data *policy,
829 struct lookup_intent *it,
830 struct md_op_data *op_data,
831 struct lustre_handle *lockh,
832 __u64 extra_lock_flags)
834 struct obd_device *obddev = class_exp2obd(exp);
835 struct ptlrpc_request *req = NULL;
836 __u64 flags, saved_flags = extra_lock_flags;
837 struct ldlm_res_id res_id;
838 static const union ldlm_policy_data lookup_policy = {
839 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
840 static const union ldlm_policy_data update_policy = {
841 .l_inodebits = { MDS_INODELOCK_UPDATE } };
842 static const union ldlm_policy_data layout_policy = {
843 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
844 static const union ldlm_policy_data getxattr_policy = {
845 .l_inodebits = { MDS_INODELOCK_XATTR } };
846 int generation, resends = 0;
847 struct ldlm_reply *lockrep;
848 struct obd_import *imp = class_exp2cliimp(exp);
850 enum lvb_type lvb_type = 0;
854 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
856 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
859 LASSERT(policy == NULL);
861 saved_flags |= LDLM_FL_HAS_INTENT;
862 if (it->it_op & (IT_GETATTR | IT_READDIR))
863 policy = &update_policy;
864 else if (it->it_op & IT_LAYOUT)
865 policy = &layout_policy;
866 else if (it->it_op & IT_GETXATTR)
867 policy = &getxattr_policy;
869 policy = &lookup_policy;
872 generation = obddev->u.cli.cl_import->imp_generation;
873 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
874 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
877 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
882 /* The only way right now is FLOCK. */
883 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
885 res_id.name[3] = LDLM_FLOCK;
886 } else if (it->it_op & IT_OPEN) {
887 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
888 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
889 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
890 } else if (it->it_op & IT_READDIR) {
891 req = mdc_enqueue_pack(exp, 0);
892 } else if (it->it_op & IT_LAYOUT) {
893 if (!imp_connect_lvb_type(imp))
895 req = mdc_intent_layout_pack(exp, it, op_data);
896 lvb_type = LVB_T_LAYOUT;
897 } else if (it->it_op & IT_GETXATTR) {
898 req = mdc_intent_getxattr_pack(exp, it, op_data);
905 RETURN(PTR_ERR(req));
908 req->rq_generation_set = 1;
909 req->rq_import_generation = generation;
910 req->rq_sent = ktime_get_real_seconds() + resends;
913 /* It is important to obtain modify RPC slot first (if applicable), so
914 * that threads that are waiting for a modify RPC slot are not polluting
915 * our rpcs in flight counter.
916 * We do not do flock request limiting, though */
918 mdc_get_mod_rpc_slot(req, it);
919 rc = obd_get_request_slot(&obddev->u.cli);
921 mdc_put_mod_rpc_slot(req, it);
922 mdc_clear_replay_flag(req, 0);
923 ptlrpc_req_finished(req);
928 /* With Data-on-MDT the glimpse callback is needed too.
929 * It is set here in advance but not in mdc_finish_enqueue()
930 * to avoid possible races. It is safe to have glimpse handler
931 * for non-DOM locks and costs nothing.*/
932 if (einfo->ei_cb_gl == NULL)
933 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
935 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
936 0, lvb_type, lockh, 0);
938 /* For flock requests we immediatelly return without further
939 delay and let caller deal with the rest, since rest of
940 this function metadata processing makes no sense for flock
941 requests anyway. But in case of problem during comms with
942 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
943 can not rely on caller and this mainly for F_UNLCKs
944 (explicits or automatically generated by Kernel to clean
945 current FLocks upon exit) that can't be trashed */
946 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
947 (einfo->ei_type == LDLM_FLOCK) &&
948 (einfo->ei_mode == LCK_NL))
953 obd_put_request_slot(&obddev->u.cli);
954 mdc_put_mod_rpc_slot(req, it);
958 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
959 obddev->obd_name, PFID(&op_data->op_fid1),
960 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
962 mdc_clear_replay_flag(req, rc);
963 ptlrpc_req_finished(req);
967 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
968 LASSERT(lockrep != NULL);
970 lockrep->lock_policy_res2 =
971 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
973 /* Retry infinitely when the server returns -EINPROGRESS for the
974 * intent operation, when server returns -EINPROGRESS for acquiring
975 * intent lock, we'll retry in after_reply(). */
976 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
977 mdc_clear_replay_flag(req, rc);
978 ptlrpc_req_finished(req);
979 if (generation == obddev->u.cli.cl_import->imp_generation) {
980 if (signal_pending(current))
984 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
985 obddev->obd_name, resends, it->it_op,
986 PFID(&op_data->op_fid1),
987 PFID(&op_data->op_fid2));
990 CDEBUG(D_HA, "resend cross eviction\n");
995 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
996 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
997 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
998 mdc_clear_replay_flag(req, -ERANGE);
999 ptlrpc_req_finished(req);
1000 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1005 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1007 if (lustre_handle_is_used(lockh)) {
1008 ldlm_lock_decref(lockh, einfo->ei_mode);
1009 memset(lockh, 0, sizeof(*lockh));
1011 ptlrpc_req_finished(req);
1013 it->it_lock_handle = 0;
1014 it->it_lock_mode = 0;
1015 it->it_request = NULL;
1021 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1022 const union ldlm_policy_data *policy,
1023 struct md_op_data *op_data,
1024 struct lustre_handle *lockh, __u64 extra_lock_flags)
1026 return mdc_enqueue_base(exp, einfo, policy, NULL,
1027 op_data, lockh, extra_lock_flags);
1030 static int mdc_finish_intent_lock(struct obd_export *exp,
1031 struct ptlrpc_request *request,
1032 struct md_op_data *op_data,
1033 struct lookup_intent *it,
1034 struct lustre_handle *lockh)
1036 struct lustre_handle old_lock;
1037 struct ldlm_lock *lock;
1041 LASSERT(request != NULL);
1042 LASSERT(request != LP_POISON);
1043 LASSERT(request->rq_repmsg != LP_POISON);
1045 if (it->it_op & IT_READDIR)
1048 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1049 if (it->it_status != 0)
1050 GOTO(out, rc = it->it_status);
1052 if (!it_disposition(it, DISP_IT_EXECD)) {
1053 /* The server failed before it even started executing
1054 * the intent, i.e. because it couldn't unpack the
1057 LASSERT(it->it_status != 0);
1058 GOTO(out, rc = it->it_status);
1060 rc = it_open_error(DISP_IT_EXECD, it);
1064 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1068 /* keep requests around for the multiple phases of the call
1069 * this shows the DISP_XX must guarantee we make it into the
1072 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1073 it_disposition(it, DISP_OPEN_CREATE) &&
1074 !it_open_error(DISP_OPEN_CREATE, it)) {
1075 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1076 /* balanced in ll_create_node */
1077 ptlrpc_request_addref(request);
1079 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1080 it_disposition(it, DISP_OPEN_OPEN) &&
1081 !it_open_error(DISP_OPEN_OPEN, it)) {
1082 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1083 /* balanced in ll_file_open */
1084 ptlrpc_request_addref(request);
1085 /* BUG 11546 - eviction in the middle of open rpc
1088 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1092 if (it->it_op & IT_CREAT) {
1093 /* XXX this belongs in ll_create_it */
1094 } else if (it->it_op == IT_OPEN) {
1095 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1097 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1101 /* If we already have a matching lock, then cancel the new
1102 * one. We have to set the data here instead of in
1103 * mdc_enqueue, because we need to use the child's inode as
1104 * the l_ast_data to match, and that's not available until
1105 * intent_finish has performed the iget().) */
1106 lock = ldlm_handle2lock(lockh);
1108 union ldlm_policy_data policy = lock->l_policy_data;
1109 LDLM_DEBUG(lock, "matching against this");
1111 if (it_has_reply_body(it)) {
1112 struct mdt_body *body;
1114 body = req_capsule_server_get(&request->rq_pill,
1116 /* mdc_enqueue checked */
1117 LASSERT(body != NULL);
1118 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1119 &lock->l_resource->lr_name),
1120 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1121 PLDLMRES(lock->l_resource),
1122 PFID(&body->mbo_fid1));
1124 LDLM_LOCK_PUT(lock);
1126 memcpy(&old_lock, lockh, sizeof(*lockh));
1127 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1128 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1129 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1130 memcpy(lockh, &old_lock, sizeof(old_lock));
1131 it->it_lock_handle = lockh->cookie;
1137 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1138 (int)op_data->op_namelen, op_data->op_name,
1139 ldlm_it2str(it->it_op), it->it_status,
1140 it->it_disposition, rc);
1144 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1145 struct lu_fid *fid, __u64 *bits)
1147 /* We could just return 1 immediately, but since we should only
1148 * be called in revalidate_it if we already have a lock, let's
1150 struct ldlm_res_id res_id;
1151 struct lustre_handle lockh;
1152 union ldlm_policy_data policy;
1153 enum ldlm_mode mode;
1156 if (it->it_lock_handle) {
1157 lockh.cookie = it->it_lock_handle;
1158 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1160 fid_build_reg_res_name(fid, &res_id);
1161 switch (it->it_op) {
1163 /* File attributes are held under multiple bits:
1164 * nlink is under lookup lock, size and times are
1165 * under UPDATE lock and recently we've also got
1166 * a separate permissions lock for owner/group/acl that
1167 * were protected by lookup lock before.
1168 * Getattr must provide all of that information,
1169 * so we need to ensure we have all of those locks.
1170 * Unfortunately, if the bits are split across multiple
1171 * locks, there's no easy way to match all of them here,
1172 * so an extra RPC would be performed to fetch all
1173 * of those bits at once for now. */
1174 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1175 * but for old MDTs (< 2.4), permission is covered
1176 * by LOOKUP lock, so it needs to match all bits here.*/
1177 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1178 MDS_INODELOCK_LOOKUP |
1182 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1185 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1188 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1192 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1193 LDLM_IBITS, &policy,
1194 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1199 it->it_lock_handle = lockh.cookie;
1200 it->it_lock_mode = mode;
1202 it->it_lock_handle = 0;
1203 it->it_lock_mode = 0;
1210 * This long block is all about fixing up the lock and request state
1211 * so that it is correct as of the moment _before_ the operation was
1212 * applied; that way, the VFS will think that everything is normal and
1213 * call Lustre's regular VFS methods.
1215 * If we're performing a creation, that means that unless the creation
1216 * failed with EEXIST, we should fake up a negative dentry.
1218 * For everything else, we want to lookup to succeed.
1220 * One additional note: if CREATE or OPEN succeeded, we add an extra
1221 * reference to the request because we need to keep it around until
1222 * ll_create/ll_open gets called.
1224 * The server will return to us, in it_disposition, an indication of
1225 * exactly what it_status refers to.
1227 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1228 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1229 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1230 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1233 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1236 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1237 struct lookup_intent *it, struct ptlrpc_request **reqp,
1238 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1240 struct ldlm_enqueue_info einfo = {
1241 .ei_type = LDLM_IBITS,
1242 .ei_mode = it_to_lock_mode(it),
1243 .ei_cb_bl = cb_blocking,
1244 .ei_cb_cp = ldlm_completion_ast,
1245 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1247 struct lustre_handle lockh;
1252 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1253 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1254 op_data->op_name, PFID(&op_data->op_fid2),
1255 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1259 if (fid_is_sane(&op_data->op_fid2) &&
1260 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1261 /* We could just return 1 immediately, but since we should only
1262 * be called in revalidate_it if we already have a lock, let's
1264 it->it_lock_handle = 0;
1265 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1266 /* Only return failure if it was not GETATTR by cfid
1267 (from inode_revalidate) */
1268 if (rc || op_data->op_namelen != 0)
1272 /* For case if upper layer did not alloc fid, do it now. */
1273 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1274 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1276 CERROR("Can't alloc new fid, rc %d\n", rc);
1281 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1286 *reqp = it->it_request;
1287 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1291 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1292 struct ptlrpc_request *req,
1295 struct mdc_getattr_args *ga = args;
1296 struct obd_export *exp = ga->ga_exp;
1297 struct md_enqueue_info *minfo = ga->ga_minfo;
1298 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1299 struct lookup_intent *it;
1300 struct lustre_handle *lockh;
1301 struct obd_device *obddev;
1302 struct ldlm_reply *lockrep;
1303 __u64 flags = LDLM_FL_HAS_INTENT;
1307 lockh = &minfo->mi_lockh;
1309 obddev = class_exp2obd(exp);
1311 obd_put_request_slot(&obddev->u.cli);
1312 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1315 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1316 &flags, NULL, 0, lockh, rc);
1318 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1319 mdc_clear_replay_flag(req, rc);
1323 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1324 LASSERT(lockrep != NULL);
1326 lockrep->lock_policy_res2 =
1327 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1329 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1333 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1337 minfo->mi_cb(req, minfo, rc);
1341 int mdc_intent_getattr_async(struct obd_export *exp,
1342 struct md_enqueue_info *minfo)
1344 struct md_op_data *op_data = &minfo->mi_data;
1345 struct lookup_intent *it = &minfo->mi_it;
1346 struct ptlrpc_request *req;
1347 struct mdc_getattr_args *ga;
1348 struct obd_device *obddev = class_exp2obd(exp);
1349 struct ldlm_res_id res_id;
1350 union ldlm_policy_data policy = {
1351 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1352 MDS_INODELOCK_UPDATE } };
1354 __u64 flags = LDLM_FL_HAS_INTENT;
1357 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1358 (int)op_data->op_namelen, op_data->op_name,
1359 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1361 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1362 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1363 * of the async getattr RPC will handle that by itself. */
1364 req = mdc_intent_getattr_pack(exp, it, op_data,
1365 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1367 RETURN(PTR_ERR(req));
1369 rc = obd_get_request_slot(&obddev->u.cli);
1371 ptlrpc_req_finished(req);
1375 /* With Data-on-MDT the glimpse callback is needed too.
1376 * It is set here in advance but not in mdc_finish_enqueue()
1377 * to avoid possible races. It is safe to have glimpse handler
1378 * for non-DOM locks and costs nothing.*/
1379 if (minfo->mi_einfo.ei_cb_gl == NULL)
1380 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1382 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1383 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1385 obd_put_request_slot(&obddev->u.cli);
1386 ptlrpc_req_finished(req);
1390 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1391 ga = ptlrpc_req_async_args(req);
1393 ga->ga_minfo = minfo;
1395 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1396 ptlrpcd_add_req(req);