33from .base import BaseYarnAPI
44from .constants import YarnApplicationState , FinalApplicationStatus
55from .errors import IllegalArgumentError
6- from .hadoop_conf import get_resource_manager_host_port
6+ from .hadoop_conf import get_resource_manager_host_port , check_is_active_rm , CONF_DIR
77
88
99class ResourceManager (BaseYarnAPI ):
@@ -14,20 +14,38 @@ class ResourceManager(BaseYarnAPI):
1414 and information about applications on the cluster.
1515
1616 If `address` argument is `None` client will try to extract `address` and
17- `port` from Hadoop configuration files.
17+ `port` from Hadoop configuration files. If both `address` and `alt_address`
18+ are provided, the address corresponding to the ACTIVE HA Resource Manager will
19+ be used.
1820
1921 :param str address: ResourceManager HTTP address
2022 :param int port: ResourceManager HTTP port
23+ :param str alt_address: Alternate ResourceManager HTTP address for HA configurations
24+ :param int alt_port: Alternate ResourceManager HTTP port for HA configurations
2125 :param int timeout: API connection timeout in seconds
2226 :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
2327 """
24- def __init__ (self , address = None , port = 8088 , timeout = 30 , kerberos_enabled = False ):
28+ def __init__ (self , address = None , port = 8088 , alt_address = None , alt_port = 8088 , timeout = 30 , kerberos_enabled = False ):
2529 if address is None :
26- self .logger .debug ('Get configuration from hadoop conf dir' )
30+ self .logger .debug ('Get configuration from hadoop conf dir: {conf_dir}' . format ( conf_dir = CONF_DIR ) )
2731 address , port = get_resource_manager_host_port ()
32+ else :
33+ if alt_address : # Determine active RM
34+ if not check_is_active_rm (address , port ):
35+ # Default is not active, check alternate
36+ if check_is_active_rm (alt_address , alt_port ):
37+ address , port = alt_address , alt_port
2838
2939 super (ResourceManager , self ).__init__ (address , port , timeout , kerberos_enabled )
3040
41+ def get_active_host_port (self ):
42+ """
43+ The active address, port tuple to which this instance is associated.
44+
45+ :return: Tuple (str, int) corresponding to the active address and port
46+ """
47+ return self .address , self .port
48+
3149 def cluster_information (self ):
3250 """
3351 The cluster information resource provides overall information about
0 commit comments