Recently I had to connect to a Hadoop cluster with Camel. If you do a quick search you will see a nice hdfs2 component that you can use. Obviously I thought it could be done in just a few minutes, why not it has a camel component. Point the component to the right cluster and start moving files! Imagine my surprise when I found out I had to use Kerberos. In this post I am not going into the details of Kerberos itself (in short I don’t like it). You can find good posts by just looking on the web. I found this post to be useful in explaining the basics.
When you look at the documentation page of the camel-hdfs2 component you wont find any hints on how to connect to hdfs using Kerberos. Looking at the code you wont find any references as well. So I decided to just try to connect to the cluster with the HDFS SDK using the FileSystem.class.
Long story short the easiest way to do it is using a couple of files, one to define the cluster, one to define the Kerberos setup and a keytab file to top it of. With this you can connect to the cluster relatively easy.
In the cluster configuration you have to define the cluster setup, the named nodes of the cluster etc. This allows you to automatically switch nodes when a fail over is triggered on the cluster side.
If you google this, it is normally done with a file. As I didn’t like that because all our configuration is stored in the database I went looking for a way to do it based on some code. In essence you have to create an object called Configuration.
static Configuration createHdfsConfiguration(String namedNodes, int replicationFactor) { Configuration hdfsConfiguration = new Configuration(); hdfsConfiguration.set("hadoop.security.authentication", "kerberos"); String nameService = "hfdsNamedService"; hdfsConfiguration.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(replicationFactor)); hdfsConfiguration.set(DFSConfigKeys.DFS_NAMESERVICES, nameService); hdfsConfiguration.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameService), getName(namedNodes)); List distinctNodes = Arrays.stream(namedNodes.split(",")).distinct().collect(Collectors.toList()); for (String address : distinctNodes) { hdfsConfiguration.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, nameService, getName(address)), address); } hdfsConfiguration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + nameService, ConfiguredFailoverProxyProvider.class.getName()); hdfsConfiguration.set("fs.defaultFS", "hdfs://" + nameService); return hdfsConfiguration; }
This method creates the Configuration object and adds serveral configuration parameters. First of all it sets the authentication to Kerberos. Second it creates a list of namedNodes that are in the cluster. For each namedNode a new entry is added that sets the URI. The last item that is configured is the default FileSystem. In our case the namedService was always hdfsNamedService.
Now that we have the cluster configuration we also need to specify some details on the Kerberos server itself. This is done in a Kerberos config file (explained here).
[libdefaults] default_realm = FUSE.RUBIX.NL [realms] FUSE.RUBIX.NL = { kdc = kerberos.rubix.nl kpasswd_server = kerberos.rubix.nl admin_server = kerberos.rubix.nl kpasswd_protocol = SET_CHANGE }
I have tried to get this file converted to code but I haven’t found a plausible solution. I you have one please comment with an example or link to an example. In order for the file to be discovered you have set a system property.
System.setProperty("java.security.krb5.conf", kerberosConfigFileLocation);
Now the last thing we need to do is to log in and setup a connection.
public static FileSystem loginWithKeytab(Configuration hdfsConfiguration, String username, String keyTabFileLocation) throws IOException, URISyntaxException { checkFileExists(keyTabFileLocation); UserGroupInformation.setConfiguration(hdfsConfiguration); UserGroupInformation.loginUserFromKeytab(username, keyTabFileLocation); FileSystem fileSystem = FileSystem.get(hdfsConfiguration); return fileSystem; }
The static methods that the UserGroupInformation expose can be used to connect / log in to the cluster. And with the static “get” method from FileSystem object you can create the actual connection. The FileSystem class is your gateway to the Hadoop cluster. With it you can retrieve, create, delete etc. files on the cluster.
Now that you have a connection you ideally want to get it working in combination with the hdfs2 component of camel. I went to the documentation again to see if I didn’t miss anything. In the OSGI section I found the following line of text: “Provide boilerplate initialization code which populates internal, static cache inside org.apache.hadoop.fs.FileSystem class:”
When debugging the camel-hdfs2 component I noticed that the component uses a slightly different method to get the FileSystem:
this.fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
When you look at the get method you see that they cache the FileSystem based on the URI provided in this method. So in order to get Kerberos working with the camel-hdfs2 component you only have to preload (and thus cache) the FileSystem with the correct URI. So I changed the login method a bit:
public static FileSystem loginWithKeytab(Configuration hdfsConfiguration, String username, String keyTabFileLocation, URI uri) throws IOException, URISyntaxException { checkFileExists(keyTabFileLocation); UserGroupInformation.setConfiguration(hdfsConfiguration); UserGroupInformation.loginUserFromKeytab(username, keyTabFileLocation); FileSystem fileSystem = FileSystem.get(new URI("hdfs:" + uri.getSchemeSpecificPart()), hdfsConfiguration); fileSystem.getStatus(); return fileSystem; }
When you call this method before you initialize your route. On startup of the route the FileSystem will be loaded from the cache and you are good to go with your camel route! Just make sure that you use the same URI that you define in your route (from or to). I had to change the URI scheme to hdfs (witouth the 2) and it all worked like a charm.
URI uri = new URI(“hdfs2:hfdsNamedService:/fileLocation/”) Configuration configuration = getHdfsConfiguration(namedNodes, kerberosConfigFileLocation); loginWithKeytab(hdfsConfiguration, userName, keytabLocation, uri); from(uri) .to(‘sftp://localhost:/destination’)