forked from aws/aws-cli
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathfactory.py
More file actions
141 lines (121 loc) · 5.45 KB
/
factory.py
File metadata and controls
141 lines (121 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
from botocore.client import Config
from botocore.httpsession import DEFAULT_CA_BUNDLE
from s3transfer.manager import TransferManager
from s3transfer.crt import (
create_s3_crt_client, BotocoreCRTRequestSerializer, CRTTransferManager
)
from awscli.compat import urlparse
from awscli.customizations.s3 import constants
from awscli.customizations.s3.transferconfig import \
create_transfer_config_from_runtime_config
LOGGER = logging.getLogger(__name__)
class ClientFactory:
def __init__(self, session):
self._session = session
def create_client(self, params, is_source_client=False):
create_client_kwargs = {
'verify': params['verify_ssl']
}
if params.get('sse') == 'aws:kms':
create_client_kwargs['config'] = Config(signature_version='s3v4')
region = params['region']
endpoint_url = params['endpoint_url']
if is_source_client and params['source_region']:
if params['paths_type'] == 's3s3':
region = params['source_region']
endpoint_url = None
create_client_kwargs['region_name'] = region
create_client_kwargs['endpoint_url'] = endpoint_url
return self._session.create_client('s3', **create_client_kwargs)
class TransferManagerFactory:
_MAX_IN_MEMORY_CHUNKS = 6
def __init__(self, session):
self._session = session
self._botocore_client_factory = ClientFactory(self._session)
def create_transfer_manager(self, params, runtime_config,
botocore_client=None):
client_type = self._compute_transfer_client_type(
params, runtime_config)
if client_type == constants.CRT_TRANSFER_CLIENT:
return self._create_crt_transfer_manager(params, runtime_config)
else:
return self._create_default_transfer_manager(
params, runtime_config, botocore_client)
def _compute_transfer_client_type(self, params, runtime_config):
if params.get('paths_type') == 's3s3':
return constants.DEFAULT_TRANSFER_CLIENT
return runtime_config.get(
'preferred_transfer_client', constants.DEFAULT_TRANSFER_CLIENT)
def _create_crt_transfer_manager(self, params, runtime_config):
return CRTTransferManager(
self._create_crt_client(params, runtime_config),
self._create_crt_request_serializer(params)
)
def _create_crt_client(self, params, runtime_config):
create_crt_client_kwargs = {
'region': self._resolve_region(params),
'verify': self._resolve_verify(params),
}
endpoint_url = params.get('endpoint_url')
if endpoint_url and urlparse.urlparse(endpoint_url).scheme == 'http':
create_crt_client_kwargs['use_ssl'] = False
target_throughput = runtime_config.get('target_bandwidth', None)
multipart_chunksize = runtime_config.get('multipart_chunksize', None)
if target_throughput:
create_crt_client_kwargs['target_throughput'] = target_throughput
if multipart_chunksize:
create_crt_client_kwargs['part_size'] = multipart_chunksize
if params.get('sign_request', True):
create_crt_client_kwargs[
'botocore_credential_provider'] = self._session.get_component(
'credential_provider')
return create_s3_crt_client(**create_crt_client_kwargs)
def _create_crt_request_serializer(self, params):
return BotocoreCRTRequestSerializer(
self._session,
{
'region_name': self._resolve_region(params),
'endpoint_url': params.get('endpoint_url'),
}
)
def _create_default_transfer_manager(self, params, runtime_config,
client=None):
if client is None:
client = self._botocore_client_factory.create_client(params)
transfer_config = create_transfer_config_from_runtime_config(
runtime_config)
transfer_config.max_in_memory_upload_chunks = \
self._MAX_IN_MEMORY_CHUNKS
transfer_config.max_in_memory_download_chunks = \
self._MAX_IN_MEMORY_CHUNKS
LOGGER.debug(
"Using a multipart threshold of %s and a part size of %s",
transfer_config.multipart_threshold,
transfer_config.multipart_chunksize
)
return TransferManager(client, transfer_config)
def _resolve_region(self, params):
region = params.get('region')
if region is None:
region = self._session.get_config_variable('region')
return region
def _resolve_verify(self, params):
verify = params.get('verify_ssl')
if verify is None:
verify = self._session.get_config_variable('ca_bundle')
if verify is None:
verify = DEFAULT_CA_BUNDLE
return verify