network.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015-2020
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. import contextlib
  22. from distutils import spawn
  23. import asyncio
  24. import subprocess
  25. import sys
  26. import time
  27. import unittest
  28. import qubes.tests
  29. import qubes.firewall
  30. import qubes.vm.qubesvm
  31. import qubes.vm.appvm
  32. # noinspection PyAttributeOutsideInit,PyPep8Naming
  33. class VmNetworkingMixin(object):
  34. test_ip = '192.168.123.45'
  35. test_name = 'test.example.com'
  36. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  37. ping_ip = ping_cmd.format(target=test_ip)
  38. ping_name = ping_cmd.format(target=test_name)
  39. # filled by load_tests
  40. template = None
  41. def run_cmd(self, vm, cmd, user="root"):
  42. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  43. :type self: qubes.tests.SystemTestCase | VmNetworkingMixin
  44. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  45. :param str cmd: command to execute
  46. :param std user: user to execute command as
  47. :return int: command exit code
  48. '''
  49. try:
  50. self.loop.run_until_complete(vm.run_for_stdio(cmd, user=user))
  51. except subprocess.CalledProcessError as e:
  52. return e.returncode
  53. return 0
  54. def setUp(self):
  55. '''
  56. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  57. '''
  58. super(VmNetworkingMixin, self).setUp()
  59. if self.template.startswith('whonix-'):
  60. self.skipTest("Test not supported here - Whonix uses its own "
  61. "firewall settings")
  62. if self.template.endswith('-minimal'):
  63. self.skipTest(
  64. "Test not supported here - minimal template don't have "
  65. "networking packages by default")
  66. self.init_default_template(self.template)
  67. self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  68. name=self.make_vm_name('netvm1'),
  69. label='red')
  70. self.loop.run_until_complete(self.testnetvm.create_on_disk())
  71. self.testnetvm.provides_network = True
  72. self.testnetvm.netvm = None
  73. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  74. name=self.make_vm_name('vm1'),
  75. label='red')
  76. self.loop.run_until_complete(self.testvm1.create_on_disk())
  77. self.testvm1.netvm = self.testnetvm
  78. self.app.save()
  79. self.configure_netvm()
  80. def _run_cmd_and_log_output(self, vm, cmd):
  81. """Used in tearDown to collect more info"""
  82. if not vm.is_running():
  83. return
  84. with contextlib.suppress(subprocess.CalledProcessError):
  85. output, _ = self.loop.run_until_complete(
  86. vm.run_for_stdio(cmd, user='root', stderr=subprocess.STDOUT))
  87. self.log.critical('{}: {}: {}'.format(vm.name, cmd, output))
  88. def tearDown(self):
  89. # collect more info on failure
  90. if self._outcome and not self._outcome.success:
  91. for vm in (self.testnetvm, self.testvm1, getattr(self, 'proxy', None)):
  92. if vm is None:
  93. continue
  94. self._run_cmd_and_log_output(vm, 'ip a')
  95. self._run_cmd_and_log_output(vm, 'ip r')
  96. self._run_cmd_and_log_output(vm, 'iptables -vnL')
  97. self._run_cmd_and_log_output(vm, 'iptables -vnL -t nat')
  98. self._run_cmd_and_log_output(vm, 'nft list table qubes-firewall')
  99. self._run_cmd_and_log_output(vm, 'systemctl --no-pager status qubes-firewall')
  100. self._run_cmd_and_log_output(vm, 'systemctl --no-pager status qubes-iptables')
  101. self._run_cmd_and_log_output(vm, 'systemctl --no-pager status xendriverdomain')
  102. self._run_cmd_and_log_output(vm, 'cat /var/log/xen/xen-hotplug.log')
  103. super(VmNetworkingMixin, self).tearDown()
  104. def configure_netvm(self):
  105. '''
  106. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  107. '''
  108. def run_netvm_cmd(cmd):
  109. try:
  110. self.loop.run_until_complete(
  111. self.testnetvm.run_for_stdio(cmd, user='root'))
  112. except subprocess.CalledProcessError as e:
  113. self.fail("Command '%s' failed: %s%s" %
  114. (cmd, e.stdout.decode(), e.stderr.decode()))
  115. if not self.testnetvm.is_running():
  116. self.loop.run_until_complete(self.testnetvm.start())
  117. # Ensure that dnsmasq is installed:
  118. try:
  119. self.loop.run_until_complete(self.testnetvm.run_for_stdio(
  120. 'dnsmasq --version', user='root'))
  121. except subprocess.CalledProcessError:
  122. self.skipTest("dnsmasq not installed")
  123. run_netvm_cmd("ip link add test0 type dummy")
  124. run_netvm_cmd("ip link set test0 up")
  125. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  126. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT --wait".format(
  127. self.test_ip))
  128. # ignore failure
  129. self.run_cmd(self.testnetvm, "while pkill dnsmasq; do sleep 1; done")
  130. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  131. ip=self.test_ip, name=self.test_name))
  132. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  133. self.test_ip))
  134. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  135. def test_000_simple_networking(self):
  136. '''
  137. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  138. '''
  139. self.loop.run_until_complete(self.start_vm(self.testvm1))
  140. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  141. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  142. def test_010_simple_proxyvm(self):
  143. '''
  144. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  145. '''
  146. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  147. name=self.make_vm_name('proxy'),
  148. label='red')
  149. self.proxy.provides_network = True
  150. self.proxy.netvm = self.testnetvm
  151. self.loop.run_until_complete(self.proxy.create_on_disk())
  152. self.testvm1.netvm = self.proxy
  153. self.app.save()
  154. self.loop.run_until_complete(self.start_vm(self.testvm1))
  155. self.assertTrue(self.proxy.is_running())
  156. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  157. "Ping by IP from ProxyVM failed")
  158. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  159. "Ping by name from ProxyVM failed")
  160. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  161. "Ping by IP from AppVM failed")
  162. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  163. "Ping by IP from AppVM failed")
  164. @qubes.tests.expectedFailureIfTemplate('debian-7')
  165. @unittest.skipUnless(spawn.find_executable('xdotool'),
  166. "xdotool not installed")
  167. def test_020_simple_proxyvm_nm(self):
  168. '''
  169. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  170. '''
  171. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  172. name=self.make_vm_name('proxy'),
  173. label='red')
  174. self.proxy.provides_network = True
  175. self.loop.run_until_complete(self.proxy.create_on_disk())
  176. self.proxy.netvm = self.testnetvm
  177. self.proxy.features['service.network-manager'] = True
  178. self.testvm1.netvm = self.proxy
  179. self.app.save()
  180. self.loop.run_until_complete(self.start_vm(self.testvm1))
  181. self.assertTrue(self.proxy.is_running())
  182. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  183. "Ping by IP failed")
  184. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  185. "Ping by name failed")
  186. # reconnect to make sure that device was configured by NM
  187. self.assertEqual(
  188. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  189. user="user"),
  190. 0, "Failed to disconnect eth0 using nmcli")
  191. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  192. "Network should be disabled, but apparently it isn't")
  193. self.assertEqual(
  194. self.run_cmd(self.proxy,
  195. 'nmcli connection up "VM uplink eth0" ifname eth0',
  196. user="user"),
  197. 0, "Failed to connect eth0 using nmcli")
  198. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  199. "Failed to wait for NM connection")
  200. # check for nm-applet presence
  201. self.assertEqual(subprocess.call([
  202. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  203. self.proxy.name)],
  204. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  205. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  206. "Ping by IP failed (after NM reconnection")
  207. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  208. "Ping by name failed (after NM reconnection)")
  209. def test_030_firewallvm_firewall(self):
  210. '''
  211. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  212. '''
  213. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  214. name=self.make_vm_name('proxy'),
  215. label='red')
  216. self.proxy.provides_network = True
  217. self.loop.run_until_complete(self.proxy.create_on_disk())
  218. self.proxy.netvm = self.testnetvm
  219. self.testvm1.netvm = self.proxy
  220. self.app.save()
  221. # block all for first
  222. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  223. self.testvm1.firewall.save()
  224. self.loop.run_until_complete(self.start_vm(self.testvm1))
  225. self.assertTrue(self.proxy.is_running())
  226. server = self.loop.run_until_complete(self.testnetvm.run(
  227. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  228. try:
  229. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  230. "Ping by IP from ProxyVM failed")
  231. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  232. "Ping by name from ProxyVM failed")
  233. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  234. "Ping by IP should be blocked")
  235. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  236. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  237. "TCP connection should be blocked")
  238. # block all except ICMP
  239. self.testvm1.firewall.rules = [(
  240. qubes.firewall.Rule(None, action='accept', proto='icmp')
  241. )]
  242. self.testvm1.firewall.save()
  243. # Ugly hack b/c there is no feedback when the rules are actually
  244. # applied
  245. time.sleep(3)
  246. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  247. "Ping by IP failed (should be allowed now)")
  248. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  249. "Ping by name should be blocked")
  250. # all TCP still blocked
  251. self.testvm1.firewall.rules = [
  252. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  253. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  254. ]
  255. self.testvm1.firewall.save()
  256. # Ugly hack b/c there is no feedback when the rules are actually
  257. # applied
  258. time.sleep(3)
  259. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  260. "Ping by name failed (should be allowed now)")
  261. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  262. "TCP connection should be blocked")
  263. # block all except target
  264. self.testvm1.firewall.rules = [
  265. qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
  266. proto='tcp', dstports=1234),
  267. ]
  268. self.testvm1.firewall.save()
  269. # Ugly hack b/c there is no feedback when the rules are actually
  270. # applied
  271. time.sleep(3)
  272. self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  273. "TCP connection failed (should be allowed now)")
  274. # allow all except target
  275. self.testvm1.firewall.rules = [
  276. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
  277. proto='tcp', dstports=1234),
  278. qubes.firewall.Rule(action='accept'),
  279. ]
  280. self.testvm1.firewall.save()
  281. # Ugly hack b/c there is no feedback when the rules are actually
  282. # applied
  283. time.sleep(3)
  284. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  285. "TCP connection should be blocked")
  286. finally:
  287. server.terminate()
  288. self.loop.run_until_complete(server.wait())
  289. def test_040_inter_vm(self):
  290. '''
  291. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  292. '''
  293. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  294. name=self.make_vm_name('proxy'),
  295. label='red')
  296. self.loop.run_until_complete(self.proxy.create_on_disk())
  297. self.proxy.provides_network = True
  298. self.proxy.netvm = self.testnetvm
  299. self.testvm1.netvm = self.proxy
  300. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  301. name=self.make_vm_name('vm2'),
  302. label='red')
  303. self.loop.run_until_complete(self.testvm2.create_on_disk())
  304. self.testvm2.netvm = self.proxy
  305. self.app.save()
  306. self.loop.run_until_complete(asyncio.gather(
  307. self.start_vm(self.testvm1),
  308. self.start_vm(self.testvm2)))
  309. self.assertNotEqual(self.run_cmd(self.testvm1,
  310. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  311. self.testvm2.netvm = self.testnetvm
  312. self.assertNotEqual(self.run_cmd(self.testvm1,
  313. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  314. self.assertNotEqual(self.run_cmd(self.testvm2,
  315. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  316. self.testvm1.netvm = self.testnetvm
  317. self.assertNotEqual(self.run_cmd(self.testvm1,
  318. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  319. self.assertNotEqual(self.run_cmd(self.testvm2,
  320. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  321. def test_050_spoof_ip(self):
  322. '''Test if VM IP spoofing is blocked
  323. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  324. '''
  325. self.loop.run_until_complete(self.start_vm(self.testvm1))
  326. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  327. self.assertEqual(self.run_cmd(self.testnetvm,
  328. 'iptables -I INPUT -i vif+ ! -s {} -p icmp -j LOG'.format(
  329. self.testvm1.ip)), 0)
  330. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  331. 'ip addr flush dev eth0 && '
  332. 'ip addr add 10.137.1.128/24 dev eth0 && '
  333. 'ip route add default dev eth0',
  334. user='root'))
  335. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  336. "Spoofed ping should be blocked")
  337. try:
  338. (output, _) = self.loop.run_until_complete(
  339. self.testnetvm.run_for_stdio('iptables -nxvL INPUT',
  340. user='root'))
  341. except subprocess.CalledProcessError:
  342. self.fail('iptables -nxvL INPUT failed')
  343. output = output.decode().splitlines()
  344. packets = output[2].lstrip().split()[0]
  345. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  346. def test_100_late_xldevd_startup(self):
  347. '''Regression test for #1990
  348. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  349. '''
  350. # Simulater late xl devd startup
  351. cmd = "systemctl stop xendriverdomain"
  352. if self.run_cmd(self.testnetvm, cmd) != 0:
  353. self.fail("Command '%s' failed" % cmd)
  354. self.loop.run_until_complete(self.start_vm(self.testvm1))
  355. cmd = "systemctl start xendriverdomain"
  356. if self.run_cmd(self.testnetvm, cmd) != 0:
  357. self.fail("Command '%s' failed" % cmd)
  358. # let it initialize the interface(s)
  359. time.sleep(1)
  360. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  361. def test_110_dynamic_attach(self):
  362. self.testvm1.netvm = None
  363. self.loop.run_until_complete(self.start_vm(self.testvm1))
  364. self.testvm1.netvm = self.testnetvm
  365. # wait for it to settle down
  366. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  367. 'udevadm settle'))
  368. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  369. def test_111_dynamic_detach_attach(self):
  370. self.loop.run_until_complete(self.start_vm(self.testvm1))
  371. self.testvm1.netvm = None
  372. # wait for it to settle down
  373. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  374. 'udevadm settle'))
  375. self.testvm1.netvm = self.testnetvm
  376. # wait for it to settle down
  377. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  378. 'udevadm settle'))
  379. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  380. def test_112_reattach_after_provider_shutdown(self):
  381. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  382. name=self.make_vm_name('proxy'),
  383. label='red')
  384. self.proxy.provides_network = True
  385. self.proxy.netvm = self.testnetvm
  386. self.loop.run_until_complete(self.proxy.create_on_disk())
  387. self.testvm1.netvm = self.proxy
  388. self.loop.run_until_complete(self.start_vm(self.testvm1))
  389. self.loop.run_until_complete(self.proxy.shutdown(force=True, wait=True))
  390. self.loop.run_until_complete(self.start_vm(self.proxy))
  391. # wait for it to settle down
  392. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  393. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  394. def test_113_reattach_after_provider_kill(self):
  395. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  396. name=self.make_vm_name('proxy'),
  397. label='red')
  398. self.proxy.provides_network = True
  399. self.proxy.netvm = self.testnetvm
  400. self.loop.run_until_complete(self.proxy.create_on_disk())
  401. self.testvm1.netvm = self.proxy
  402. self.loop.run_until_complete(self.start_vm(self.testvm1))
  403. self.loop.run_until_complete(self.proxy.kill())
  404. self.loop.run_until_complete(self.start_vm(self.proxy))
  405. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  406. def test_114_reattach_after_provider_crash(self):
  407. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  408. name=self.make_vm_name('proxy'),
  409. label='red')
  410. self.proxy.provides_network = True
  411. self.proxy.netvm = self.testnetvm
  412. self.loop.run_until_complete(self.proxy.create_on_disk())
  413. self.testvm1.netvm = self.proxy
  414. self.loop.run_until_complete(self.start_vm(self.testvm1))
  415. p = self.loop.run_until_complete(self.proxy.run(
  416. 'echo c > /proc/sysrq-trigger', user='root'))
  417. self.loop.run_until_complete(p.wait())
  418. timeout = 10
  419. while self.proxy.is_running():
  420. self.loop.run_until_complete(asyncio.sleep(1))
  421. timeout -= 1
  422. self.assertGreater(timeout, 0,
  423. 'timeout waiting for crash cleanup')
  424. self.loop.run_until_complete(self.start_vm(self.proxy))
  425. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  426. def test_200_fake_ip_simple(self):
  427. '''Test hiding VM real IP
  428. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  429. '''
  430. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  431. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  432. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  433. self.app.save()
  434. self.loop.run_until_complete(self.start_vm(self.testvm1))
  435. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  436. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  437. try:
  438. (output, _) = self.loop.run_until_complete(
  439. self.testvm1.run_for_stdio(
  440. 'ip addr show dev eth0', user='root'))
  441. except subprocess.CalledProcessError:
  442. self.fail('ip addr show dev eth0 failed')
  443. output = output.decode()
  444. self.assertIn('192.168.1.128', output)
  445. self.assertNotIn(str(self.testvm1.ip), output)
  446. try:
  447. (output, _) = self.loop.run_until_complete(
  448. self.testvm1.run_for_stdio('ip route show', user='root'))
  449. except subprocess.CalledProcessError:
  450. self.fail('ip route show failed')
  451. output = output.decode()
  452. self.assertIn('192.168.1.1', output)
  453. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  454. def test_201_fake_ip_without_gw(self):
  455. '''Test hiding VM real IP
  456. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  457. '''
  458. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  459. self.app.save()
  460. self.loop.run_until_complete(self.start_vm(self.testvm1))
  461. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  462. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  463. try:
  464. (output, _) = self.loop.run_until_complete(
  465. self.testvm1.run_for_stdio('ip addr show dev eth0',
  466. user='root'))
  467. except subprocess.CalledProcessError:
  468. self.fail('ip addr show dev eth0 failed')
  469. output = output.decode()
  470. self.assertIn('192.168.1.128', output)
  471. self.assertNotIn(str(self.testvm1.ip), output)
  472. def test_202_fake_ip_firewall(self):
  473. '''Test hiding VM real IP, firewall
  474. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  475. '''
  476. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  477. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  478. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  479. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  480. name=self.make_vm_name('proxy'),
  481. label='red')
  482. self.proxy.provides_network = True
  483. self.loop.run_until_complete(self.proxy.create_on_disk())
  484. self.proxy.netvm = self.testnetvm
  485. self.testvm1.netvm = self.proxy
  486. self.app.save()
  487. # block all but ICMP and DNS
  488. self.testvm1.firewall.rules = [
  489. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  490. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  491. ]
  492. self.testvm1.firewall.save()
  493. self.loop.run_until_complete(self.start_vm(self.testvm1))
  494. self.assertTrue(self.proxy.is_running())
  495. server = self.loop.run_until_complete(self.testnetvm.run(
  496. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  497. try:
  498. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  499. "Ping by IP from ProxyVM failed")
  500. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  501. "Ping by name from ProxyVM failed")
  502. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  503. "Ping by IP should be allowed")
  504. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  505. "Ping by name should be allowed")
  506. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  507. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  508. "TCP connection should be blocked")
  509. finally:
  510. server.terminate()
  511. self.loop.run_until_complete(server.wait())
  512. def test_203_fake_ip_inter_vm_allow(self):
  513. '''Access VM with "fake IP" from other VM (when firewall allows)
  514. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  515. '''
  516. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  517. name=self.make_vm_name('proxy'),
  518. label='red')
  519. self.loop.run_until_complete(self.proxy.create_on_disk())
  520. self.proxy.provides_network = True
  521. self.proxy.netvm = self.testnetvm
  522. self.testvm1.netvm = self.proxy
  523. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  524. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  525. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  526. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  527. name=self.make_vm_name('vm2'),
  528. label='red')
  529. self.loop.run_until_complete(self.testvm2.create_on_disk())
  530. self.testvm2.netvm = self.proxy
  531. self.app.save()
  532. self.loop.run_until_complete(self.start_vm(self.testvm1))
  533. self.loop.run_until_complete(self.start_vm(self.testvm2))
  534. cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
  535. self.testvm2.ip, self.testvm1.ip)
  536. try:
  537. self.loop.run_until_complete(self.proxy.run_for_stdio(
  538. cmd, user='root'))
  539. except subprocess.CalledProcessError as e:
  540. raise AssertionError(
  541. '{} failed with: {}'.format(cmd, e.returncode)) from None
  542. try:
  543. cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(self.testvm2.ip)
  544. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  545. cmd, user='root'))
  546. except subprocess.CalledProcessError as e:
  547. raise AssertionError(
  548. '{} failed with: {}'.format(cmd, e.returncode)) from None
  549. self.assertEqual(self.run_cmd(self.testvm2,
  550. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  551. try:
  552. cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
  553. (stdout, _) = self.loop.run_until_complete(
  554. self.testvm1.run_for_stdio(cmd, user='root'))
  555. except subprocess.CalledProcessError as e:
  556. raise AssertionError(
  557. '{} failed with {}'.format(cmd, e.returncode)) from None
  558. self.assertNotEqual(stdout.decode().split()[0], '0',
  559. 'Packets didn\'t managed to the VM')
  560. def test_204_fake_ip_proxy(self):
  561. '''Test hiding VM real IP
  562. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  563. '''
  564. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  565. name=self.make_vm_name('proxy'),
  566. label='red')
  567. self.loop.run_until_complete(self.proxy.create_on_disk())
  568. self.proxy.provides_network = True
  569. self.proxy.netvm = self.testnetvm
  570. self.proxy.features['net.fake-ip'] = '192.168.1.128'
  571. self.proxy.features['net.fake-gateway'] = '192.168.1.1'
  572. self.proxy.features['net.fake-netmask'] = '255.255.255.0'
  573. self.testvm1.netvm = self.proxy
  574. self.app.save()
  575. self.loop.run_until_complete(self.start_vm(self.testvm1))
  576. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
  577. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
  578. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  579. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  580. try:
  581. (output, _) = self.loop.run_until_complete(
  582. self.proxy.run_for_stdio(
  583. 'ip addr show dev eth0', user='root'))
  584. except subprocess.CalledProcessError:
  585. self.fail('ip addr show dev eth0 failed')
  586. output = output.decode()
  587. self.assertIn('192.168.1.128', output)
  588. self.assertNotIn(str(self.testvm1.ip), output)
  589. try:
  590. (output, _) = self.loop.run_until_complete(
  591. self.proxy.run_for_stdio(
  592. 'ip route show', user='root'))
  593. except subprocess.CalledProcessError:
  594. self.fail('ip route show failed')
  595. output = output.decode()
  596. self.assertIn('192.168.1.1', output)
  597. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  598. try:
  599. (output, _) = self.loop.run_until_complete(
  600. self.testvm1.run_for_stdio(
  601. 'ip addr show dev eth0', user='root'))
  602. except subprocess.CalledProcessError:
  603. self.fail('ip addr show dev eth0 failed')
  604. output = output.decode()
  605. self.assertNotIn('192.168.1.128', output)
  606. self.assertIn(str(self.testvm1.ip), output)
  607. try:
  608. (output, _) = self.loop.run_until_complete(
  609. self.testvm1.run_for_stdio(
  610. 'ip route show', user='root'))
  611. except subprocess.CalledProcessError:
  612. self.fail('ip route show failed')
  613. output = output.decode()
  614. self.assertIn('192.168.1.128', output)
  615. self.assertNotIn(str(self.proxy.ip), output)
  616. def test_210_custom_ip_simple(self):
  617. '''Custom AppVM IP
  618. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  619. '''
  620. self.testvm1.ip = '192.168.1.1'
  621. self.app.save()
  622. self.loop.run_until_complete(self.start_vm(self.testvm1))
  623. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  624. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  625. def test_211_custom_ip_proxy(self):
  626. '''Custom ProxyVM IP
  627. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  628. '''
  629. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  630. name=self.make_vm_name('proxy'),
  631. label='red')
  632. self.loop.run_until_complete(self.proxy.create_on_disk())
  633. self.proxy.provides_network = True
  634. self.proxy.netvm = self.testnetvm
  635. self.proxy.ip = '192.168.1.1'
  636. self.testvm1.netvm = self.proxy
  637. self.app.save()
  638. self.loop.run_until_complete(self.start_vm(self.testvm1))
  639. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  640. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  641. def test_212_custom_ip_firewall(self):
  642. '''Custom VM IP and firewall
  643. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  644. '''
  645. self.testvm1.ip = '192.168.1.1'
  646. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  647. name=self.make_vm_name('proxy'),
  648. label='red')
  649. self.proxy.provides_network = True
  650. self.loop.run_until_complete(self.proxy.create_on_disk())
  651. self.proxy.netvm = self.testnetvm
  652. self.testvm1.netvm = self.proxy
  653. self.app.save()
  654. # block all but ICMP and DNS
  655. self.testvm1.firewall.rules = [
  656. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  657. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  658. ]
  659. self.testvm1.firewall.save()
  660. self.loop.run_until_complete(self.start_vm(self.testvm1))
  661. self.assertTrue(self.proxy.is_running())
  662. server = self.loop.run_until_complete(self.testnetvm.run(
  663. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  664. try:
  665. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  666. "Ping by IP from ProxyVM failed")
  667. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  668. "Ping by name from ProxyVM failed")
  669. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  670. "Ping by IP should be allowed")
  671. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  672. "Ping by name should be allowed")
  673. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  674. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  675. "TCP connection should be blocked")
  676. finally:
  677. server.terminate()
  678. self.loop.run_until_complete(server.wait())
  679. def create_testcases_for_templates():
  680. yield from qubes.tests.create_testcases_for_templates('VmNetworking',
  681. VmNetworkingMixin, qubes.tests.SystemTestCase,
  682. module=sys.modules[__name__])
  683. def load_tests(loader, tests, pattern):
  684. tests.addTests(loader.loadTestsFromNames(
  685. create_testcases_for_templates()))
  686. return tests
  687. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)