2 """Test Mammoth connection for all data categories and CU options. 4 This test module contains test cases for testing connection 5 to Mammoth and validating response for all CUs (eg. SCRUBCU) 6 and all data categories (eg. settings, members, etc.). 8 Note: Each test method runs the same test case on multiple 9 CUs and multiple applicable data categories. 12 $ export ODY_MIGR_SECRET_KEY=**** &&\ 13 python3 opt/odyssey/tools/bin/ody_migr_mmth_test.py 20 from ody_migr_mmth_endpoint
import MammothMigration
21 from ody_migr_utils
import generate_hash, get_valid_json
22 from ody_migr_config
import (ENV_MIGR_SECRET_KEY,
32 DATA_CHOICES_FOR_TEST,
40 logging.disable(logging.ERROR)
43 assert os.getenv(ENV_MIGR_SECRET_KEY)
is not None,\
44 "Environment variable: `{}` is not set!".format(ENV_MIGR_SECRET_KEY)
48 """Test cases for Mammoth connection and received data response.""" 51 """Set up test environment. 53 Prepare essential Mammoth connectors to test connections and 69 for data_category
in DATA_CHOICES_FOR_TEST:
70 for cu
in SUPPORTED_CU_TEST:
73 if data_category == DATA_OPT_MEMDATA:
76 KEY_MEMBERS: get_valid_json([6680, 779999])}
78 elif (cu ==
"CRUISECU" and 79 data_category == DATA_OPT_MEMDATA):
80 memdata_params = {KEY_MEMBERS: get_valid_json([25198])}
91 elif data_category == DATA_OPT_CREATE_HIST:
98 params={
"restart":
"Y"}
114 this_hash = EXPECTED_HASH_DICT[data_category][cu]
117 this_url = (
"https://{}.homecu.net/hcuadm/mOdysseyMigrExp.prg" 118 "?cu={}&passphrase={}&action={}").format(
119 this_migrator.server,
122 this_migrator.data_category)
134 """Test authentication code generation. 136 Obtained authentication code must match the expected code 137 generated for valid cu and action when generated with correct 141 obtained_hash = generate_hash(
142 this_migrator.data_category,
144 os.environ.get(ENV_MIGR_SECRET_KEY)
146 self.assertEqual(this_expected_hash, obtained_hash)
149 """Test url destination correctness given cu and data_category.""" 152 obtained_url = this_migrator._dest_url()
153 self.assertEqual(this_expected_url, obtained_url)
156 """Invalid credential should not allow access the Mammoth resources. 158 Expect SystemExit exception raised when tried to authenticate 159 with incorrect credentials 164 with self.assertRaises(SystemExit):
165 this_migrator._initiate_http_request()
168 """Test connection with invalid secret key.""" 171 original_secret = os.environ.get(ENV_MIGR_SECRET_KEY)
174 this_migrator._initiate_http_request()
176 with self.assertRaises((SystemExit)):
177 this_migrator._validate_response()
178 os.environ[ENV_MIGR_SECRET_KEY] = original_secret
181 """Test file not exist for gethistoryfile executed before creation. 183 SOCU, CU that we do not use for testing and is expected not to have 184 memhist.gz and memhist lock file in /home/SOCU/tmp directory in 185 Mammoth. If such files exist, delete first. 195 this_migrator._initiate_http_request()
197 with self.assertRaises(SystemExit):
198 this_migrator._validate_response()
201 """Test successful data (all categories) pull from Mammoth.""" 205 if (this_migrator.data_category == DATA_OPT_MEMDATA
and 209 this_migrator._initiate_http_request()
211 resp = this_migrator.response
213 if this_migrator.data_category == DATA_OPT_CREATE_HIST:
214 self.assertIsInstance(resp, dict)
218 self.assertIn(expected_resp_key, resp.keys())
221 chist_error = resp[
"error"]
222 data_fname = resp[
"file"]
223 assert (data_fname ==
"" and chist_error !=
"") \
224 or (data_fname !=
"" and chist_error ==
"")
226 elif this_migrator.data_category == DATA_OPT_GET_HIST:
227 self.assertIsInstance(resp.content, bytes)
230 with self.assertRaises((ValueError)):
235 self.assertIsInstance(resp, dict)
240 self.assertIn(expected_resp_key, resp.keys())
242 data_dict = resp[
"data"]
245 if this_migrator.data_category == DATA_OPT_MEMDATA:
247 this_migrator.data_category]
249 self.assertEqual(len(data_dict.keys()), 1)
250 self.assertEqual(list(data_dict.keys())[0],
"account")
252 for member_response
in data_dict[
"account"].keys():
253 self.assertEqual(sorted(
254 data_dict[
"account"][member_response].keys()),
255 sorted(memdata_true_dict_struct))
260 this_migrator.data_category]:
261 self.assertIn(expected_data_key, data_dict.keys())
264 if this_migrator.data_category == DATA_OPT_SETTINGS:
266 self.assertEqual(len(data_dict[
"cuadmin"]), 1)
267 self.assertEqual(data_dict[
"cuadmin"][
268 0][
"cu"].strip(), this_migrator.cu)
274 if __name__ ==
"__main__":
def test_response_with_error(self)
def test_successful_connection_validate_data(self)
def test_invalid_authentication(self)
def test_hash_generation(self)
expected_response_hist_keys
def test_request_url_correctness(self)
def test_file_not_found_memhist(self)